""" Cog Runtime Test Runner — parses testcases/*.md and executes against live API. Usage: pytest testcases/ -v # run all testcases pytest testcases/counter_state.md -v # run one testcase python runtime_test.py # standalone run all python runtime_test.py testcases/pub_conversation.md # standalone run one """ import httpx import json import os import re import sys import time # Fix Windows console encoding if sys.platform == "win32": sys.stdout.reconfigure(encoding="utf-8", errors="replace") from pathlib import Path from dataclasses import dataclass, field API = "https://cog.loop42.de/api" TOKEN = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g" HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"} # --- Markdown parser --- def parse_testcase(path: Path) -> dict: """Parse a testcase markdown file into structured steps.""" text = path.read_text(encoding="utf-8") lines = text.split("\n") tc = {"name": "", "steps": [], "file": str(path)} current_step = None in_setup = False for line in lines: line_stripped = line.strip() # Title if line_stripped.startswith("# ") and not tc["name"]: tc["name"] = line_stripped[2:].strip() continue # Setup section if line_stripped == "## Setup": in_setup = True current_step = {"name": "Setup", "commands": []} continue # End setup on next ## or ### if line_stripped.startswith("## ") and in_setup: if current_step and current_step["commands"]: tc["steps"].insert(0, current_step) in_setup = False current_step = None # Step header if line_stripped.startswith("### "): if in_setup and current_step and current_step["commands"]: tc["steps"].insert(0, current_step) in_setup = False elif current_step and not in_setup: tc["steps"].append(current_step) step_text = line_stripped[4:].strip() m = re.match(r"\d+\.\s*(.*)", step_text) current_step = {"name": m.group(1) if m else step_text, "commands": []} continue # Commands within a step or setup if (current_step or in_setup) and line_stripped.startswith("- "): cmd_text = line_stripped[2:].strip() cmd = _parse_command(cmd_text) if cmd and current_step: current_step["commands"].append(cmd) if current_step: tc["steps"].append(current_step) return tc def _parse_command(text: str) -> dict | None: """Parse a single command line like 'send: hello' or 'expect_response: contains foo'.""" # send: message |dashboard| [json] # send: message (no dashboard) if text.startswith("send:"): val = text[5:].strip() if "|dashboard|" in val: parts = val.split("|dashboard|", 1) msg_text = parts[0].strip() try: dashboard = json.loads(parts[1].strip()) except (json.JSONDecodeError, Exception): dashboard = [] return {"type": "send", "text": msg_text, "dashboard": dashboard} return {"type": "send", "text": val} # action: action_name OR action: first matching "pattern" if text.startswith("action:"): val = text[7:].strip() m = re.match(r'first matching "(.+)"', val) if m: return {"type": "action_match", "pattern": m.group(1)} return {"type": "action", "action": val} # expect_response: contains "foo" if text.startswith("expect_response:"): return {"type": "expect_response", "check": text[16:].strip()} # expect_actions: length >= 2 if text.startswith("expect_actions:"): return {"type": "expect_actions", "check": text[15:].strip()} # expect_state: field contains/is "value" if text.startswith("expect_state:"): return {"type": "expect_state", "check": text[13:].strip()} # clear history if text == "clear history": return {"type": "clear"} # expect_trace: input.analysis.intent is "social" # expect_trace: has reflex_path # expect_trace: no thinker if text.startswith("expect_trace:"): return {"type": "expect_trace", "check": text[13:].strip()} return None # --- API client --- class CogClient: def __init__(self): self.client = httpx.Client(timeout=90) self.last_response = "" self.last_memo = {} self.last_actions = [] self.last_buttons = [] self.last_trace = [] def clear(self): self.client.post(f"{API}/clear", headers=HEADERS) time.sleep(0.3) def send(self, text: str, dashboard: list = None) -> dict: body = {"text": text} if dashboard is not None: body["dashboard"] = dashboard r = self.client.post(f"{API}/send", json=body, headers=HEADERS) d = r.json() # Async send: poll for result, match on message ID if d.get("status") == "queued": msg_id = d.get("id", "") for _ in range(120): time.sleep(0.5) pr = self.client.get(f"{API}/result", headers=HEADERS) pd = pr.json() if pd.get("id") == msg_id and pd.get("status") == "done": d = pd break if pd.get("id") == msg_id and pd.get("status") == "error": d = pd break self.last_response = d.get("response", "") self.last_memo = d.get("memorizer", {}) time.sleep(0.5) self._fetch_trace() return d def send_action(self, action: str) -> dict: # Actions go through /api/send as ACTION: format (since we can't use WS from test) return self.send(f"ACTION: {action}") def _fetch_trace(self): r = self.client.get(f"{API}/trace?last=20", headers=HEADERS) self.last_trace = r.json().get("lines", []) # Extract all controls from trace (buttons, tables, labels, displays) for t in self.last_trace: if t.get("event") == "controls": new_controls = t.get("controls", []) if new_controls: self.last_actions = new_controls self.last_buttons = [c for c in new_controls if c.get("type") == "button"] def get_state(self) -> dict: r = self.client.get(f"{API}/state", headers=HEADERS) d = r.json() self.last_memo = d.get("memorizer", {}) return self.last_memo def close(self): self.client.close() # --- Assertion engine --- def check_response(response: str, check: str) -> tuple[bool, str]: """Evaluate a response assertion. Returns (passed, detail).""" # contains "foo" or "bar" m = re.match(r'contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] for term in terms: if term.lower() in response.lower(): return True, f"found '{term}'" return False, f"none of {terms} found in: {response[:100]}" # matches regex m = re.match(r'matches\s+(.+)', check) if m: pattern = m.group(1).strip() if re.search(pattern, response): return True, f"matched /{pattern}/" return False, f"/{pattern}/ not found in: {response[:100]}" # not contains "foo" or "bar" m = re.match(r'not contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] for term in terms: if term.lower() in response.lower(): return False, f"found '{term}' but expected NOT to" return True, f"none of {terms} found (as expected)" # length > N m = re.match(r'length\s*>\s*(\d+)', check) if m: expected = int(m.group(1)) if len(response) > expected: return True, f"length {len(response)} > {expected}" return False, f"length {len(response)} <= {expected}" return False, f"unknown check: {check}" def check_actions(actions: list, check: str) -> tuple[bool, str]: """Evaluate an actions assertion.""" # length >= N m = re.match(r'length\s*>=\s*(\d+)', check) if m: expected = int(m.group(1)) if len(actions) >= expected: return True, f"{len(actions)} actions >= {expected}" return False, f"{len(actions)} actions < {expected}" # has table if check.strip() == "has table": for a in actions: if isinstance(a, dict) and a.get("type") == "table": cols = a.get("columns", []) rows = len(a.get("data", [])) return True, f"table found: {len(cols)} cols, {rows} rows" return False, f"no table in {len(actions)} controls" # any action contains "foo" or "bar" — searches buttons only m = re.match(r'any action contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] buttons = [a for a in actions if isinstance(a, dict) and a.get("type") == "button"] action_strs = [json.dumps(a).lower() for a in buttons] for term in terms: if any(term.lower() in s for s in action_strs): return True, f"found '{term}' in actions" return False, f"none of {terms} found in {len(buttons)} buttons" return False, f"unknown check: {check}" def check_state(memo: dict, check: str) -> tuple[bool, str]: """Evaluate a memorizer state assertion.""" # field contains "value" or "value2" m = re.match(r'(\w+)\s+contains\s+"?(.+?)"?\s*$', check) if m: field, values_str = m.group(1), m.group(2) terms = [t.strip().strip('"') for t in values_str.split(" or ")] actual = memo.get(field, "") if isinstance(actual, list): actual_str = " ".join(str(x) for x in actual) else: actual_str = str(actual) for term in terms: if term.lower() in actual_str.lower(): return True, f"{field}={actual_str[:50]} contains '{term}'" return False, f"{field}={actual_str[:50]} doesn't contain any of {terms}" # field is "value" or "value2" m = re.match(r'(\w+)\s+is\s+"?(.+?)"?\s*$', check) if m: field, values_str = m.group(1), m.group(2) terms = [t.strip().strip('"') for t in values_str.split(" or ")] actual = str(memo.get(field, "")) for term in terms: if actual.lower() == term.lower(): return True, f"{field}={actual}" return False, f"{field}={actual} not in {terms}" # facts any contains "value" m = re.match(r'facts\s+any\s+contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] facts = memo.get("facts", []) facts_str = " ".join(facts).lower() for term in terms: if term.lower() in facts_str: return True, f"found '{term}' in facts" return False, f"none of {terms} found in facts: {facts}" return False, f"unknown check: {check}" def check_trace(trace: list, check: str) -> tuple[bool, str]: """Evaluate a trace assertion. Checks HUD events from last request.""" # input.analysis.FIELD is "VALUE" m = re.match(r'input\.analysis\.(\w+)\s+is\s+"?(.+?)"?\s*$', check) if m: field, expected = m.group(1), m.group(2) terms = [t.strip().strip('"') for t in expected.split(" or ")] for t in trace: if t.get("node") == "input" and t.get("event") == "perceived": analysis = t.get("analysis", {}) actual = str(analysis.get(field, "")) for term in terms: if actual.lower() == term.lower(): return True, f"input.analysis.{field}={actual}" return False, f"input.analysis.{field}={actual}, expected one of {terms}" return False, f"no input perceived event in trace" # has tool_call TOOL_NAME — checks if Thinker called a specific function tool m = re.match(r'has\s+tool_call\s+(\w+)', check) if m: tool_name = m.group(1) for t in trace: # Check machine_created/destroyed/etc events that are emitted by UI node if t.get("event") in ("machine_created", "machine_destroyed", "machine_reset", "machine_state_added") and tool_name in t.get("event", ""): return True, f"found machine event for '{tool_name}'" # Check for the tool name in the event data if t.get("event") == "machine_created" and tool_name == "create_machine": return True, f"found create_machine via machine_created event" if t.get("event") == "machine_state_added" and tool_name == "add_state": return True, f"found add_state via machine_state_added event" if t.get("event") == "machine_reset" and tool_name == "reset_machine": return True, f"found reset_machine via machine_reset event" if t.get("event") == "machine_destroyed" and tool_name == "destroy_machine": return True, f"found destroy_machine via machine_destroyed event" return False, f"no tool_call '{tool_name}' in trace" # machine_created id="NAV" — checks for specific machine creation m = re.match(r'machine_created\s+id="(\w+)"', check) if m: expected_id = m.group(1) for t in trace: if t.get("event") == "machine_created" and t.get("id") == expected_id: return True, f"machine '{expected_id}' created" return False, f"no machine_created event with id='{expected_id}'" # has EVENT_NAME m = re.match(r'has\s+(\w+)', check) if m: event_name = m.group(1) for t in trace: if t.get("event") == event_name: return True, f"found event '{event_name}'" return False, f"no '{event_name}' event in trace" # no EVENT_NAME m = re.match(r'no\s+(\w+)', check) if m: event_name = m.group(1) for t in trace: if t.get("event") == event_name: return False, f"found unexpected event '{event_name}'" return True, f"no '{event_name}' event (as expected)" return False, f"unknown trace check: {check}" # --- Runner --- @dataclass class StepResult: step: str check: str status: str # PASS, FAIL, SKIP detail: str = "" class CogTestRunner: def __init__(self): self.client = CogClient() def run(self, testcase: dict) -> list[dict]: results = [] for step in testcase["steps"]: step_results = self._run_step(step) results.extend(step_results) self.client.close() return results def _run_step(self, step: dict) -> list[dict]: results = [] step_name = step["name"] for cmd in step["commands"]: if cmd["type"] == "clear": self.client.clear() results.append({"step": step_name, "check": "clear", "status": "PASS", "detail": "cleared"}) elif cmd["type"] == "send": try: self.client.send(cmd["text"], dashboard=cmd.get("dashboard")) results.append({"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "PASS", "detail": f"response: {self.client.last_response[:80]}"}) except Exception as e: results.append({"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "FAIL", "detail": str(e)}) elif cmd["type"] == "action": try: self.client.send_action(cmd["action"]) results.append({"step": step_name, "check": f"action: {cmd['action']}", "status": "PASS", "detail": f"response: {self.client.last_response[:80]}"}) except Exception as e: results.append({"step": step_name, "check": f"action: {cmd['action']}", "status": "FAIL", "detail": str(e)}) elif cmd["type"] == "action_match": # Find first button matching pattern pattern = cmd["pattern"].lower() matched = None for a in self.client.last_buttons: if pattern in a.get("action", "").lower() or pattern in a.get("label", "").lower(): matched = a["action"] break if matched: try: self.client.send_action(matched) results.append({"step": step_name, "check": f"action: {matched}", "status": "PASS", "detail": f"response: {self.client.last_response[:80]}"}) except Exception as e: results.append({"step": step_name, "check": f"action: {matched}", "status": "FAIL", "detail": str(e)}) else: results.append({"step": step_name, "check": f"action matching '{pattern}'", "status": "FAIL", "detail": f"no action matching '{pattern}' in {[a.get('action') for a in self.client.last_actions]}"}) elif cmd["type"] == "expect_response": passed, detail = check_response(self.client.last_response, cmd["check"]) results.append({"step": step_name, "check": f"response: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) elif cmd["type"] == "expect_actions": passed, detail = check_actions(self.client.last_actions, cmd["check"]) results.append({"step": step_name, "check": f"actions: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) elif cmd["type"] == "expect_state": self.client.get_state() passed, detail = check_state(self.client.last_memo, cmd["check"]) results.append({"step": step_name, "check": f"state: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) elif cmd["type"] == "expect_trace": passed, detail = check_trace(self.client.last_trace, cmd["check"]) results.append({"step": step_name, "check": f"trace: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) return results # --- Live status push --- def _push_status(event: str, **kwargs): """Push test status to the API for frontend display.""" try: httpx.post(f"{API}/test/status", json={"event": event, **kwargs}, headers=HEADERS, timeout=5) except Exception: pass # Don't fail tests if push fails # --- Standalone runner --- def run_standalone(paths: list[Path] = None): if not paths: paths = sorted(Path("testcases").glob("*.md")) # Count total steps across all testcases for frontend progress all_tcs = [parse_testcase(p) for p in paths] total_steps = sum(len(s["commands"]) for tc in all_tcs for s in tc["steps"]) first_suite = True all_results = {} for tc in all_tcs: path = tc["file"] print(f"\n{'='*60}") print(f" {tc['name']}") print(f"{'='*60}") if first_suite: _push_status("suite_start", suite=tc["name"], count=total_steps) first_suite = False else: _push_status("suite_start", suite=tc["name"]) runner = CogTestRunner() results = runner.run(tc) all_results[tc["name"]] = results for r in results: icon = "OK" if r["status"] == "PASS" else "FAIL" if r["status"] == "FAIL" else "SKIP" print(f" {icon} [{r['step']}] {r['check']}") if r["detail"]: print(f" {r['detail']}") _push_status("step_result", suite=tc["name"], result=r) passed = sum(1 for r in results if r["status"] == "PASS") failed = sum(1 for r in results if r["status"] == "FAIL") print(f"\n {passed} passed, {failed} failed") _push_status("suite_end", suite=tc["name"], passed=passed, failed=failed) # Summary print(f"\n{'='*60}") total_pass = sum(sum(1 for r in results if r["status"] == "PASS") for results in all_results.values()) total_fail = sum(sum(1 for r in results if r["status"] == "FAIL") for results in all_results.values()) print(f" TOTAL: {total_pass} passed, {total_fail} failed") print(f"{'='*60}") # Write results JSON for web UI output = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "testcases": {name: results for name, results in all_results.items()}, "summary": {"passed": total_pass, "failed": total_fail}, } results_path = Path("testcases/results.json") results_path.write_text(json.dumps(output, indent=2, ensure_ascii=False), encoding="utf-8") print(f"\n Results written to {results_path}") return total_fail == 0 if __name__ == "__main__": paths = [Path(p) for p in sys.argv[1:]] if len(sys.argv) > 1 else None success = run_standalone(paths) sys.exit(0 if success else 1)