agent-runtime/runtime_test.py
Nico bf11312b4b Rename cog -> assay across codebase
- Rename files: cog_cli.py, test_cog.py, k8s/cog-*.yaml
- Update all Python tool names: cog_* -> assay_*
- Update FastAPI titles, MCP server names, URLs
- Update K8s manifests: deployments, services, secrets, ingress
- Update Docker env vars: COG_API -> ASSAY_API
- Domain: cog.loop42.de -> assay.loop42.de
- SQLite path: /tmp/cog_db.sqlite -> /tmp/assay_db.sqlite

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 01:39:05 +02:00

604 lines
25 KiB
Python

"""
Assay Runtime Test Runner — parses testcases/*.md and executes against live API.
Usage:
pytest testcases/ -v # run all testcases
pytest testcases/counter_state.md -v # run one testcase
python runtime_test.py # standalone run all
python runtime_test.py testcases/pub_conversation.md # standalone run one
"""
import httpx
import json
import os
import re
import sys
import time
# Fix Windows console encoding
if sys.platform == "win32":
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
from pathlib import Path
from dataclasses import dataclass, field
API = os.environ.get("ASSAY_API", "https://assay.loop42.de/api")
TOKEN = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g"
HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
# --- Markdown parser ---
def parse_testcase(path: Path) -> dict:
"""Parse a testcase markdown file into structured steps."""
text = path.read_text(encoding="utf-8")
lines = text.split("\n")
tc = {"name": "", "steps": [], "file": str(path)}
current_step = None
in_setup = False
for line in lines:
line_stripped = line.strip()
# Title
if line_stripped.startswith("# ") and not tc["name"]:
tc["name"] = line_stripped[2:].strip()
continue
# Setup section
if line_stripped == "## Setup":
in_setup = True
current_step = {"name": "Setup", "commands": []}
continue
# End setup on next ## or ###
if line_stripped.startswith("## ") and in_setup:
if current_step and current_step["commands"]:
tc["steps"].insert(0, current_step)
in_setup = False
current_step = None
# Step header
if line_stripped.startswith("### "):
if in_setup and current_step and current_step["commands"]:
tc["steps"].insert(0, current_step)
in_setup = False
elif current_step and not in_setup:
tc["steps"].append(current_step)
step_text = line_stripped[4:].strip()
m = re.match(r"\d+\.\s*(.*)", step_text)
current_step = {"name": m.group(1) if m else step_text, "commands": []}
continue
# Commands within a step or setup
if (current_step or in_setup) and line_stripped.startswith("- "):
cmd_text = line_stripped[2:].strip()
cmd = _parse_command(cmd_text)
if cmd and current_step:
current_step["commands"].append(cmd)
if current_step:
tc["steps"].append(current_step)
return tc
def _parse_command(text: str) -> dict | None:
"""Parse a single command line like 'send: hello' or 'expect_response: contains foo'."""
# send: message |dashboard| [json]
# send: message (no dashboard)
if text.startswith("send:"):
val = text[5:].strip()
if "|dashboard|" in val:
parts = val.split("|dashboard|", 1)
msg_text = parts[0].strip()
try:
dashboard = json.loads(parts[1].strip())
except (json.JSONDecodeError, Exception):
dashboard = []
return {"type": "send", "text": msg_text, "dashboard": dashboard}
return {"type": "send", "text": val}
# action: action_name OR action: first matching "pattern" or "pattern2"
if text.startswith("action:"):
val = text[7:].strip()
m = re.match(r'first matching (.+)', val)
if m:
# Support: first matching "+1" or "inc" or "plus"
patterns = [p.strip().strip('"') for p in m.group(1).split(" or ")]
return {"type": "action_match", "patterns": patterns}
return {"type": "action", "action": val}
# expect_response: contains "foo"
if text.startswith("expect_response:"):
return {"type": "expect_response", "check": text[16:].strip()}
# expect_actions: length >= 2
if text.startswith("expect_actions:"):
return {"type": "expect_actions", "check": text[15:].strip()}
# expect_state: field contains/is "value"
if text.startswith("expect_state:"):
return {"type": "expect_state", "check": text[13:].strip()}
# clear history
if text == "clear history":
return {"type": "clear"}
# expect_trace: input.analysis.intent is "social"
# expect_trace: has reflex_path
# expect_trace: no thinker
if text.startswith("expect_trace:"):
return {"type": "expect_trace", "check": text[13:].strip()}
return None
# --- API client ---
class AssayClient:
def __init__(self):
self.client = httpx.Client(timeout=90)
self.last_response = ""
self.last_memo = {}
self.last_actions = []
self.last_buttons = []
self.last_trace = []
def clear(self):
self.client.post(f"{API}/clear", headers=HEADERS)
time.sleep(0.3)
def send(self, text: str, dashboard: list = None) -> dict:
body = {"text": text}
if dashboard is not None:
body["dashboard"] = dashboard
r = self.client.post(f"{API}/send", json=body, headers=HEADERS)
d = r.json()
# Async send: poll for result, match on message ID
if d.get("status") == "queued":
msg_id = d.get("id", "")
for _ in range(120):
time.sleep(0.5)
pr = self.client.get(f"{API}/result", headers=HEADERS)
pd = pr.json()
if pd.get("id") == msg_id and pd.get("status") == "done":
d = pd
break
if pd.get("id") == msg_id and pd.get("status") == "error":
d = pd
break
resp = d.get("response", "")
self.last_response = resp if isinstance(resp, str) else str(resp)
self.last_memo = d.get("memorizer", {})
if not isinstance(self.last_memo, dict):
self.last_memo = {}
time.sleep(0.5)
self._fetch_trace()
return d
def send_action(self, action: str) -> dict:
# Actions go through /api/send as ACTION: format (since we can't use WS from test)
return self.send(f"ACTION: {action}")
def _fetch_trace(self):
r = self.client.get(f"{API}/trace?last=40", headers=HEADERS)
self.last_trace = r.json().get("lines", [])
# Extract controls from the most recent controls HUD event
for t in reversed(self.last_trace):
if t.get("event") == "controls":
new_controls = t.get("controls", [])
if new_controls:
self.last_actions = new_controls
self.last_buttons = [c for c in new_controls
if isinstance(c, dict) and c.get("type") == "button"]
break
def get_state(self) -> dict:
r = self.client.get(f"{API}/state", headers=HEADERS)
d = r.json()
self.last_memo = d.get("memorizer", {})
return self.last_memo
def close(self):
self.client.close()
# --- Assertion engine ---
def check_response(response: str, check: str) -> tuple[bool, str]:
"""Evaluate a response assertion. Returns (passed, detail)."""
# contains "foo" or "bar"
m = re.match(r'contains\s+"?(.+?)"?\s*$', check)
if m:
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
for term in terms:
if term.lower() in response.lower():
return True, f"found '{term}'"
return False, f"none of {terms} found in: {response[:100]}"
# matches regex
m = re.match(r'matches\s+(.+)', check)
if m:
pattern = m.group(1).strip()
if re.search(pattern, response):
return True, f"matched /{pattern}/"
return False, f"/{pattern}/ not found in: {response[:100]}"
# not contains "foo" or "bar"
m = re.match(r'not contains\s+"?(.+?)"?\s*$', check)
if m:
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
for term in terms:
if term.lower() in response.lower():
return False, f"found '{term}' but expected NOT to"
return True, f"none of {terms} found (as expected)"
# length > N
m = re.match(r'length\s*>\s*(\d+)', check)
if m:
expected = int(m.group(1))
if len(response) > expected:
return True, f"length {len(response)} > {expected}"
return False, f"length {len(response)} <= {expected}"
return False, f"unknown check: {check}"
def check_actions(actions: list, check: str) -> tuple[bool, str]:
"""Evaluate an actions assertion."""
# length >= N
m = re.match(r'length\s*>=\s*(\d+)', check)
if m:
expected = int(m.group(1))
if len(actions) >= expected:
return True, f"{len(actions)} actions >= {expected}"
return False, f"{len(actions)} actions < {expected}"
# has TYPE or has TYPE1 or TYPE2
m = re.match(r'has\s+(.+)', check)
if m:
types = [t.strip() for t in m.group(1).split(" or has ")]
# Also handle "card or has table" → ["card", "table"]
types = [t.replace("has ", "") for t in types]
for a in actions:
if isinstance(a, dict) and a.get("type") in types:
atype = a.get("type")
if atype == "table":
return True, f"table found: {len(a.get('columns', []))} cols, {len(a.get('data', []))} rows"
elif atype == "card":
return True, f"card found: {a.get('title', '?')}, {len(a.get('fields', []))} fields"
elif atype == "list":
return True, f"list found: {a.get('title', '?')}, {len(a.get('items', []))} items"
else:
return True, f"{atype} found"
return False, f"no {' or '.join(types)} in {len(actions)} controls ({[a.get('type','?') for a in actions if isinstance(a, dict)]})"
# any action contains "foo" or "bar" — searches buttons only
m = re.match(r'any action contains\s+"?(.+?)"?\s*$', check)
if m:
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
buttons = [a for a in actions if isinstance(a, dict) and a.get("type") == "button"]
action_strs = [json.dumps(a).lower() for a in buttons]
for term in terms:
if any(term.lower() in s for s in action_strs):
return True, f"found '{term}' in actions"
return False, f"none of {terms} found in {len(buttons)} buttons"
return False, f"unknown check: {check}"
def check_state(memo: dict, check: str) -> tuple[bool, str]:
"""Evaluate a memorizer state assertion."""
# field contains "value" or "value2"
m = re.match(r'(\w+)\s+contains\s+"?(.+?)"?\s*$', check)
if m:
field, values_str = m.group(1), m.group(2)
terms = [t.strip().strip('"') for t in values_str.split(" or ")]
actual = memo.get(field, "")
if isinstance(actual, list):
actual_str = " ".join(str(x) for x in actual)
else:
actual_str = str(actual)
for term in terms:
if term.lower() in actual_str.lower():
return True, f"{field}={actual_str[:50]} contains '{term}'"
return False, f"{field}={actual_str[:50]} doesn't contain any of {terms}"
# field is "value" or "value2"
m = re.match(r'(\w+)\s+is\s+"?(.+?)"?\s*$', check)
if m:
field, values_str = m.group(1), m.group(2)
terms = [t.strip().strip('"') for t in values_str.split(" or ")]
actual = str(memo.get(field, ""))
for term in terms:
if actual.lower() == term.lower():
return True, f"{field}={actual}"
return False, f"{field}={actual} not in {terms}"
# facts any contains "value"
m = re.match(r'facts\s+any\s+contains\s+"?(.+?)"?\s*$', check)
if m:
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
facts = memo.get("facts", [])
facts_str = " ".join(facts).lower()
for term in terms:
if term.lower() in facts_str:
return True, f"found '{term}' in facts"
return False, f"none of {terms} found in facts: {facts}"
return False, f"unknown check: {check}"
def check_trace(trace: list, check: str) -> tuple[bool, str]:
"""Evaluate a trace assertion. Checks HUD events from last request."""
# input.analysis.FIELD is "VALUE"
m = re.match(r'input\.analysis\.(\w+)\s+is\s+"?(.+?)"?\s*$', check)
if m:
field, expected = m.group(1), m.group(2)
terms = [t.strip().strip('"') for t in expected.split(" or ")]
# Method 1: parse from LAST frame_trace event (v3 frame engine, most reliable)
for t in reversed(trace):
if t.get("event") == "frame_trace" and t.get("trace"):
frames = t["trace"].get("frames", [])
for fr in frames:
if fr.get("node") == "input" and fr.get("output"):
out = fr["output"]
for part in out.split():
if "=" in part:
k, v = part.split("=", 1)
if k == field:
for term in terms:
if v.lower() == term.lower():
return True, f"input.analysis.{field}={v} (from frame_trace)"
return False, f"input.analysis.{field}={v}, expected one of {terms}"
break # only check the most recent frame_trace
# Method 2: fallback to input node's "perceived" HUD event (v1/v2)
for t in reversed(trace):
if t.get("node") == "input" and t.get("event") == "perceived":
analysis = t.get("analysis", {})
actual = str(analysis.get(field, ""))
for term in terms:
if actual.lower() == term.lower():
return True, f"input.analysis.{field}={actual}"
return False, f"input.analysis.{field}={actual}, expected one of {terms}"
return False, f"no input perceived event in trace"
# has tool_call TOOL_NAME — checks if Thinker called a specific function tool
m = re.match(r'has\s+tool_call\s+(\w+)', check)
if m:
tool_name = m.group(1)
for t in trace:
# Check machine_created/destroyed/etc events that are emitted by UI node
if t.get("event") in ("machine_created", "machine_destroyed", "machine_reset",
"machine_state_added") and tool_name in t.get("event", ""):
return True, f"found machine event for '{tool_name}'"
# Check for the tool name in the event data
if t.get("event") == "machine_created" and tool_name == "create_machine":
return True, f"found create_machine via machine_created event"
if t.get("event") == "machine_state_added" and tool_name == "add_state":
return True, f"found add_state via machine_state_added event"
if t.get("event") == "machine_reset" and tool_name == "reset_machine":
return True, f"found reset_machine via machine_reset event"
if t.get("event") == "machine_destroyed" and tool_name == "destroy_machine":
return True, f"found destroy_machine via machine_destroyed event"
if t.get("event") == "machine_data_updated" and tool_name == "update_machine":
return True, f"found update_machine via machine_data_updated event"
if t.get("event") == "machine_transitioned" and tool_name == "transition_machine":
return True, f"found transition_machine via machine_transitioned event"
if t.get("event") == "pa_retry" and tool_name == "pa_retry":
return True, f"found pa_retry event"
return False, f"no tool_call '{tool_name}' in trace"
# machine_created id="NAV" — checks for specific machine creation
m = re.match(r'machine_created\s+id="(\w+)"', check)
if m:
expected_id = m.group(1)
for t in trace:
if t.get("event") == "machine_created" and t.get("id") == expected_id:
return True, f"machine '{expected_id}' created"
return False, f"no machine_created event with id='{expected_id}'"
# has EVENT_NAME or EVENT_NAME2 ...
m = re.match(r'has\s+([\w\s]+(?:\s+or\s+\w+)*)', check)
if m and not re.match(r'has\s+tool_call\s+\w+', check):
names = [n.strip() for n in re.split(r'\s+or\s+', m.group(1))]
for t in trace:
if t.get("event") in names:
return True, f"found event '{t.get('event')}'"
return False, f"no '{' or '.join(names)}' event in trace"
# no EVENT_NAME
m = re.match(r'no\s+(\w+)', check)
if m:
event_name = m.group(1)
for t in trace:
if t.get("event") == event_name:
return False, f"found unexpected event '{event_name}'"
return True, f"no '{event_name}' event (as expected)"
return False, f"unknown trace check: {check}"
# --- Runner ---
@dataclass
class StepResult:
step: str
check: str
status: str # PASS, FAIL, SKIP
detail: str = ""
class AssayTestRunner:
def __init__(self, on_result=None):
self.client = AssayClient()
self._on_result = on_result # callback(result_dict) per check
def run(self, testcase: dict) -> list[dict]:
results = []
for step in testcase["steps"]:
step_results = self._run_step(step)
results.extend(step_results)
self.client.close()
return results
def _add(self, results: list, result: dict):
results.append(result)
if self._on_result:
self._on_result(result)
def _run_step(self, step: dict) -> list[dict]:
results = []
step_name = step["name"]
for cmd in step["commands"]:
if cmd["type"] == "clear":
self.client.clear()
self._add(results, {"step": step_name, "check": "clear", "status": "PASS", "detail": "cleared"})
elif cmd["type"] == "send":
try:
self.client.send(cmd["text"], dashboard=cmd.get("dashboard"))
self._add(results, {"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "PASS",
"detail": f"response: {self.client.last_response[:80]}"})
except Exception as e:
self._add(results, {"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "FAIL",
"detail": str(e)})
elif cmd["type"] == "action":
try:
self.client.send_action(cmd["action"])
self._add(results, {"step": step_name, "check": f"action: {cmd['action']}", "status": "PASS",
"detail": f"response: {self.client.last_response[:80]}"})
except Exception as e:
self._add(results, {"step": step_name, "check": f"action: {cmd['action']}", "status": "FAIL",
"detail": str(e)})
elif cmd["type"] == "action_match":
# Find first button matching any pattern
patterns = cmd["patterns"]
matched = None
for pattern in patterns:
pat = pattern.lower()
for a in self.client.last_buttons:
action_str = a.get("action", "") or ""
label_str = a.get("label", "") or ""
if pat in action_str.lower() or pat in label_str.lower():
matched = a.get("action") or a.get("label", "")
break
if matched:
break
if matched:
try:
self.client.send_action(matched)
self._add(results, {"step": step_name, "check": f"action: {matched}", "status": "PASS",
"detail": f"response: {self.client.last_response[:80]}"})
except Exception as e:
self._add(results, {"step": step_name, "check": f"action: {matched}", "status": "FAIL",
"detail": str(e)})
else:
self._add(results, {"step": step_name, "check": f"action matching '{' or '.join(patterns)}'", "status": "FAIL",
"detail": f"no action matching '{' or '.join(patterns)}' in {[a.get('action') or a.get('label') for a in self.client.last_actions]}"})
elif cmd["type"] == "expect_response":
passed, detail = check_response(self.client.last_response, cmd["check"])
self._add(results, {"step": step_name, "check": f"response: {cmd['check']}",
"status": "PASS" if passed else "FAIL", "detail": detail})
elif cmd["type"] == "expect_actions":
passed, detail = check_actions(self.client.last_actions, cmd["check"])
self._add(results, {"step": step_name, "check": f"actions: {cmd['check']}",
"status": "PASS" if passed else "FAIL", "detail": detail})
elif cmd["type"] == "expect_state":
self.client.get_state()
passed, detail = check_state(self.client.last_memo, cmd["check"])
self._add(results, {"step": step_name, "check": f"state: {cmd['check']}",
"status": "PASS" if passed else "FAIL", "detail": detail})
elif cmd["type"] == "expect_trace":
passed, detail = check_trace(self.client.last_trace, cmd["check"])
self._add(results, {"step": step_name, "check": f"trace: {cmd['check']}",
"status": "PASS" if passed else "FAIL", "detail": detail})
return results
# --- Live status push ---
def _push_status(event: str, **kwargs):
"""Push test status to the API for frontend display."""
try:
httpx.post(f"{API}/test/status", json={"event": event, **kwargs},
headers=HEADERS, timeout=5)
except Exception:
pass # Don't fail tests if push fails
# --- Standalone runner ---
def run_standalone(paths: list[Path] = None):
if not paths:
paths = sorted(Path("testcases").glob("*.md"))
# Count total steps across all testcases for frontend progress
all_tcs = [parse_testcase(p) for p in paths]
total_steps = sum(len(s["commands"]) for tc in all_tcs for s in tc["steps"])
first_suite = True
all_results = {}
for tc in all_tcs:
path = tc["file"]
print(f"\n{'='*60}")
print(f" {tc['name']}")
print(f"{'='*60}")
if first_suite:
_push_status("suite_start", suite=tc["name"], count=total_steps)
first_suite = False
else:
_push_status("suite_start", suite=tc["name"])
suite_name = tc["name"]
def _on_result(r):
icon = "OK" if r["status"] == "PASS" else "FAIL" if r["status"] == "FAIL" else "SKIP"
print(f" {icon} [{r['step']}] {r['check']}")
if r["detail"]:
print(f" {r['detail']}")
_push_status("step_result", suite=suite_name, result=r)
runner = AssayTestRunner(on_result=_on_result)
results = runner.run(tc)
all_results[tc["name"]] = results
passed = sum(1 for r in results if r["status"] == "PASS")
failed = sum(1 for r in results if r["status"] == "FAIL")
print(f"\n {passed} passed, {failed} failed")
_push_status("suite_end", suite=tc["name"], passed=passed, failed=failed)
# Summary
print(f"\n{'='*60}")
total_pass = sum(sum(1 for r in results if r["status"] == "PASS") for results in all_results.values())
total_fail = sum(sum(1 for r in results if r["status"] == "FAIL") for results in all_results.values())
print(f" TOTAL: {total_pass} passed, {total_fail} failed")
print(f"{'='*60}")
# Write results JSON for web UI
output = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"testcases": {name: results for name, results in all_results.items()},
"summary": {"passed": total_pass, "failed": total_fail},
}
results_path = Path("testcases/results.json")
results_path.write_text(json.dumps(output, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"\n Results written to {results_path}")
return total_fail == 0
if __name__ == "__main__":
paths = [Path(p) for p in sys.argv[1:]] if len(sys.argv) > 1 else None
success = run_standalone(paths)
sys.exit(0 if success else 1)