agent-runtime/runtime_test.py
Nico 5f447dfd53 v0.14.0: v2 Director-drives architecture + 3-pod K8s split
Architecture:
- director_v2: always-on brain, produces DirectorPlan with tool_sequence
- thinker_v2: pure executor, runs tools from DirectorPlan
- interpreter_v1: factual result summarizer, no hallucination
- v2_director_drives graph: Input -> Director -> Thinker -> Output

Infrastructure:
- Split into 3 pods: cog-frontend (nginx), cog-runtime (FastAPI), cog-mcp (SSE proxy)
- MCP survives runtime restarts (separate pod, proxies via HTTP)
- Async send pipeline: /api/send/check -> /api/send -> /api/result with progress
- Zero-downtime rolling updates (maxUnavailable: 0)
- Dynamic graph visualization (fetched from API, not hardcoded)

Tests: 22 new mocked unit tests (director_v2: 7, thinker_v2: 8, interpreter_v1: 7)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 04:17:44 +02:00

528 lines
20 KiB
Python

"""
Cog Runtime Test Runner — parses testcases/*.md and executes against live API.
Usage:
pytest testcases/ -v # run all testcases
pytest testcases/counter_state.md -v # run one testcase
python runtime_test.py # standalone run all
python runtime_test.py testcases/pub_conversation.md # standalone run one
"""
import httpx
import json
import os
import re
import sys
import time
# Fix Windows console encoding
if sys.platform == "win32":
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
from pathlib import Path
from dataclasses import dataclass, field
API = "https://cog.loop42.de/api"
TOKEN = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g"
HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
# --- Markdown parser ---
def parse_testcase(path: Path) -> dict:
"""Parse a testcase markdown file into structured steps."""
text = path.read_text(encoding="utf-8")
lines = text.split("\n")
tc = {"name": "", "steps": [], "file": str(path)}
current_step = None
in_setup = False
for line in lines:
line_stripped = line.strip()
# Title
if line_stripped.startswith("# ") and not tc["name"]:
tc["name"] = line_stripped[2:].strip()
continue
# Setup section
if line_stripped == "## Setup":
in_setup = True
current_step = {"name": "Setup", "commands": []}
continue
# End setup on next ## or ###
if line_stripped.startswith("## ") and in_setup:
if current_step and current_step["commands"]:
tc["steps"].insert(0, current_step)
in_setup = False
current_step = None
# Step header
if line_stripped.startswith("### "):
if in_setup and current_step and current_step["commands"]:
tc["steps"].insert(0, current_step)
in_setup = False
elif current_step and not in_setup:
tc["steps"].append(current_step)
step_text = line_stripped[4:].strip()
m = re.match(r"\d+\.\s*(.*)", step_text)
current_step = {"name": m.group(1) if m else step_text, "commands": []}
continue
# Commands within a step or setup
if (current_step or in_setup) and line_stripped.startswith("- "):
cmd_text = line_stripped[2:].strip()
cmd = _parse_command(cmd_text)
if cmd and current_step:
current_step["commands"].append(cmd)
if current_step:
tc["steps"].append(current_step)
return tc
def _parse_command(text: str) -> dict | None:
"""Parse a single command line like 'send: hello' or 'expect_response: contains foo'."""
# send: message |dashboard| [json]
# send: message (no dashboard)
if text.startswith("send:"):
val = text[5:].strip()
if "|dashboard|" in val:
parts = val.split("|dashboard|", 1)
msg_text = parts[0].strip()
try:
dashboard = json.loads(parts[1].strip())
except (json.JSONDecodeError, Exception):
dashboard = []
return {"type": "send", "text": msg_text, "dashboard": dashboard}
return {"type": "send", "text": val}
# action: action_name OR action: first matching "pattern"
if text.startswith("action:"):
val = text[7:].strip()
m = re.match(r'first matching "(.+)"', val)
if m:
return {"type": "action_match", "pattern": m.group(1)}
return {"type": "action", "action": val}
# expect_response: contains "foo"
if text.startswith("expect_response:"):
return {"type": "expect_response", "check": text[16:].strip()}
# expect_actions: length >= 2
if text.startswith("expect_actions:"):
return {"type": "expect_actions", "check": text[15:].strip()}
# expect_state: field contains/is "value"
if text.startswith("expect_state:"):
return {"type": "expect_state", "check": text[13:].strip()}
# clear history
if text == "clear history":
return {"type": "clear"}
# expect_trace: input.analysis.intent is "social"
# expect_trace: has reflex_path
# expect_trace: no thinker
if text.startswith("expect_trace:"):
return {"type": "expect_trace", "check": text[13:].strip()}
return None
# --- API client ---
class CogClient:
def __init__(self):
self.client = httpx.Client(timeout=90)
self.last_response = ""
self.last_memo = {}
self.last_actions = []
self.last_buttons = []
self.last_trace = []
def clear(self):
self.client.post(f"{API}/clear", headers=HEADERS)
time.sleep(0.3)
def send(self, text: str, dashboard: list = None) -> dict:
body = {"text": text}
if dashboard is not None:
body["dashboard"] = dashboard
r = self.client.post(f"{API}/send", json=body, headers=HEADERS)
d = r.json()
self.last_response = d.get("response", "")
self.last_memo = d.get("memorizer", {})
time.sleep(0.5)
self._fetch_trace()
return d
def send_action(self, action: str) -> dict:
# Actions go through /api/send as ACTION: format (since we can't use WS from test)
return self.send(f"ACTION: {action}")
def _fetch_trace(self):
r = self.client.get(f"{API}/trace?last=20", headers=HEADERS)
self.last_trace = r.json().get("lines", [])
# Extract all controls from trace (buttons, tables, labels, displays)
for t in self.last_trace:
if t.get("event") == "controls":
new_controls = t.get("controls", [])
if new_controls:
self.last_actions = new_controls
self.last_buttons = [c for c in new_controls if c.get("type") == "button"]
def get_state(self) -> dict:
r = self.client.get(f"{API}/state", headers=HEADERS)
d = r.json()
self.last_memo = d.get("memorizer", {})
return self.last_memo
def close(self):
self.client.close()
# --- Assertion engine ---
def check_response(response: str, check: str) -> tuple[bool, str]:
"""Evaluate a response assertion. Returns (passed, detail)."""
# contains "foo" or "bar"
m = re.match(r'contains\s+"?(.+?)"?\s*$', check)
if m:
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
for term in terms:
if term.lower() in response.lower():
return True, f"found '{term}'"
return False, f"none of {terms} found in: {response[:100]}"
# matches regex
m = re.match(r'matches\s+(.+)', check)
if m:
pattern = m.group(1).strip()
if re.search(pattern, response):
return True, f"matched /{pattern}/"
return False, f"/{pattern}/ not found in: {response[:100]}"
# not contains "foo" or "bar"
m = re.match(r'not contains\s+"?(.+?)"?\s*$', check)
if m:
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
for term in terms:
if term.lower() in response.lower():
return False, f"found '{term}' but expected NOT to"
return True, f"none of {terms} found (as expected)"
# length > N
m = re.match(r'length\s*>\s*(\d+)', check)
if m:
expected = int(m.group(1))
if len(response) > expected:
return True, f"length {len(response)} > {expected}"
return False, f"length {len(response)} <= {expected}"
return False, f"unknown check: {check}"
def check_actions(actions: list, check: str) -> tuple[bool, str]:
"""Evaluate an actions assertion."""
# length >= N
m = re.match(r'length\s*>=\s*(\d+)', check)
if m:
expected = int(m.group(1))
if len(actions) >= expected:
return True, f"{len(actions)} actions >= {expected}"
return False, f"{len(actions)} actions < {expected}"
# has table
if check.strip() == "has table":
for a in actions:
if isinstance(a, dict) and a.get("type") == "table":
cols = a.get("columns", [])
rows = len(a.get("data", []))
return True, f"table found: {len(cols)} cols, {rows} rows"
return False, f"no table in {len(actions)} controls"
# any action contains "foo" or "bar" — searches buttons only
m = re.match(r'any action contains\s+"?(.+?)"?\s*$', check)
if m:
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
buttons = [a for a in actions if isinstance(a, dict) and a.get("type") == "button"]
action_strs = [json.dumps(a).lower() for a in buttons]
for term in terms:
if any(term.lower() in s for s in action_strs):
return True, f"found '{term}' in actions"
return False, f"none of {terms} found in {len(buttons)} buttons"
return False, f"unknown check: {check}"
def check_state(memo: dict, check: str) -> tuple[bool, str]:
"""Evaluate a memorizer state assertion."""
# field contains "value" or "value2"
m = re.match(r'(\w+)\s+contains\s+"?(.+?)"?\s*$', check)
if m:
field, values_str = m.group(1), m.group(2)
terms = [t.strip().strip('"') for t in values_str.split(" or ")]
actual = memo.get(field, "")
if isinstance(actual, list):
actual_str = " ".join(str(x) for x in actual)
else:
actual_str = str(actual)
for term in terms:
if term.lower() in actual_str.lower():
return True, f"{field}={actual_str[:50]} contains '{term}'"
return False, f"{field}={actual_str[:50]} doesn't contain any of {terms}"
# field is "value" or "value2"
m = re.match(r'(\w+)\s+is\s+"?(.+?)"?\s*$', check)
if m:
field, values_str = m.group(1), m.group(2)
terms = [t.strip().strip('"') for t in values_str.split(" or ")]
actual = str(memo.get(field, ""))
for term in terms:
if actual.lower() == term.lower():
return True, f"{field}={actual}"
return False, f"{field}={actual} not in {terms}"
# facts any contains "value"
m = re.match(r'facts\s+any\s+contains\s+"?(.+?)"?\s*$', check)
if m:
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
facts = memo.get("facts", [])
facts_str = " ".join(facts).lower()
for term in terms:
if term.lower() in facts_str:
return True, f"found '{term}' in facts"
return False, f"none of {terms} found in facts: {facts}"
return False, f"unknown check: {check}"
def check_trace(trace: list, check: str) -> tuple[bool, str]:
"""Evaluate a trace assertion. Checks HUD events from last request."""
# input.analysis.FIELD is "VALUE"
m = re.match(r'input\.analysis\.(\w+)\s+is\s+"?(.+?)"?\s*$', check)
if m:
field, expected = m.group(1), m.group(2)
terms = [t.strip().strip('"') for t in expected.split(" or ")]
for t in trace:
if t.get("node") == "input" and t.get("event") == "perceived":
analysis = t.get("analysis", {})
actual = str(analysis.get(field, ""))
for term in terms:
if actual.lower() == term.lower():
return True, f"input.analysis.{field}={actual}"
return False, f"input.analysis.{field}={actual}, expected one of {terms}"
return False, f"no input perceived event in trace"
# has tool_call TOOL_NAME — checks if Thinker called a specific function tool
m = re.match(r'has\s+tool_call\s+(\w+)', check)
if m:
tool_name = m.group(1)
for t in trace:
# Check machine_created/destroyed/etc events that are emitted by UI node
if t.get("event") in ("machine_created", "machine_destroyed", "machine_reset",
"machine_state_added") and tool_name in t.get("event", ""):
return True, f"found machine event for '{tool_name}'"
# Check for the tool name in the event data
if t.get("event") == "machine_created" and tool_name == "create_machine":
return True, f"found create_machine via machine_created event"
if t.get("event") == "machine_state_added" and tool_name == "add_state":
return True, f"found add_state via machine_state_added event"
if t.get("event") == "machine_reset" and tool_name == "reset_machine":
return True, f"found reset_machine via machine_reset event"
if t.get("event") == "machine_destroyed" and tool_name == "destroy_machine":
return True, f"found destroy_machine via machine_destroyed event"
return False, f"no tool_call '{tool_name}' in trace"
# machine_created id="NAV" — checks for specific machine creation
m = re.match(r'machine_created\s+id="(\w+)"', check)
if m:
expected_id = m.group(1)
for t in trace:
if t.get("event") == "machine_created" and t.get("id") == expected_id:
return True, f"machine '{expected_id}' created"
return False, f"no machine_created event with id='{expected_id}'"
# has EVENT_NAME
m = re.match(r'has\s+(\w+)', check)
if m:
event_name = m.group(1)
for t in trace:
if t.get("event") == event_name:
return True, f"found event '{event_name}'"
return False, f"no '{event_name}' event in trace"
# no EVENT_NAME
m = re.match(r'no\s+(\w+)', check)
if m:
event_name = m.group(1)
for t in trace:
if t.get("event") == event_name:
return False, f"found unexpected event '{event_name}'"
return True, f"no '{event_name}' event (as expected)"
return False, f"unknown trace check: {check}"
# --- Runner ---
@dataclass
class StepResult:
step: str
check: str
status: str # PASS, FAIL, SKIP
detail: str = ""
class CogTestRunner:
def __init__(self):
self.client = CogClient()
def run(self, testcase: dict) -> list[dict]:
results = []
for step in testcase["steps"]:
step_results = self._run_step(step)
results.extend(step_results)
self.client.close()
return results
def _run_step(self, step: dict) -> list[dict]:
results = []
step_name = step["name"]
for cmd in step["commands"]:
if cmd["type"] == "clear":
self.client.clear()
results.append({"step": step_name, "check": "clear", "status": "PASS", "detail": "cleared"})
elif cmd["type"] == "send":
try:
self.client.send(cmd["text"], dashboard=cmd.get("dashboard"))
results.append({"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "PASS",
"detail": f"response: {self.client.last_response[:80]}"})
except Exception as e:
results.append({"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "FAIL",
"detail": str(e)})
elif cmd["type"] == "action":
try:
self.client.send_action(cmd["action"])
results.append({"step": step_name, "check": f"action: {cmd['action']}", "status": "PASS",
"detail": f"response: {self.client.last_response[:80]}"})
except Exception as e:
results.append({"step": step_name, "check": f"action: {cmd['action']}", "status": "FAIL",
"detail": str(e)})
elif cmd["type"] == "action_match":
# Find first button matching pattern
pattern = cmd["pattern"].lower()
matched = None
for a in self.client.last_buttons:
if pattern in a.get("action", "").lower() or pattern in a.get("label", "").lower():
matched = a["action"]
break
if matched:
try:
self.client.send_action(matched)
results.append({"step": step_name, "check": f"action: {matched}", "status": "PASS",
"detail": f"response: {self.client.last_response[:80]}"})
except Exception as e:
results.append({"step": step_name, "check": f"action: {matched}", "status": "FAIL",
"detail": str(e)})
else:
results.append({"step": step_name, "check": f"action matching '{pattern}'", "status": "FAIL",
"detail": f"no action matching '{pattern}' in {[a.get('action') for a in self.client.last_actions]}"})
elif cmd["type"] == "expect_response":
passed, detail = check_response(self.client.last_response, cmd["check"])
results.append({"step": step_name, "check": f"response: {cmd['check']}",
"status": "PASS" if passed else "FAIL", "detail": detail})
elif cmd["type"] == "expect_actions":
passed, detail = check_actions(self.client.last_actions, cmd["check"])
results.append({"step": step_name, "check": f"actions: {cmd['check']}",
"status": "PASS" if passed else "FAIL", "detail": detail})
elif cmd["type"] == "expect_state":
self.client.get_state()
passed, detail = check_state(self.client.last_memo, cmd["check"])
results.append({"step": step_name, "check": f"state: {cmd['check']}",
"status": "PASS" if passed else "FAIL", "detail": detail})
elif cmd["type"] == "expect_trace":
passed, detail = check_trace(self.client.last_trace, cmd["check"])
results.append({"step": step_name, "check": f"trace: {cmd['check']}",
"status": "PASS" if passed else "FAIL", "detail": detail})
return results
# --- Live status push ---
def _push_status(event: str, **kwargs):
"""Push test status to the API for frontend display."""
try:
httpx.post(f"{API}/test/status", json={"event": event, **kwargs},
headers=HEADERS, timeout=5)
except Exception:
pass # Don't fail tests if push fails
# --- Standalone runner ---
def run_standalone(paths: list[Path] = None):
if not paths:
paths = sorted(Path("testcases").glob("*.md"))
all_results = {}
for path in paths:
tc = parse_testcase(path)
print(f"\n{'='*60}")
print(f" {tc['name']}")
print(f"{'='*60}")
_push_status("suite_start", suite=tc["name"])
runner = CogTestRunner()
results = runner.run(tc)
all_results[tc["name"]] = results
for r in results:
icon = "OK" if r["status"] == "PASS" else "FAIL" if r["status"] == "FAIL" else "SKIP"
print(f" {icon} [{r['step']}] {r['check']}")
if r["detail"]:
print(f" {r['detail']}")
_push_status("step_result", suite=tc["name"], result=r)
passed = sum(1 for r in results if r["status"] == "PASS")
failed = sum(1 for r in results if r["status"] == "FAIL")
print(f"\n {passed} passed, {failed} failed")
_push_status("suite_end", suite=tc["name"], passed=passed, failed=failed)
# Summary
print(f"\n{'='*60}")
total_pass = sum(sum(1 for r in results if r["status"] == "PASS") for results in all_results.values())
total_fail = sum(sum(1 for r in results if r["status"] == "FAIL") for results in all_results.values())
print(f" TOTAL: {total_pass} passed, {total_fail} failed")
print(f"{'='*60}")
# Write results JSON for web UI
output = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"testcases": {name: results for name, results in all_results.items()},
"summary": {"passed": total_pass, "failed": total_fail},
}
results_path = Path("testcases/results.json")
results_path.write_text(json.dumps(output, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"\n Results written to {results_path}")
return total_fail == 0
if __name__ == "__main__":
paths = [Path(p) for p in sys.argv[1:]] if len(sys.argv) > 1 else None
success = run_standalone(paths)
sys.exit(0 if success else 1)