- Rename files: cog_cli.py, test_cog.py, k8s/cog-*.yaml - Update all Python tool names: cog_* -> assay_* - Update FastAPI titles, MCP server names, URLs - Update K8s manifests: deployments, services, secrets, ingress - Update Docker env vars: COG_API -> ASSAY_API - Domain: cog.loop42.de -> assay.loop42.de - SQLite path: /tmp/cog_db.sqlite -> /tmp/assay_db.sqlite Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
604 lines
25 KiB
Python
604 lines
25 KiB
Python
"""
|
|
Assay Runtime Test Runner — parses testcases/*.md and executes against live API.
|
|
|
|
Usage:
|
|
pytest testcases/ -v # run all testcases
|
|
pytest testcases/counter_state.md -v # run one testcase
|
|
python runtime_test.py # standalone run all
|
|
python runtime_test.py testcases/pub_conversation.md # standalone run one
|
|
"""
|
|
|
|
import httpx
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
# Fix Windows console encoding
|
|
if sys.platform == "win32":
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
|
|
API = os.environ.get("ASSAY_API", "https://assay.loop42.de/api")
|
|
TOKEN = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g"
|
|
HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
|
|
|
|
|
|
# --- Markdown parser ---
|
|
|
|
def parse_testcase(path: Path) -> dict:
|
|
"""Parse a testcase markdown file into structured steps."""
|
|
text = path.read_text(encoding="utf-8")
|
|
lines = text.split("\n")
|
|
|
|
tc = {"name": "", "steps": [], "file": str(path)}
|
|
current_step = None
|
|
|
|
in_setup = False
|
|
for line in lines:
|
|
line_stripped = line.strip()
|
|
|
|
# Title
|
|
if line_stripped.startswith("# ") and not tc["name"]:
|
|
tc["name"] = line_stripped[2:].strip()
|
|
continue
|
|
|
|
# Setup section
|
|
if line_stripped == "## Setup":
|
|
in_setup = True
|
|
current_step = {"name": "Setup", "commands": []}
|
|
continue
|
|
|
|
# End setup on next ## or ###
|
|
if line_stripped.startswith("## ") and in_setup:
|
|
if current_step and current_step["commands"]:
|
|
tc["steps"].insert(0, current_step)
|
|
in_setup = False
|
|
current_step = None
|
|
|
|
# Step header
|
|
if line_stripped.startswith("### "):
|
|
if in_setup and current_step and current_step["commands"]:
|
|
tc["steps"].insert(0, current_step)
|
|
in_setup = False
|
|
elif current_step and not in_setup:
|
|
tc["steps"].append(current_step)
|
|
step_text = line_stripped[4:].strip()
|
|
m = re.match(r"\d+\.\s*(.*)", step_text)
|
|
current_step = {"name": m.group(1) if m else step_text, "commands": []}
|
|
continue
|
|
|
|
# Commands within a step or setup
|
|
if (current_step or in_setup) and line_stripped.startswith("- "):
|
|
cmd_text = line_stripped[2:].strip()
|
|
cmd = _parse_command(cmd_text)
|
|
if cmd and current_step:
|
|
current_step["commands"].append(cmd)
|
|
|
|
if current_step:
|
|
tc["steps"].append(current_step)
|
|
|
|
return tc
|
|
|
|
|
|
def _parse_command(text: str) -> dict | None:
|
|
"""Parse a single command line like 'send: hello' or 'expect_response: contains foo'."""
|
|
# send: message |dashboard| [json]
|
|
# send: message (no dashboard)
|
|
if text.startswith("send:"):
|
|
val = text[5:].strip()
|
|
if "|dashboard|" in val:
|
|
parts = val.split("|dashboard|", 1)
|
|
msg_text = parts[0].strip()
|
|
try:
|
|
dashboard = json.loads(parts[1].strip())
|
|
except (json.JSONDecodeError, Exception):
|
|
dashboard = []
|
|
return {"type": "send", "text": msg_text, "dashboard": dashboard}
|
|
return {"type": "send", "text": val}
|
|
|
|
# action: action_name OR action: first matching "pattern" or "pattern2"
|
|
if text.startswith("action:"):
|
|
val = text[7:].strip()
|
|
m = re.match(r'first matching (.+)', val)
|
|
if m:
|
|
# Support: first matching "+1" or "inc" or "plus"
|
|
patterns = [p.strip().strip('"') for p in m.group(1).split(" or ")]
|
|
return {"type": "action_match", "patterns": patterns}
|
|
return {"type": "action", "action": val}
|
|
|
|
# expect_response: contains "foo"
|
|
if text.startswith("expect_response:"):
|
|
return {"type": "expect_response", "check": text[16:].strip()}
|
|
|
|
# expect_actions: length >= 2
|
|
if text.startswith("expect_actions:"):
|
|
return {"type": "expect_actions", "check": text[15:].strip()}
|
|
|
|
# expect_state: field contains/is "value"
|
|
if text.startswith("expect_state:"):
|
|
return {"type": "expect_state", "check": text[13:].strip()}
|
|
|
|
# clear history
|
|
if text == "clear history":
|
|
return {"type": "clear"}
|
|
|
|
# expect_trace: input.analysis.intent is "social"
|
|
# expect_trace: has reflex_path
|
|
# expect_trace: no thinker
|
|
if text.startswith("expect_trace:"):
|
|
return {"type": "expect_trace", "check": text[13:].strip()}
|
|
|
|
return None
|
|
|
|
|
|
# --- API client ---
|
|
|
|
class AssayClient:
|
|
def __init__(self):
|
|
self.client = httpx.Client(timeout=90)
|
|
self.last_response = ""
|
|
self.last_memo = {}
|
|
self.last_actions = []
|
|
self.last_buttons = []
|
|
self.last_trace = []
|
|
|
|
def clear(self):
|
|
self.client.post(f"{API}/clear", headers=HEADERS)
|
|
time.sleep(0.3)
|
|
|
|
def send(self, text: str, dashboard: list = None) -> dict:
|
|
body = {"text": text}
|
|
if dashboard is not None:
|
|
body["dashboard"] = dashboard
|
|
r = self.client.post(f"{API}/send", json=body, headers=HEADERS)
|
|
d = r.json()
|
|
# Async send: poll for result, match on message ID
|
|
if d.get("status") == "queued":
|
|
msg_id = d.get("id", "")
|
|
for _ in range(120):
|
|
time.sleep(0.5)
|
|
pr = self.client.get(f"{API}/result", headers=HEADERS)
|
|
pd = pr.json()
|
|
if pd.get("id") == msg_id and pd.get("status") == "done":
|
|
d = pd
|
|
break
|
|
if pd.get("id") == msg_id and pd.get("status") == "error":
|
|
d = pd
|
|
break
|
|
resp = d.get("response", "")
|
|
self.last_response = resp if isinstance(resp, str) else str(resp)
|
|
self.last_memo = d.get("memorizer", {})
|
|
if not isinstance(self.last_memo, dict):
|
|
self.last_memo = {}
|
|
time.sleep(0.5)
|
|
self._fetch_trace()
|
|
return d
|
|
|
|
def send_action(self, action: str) -> dict:
|
|
# Actions go through /api/send as ACTION: format (since we can't use WS from test)
|
|
return self.send(f"ACTION: {action}")
|
|
|
|
def _fetch_trace(self):
|
|
r = self.client.get(f"{API}/trace?last=40", headers=HEADERS)
|
|
self.last_trace = r.json().get("lines", [])
|
|
# Extract controls from the most recent controls HUD event
|
|
for t in reversed(self.last_trace):
|
|
if t.get("event") == "controls":
|
|
new_controls = t.get("controls", [])
|
|
if new_controls:
|
|
self.last_actions = new_controls
|
|
self.last_buttons = [c for c in new_controls
|
|
if isinstance(c, dict) and c.get("type") == "button"]
|
|
break
|
|
|
|
def get_state(self) -> dict:
|
|
r = self.client.get(f"{API}/state", headers=HEADERS)
|
|
d = r.json()
|
|
self.last_memo = d.get("memorizer", {})
|
|
return self.last_memo
|
|
|
|
def close(self):
|
|
self.client.close()
|
|
|
|
|
|
# --- Assertion engine ---
|
|
|
|
def check_response(response: str, check: str) -> tuple[bool, str]:
|
|
"""Evaluate a response assertion. Returns (passed, detail)."""
|
|
# contains "foo" or "bar"
|
|
m = re.match(r'contains\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
|
|
for term in terms:
|
|
if term.lower() in response.lower():
|
|
return True, f"found '{term}'"
|
|
return False, f"none of {terms} found in: {response[:100]}"
|
|
|
|
# matches regex
|
|
m = re.match(r'matches\s+(.+)', check)
|
|
if m:
|
|
pattern = m.group(1).strip()
|
|
if re.search(pattern, response):
|
|
return True, f"matched /{pattern}/"
|
|
return False, f"/{pattern}/ not found in: {response[:100]}"
|
|
|
|
# not contains "foo" or "bar"
|
|
m = re.match(r'not contains\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
|
|
for term in terms:
|
|
if term.lower() in response.lower():
|
|
return False, f"found '{term}' but expected NOT to"
|
|
return True, f"none of {terms} found (as expected)"
|
|
|
|
# length > N
|
|
m = re.match(r'length\s*>\s*(\d+)', check)
|
|
if m:
|
|
expected = int(m.group(1))
|
|
if len(response) > expected:
|
|
return True, f"length {len(response)} > {expected}"
|
|
return False, f"length {len(response)} <= {expected}"
|
|
|
|
return False, f"unknown check: {check}"
|
|
|
|
|
|
def check_actions(actions: list, check: str) -> tuple[bool, str]:
|
|
"""Evaluate an actions assertion."""
|
|
# length >= N
|
|
m = re.match(r'length\s*>=\s*(\d+)', check)
|
|
if m:
|
|
expected = int(m.group(1))
|
|
if len(actions) >= expected:
|
|
return True, f"{len(actions)} actions >= {expected}"
|
|
return False, f"{len(actions)} actions < {expected}"
|
|
|
|
# has TYPE or has TYPE1 or TYPE2
|
|
m = re.match(r'has\s+(.+)', check)
|
|
if m:
|
|
types = [t.strip() for t in m.group(1).split(" or has ")]
|
|
# Also handle "card or has table" → ["card", "table"]
|
|
types = [t.replace("has ", "") for t in types]
|
|
for a in actions:
|
|
if isinstance(a, dict) and a.get("type") in types:
|
|
atype = a.get("type")
|
|
if atype == "table":
|
|
return True, f"table found: {len(a.get('columns', []))} cols, {len(a.get('data', []))} rows"
|
|
elif atype == "card":
|
|
return True, f"card found: {a.get('title', '?')}, {len(a.get('fields', []))} fields"
|
|
elif atype == "list":
|
|
return True, f"list found: {a.get('title', '?')}, {len(a.get('items', []))} items"
|
|
else:
|
|
return True, f"{atype} found"
|
|
return False, f"no {' or '.join(types)} in {len(actions)} controls ({[a.get('type','?') for a in actions if isinstance(a, dict)]})"
|
|
|
|
# any action contains "foo" or "bar" — searches buttons only
|
|
m = re.match(r'any action contains\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
|
|
buttons = [a for a in actions if isinstance(a, dict) and a.get("type") == "button"]
|
|
action_strs = [json.dumps(a).lower() for a in buttons]
|
|
for term in terms:
|
|
if any(term.lower() in s for s in action_strs):
|
|
return True, f"found '{term}' in actions"
|
|
return False, f"none of {terms} found in {len(buttons)} buttons"
|
|
|
|
return False, f"unknown check: {check}"
|
|
|
|
|
|
def check_state(memo: dict, check: str) -> tuple[bool, str]:
|
|
"""Evaluate a memorizer state assertion."""
|
|
# field contains "value" or "value2"
|
|
m = re.match(r'(\w+)\s+contains\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
field, values_str = m.group(1), m.group(2)
|
|
terms = [t.strip().strip('"') for t in values_str.split(" or ")]
|
|
actual = memo.get(field, "")
|
|
if isinstance(actual, list):
|
|
actual_str = " ".join(str(x) for x in actual)
|
|
else:
|
|
actual_str = str(actual)
|
|
for term in terms:
|
|
if term.lower() in actual_str.lower():
|
|
return True, f"{field}={actual_str[:50]} contains '{term}'"
|
|
return False, f"{field}={actual_str[:50]} doesn't contain any of {terms}"
|
|
|
|
# field is "value" or "value2"
|
|
m = re.match(r'(\w+)\s+is\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
field, values_str = m.group(1), m.group(2)
|
|
terms = [t.strip().strip('"') for t in values_str.split(" or ")]
|
|
actual = str(memo.get(field, ""))
|
|
for term in terms:
|
|
if actual.lower() == term.lower():
|
|
return True, f"{field}={actual}"
|
|
return False, f"{field}={actual} not in {terms}"
|
|
|
|
# facts any contains "value"
|
|
m = re.match(r'facts\s+any\s+contains\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
|
|
facts = memo.get("facts", [])
|
|
facts_str = " ".join(facts).lower()
|
|
for term in terms:
|
|
if term.lower() in facts_str:
|
|
return True, f"found '{term}' in facts"
|
|
return False, f"none of {terms} found in facts: {facts}"
|
|
|
|
return False, f"unknown check: {check}"
|
|
|
|
|
|
def check_trace(trace: list, check: str) -> tuple[bool, str]:
|
|
"""Evaluate a trace assertion. Checks HUD events from last request."""
|
|
# input.analysis.FIELD is "VALUE"
|
|
m = re.match(r'input\.analysis\.(\w+)\s+is\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
field, expected = m.group(1), m.group(2)
|
|
terms = [t.strip().strip('"') for t in expected.split(" or ")]
|
|
# Method 1: parse from LAST frame_trace event (v3 frame engine, most reliable)
|
|
for t in reversed(trace):
|
|
if t.get("event") == "frame_trace" and t.get("trace"):
|
|
frames = t["trace"].get("frames", [])
|
|
for fr in frames:
|
|
if fr.get("node") == "input" and fr.get("output"):
|
|
out = fr["output"]
|
|
for part in out.split():
|
|
if "=" in part:
|
|
k, v = part.split("=", 1)
|
|
if k == field:
|
|
for term in terms:
|
|
if v.lower() == term.lower():
|
|
return True, f"input.analysis.{field}={v} (from frame_trace)"
|
|
return False, f"input.analysis.{field}={v}, expected one of {terms}"
|
|
break # only check the most recent frame_trace
|
|
# Method 2: fallback to input node's "perceived" HUD event (v1/v2)
|
|
for t in reversed(trace):
|
|
if t.get("node") == "input" and t.get("event") == "perceived":
|
|
analysis = t.get("analysis", {})
|
|
actual = str(analysis.get(field, ""))
|
|
for term in terms:
|
|
if actual.lower() == term.lower():
|
|
return True, f"input.analysis.{field}={actual}"
|
|
return False, f"input.analysis.{field}={actual}, expected one of {terms}"
|
|
return False, f"no input perceived event in trace"
|
|
|
|
# has tool_call TOOL_NAME — checks if Thinker called a specific function tool
|
|
m = re.match(r'has\s+tool_call\s+(\w+)', check)
|
|
if m:
|
|
tool_name = m.group(1)
|
|
for t in trace:
|
|
# Check machine_created/destroyed/etc events that are emitted by UI node
|
|
if t.get("event") in ("machine_created", "machine_destroyed", "machine_reset",
|
|
"machine_state_added") and tool_name in t.get("event", ""):
|
|
return True, f"found machine event for '{tool_name}'"
|
|
# Check for the tool name in the event data
|
|
if t.get("event") == "machine_created" and tool_name == "create_machine":
|
|
return True, f"found create_machine via machine_created event"
|
|
if t.get("event") == "machine_state_added" and tool_name == "add_state":
|
|
return True, f"found add_state via machine_state_added event"
|
|
if t.get("event") == "machine_reset" and tool_name == "reset_machine":
|
|
return True, f"found reset_machine via machine_reset event"
|
|
if t.get("event") == "machine_destroyed" and tool_name == "destroy_machine":
|
|
return True, f"found destroy_machine via machine_destroyed event"
|
|
if t.get("event") == "machine_data_updated" and tool_name == "update_machine":
|
|
return True, f"found update_machine via machine_data_updated event"
|
|
if t.get("event") == "machine_transitioned" and tool_name == "transition_machine":
|
|
return True, f"found transition_machine via machine_transitioned event"
|
|
if t.get("event") == "pa_retry" and tool_name == "pa_retry":
|
|
return True, f"found pa_retry event"
|
|
return False, f"no tool_call '{tool_name}' in trace"
|
|
|
|
# machine_created id="NAV" — checks for specific machine creation
|
|
m = re.match(r'machine_created\s+id="(\w+)"', check)
|
|
if m:
|
|
expected_id = m.group(1)
|
|
for t in trace:
|
|
if t.get("event") == "machine_created" and t.get("id") == expected_id:
|
|
return True, f"machine '{expected_id}' created"
|
|
return False, f"no machine_created event with id='{expected_id}'"
|
|
|
|
# has EVENT_NAME or EVENT_NAME2 ...
|
|
m = re.match(r'has\s+([\w\s]+(?:\s+or\s+\w+)*)', check)
|
|
if m and not re.match(r'has\s+tool_call\s+\w+', check):
|
|
names = [n.strip() for n in re.split(r'\s+or\s+', m.group(1))]
|
|
for t in trace:
|
|
if t.get("event") in names:
|
|
return True, f"found event '{t.get('event')}'"
|
|
return False, f"no '{' or '.join(names)}' event in trace"
|
|
|
|
# no EVENT_NAME
|
|
m = re.match(r'no\s+(\w+)', check)
|
|
if m:
|
|
event_name = m.group(1)
|
|
for t in trace:
|
|
if t.get("event") == event_name:
|
|
return False, f"found unexpected event '{event_name}'"
|
|
return True, f"no '{event_name}' event (as expected)"
|
|
|
|
return False, f"unknown trace check: {check}"
|
|
|
|
|
|
# --- Runner ---
|
|
|
|
@dataclass
|
|
class StepResult:
|
|
step: str
|
|
check: str
|
|
status: str # PASS, FAIL, SKIP
|
|
detail: str = ""
|
|
|
|
|
|
class AssayTestRunner:
|
|
def __init__(self, on_result=None):
|
|
self.client = AssayClient()
|
|
self._on_result = on_result # callback(result_dict) per check
|
|
|
|
def run(self, testcase: dict) -> list[dict]:
|
|
results = []
|
|
for step in testcase["steps"]:
|
|
step_results = self._run_step(step)
|
|
results.extend(step_results)
|
|
self.client.close()
|
|
return results
|
|
|
|
def _add(self, results: list, result: dict):
|
|
results.append(result)
|
|
if self._on_result:
|
|
self._on_result(result)
|
|
|
|
def _run_step(self, step: dict) -> list[dict]:
|
|
results = []
|
|
step_name = step["name"]
|
|
|
|
for cmd in step["commands"]:
|
|
if cmd["type"] == "clear":
|
|
self.client.clear()
|
|
self._add(results, {"step": step_name, "check": "clear", "status": "PASS", "detail": "cleared"})
|
|
|
|
elif cmd["type"] == "send":
|
|
try:
|
|
self.client.send(cmd["text"], dashboard=cmd.get("dashboard"))
|
|
self._add(results, {"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "PASS",
|
|
"detail": f"response: {self.client.last_response[:80]}"})
|
|
except Exception as e:
|
|
self._add(results, {"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "FAIL",
|
|
"detail": str(e)})
|
|
|
|
elif cmd["type"] == "action":
|
|
try:
|
|
self.client.send_action(cmd["action"])
|
|
self._add(results, {"step": step_name, "check": f"action: {cmd['action']}", "status": "PASS",
|
|
"detail": f"response: {self.client.last_response[:80]}"})
|
|
except Exception as e:
|
|
self._add(results, {"step": step_name, "check": f"action: {cmd['action']}", "status": "FAIL",
|
|
"detail": str(e)})
|
|
|
|
elif cmd["type"] == "action_match":
|
|
# Find first button matching any pattern
|
|
patterns = cmd["patterns"]
|
|
matched = None
|
|
for pattern in patterns:
|
|
pat = pattern.lower()
|
|
for a in self.client.last_buttons:
|
|
action_str = a.get("action", "") or ""
|
|
label_str = a.get("label", "") or ""
|
|
if pat in action_str.lower() or pat in label_str.lower():
|
|
matched = a.get("action") or a.get("label", "")
|
|
break
|
|
if matched:
|
|
break
|
|
if matched:
|
|
try:
|
|
self.client.send_action(matched)
|
|
self._add(results, {"step": step_name, "check": f"action: {matched}", "status": "PASS",
|
|
"detail": f"response: {self.client.last_response[:80]}"})
|
|
except Exception as e:
|
|
self._add(results, {"step": step_name, "check": f"action: {matched}", "status": "FAIL",
|
|
"detail": str(e)})
|
|
else:
|
|
self._add(results, {"step": step_name, "check": f"action matching '{' or '.join(patterns)}'", "status": "FAIL",
|
|
"detail": f"no action matching '{' or '.join(patterns)}' in {[a.get('action') or a.get('label') for a in self.client.last_actions]}"})
|
|
|
|
elif cmd["type"] == "expect_response":
|
|
passed, detail = check_response(self.client.last_response, cmd["check"])
|
|
self._add(results, {"step": step_name, "check": f"response: {cmd['check']}",
|
|
"status": "PASS" if passed else "FAIL", "detail": detail})
|
|
|
|
elif cmd["type"] == "expect_actions":
|
|
passed, detail = check_actions(self.client.last_actions, cmd["check"])
|
|
self._add(results, {"step": step_name, "check": f"actions: {cmd['check']}",
|
|
"status": "PASS" if passed else "FAIL", "detail": detail})
|
|
|
|
elif cmd["type"] == "expect_state":
|
|
self.client.get_state()
|
|
passed, detail = check_state(self.client.last_memo, cmd["check"])
|
|
self._add(results, {"step": step_name, "check": f"state: {cmd['check']}",
|
|
"status": "PASS" if passed else "FAIL", "detail": detail})
|
|
|
|
elif cmd["type"] == "expect_trace":
|
|
passed, detail = check_trace(self.client.last_trace, cmd["check"])
|
|
self._add(results, {"step": step_name, "check": f"trace: {cmd['check']}",
|
|
"status": "PASS" if passed else "FAIL", "detail": detail})
|
|
|
|
return results
|
|
|
|
|
|
# --- Live status push ---
|
|
|
|
def _push_status(event: str, **kwargs):
|
|
"""Push test status to the API for frontend display."""
|
|
try:
|
|
httpx.post(f"{API}/test/status", json={"event": event, **kwargs},
|
|
headers=HEADERS, timeout=5)
|
|
except Exception:
|
|
pass # Don't fail tests if push fails
|
|
|
|
|
|
# --- Standalone runner ---
|
|
|
|
def run_standalone(paths: list[Path] = None):
|
|
if not paths:
|
|
paths = sorted(Path("testcases").glob("*.md"))
|
|
|
|
# Count total steps across all testcases for frontend progress
|
|
all_tcs = [parse_testcase(p) for p in paths]
|
|
total_steps = sum(len(s["commands"]) for tc in all_tcs for s in tc["steps"])
|
|
first_suite = True
|
|
|
|
all_results = {}
|
|
for tc in all_tcs:
|
|
path = tc["file"]
|
|
print(f"\n{'='*60}")
|
|
print(f" {tc['name']}")
|
|
print(f"{'='*60}")
|
|
if first_suite:
|
|
_push_status("suite_start", suite=tc["name"], count=total_steps)
|
|
first_suite = False
|
|
else:
|
|
_push_status("suite_start", suite=tc["name"])
|
|
|
|
suite_name = tc["name"]
|
|
|
|
def _on_result(r):
|
|
icon = "OK" if r["status"] == "PASS" else "FAIL" if r["status"] == "FAIL" else "SKIP"
|
|
print(f" {icon} [{r['step']}] {r['check']}")
|
|
if r["detail"]:
|
|
print(f" {r['detail']}")
|
|
_push_status("step_result", suite=suite_name, result=r)
|
|
|
|
runner = AssayTestRunner(on_result=_on_result)
|
|
results = runner.run(tc)
|
|
all_results[tc["name"]] = results
|
|
|
|
passed = sum(1 for r in results if r["status"] == "PASS")
|
|
failed = sum(1 for r in results if r["status"] == "FAIL")
|
|
print(f"\n {passed} passed, {failed} failed")
|
|
_push_status("suite_end", suite=tc["name"], passed=passed, failed=failed)
|
|
|
|
# Summary
|
|
print(f"\n{'='*60}")
|
|
total_pass = sum(sum(1 for r in results if r["status"] == "PASS") for results in all_results.values())
|
|
total_fail = sum(sum(1 for r in results if r["status"] == "FAIL") for results in all_results.values())
|
|
print(f" TOTAL: {total_pass} passed, {total_fail} failed")
|
|
print(f"{'='*60}")
|
|
|
|
# Write results JSON for web UI
|
|
output = {
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"testcases": {name: results for name, results in all_results.items()},
|
|
"summary": {"passed": total_pass, "failed": total_fail},
|
|
}
|
|
results_path = Path("testcases/results.json")
|
|
results_path.write_text(json.dumps(output, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
print(f"\n Results written to {results_path}")
|
|
|
|
return total_fail == 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
paths = [Path(p) for p in sys.argv[1:]] if len(sys.argv) > 1 else None
|
|
success = run_standalone(paths)
|
|
sys.exit(0 if success else 1)
|