diff --git a/agent/__init__.py b/agent/__init__.py index af08a39..f9ca176 100644 --- a/agent/__init__.py +++ b/agent/__init__.py @@ -32,6 +32,10 @@ async def index(): resp.headers["Cache-Control"] = "no-cache" return resp +@app.get("/tests") +async def tests_page(): + return FileResponse(STATIC_DIR / "tests.html") + @app.get("/callback") async def callback(): """OIDC callback — serves the same SPA, JS handles the code exchange.""" diff --git a/agent/api.py b/agent/api.py index ad1d84f..a8ff66f 100644 --- a/agent/api.py +++ b/agent/api.py @@ -171,6 +171,14 @@ def register_routes(app): "messages": _active_runtime.history[-last:], } + @app.get("/api/tests") + async def get_tests(): + """Latest test results from runtime_test.py.""" + results_path = Path(__file__).parent.parent / "testcases" / "results.json" + if not results_path.exists(): + return {} + return json.loads(results_path.read_text(encoding="utf-8")) + @app.get("/api/trace") async def get_trace(last: int = 30, user=Depends(require_auth)): if not TRACE_FILE.exists(): diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..5d74c00 --- /dev/null +++ b/conftest.py @@ -0,0 +1,45 @@ +"""Pytest configuration: collect testcases/*.md as test items.""" + +import pytest +from pathlib import Path +from runtime_test import parse_testcase, CogTestRunner + + +def pytest_collect_file(parent, file_path): + if file_path.suffix == ".md" and file_path.parent.name == "testcases": + return TestCaseFile.from_parent(parent, path=file_path) + + +class TestCaseFile(pytest.File): + def collect(self): + tc = parse_testcase(self.path) + yield TestCaseItem.from_parent(self, name=tc["name"], testcase=tc) + + +class TestCaseItem(pytest.Item): + def __init__(self, name, parent, testcase): + super().__init__(name, parent) + self.testcase = testcase + + def runtest(self): + runner = CogTestRunner() + results = runner.run(self.testcase) + # Collect failures + failures = [r for r in results if r["status"] == "FAIL"] + if failures: + msg = "\n".join(f"Step {r['step']}: {r['check']} — {r['detail']}" for r in failures) + raise TestCaseFailure(msg, results) + + def repr_failure(self, excinfo, style=None): + if isinstance(excinfo.value, TestCaseFailure): + return f"\n{excinfo.value.args[0]}" + return super().repr_failure(excinfo, style) + + def reportinfo(self): + return self.path, 0, f"testcase: {self.name}" + + +class TestCaseFailure(Exception): + def __init__(self, msg, results): + super().__init__(msg) + self.results = results diff --git a/runtime_test.py b/runtime_test.py new file mode 100644 index 0000000..09a91d6 --- /dev/null +++ b/runtime_test.py @@ -0,0 +1,378 @@ +""" +Cog Runtime Test Runner — parses testcases/*.md and executes against live API. + +Usage: + pytest testcases/ -v # run all testcases + pytest testcases/counter_state.md -v # run one testcase + python runtime_test.py # standalone run all + python runtime_test.py testcases/pub_conversation.md # standalone run one +""" + +import httpx +import json +import os +import re +import sys +import time + +# Fix Windows console encoding +if sys.platform == "win32": + sys.stdout.reconfigure(encoding="utf-8", errors="replace") +from pathlib import Path +from dataclasses import dataclass, field + +API = "https://cog.loop42.de/api" +TOKEN = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g" +HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"} + + +# --- Markdown parser --- + +def parse_testcase(path: Path) -> dict: + """Parse a testcase markdown file into structured steps.""" + text = path.read_text(encoding="utf-8") + lines = text.split("\n") + + tc = {"name": "", "steps": [], "file": str(path)} + current_step = None + + in_setup = False + for line in lines: + line_stripped = line.strip() + + # Title + if line_stripped.startswith("# ") and not tc["name"]: + tc["name"] = line_stripped[2:].strip() + continue + + # Setup section + if line_stripped == "## Setup": + in_setup = True + current_step = {"name": "Setup", "commands": []} + continue + + # End setup on next ## or ### + if line_stripped.startswith("## ") and in_setup: + if current_step and current_step["commands"]: + tc["steps"].insert(0, current_step) + in_setup = False + current_step = None + + # Step header + if line_stripped.startswith("### "): + if in_setup and current_step and current_step["commands"]: + tc["steps"].insert(0, current_step) + in_setup = False + elif current_step and not in_setup: + tc["steps"].append(current_step) + step_text = line_stripped[4:].strip() + m = re.match(r"\d+\.\s*(.*)", step_text) + current_step = {"name": m.group(1) if m else step_text, "commands": []} + continue + + # Commands within a step or setup + if (current_step or in_setup) and line_stripped.startswith("- "): + cmd_text = line_stripped[2:].strip() + cmd = _parse_command(cmd_text) + if cmd and current_step: + current_step["commands"].append(cmd) + + if current_step: + tc["steps"].append(current_step) + + return tc + + +def _parse_command(text: str) -> dict | None: + """Parse a single command line like 'send: hello' or 'expect_response: contains foo'.""" + # send: message + if text.startswith("send:"): + return {"type": "send", "text": text[5:].strip()} + + # action: action_name + if text.startswith("action:"): + return {"type": "action", "action": text[7:].strip()} + + # expect_response: contains "foo" + if text.startswith("expect_response:"): + return {"type": "expect_response", "check": text[16:].strip()} + + # expect_actions: length >= 2 + if text.startswith("expect_actions:"): + return {"type": "expect_actions", "check": text[15:].strip()} + + # expect_state: field contains/is "value" + if text.startswith("expect_state:"): + return {"type": "expect_state", "check": text[13:].strip()} + + # clear history + if text == "clear history": + return {"type": "clear"} + + return None + + +# --- API client --- + +class CogClient: + def __init__(self): + self.client = httpx.Client(timeout=30) + self.last_response = "" + self.last_memo = {} + self.last_actions = [] + self.last_trace = [] + + def clear(self): + self.client.post(f"{API}/clear", headers=HEADERS) + time.sleep(0.3) + + def send(self, text: str) -> dict: + r = self.client.post(f"{API}/send", json={"text": text}, headers=HEADERS) + d = r.json() + self.last_response = d.get("response", "") + self.last_memo = d.get("memorizer", {}) + time.sleep(0.5) + self._fetch_trace() + return d + + def send_action(self, action: str) -> dict: + # Actions go through /api/send as ACTION: format (since we can't use WS from test) + return self.send(f"ACTION: {action}") + + def _fetch_trace(self): + r = self.client.get(f"{API}/trace?last=10", headers=HEADERS) + self.last_trace = r.json().get("lines", []) + # Extract actions from trace + self.last_actions = [] + for t in self.last_trace: + if t.get("event") == "controls": + for ctrl in t.get("controls", []): + if ctrl.get("type") == "button": + self.last_actions.append(ctrl) + + def get_state(self) -> dict: + r = self.client.get(f"{API}/state", headers=HEADERS) + d = r.json() + self.last_memo = d.get("memorizer", {}) + return self.last_memo + + def close(self): + self.client.close() + + +# --- Assertion engine --- + +def check_response(response: str, check: str) -> tuple[bool, str]: + """Evaluate a response assertion. Returns (passed, detail).""" + # contains "foo" or "bar" + m = re.match(r'contains\s+"?(.+?)"?\s*$', check) + if m: + terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] + for term in terms: + if term.lower() in response.lower(): + return True, f"found '{term}'" + return False, f"none of {terms} found in: {response[:100]}" + + # matches regex + m = re.match(r'matches\s+(.+)', check) + if m: + pattern = m.group(1).strip() + if re.search(pattern, response): + return True, f"matched /{pattern}/" + return False, f"/{pattern}/ not found in: {response[:100]}" + + # length > N + m = re.match(r'length\s*>\s*(\d+)', check) + if m: + expected = int(m.group(1)) + if len(response) > expected: + return True, f"length {len(response)} > {expected}" + return False, f"length {len(response)} <= {expected}" + + return False, f"unknown check: {check}" + + +def check_actions(actions: list, check: str) -> tuple[bool, str]: + """Evaluate an actions assertion.""" + # length >= N + m = re.match(r'length\s*>=\s*(\d+)', check) + if m: + expected = int(m.group(1)) + if len(actions) >= expected: + return True, f"{len(actions)} actions >= {expected}" + return False, f"{len(actions)} actions < {expected}" + + # any action contains "foo" or "bar" + m = re.match(r'any action contains\s+"?(.+?)"?\s*$', check) + if m: + terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] + action_strs = [json.dumps(a).lower() for a in actions] + for term in terms: + if any(term.lower() in s for s in action_strs): + return True, f"found '{term}' in actions" + return False, f"none of {terms} found in {len(actions)} actions" + + return False, f"unknown check: {check}" + + +def check_state(memo: dict, check: str) -> tuple[bool, str]: + """Evaluate a memorizer state assertion.""" + # field contains "value" or "value2" + m = re.match(r'(\w+)\s+contains\s+"?(.+?)"?\s*$', check) + if m: + field, values_str = m.group(1), m.group(2) + terms = [t.strip().strip('"') for t in values_str.split(" or ")] + actual = memo.get(field, "") + if isinstance(actual, list): + actual_str = " ".join(str(x) for x in actual) + else: + actual_str = str(actual) + for term in terms: + if term.lower() in actual_str.lower(): + return True, f"{field}={actual_str[:50]} contains '{term}'" + return False, f"{field}={actual_str[:50]} doesn't contain any of {terms}" + + # field is "value" or "value2" + m = re.match(r'(\w+)\s+is\s+"?(.+?)"?\s*$', check) + if m: + field, values_str = m.group(1), m.group(2) + terms = [t.strip().strip('"') for t in values_str.split(" or ")] + actual = str(memo.get(field, "")) + for term in terms: + if actual.lower() == term.lower(): + return True, f"{field}={actual}" + return False, f"{field}={actual} not in {terms}" + + # facts any contains "value" + m = re.match(r'facts\s+any\s+contains\s+"?(.+?)"?\s*$', check) + if m: + terms = [t.strip().strip('"') for t in m.group(1).split(" or ")] + facts = memo.get("facts", []) + facts_str = " ".join(facts).lower() + for term in terms: + if term.lower() in facts_str: + return True, f"found '{term}' in facts" + return False, f"none of {terms} found in facts: {facts}" + + return False, f"unknown check: {check}" + + +# --- Runner --- + +@dataclass +class StepResult: + step: str + check: str + status: str # PASS, FAIL, SKIP + detail: str = "" + + +class CogTestRunner: + def __init__(self): + self.client = CogClient() + + def run(self, testcase: dict) -> list[dict]: + results = [] + for step in testcase["steps"]: + step_results = self._run_step(step) + results.extend(step_results) + self.client.close() + return results + + def _run_step(self, step: dict) -> list[dict]: + results = [] + step_name = step["name"] + + for cmd in step["commands"]: + if cmd["type"] == "clear": + self.client.clear() + results.append({"step": step_name, "check": "clear", "status": "PASS", "detail": "cleared"}) + + elif cmd["type"] == "send": + try: + self.client.send(cmd["text"]) + results.append({"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "PASS", + "detail": f"response: {self.client.last_response[:80]}"}) + except Exception as e: + results.append({"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "FAIL", + "detail": str(e)}) + + elif cmd["type"] == "action": + try: + self.client.send_action(cmd["action"]) + results.append({"step": step_name, "check": f"action: {cmd['action']}", "status": "PASS", + "detail": f"response: {self.client.last_response[:80]}"}) + except Exception as e: + results.append({"step": step_name, "check": f"action: {cmd['action']}", "status": "FAIL", + "detail": str(e)}) + + elif cmd["type"] == "expect_response": + passed, detail = check_response(self.client.last_response, cmd["check"]) + results.append({"step": step_name, "check": f"response: {cmd['check']}", + "status": "PASS" if passed else "FAIL", "detail": detail}) + + elif cmd["type"] == "expect_actions": + passed, detail = check_actions(self.client.last_actions, cmd["check"]) + results.append({"step": step_name, "check": f"actions: {cmd['check']}", + "status": "PASS" if passed else "FAIL", "detail": detail}) + + elif cmd["type"] == "expect_state": + self.client.get_state() + passed, detail = check_state(self.client.last_memo, cmd["check"]) + results.append({"step": step_name, "check": f"state: {cmd['check']}", + "status": "PASS" if passed else "FAIL", "detail": detail}) + + return results + + +# --- Standalone runner --- + +def run_standalone(paths: list[Path] = None): + if not paths: + paths = sorted(Path("testcases").glob("*.md")) + + all_results = {} + for path in paths: + tc = parse_testcase(path) + print(f"\n{'='*60}") + print(f" {tc['name']}") + print(f"{'='*60}") + + runner = CogTestRunner() + results = runner.run(tc) + all_results[tc["name"]] = results + + for r in results: + icon = "OK" if r["status"] == "PASS" else "FAIL" if r["status"] == "FAIL" else "SKIP" + print(f" {icon} [{r['step']}] {r['check']}") + if r["detail"]: + print(f" {r['detail']}") + + passed = sum(1 for r in results if r["status"] == "PASS") + failed = sum(1 for r in results if r["status"] == "FAIL") + print(f"\n {passed} passed, {failed} failed") + + # Summary + print(f"\n{'='*60}") + total_pass = sum(sum(1 for r in results if r["status"] == "PASS") for results in all_results.values()) + total_fail = sum(sum(1 for r in results if r["status"] == "FAIL") for results in all_results.values()) + print(f" TOTAL: {total_pass} passed, {total_fail} failed") + print(f"{'='*60}") + + # Write results JSON for web UI + output = { + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + "testcases": {name: results for name, results in all_results.items()}, + "summary": {"passed": total_pass, "failed": total_fail}, + } + results_path = Path("testcases/results.json") + results_path.write_text(json.dumps(output, indent=2, ensure_ascii=False), encoding="utf-8") + print(f"\n Results written to {results_path}") + + return total_fail == 0 + + +if __name__ == "__main__": + paths = [Path(p) for p in sys.argv[1:]] if len(sys.argv) > 1 else None + success = run_standalone(paths) + sys.exit(0 if success else 1) diff --git a/static/style.css b/static/style.css index fec2579..9b52303 100644 --- a/static/style.css +++ b/static/style.css @@ -18,7 +18,7 @@ body { font-family: system-ui, sans-serif; background: #0a0a0a; color: #e0e0e0; #meter-sensor .nm-label { color: #60a5fa; } .nm-bar { flex: 1; height: 6px; background: #1a1a1a; border-radius: 3px; overflow: hidden; } .nm-fill { height: 100%; width: 0%; border-radius: 3px; transition: width 0.3s, background-color 0.3s; background: #333; } -.nm-text { font-size: 0.6rem; color: #555; min-width: 5rem; text-align: right; font-family: monospace; } +.nm-text { font-size: 0.6rem; color: #555; min-width: 5rem; text-align: right; font-family: monospace; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } /* Three-column layout: chat | awareness | trace */ #main { flex: 1; display: grid; grid-template-columns: 1fr 1fr 2fr; gap: 1px; background: #222; overflow: hidden; min-height: 0; } diff --git a/static/tests.html b/static/tests.html new file mode 100644 index 0000000..173f105 --- /dev/null +++ b/static/tests.html @@ -0,0 +1,52 @@ + + + + + +cog — tests + + + +

