""" Cog Runtime Test Runner — parses testcases/*.md and executes against live API. Usage: pytest testcases/ -v # run all testcases pytest testcases/counter_state.md -v # run one testcase python runtime_test.py # standalone run all python runtime_test.py testcases/pub_conversation.md # standalone run one """ import httpx import json import os import re import sys import time # Fix Windows console encoding if sys.platform == "win32": sys.stdout.reconfigure(encoding="utf-8", errors="replace") from pathlib import Path from dataclasses import dataclass, field API = "https://cog.loop42.de/api" TOKEN = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g" HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"} # --- Markdown parser --- def parse_testcase(path: Path) -> dict: """Parse a testcase markdown file into structured steps.""" text = path.read_text(encoding="utf-8") lines = text.split("\n") tc = {"name": "", "steps": [], "file": str(path)} current_step = None in_setup = False for line in lines: line_stripped = line.strip() # Title if line_stripped.startswith("# ") and not tc["name"]: tc["name"] = line_stripped[2:].strip() continue # Setup section if line_stripped == "## Setup": in_setup = True current_step = {"name": "Setup", "commands": []} continue # End setup on next ## or ### if line_stripped.startswith("## ") and in_setup: if current_step and current_step["commands"]: tc["steps"].insert(0, current_step) in_setup = False current_step = None # Step header if line_stripped.startswith("### "): if in_setup and current_step and current_step["commands"]: tc["steps"].insert(0, current_step) in_setup = False elif current_step and not in_setup: tc["steps"].append(current_step) step_text = line_stripped[4:].strip() m = re.match(r"\d+\.\s*(.*)", step_text) current_step = {"name": m.group(1) if m else step_text, "commands": []} continue # Commands within a step or setup if (current_step or in_setup) and line_stripped.startswith("- "): cmd_text = line_stripped[2:].strip() cmd = _parse_command(cmd_text) if cmd and current_step: current_step["commands"].append(cmd) if current_step: tc["steps"].append(current_step) return tc def _parse_command(text: str) -> dict | None: """Parse a single command line like 'send: hello' or 'expect_response: contains foo'.""" # send: message |dashboard| [json] # send: message (no dashboard) if text.startswith("send:"): val = text[5:].strip() if "|dashboard|" in val: parts = val.split("|dashboard|", 1) msg_text = parts[0].strip() try: dashboard = json.loads(parts[1].strip()) except (json.JSONDecodeError, Exception): dashboard = [] return {"type": "send", "text": msg_text, "dashboard": dashboard} return {"type": "send", "text": val} # action: action_name OR action: first matching "pattern" or "pattern2" if text.startswith("action:"): val = text[7:].strip() m = re.match(r'first matching (.+)', val) if m: # Support: first matching "+1" or "inc" or "plus" patterns = [p.strip().strip('"') for p in m.group(1).split(" or ")] return {"type": "action_match", "patterns": patterns} return {"type": "action", "action": val} # expect_response: contains "foo" if text.startswith("expect_response:"): return {"type": "expect_response", "check": text[16:].strip()} # expect_actions: length >= 2 if text.startswith("expect_actions:"): return {"type": "expect_actions", "check": text[15:].strip()} # expect_state: field contains/is "value" if text.startswith("expect_state:"): return {"type": "expect_state", "check": text[13:].strip()} # clear history if text == "clear history": return {"type": "clear"} # expect_trace: input.analysis.intent is "social" # expect_trace: has reflex_path # expect_trace: no thinker if text.startswith("expect_trace:"): return {"type": "expect_trace", "check": text[13:].strip()} return None # --- API client --- class CogClient: def __init__(self): self.client = httpx.Client(timeout=90) self.last_response = "" self.last_memo = {} self.last_actions = [] self.last_buttons = [] self.last_trace = [] def clear(self): self.client.post(f"{API}/clear", headers=HEADERS) time.sleep(0.3) def send(self, text: str, dashboard: list = None) -> dict: body = {"text": text} if dashboard is not None: body["dashboard"] = dashboard r = self.client.post(f"{API}/send", json=body, headers=HEADERS) d = r.json() # Async send: poll for result, match on message ID if d.get("status") == "queued": msg_id = d.get("id", "") for _ in range(120): time.sleep(0.5) pr = self.client.get(f"{API}/result", headers=HEADERS) pd = pr.json() if pd.get("id") == msg_id and pd.get("status") == "done": d = pd break if pd.get("id") == msg_id and pd.get("status") == "error": d = pd break resp = d.get("response", "") self.last_response = resp if isinstance(resp, str) else str(resp) self.last_memo = d.get("memorizer", {}) if not isinstance(self.last_memo, dict): self.last_memo = {} time.sleep(0.5) self._fetch_trace() return d def send_action(self, action: str) -> dict: # Actions go through /api/send as ACTION: format (since we can't use WS from test) return self.send(f"ACTION: {action}") def _fetch_trace(self): r = self.client.get(f"{API}/trace?last=40", headers=HEADERS) self.last_trace = r.json().get("lines", []) # Extract controls from the most recent controls HUD event for t in reversed(self.last_trace): if t.get("event") == "controls": new_controls = t.get("controls", []) if new_controls: self.last_actions = new_controls self.last_buttons = [c for c in new_controls if isinstance(c, dict) and c.get("type") == "button"] break def get_state(self) -> dict: r = self.client.get(f"{API}/state", headers=HEADERS) d = r.json() self.last_memo = d.get("memorizer", {}) return self.last_memo def close(self): self.client.close() # --- Assertion engine --- def check_response(response: str, check: str) -> tuple[bool, str]: """Evaluate a response assertion. Returns (passed, detail).""" # contains "foo" or "bar" m = re.match(r'contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] for term in terms: if term.lower() in response.lower(): return True, f"found '{term}'" return False, f"none of {terms} found in: {response[:100]}" # matches regex m = re.match(r'matches\s+(.+)', check) if m: pattern = m.group(1).strip() if re.search(pattern, response): return True, f"matched /{pattern}/" return False, f"/{pattern}/ not found in: {response[:100]}" # not contains "foo" or "bar" m = re.match(r'not contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] for term in terms: if term.lower() in response.lower(): return False, f"found '{term}' but expected NOT to" return True, f"none of {terms} found (as expected)" # length > N m = re.match(r'length\s*>\s*(\d+)', check) if m: expected = int(m.group(1)) if len(response) > expected: return True, f"length {len(response)} > {expected}" return False, f"length {len(response)} <= {expected}" return False, f"unknown check: {check}" def check_actions(actions: list, check: str) -> tuple[bool, str]: """Evaluate an actions assertion.""" # length >= N m = re.match(r'length\s*>=\s*(\d+)', check) if m: expected = int(m.group(1)) if len(actions) >= expected: return True, f"{len(actions)} actions >= {expected}" return False, f"{len(actions)} actions < {expected}" # has TYPE or has TYPE1 or TYPE2 m = re.match(r'has\s+(.+)', check) if m: types = [t.strip() for t in m.group(1).split(" or has ")] # Also handle "card or has table" → ["card", "table"] types = [t.replace("has ", "") for t in types] for a in actions: if isinstance(a, dict) and a.get("type") in types: atype = a.get("type") if atype == "table": return True, f"table found: {len(a.get('columns', []))} cols, {len(a.get('data', []))} rows" elif atype == "card": return True, f"card found: {a.get('title', '?')}, {len(a.get('fields', []))} fields" elif atype == "list": return True, f"list found: {a.get('title', '?')}, {len(a.get('items', []))} items" else: return True, f"{atype} found" return False, f"no {' or '.join(types)} in {len(actions)} controls ({[a.get('type','?') for a in actions if isinstance(a, dict)]})" # any action contains "foo" or "bar" — searches buttons only m = re.match(r'any action contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] buttons = [a for a in actions if isinstance(a, dict) and a.get("type") == "button"] action_strs = [json.dumps(a).lower() for a in buttons] for term in terms: if any(term.lower() in s for s in action_strs): return True, f"found '{term}' in actions" return False, f"none of {terms} found in {len(buttons)} buttons" return False, f"unknown check: {check}" def check_state(memo: dict, check: str) -> tuple[bool, str]: """Evaluate a memorizer state assertion.""" # field contains "value" or "value2" m = re.match(r'(\w+)\s+contains\s+"?(.+?)"?\s*$', check) if m: field, values_str = m.group(1), m.group(2) terms = [t.strip().strip('"') for t in values_str.split(" or ")] actual = memo.get(field, "") if isinstance(actual, list): actual_str = " ".join(str(x) for x in actual) else: actual_str = str(actual) for term in terms: if term.lower() in actual_str.lower(): return True, f"{field}={actual_str[:50]} contains '{term}'" return False, f"{field}={actual_str[:50]} doesn't contain any of {terms}" # field is "value" or "value2" m = re.match(r'(\w+)\s+is\s+"?(.+?)"?\s*$', check) if m: field, values_str = m.group(1), m.group(2) terms = [t.strip().strip('"') for t in values_str.split(" or ")] actual = str(memo.get(field, "")) for term in terms: if actual.lower() == term.lower(): return True, f"{field}={actual}" return False, f"{field}={actual} not in {terms}" # facts any contains "value" m = re.match(r'facts\s+any\s+contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] facts = memo.get("facts", []) facts_str = " ".join(facts).lower() for term in terms: if term.lower() in facts_str: return True, f"found '{term}' in facts" return False, f"none of {terms} found in facts: {facts}" return False, f"unknown check: {check}" def check_trace(trace: list, check: str) -> tuple[bool, str]: """Evaluate a trace assertion. Checks HUD events from last request.""" # input.analysis.FIELD is "VALUE" m = re.match(r'input\.analysis\.(\w+)\s+is\s+"?(.+?)"?\s*$', check) if m: field, expected = m.group(1), m.group(2) terms = [t.strip().strip('"') for t in expected.split(" or ")] # Method 1: parse from LAST frame_trace event (v3 frame engine, most reliable) for t in reversed(trace): if t.get("event") == "frame_trace" and t.get("trace"): frames = t["trace"].get("frames", []) for fr in frames: if fr.get("node") == "input" and fr.get("output"): out = fr["output"] for part in out.split(): if "=" in part: k, v = part.split("=", 1) if k == field: for term in terms: if v.lower() == term.lower(): return True, f"input.analysis.{field}={v} (from frame_trace)" return False, f"input.analysis.{field}={v}, expected one of {terms}" break # only check the most recent frame_trace # Method 2: fallback to input node's "perceived" HUD event (v1/v2) for t in reversed(trace): if t.get("node") == "input" and t.get("event") == "perceived": analysis = t.get("analysis", {}) actual = str(analysis.get(field, "")) for term in terms: if actual.lower() == term.lower(): return True, f"input.analysis.{field}={actual}" return False, f"input.analysis.{field}={actual}, expected one of {terms}" return False, f"no input perceived event in trace" # has tool_call TOOL_NAME — checks if Thinker called a specific function tool m = re.match(r'has\s+tool_call\s+(\w+)', check) if m: tool_name = m.group(1) for t in trace: # Check machine_created/destroyed/etc events that are emitted by UI node if t.get("event") in ("machine_created", "machine_destroyed", "machine_reset", "machine_state_added") and tool_name in t.get("event", ""): return True, f"found machine event for '{tool_name}'" # Check for the tool name in the event data if t.get("event") == "machine_created" and tool_name == "create_machine": return True, f"found create_machine via machine_created event" if t.get("event") == "machine_state_added" and tool_name == "add_state": return True, f"found add_state via machine_state_added event" if t.get("event") == "machine_reset" and tool_name == "reset_machine": return True, f"found reset_machine via machine_reset event" if t.get("event") == "machine_destroyed" and tool_name == "destroy_machine": return True, f"found destroy_machine via machine_destroyed event" return False, f"no tool_call '{tool_name}' in trace" # machine_created id="NAV" — checks for specific machine creation m = re.match(r'machine_created\s+id="(\w+)"', check) if m: expected_id = m.group(1) for t in trace: if t.get("event") == "machine_created" and t.get("id") == expected_id: return True, f"machine '{expected_id}' created" return False, f"no machine_created event with id='{expected_id}'" # has EVENT_NAME or EVENT_NAME2 ... m = re.match(r'has\s+([\w\s]+(?:\s+or\s+\w+)*)', check) if m and not re.match(r'has\s+tool_call\s+\w+', check): names = [n.strip() for n in re.split(r'\s+or\s+', m.group(1))] for t in trace: if t.get("event") in names: return True, f"found event '{t.get('event')}'" return False, f"no '{' or '.join(names)}' event in trace" # no EVENT_NAME m = re.match(r'no\s+(\w+)', check) if m: event_name = m.group(1) for t in trace: if t.get("event") == event_name: return False, f"found unexpected event '{event_name}'" return True, f"no '{event_name}' event (as expected)" return False, f"unknown trace check: {check}" # --- Runner --- @dataclass class StepResult: step: str check: str status: str # PASS, FAIL, SKIP detail: str = "" class CogTestRunner: def __init__(self, on_result=None): self.client = CogClient() self._on_result = on_result # callback(result_dict) per check def run(self, testcase: dict) -> list[dict]: results = [] for step in testcase["steps"]: step_results = self._run_step(step) results.extend(step_results) self.client.close() return results def _add(self, results: list, result: dict): results.append(result) if self._on_result: self._on_result(result) def _run_step(self, step: dict) -> list[dict]: results = [] step_name = step["name"] for cmd in step["commands"]: if cmd["type"] == "clear": self.client.clear() self._add(results, {"step": step_name, "check": "clear", "status": "PASS", "detail": "cleared"}) elif cmd["type"] == "send": try: self.client.send(cmd["text"], dashboard=cmd.get("dashboard")) self._add(results, {"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "PASS", "detail": f"response: {self.client.last_response[:80]}"}) except Exception as e: self._add(results, {"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "FAIL", "detail": str(e)}) elif cmd["type"] == "action": try: self.client.send_action(cmd["action"]) self._add(results, {"step": step_name, "check": f"action: {cmd['action']}", "status": "PASS", "detail": f"response: {self.client.last_response[:80]}"}) except Exception as e: self._add(results, {"step": step_name, "check": f"action: {cmd['action']}", "status": "FAIL", "detail": str(e)}) elif cmd["type"] == "action_match": # Find first button matching any pattern patterns = cmd["patterns"] matched = None for pattern in patterns: pat = pattern.lower() for a in self.client.last_buttons: action_str = a.get("action", "") or "" label_str = a.get("label", "") or "" if pat in action_str.lower() or pat in label_str.lower(): matched = a.get("action") or a.get("label", "") break if matched: break if matched: try: self.client.send_action(matched) self._add(results, {"step": step_name, "check": f"action: {matched}", "status": "PASS", "detail": f"response: {self.client.last_response[:80]}"}) except Exception as e: self._add(results, {"step": step_name, "check": f"action: {matched}", "status": "FAIL", "detail": str(e)}) else: self._add(results, {"step": step_name, "check": f"action matching '{' or '.join(patterns)}'", "status": "FAIL", "detail": f"no action matching '{' or '.join(patterns)}' in {[a.get('action') or a.get('label') for a in self.client.last_actions]}"}) elif cmd["type"] == "expect_response": passed, detail = check_response(self.client.last_response, cmd["check"]) self._add(results, {"step": step_name, "check": f"response: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) elif cmd["type"] == "expect_actions": passed, detail = check_actions(self.client.last_actions, cmd["check"]) self._add(results, {"step": step_name, "check": f"actions: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) elif cmd["type"] == "expect_state": self.client.get_state() passed, detail = check_state(self.client.last_memo, cmd["check"]) self._add(results, {"step": step_name, "check": f"state: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) elif cmd["type"] == "expect_trace": passed, detail = check_trace(self.client.last_trace, cmd["check"]) self._add(results, {"step": step_name, "check": f"trace: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) return results # --- Live status push --- def _push_status(event: str, **kwargs): """Push test status to the API for frontend display.""" try: httpx.post(f"{API}/test/status", json={"event": event, **kwargs}, headers=HEADERS, timeout=5) except Exception: pass # Don't fail tests if push fails # --- Standalone runner --- def run_standalone(paths: list[Path] = None): if not paths: paths = sorted(Path("testcases").glob("*.md")) # Count total steps across all testcases for frontend progress all_tcs = [parse_testcase(p) for p in paths] total_steps = sum(len(s["commands"]) for tc in all_tcs for s in tc["steps"]) first_suite = True all_results = {} for tc in all_tcs: path = tc["file"] print(f"\n{'='*60}") print(f" {tc['name']}") print(f"{'='*60}") if first_suite: _push_status("suite_start", suite=tc["name"], count=total_steps) first_suite = False else: _push_status("suite_start", suite=tc["name"]) suite_name = tc["name"] def _on_result(r): icon = "OK" if r["status"] == "PASS" else "FAIL" if r["status"] == "FAIL" else "SKIP" print(f" {icon} [{r['step']}] {r['check']}") if r["detail"]: print(f" {r['detail']}") _push_status("step_result", suite=suite_name, result=r) runner = CogTestRunner(on_result=_on_result) results = runner.run(tc) all_results[tc["name"]] = results passed = sum(1 for r in results if r["status"] == "PASS") failed = sum(1 for r in results if r["status"] == "FAIL") print(f"\n {passed} passed, {failed} failed") _push_status("suite_end", suite=tc["name"], passed=passed, failed=failed) # Summary print(f"\n{'='*60}") total_pass = sum(sum(1 for r in results if r["status"] == "PASS") for results in all_results.values()) total_fail = sum(sum(1 for r in results if r["status"] == "FAIL") for results in all_results.values()) print(f" TOTAL: {total_pass} passed, {total_fail} failed") print(f"{'='*60}") # Write results JSON for web UI output = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "testcases": {name: results for name, results in all_results.items()}, "summary": {"passed": total_pass, "failed": total_fail}, } results_path = Path("testcases/results.json") results_path.write_text(json.dumps(output, indent=2, ensure_ascii=False), encoding="utf-8") print(f"\n Results written to {results_path}") return total_fail == 0 if __name__ == "__main__": paths = [Path(p) for p in sys.argv[1:]] if len(sys.argv) > 1 else None success = run_standalone(paths) sys.exit(0 if success else 1)