""" Cog Runtime Test Runner — parses testcases/*.md and executes against live API. Usage: pytest testcases/ -v # run all testcases pytest testcases/counter_state.md -v # run one testcase python runtime_test.py # standalone run all python runtime_test.py testcases/pub_conversation.md # standalone run one """ import httpx import json import os import re import sys import time # Fix Windows console encoding if sys.platform == "win32": sys.stdout.reconfigure(encoding="utf-8", errors="replace") from pathlib import Path from dataclasses import dataclass, field API = "https://cog.loop42.de/api" TOKEN = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g" HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"} # --- Markdown parser --- def parse_testcase(path: Path) -> dict: """Parse a testcase markdown file into structured steps.""" text = path.read_text(encoding="utf-8") lines = text.split("\n") tc = {"name": "", "steps": [], "file": str(path)} current_step = None in_setup = False for line in lines: line_stripped = line.strip() # Title if line_stripped.startswith("# ") and not tc["name"]: tc["name"] = line_stripped[2:].strip() continue # Setup section if line_stripped == "## Setup": in_setup = True current_step = {"name": "Setup", "commands": []} continue # End setup on next ## or ### if line_stripped.startswith("## ") and in_setup: if current_step and current_step["commands"]: tc["steps"].insert(0, current_step) in_setup = False current_step = None # Step header if line_stripped.startswith("### "): if in_setup and current_step and current_step["commands"]: tc["steps"].insert(0, current_step) in_setup = False elif current_step and not in_setup: tc["steps"].append(current_step) step_text = line_stripped[4:].strip() m = re.match(r"\d+\.\s*(.*)", step_text) current_step = {"name": m.group(1) if m else step_text, "commands": []} continue # Commands within a step or setup if (current_step or in_setup) and line_stripped.startswith("- "): cmd_text = line_stripped[2:].strip() cmd = _parse_command(cmd_text) if cmd and current_step: current_step["commands"].append(cmd) if current_step: tc["steps"].append(current_step) return tc def _parse_command(text: str) -> dict | None: """Parse a single command line like 'send: hello' or 'expect_response: contains foo'.""" # send: message if text.startswith("send:"): return {"type": "send", "text": text[5:].strip()} # action: action_name if text.startswith("action:"): return {"type": "action", "action": text[7:].strip()} # expect_response: contains "foo" if text.startswith("expect_response:"): return {"type": "expect_response", "check": text[16:].strip()} # expect_actions: length >= 2 if text.startswith("expect_actions:"): return {"type": "expect_actions", "check": text[15:].strip()} # expect_state: field contains/is "value" if text.startswith("expect_state:"): return {"type": "expect_state", "check": text[13:].strip()} # clear history if text == "clear history": return {"type": "clear"} return None # --- API client --- class CogClient: def __init__(self): self.client = httpx.Client(timeout=30) self.last_response = "" self.last_memo = {} self.last_actions = [] self.last_trace = [] def clear(self): self.client.post(f"{API}/clear", headers=HEADERS) time.sleep(0.3) def send(self, text: str) -> dict: r = self.client.post(f"{API}/send", json={"text": text}, headers=HEADERS) d = r.json() self.last_response = d.get("response", "") self.last_memo = d.get("memorizer", {}) time.sleep(0.5) self._fetch_trace() return d def send_action(self, action: str) -> dict: # Actions go through /api/send as ACTION: format (since we can't use WS from test) return self.send(f"ACTION: {action}") def _fetch_trace(self): r = self.client.get(f"{API}/trace?last=10", headers=HEADERS) self.last_trace = r.json().get("lines", []) # Extract actions from trace self.last_actions = [] for t in self.last_trace: if t.get("event") == "controls": for ctrl in t.get("controls", []): if ctrl.get("type") == "button": self.last_actions.append(ctrl) def get_state(self) -> dict: r = self.client.get(f"{API}/state", headers=HEADERS) d = r.json() self.last_memo = d.get("memorizer", {}) return self.last_memo def close(self): self.client.close() # --- Assertion engine --- def check_response(response: str, check: str) -> tuple[bool, str]: """Evaluate a response assertion. Returns (passed, detail).""" # contains "foo" or "bar" m = re.match(r'contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] for term in terms: if term.lower() in response.lower(): return True, f"found '{term}'" return False, f"none of {terms} found in: {response[:100]}" # matches regex m = re.match(r'matches\s+(.+)', check) if m: pattern = m.group(1).strip() if re.search(pattern, response): return True, f"matched /{pattern}/" return False, f"/{pattern}/ not found in: {response[:100]}" # length > N m = re.match(r'length\s*>\s*(\d+)', check) if m: expected = int(m.group(1)) if len(response) > expected: return True, f"length {len(response)} > {expected}" return False, f"length {len(response)} <= {expected}" return False, f"unknown check: {check}" def check_actions(actions: list, check: str) -> tuple[bool, str]: """Evaluate an actions assertion.""" # length >= N m = re.match(r'length\s*>=\s*(\d+)', check) if m: expected = int(m.group(1)) if len(actions) >= expected: return True, f"{len(actions)} actions >= {expected}" return False, f"{len(actions)} actions < {expected}" # any action contains "foo" or "bar" m = re.match(r'any action contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] action_strs = [json.dumps(a).lower() for a in actions] for term in terms: if any(term.lower() in s for s in action_strs): return True, f"found '{term}' in actions" return False, f"none of {terms} found in {len(actions)} actions" return False, f"unknown check: {check}" def check_state(memo: dict, check: str) -> tuple[bool, str]: """Evaluate a memorizer state assertion.""" # field contains "value" or "value2" m = re.match(r'(\w+)\s+contains\s+"?(.+?)"?\s*$', check) if m: field, values_str = m.group(1), m.group(2) terms = [t.strip().strip('"') for t in values_str.split(" or ")] actual = memo.get(field, "") if isinstance(actual, list): actual_str = " ".join(str(x) for x in actual) else: actual_str = str(actual) for term in terms: if term.lower() in actual_str.lower(): return True, f"{field}={actual_str[:50]} contains '{term}'" return False, f"{field}={actual_str[:50]} doesn't contain any of {terms}" # field is "value" or "value2" m = re.match(r'(\w+)\s+is\s+"?(.+?)"?\s*$', check) if m: field, values_str = m.group(1), m.group(2) terms = [t.strip().strip('"') for t in values_str.split(" or ")] actual = str(memo.get(field, "")) for term in terms: if actual.lower() == term.lower(): return True, f"{field}={actual}" return False, f"{field}={actual} not in {terms}" # facts any contains "value" m = re.match(r'facts\s+any\s+contains\s+"?(.+?)"?\s*$', check) if m: terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] facts = memo.get("facts", []) facts_str = " ".join(facts).lower() for term in terms: if term.lower() in facts_str: return True, f"found '{term}' in facts" return False, f"none of {terms} found in facts: {facts}" return False, f"unknown check: {check}" # --- Runner --- @dataclass class StepResult: step: str check: str status: str # PASS, FAIL, SKIP detail: str = "" class CogTestRunner: def __init__(self): self.client = CogClient() def run(self, testcase: dict) -> list[dict]: results = [] for step in testcase["steps"]: step_results = self._run_step(step) results.extend(step_results) self.client.close() return results def _run_step(self, step: dict) -> list[dict]: results = [] step_name = step["name"] for cmd in step["commands"]: if cmd["type"] == "clear": self.client.clear() results.append({"step": step_name, "check": "clear", "status": "PASS", "detail": "cleared"}) elif cmd["type"] == "send": try: self.client.send(cmd["text"]) results.append({"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "PASS", "detail": f"response: {self.client.last_response[:80]}"}) except Exception as e: results.append({"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "FAIL", "detail": str(e)}) elif cmd["type"] == "action": try: self.client.send_action(cmd["action"]) results.append({"step": step_name, "check": f"action: {cmd['action']}", "status": "PASS", "detail": f"response: {self.client.last_response[:80]}"}) except Exception as e: results.append({"step": step_name, "check": f"action: {cmd['action']}", "status": "FAIL", "detail": str(e)}) elif cmd["type"] == "expect_response": passed, detail = check_response(self.client.last_response, cmd["check"]) results.append({"step": step_name, "check": f"response: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) elif cmd["type"] == "expect_actions": passed, detail = check_actions(self.client.last_actions, cmd["check"]) results.append({"step": step_name, "check": f"actions: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) elif cmd["type"] == "expect_state": self.client.get_state() passed, detail = check_state(self.client.last_memo, cmd["check"]) results.append({"step": step_name, "check": f"state: {cmd['check']}", "status": "PASS" if passed else "FAIL", "detail": detail}) return results # --- Standalone runner --- def run_standalone(paths: list[Path] = None): if not paths: paths = sorted(Path("testcases").glob("*.md")) all_results = {} for path in paths: tc = parse_testcase(path) print(f"\n{'='*60}") print(f" {tc['name']}") print(f"{'='*60}") runner = CogTestRunner() results = runner.run(tc) all_results[tc["name"]] = results for r in results: icon = "OK" if r["status"] == "PASS" else "FAIL" if r["status"] == "FAIL" else "SKIP" print(f" {icon} [{r['step']}] {r['check']}") if r["detail"]: print(f" {r['detail']}") passed = sum(1 for r in results if r["status"] == "PASS") failed = sum(1 for r in results if r["status"] == "FAIL") print(f"\n {passed} passed, {failed} failed") # Summary print(f"\n{'='*60}") total_pass = sum(sum(1 for r in results if r["status"] == "PASS") for results in all_results.values()) total_fail = sum(sum(1 for r in results if r["status"] == "FAIL") for results in all_results.values()) print(f" TOTAL: {total_pass} passed, {total_fail} failed") print(f"{'='*60}") # Write results JSON for web UI output = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "testcases": {name: results for name, results in all_results.items()}, "summary": {"passed": total_pass, "failed": total_fail}, } results_path = Path("testcases/results.json") results_path.write_text(json.dumps(output, indent=2, ensure_ascii=False), encoding="utf-8") print(f"\n Results written to {results_path}") return total_fail == 0 if __name__ == "__main__": paths = [Path(p) for p in sys.argv[1:]] if len(sys.argv) > 1 else None success = run_standalone(paths) sys.exit(0 if success else 1)