cog — test results

+
Loading...
+ + + diff --git a/testcases/counter_state.md b/testcases/counter_state.md new file mode 100644 index 0000000..a083d8e --- /dev/null +++ b/testcases/counter_state.md @@ -0,0 +1,38 @@ +# Counter State + +Tests that Thinker can instruct UI to create stateful controls, +and that UI handles local actions without round-tripping to Thinker. + +## Setup +- clear history + +## Steps + +### 1. Create counter +- send: create a counter starting at 0 with increment and decrement buttons +- expect_response: contains "counter" or "count" +- expect_actions: length >= 2 +- expect_actions: any action contains "increment" or "inc" +- expect_actions: any action contains "decrement" or "dec" + +### 2. Check state +- expect_state: topic contains "counter" or "count" or "button" + +### 3. Ask for current value +- send: what is the current count? +- expect_response: contains "0" + +### 4. Increment +- action: increment +- expect_response: contains "1" + +### 5. Increment again +- action: increment +- expect_response: contains "2" + +### 6. Decrement +- action: decrement +- expect_response: contains "1" + +### 7. Verify memorizer tracks it +- expect_state: topic contains "count" diff --git a/testcases/pub_conversation.md b/testcases/pub_conversation.md new file mode 100644 index 0000000..015670a --- /dev/null +++ b/testcases/pub_conversation.md @@ -0,0 +1,42 @@ +# Pub Conversation + +Tests multi-turn conversation with context tracking, language switching, +and memorizer state updates across a social scenario. + +## Setup +- clear history + +## Steps + +### 1. Set the scene +- send: Hey, Tina and I are heading to the pub tonight +- expect_response: length > 10 +- expect_state: situation contains "pub" or "Tina" + +### 2. Language switch to German +- send: Wir sind jetzt im Biergarten angekommen +- expect_response: length > 10 +- expect_state: language is "de" or "mixed" + +### 3. Context awareness +- send: Was sollen wir bestellen? +- expect_response: length > 10 +- expect_state: topic contains "bestell" or "order" or "pub" or "Biergarten" + +### 4. Tina speaks +- send: Tina says: I'll have a Hefeweizen please +- expect_response: length > 10 +- expect_state: facts any contains "Tina" or "Hefeweizen" + +### 5. Ask for time (tool use) +- send: wie spaet ist es eigentlich? +- expect_response: matches \d{1,2}:\d{2} + +### 6. Back to English +- send: Let's switch to English, what was the last thing Tina said? +- expect_state: language is "en" or "mixed" +- expect_response: contains "Tina" or "Hefeweizen" + +### 7. Mood check +- send: This is really fun! +- expect_state: user_mood is "happy" or "playful" or "excited" diff --git a/testcases/results.json b/testcases/results.json new file mode 100644 index 0000000..5ac4142 --- /dev/null +++ b/testcases/results.json @@ -0,0 +1,131 @@ +{ + "timestamp": "2026-03-28 15:34:02", + "testcases": { + "Pub Conversation": [ + { + "step": "Setup", + "check": "clear", + "status": "PASS", + "detail": "cleared" + }, + { + "step": "Set the scene", + "check": "send: Hey, Tina and I are heading to the pub t", + "status": "PASS", + "detail": "response: Das ist toll! Was trinkt ihr beide heute Abend?\n" + }, + { + "step": "Set the scene", + "check": "response: length > 10", + "status": "PASS", + "detail": "length 48 > 10" + }, + { + "step": "Set the scene", + "check": "state: situation contains \"pub\" or \"Tina\"", + "status": "PASS", + "detail": "situation=at a pub with tina, authenticated on https://cog.l contains 'pub'" + }, + { + "step": "Language switch to German", + "check": "send: Wir sind jetzt im Biergarten angekommen", + "status": "PASS", + "detail": "response: Super, genießt euer Biergarten-Erlebnis! Und was ist mit Tina? Trinkt sie auch e" + }, + { + "step": "Language switch to German", + "check": "response: length > 10", + "status": "PASS", + "detail": "length 95 > 10" + }, + { + "step": "Language switch to German", + "check": "state: language is \"de\" or \"mixed\"", + "status": "PASS", + "detail": "language=mixed" + }, + { + "step": "Context awareness", + "check": "send: Was sollen wir bestellen?", + "status": "PASS", + "detail": "response: Kommt drauf an, worauf ihr Lust habt! Im Biergarten sind Klassiker wie **Helles*" + }, + { + "step": "Context awareness", + "check": "response: length > 10", + "status": "PASS", + "detail": "length 255 > 10" + }, + { + "step": "Context awareness", + "check": "state: topic contains \"bestell\" or \"order\" or \"pub\" or \"Biergarten\"", + "status": "PASS", + "detail": "topic=ordering drinks contains 'order'" + }, + { + "step": "Tina speaks", + "check": "send: Tina says: I'll have a Hefeweizen please", + "status": "PASS", + "detail": "response: Ah, Tina bleibt ihren Vorlieben treu! Eine gute Wahl. Und für dich, Nico? Suchst" + }, + { + "step": "Tina speaks", + "check": "response: length > 10", + "status": "PASS", + "detail": "length 148 > 10" + }, + { + "step": "Tina speaks", + "check": "state: facts any contains \"Tina\" or \"Hefeweizen\"", + "status": "PASS", + "detail": "found 'Tina' in facts" + }, + { + "step": "Ask for time (tool use)", + "check": "send: wie spaet ist es eigentlich?", + "status": "PASS", + "detail": "response: Du hast mich noch gar nicht danach gefragt. Es ist kurz vor halb 4. Also perfekt" + }, + { + "step": "Ask for time (tool use)", + "check": "response: matches \\d{1,2}:\\d{2}", + "status": "FAIL", + "detail": "/\\d{1,2}:\\d{2}/ not found in: Du hast mich noch gar nicht danach gefragt. Es ist kurz vor halb 4. Also perfekt, um den Feierabend " + }, + { + "step": "Back to English", + "check": "send: Let's switch to English, what was the la", + "status": "PASS", + "detail": "response: Okay, switching to English! 😉 The last thing Tina said was: \"I'll have a Hefewei" + }, + { + "step": "Back to English", + "check": "state: language is \"en\" or \"mixed\"", + "status": "PASS", + "detail": "language=mixed" + }, + { + "step": "Back to English", + "check": "response: contains \"Tina\" or \"Hefeweizen\"", + "status": "PASS", + "detail": "found 'Tina'" + }, + { + "step": "Mood check", + "check": "send: This is really fun!", + "status": "PASS", + "detail": "response: Indeed! Glad you're having fun. It's always a pleasure chatting with you, Nico. " + }, + { + "step": "Mood check", + "check": "state: user_mood is \"happy\" or \"playful\" or \"excited\"", + "status": "PASS", + "detail": "user_mood=happy" + } + ] + }, + "summary": { + "passed": 19, + "failed": 1 + } +} \ No newline at end of file