RED->GREEN->REFACTOR cycle: - UI node has state store (key-value), action bindings (op/var), and local action handlers (inc/dec/set/toggle — no LLM round-trip) - Thinker self-model: knows its environment, that ACTIONS create real buttons, that UI handles state locally. Emits var/op payload for stateful actions. - Thinker's context includes UI state so it can report current values - /api/clear resets UI state, bindings, and controls - Test runner: action_match for fuzzy action names, persistent actions across steps, _stream_text restored - Counter test: 16/16 passed (create, read, inc, inc, dec, verify) - Pub test: 20/20 passed (conversation, language switch, tool use, mood) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
402 lines
15 KiB
Python
402 lines
15 KiB
Python
"""
|
|
Cog Runtime Test Runner — parses testcases/*.md and executes against live API.
|
|
|
|
Usage:
|
|
pytest testcases/ -v # run all testcases
|
|
pytest testcases/counter_state.md -v # run one testcase
|
|
python runtime_test.py # standalone run all
|
|
python runtime_test.py testcases/pub_conversation.md # standalone run one
|
|
"""
|
|
|
|
import httpx
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
# Fix Windows console encoding
|
|
if sys.platform == "win32":
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
|
|
API = "https://cog.loop42.de/api"
|
|
TOKEN = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g"
|
|
HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
|
|
|
|
|
|
# --- Markdown parser ---
|
|
|
|
def parse_testcase(path: Path) -> dict:
|
|
"""Parse a testcase markdown file into structured steps."""
|
|
text = path.read_text(encoding="utf-8")
|
|
lines = text.split("\n")
|
|
|
|
tc = {"name": "", "steps": [], "file": str(path)}
|
|
current_step = None
|
|
|
|
in_setup = False
|
|
for line in lines:
|
|
line_stripped = line.strip()
|
|
|
|
# Title
|
|
if line_stripped.startswith("# ") and not tc["name"]:
|
|
tc["name"] = line_stripped[2:].strip()
|
|
continue
|
|
|
|
# Setup section
|
|
if line_stripped == "## Setup":
|
|
in_setup = True
|
|
current_step = {"name": "Setup", "commands": []}
|
|
continue
|
|
|
|
# End setup on next ## or ###
|
|
if line_stripped.startswith("## ") and in_setup:
|
|
if current_step and current_step["commands"]:
|
|
tc["steps"].insert(0, current_step)
|
|
in_setup = False
|
|
current_step = None
|
|
|
|
# Step header
|
|
if line_stripped.startswith("### "):
|
|
if in_setup and current_step and current_step["commands"]:
|
|
tc["steps"].insert(0, current_step)
|
|
in_setup = False
|
|
elif current_step and not in_setup:
|
|
tc["steps"].append(current_step)
|
|
step_text = line_stripped[4:].strip()
|
|
m = re.match(r"\d+\.\s*(.*)", step_text)
|
|
current_step = {"name": m.group(1) if m else step_text, "commands": []}
|
|
continue
|
|
|
|
# Commands within a step or setup
|
|
if (current_step or in_setup) and line_stripped.startswith("- "):
|
|
cmd_text = line_stripped[2:].strip()
|
|
cmd = _parse_command(cmd_text)
|
|
if cmd and current_step:
|
|
current_step["commands"].append(cmd)
|
|
|
|
if current_step:
|
|
tc["steps"].append(current_step)
|
|
|
|
return tc
|
|
|
|
|
|
def _parse_command(text: str) -> dict | None:
|
|
"""Parse a single command line like 'send: hello' or 'expect_response: contains foo'."""
|
|
# send: message
|
|
if text.startswith("send:"):
|
|
return {"type": "send", "text": text[5:].strip()}
|
|
|
|
# action: action_name OR action: first matching "pattern"
|
|
if text.startswith("action:"):
|
|
val = text[7:].strip()
|
|
m = re.match(r'first matching "(.+)"', val)
|
|
if m:
|
|
return {"type": "action_match", "pattern": m.group(1)}
|
|
return {"type": "action", "action": val}
|
|
|
|
# expect_response: contains "foo"
|
|
if text.startswith("expect_response:"):
|
|
return {"type": "expect_response", "check": text[16:].strip()}
|
|
|
|
# expect_actions: length >= 2
|
|
if text.startswith("expect_actions:"):
|
|
return {"type": "expect_actions", "check": text[15:].strip()}
|
|
|
|
# expect_state: field contains/is "value"
|
|
if text.startswith("expect_state:"):
|
|
return {"type": "expect_state", "check": text[13:].strip()}
|
|
|
|
# clear history
|
|
if text == "clear history":
|
|
return {"type": "clear"}
|
|
|
|
return None
|
|
|
|
|
|
# --- API client ---
|
|
|
|
class CogClient:
|
|
def __init__(self):
|
|
self.client = httpx.Client(timeout=30)
|
|
self.last_response = ""
|
|
self.last_memo = {}
|
|
self.last_actions = []
|
|
self.last_trace = []
|
|
|
|
def clear(self):
|
|
self.client.post(f"{API}/clear", headers=HEADERS)
|
|
time.sleep(0.3)
|
|
|
|
def send(self, text: str) -> dict:
|
|
r = self.client.post(f"{API}/send", json={"text": text}, headers=HEADERS)
|
|
d = r.json()
|
|
self.last_response = d.get("response", "")
|
|
self.last_memo = d.get("memorizer", {})
|
|
time.sleep(0.5)
|
|
self._fetch_trace()
|
|
return d
|
|
|
|
def send_action(self, action: str) -> dict:
|
|
# Actions go through /api/send as ACTION: format (since we can't use WS from test)
|
|
return self.send(f"ACTION: {action}")
|
|
|
|
def _fetch_trace(self):
|
|
r = self.client.get(f"{API}/trace?last=10", headers=HEADERS)
|
|
self.last_trace = r.json().get("lines", [])
|
|
# Extract actions from trace — accumulate, don't replace
|
|
for t in self.last_trace:
|
|
if t.get("event") == "controls":
|
|
new_actions = [c for c in t.get("controls", []) if c.get("type") == "button"]
|
|
if new_actions:
|
|
self.last_actions = new_actions
|
|
|
|
def get_state(self) -> dict:
|
|
r = self.client.get(f"{API}/state", headers=HEADERS)
|
|
d = r.json()
|
|
self.last_memo = d.get("memorizer", {})
|
|
return self.last_memo
|
|
|
|
def close(self):
|
|
self.client.close()
|
|
|
|
|
|
# --- Assertion engine ---
|
|
|
|
def check_response(response: str, check: str) -> tuple[bool, str]:
|
|
"""Evaluate a response assertion. Returns (passed, detail)."""
|
|
# contains "foo" or "bar"
|
|
m = re.match(r'contains\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
|
|
for term in terms:
|
|
if term.lower() in response.lower():
|
|
return True, f"found '{term}'"
|
|
return False, f"none of {terms} found in: {response[:100]}"
|
|
|
|
# matches regex
|
|
m = re.match(r'matches\s+(.+)', check)
|
|
if m:
|
|
pattern = m.group(1).strip()
|
|
if re.search(pattern, response):
|
|
return True, f"matched /{pattern}/"
|
|
return False, f"/{pattern}/ not found in: {response[:100]}"
|
|
|
|
# length > N
|
|
m = re.match(r'length\s*>\s*(\d+)', check)
|
|
if m:
|
|
expected = int(m.group(1))
|
|
if len(response) > expected:
|
|
return True, f"length {len(response)} > {expected}"
|
|
return False, f"length {len(response)} <= {expected}"
|
|
|
|
return False, f"unknown check: {check}"
|
|
|
|
|
|
def check_actions(actions: list, check: str) -> tuple[bool, str]:
|
|
"""Evaluate an actions assertion."""
|
|
# length >= N
|
|
m = re.match(r'length\s*>=\s*(\d+)', check)
|
|
if m:
|
|
expected = int(m.group(1))
|
|
if len(actions) >= expected:
|
|
return True, f"{len(actions)} actions >= {expected}"
|
|
return False, f"{len(actions)} actions < {expected}"
|
|
|
|
# any action contains "foo" or "bar"
|
|
m = re.match(r'any action contains\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
|
|
action_strs = [json.dumps(a).lower() for a in actions]
|
|
for term in terms:
|
|
if any(term.lower() in s for s in action_strs):
|
|
return True, f"found '{term}' in actions"
|
|
return False, f"none of {terms} found in {len(actions)} actions"
|
|
|
|
return False, f"unknown check: {check}"
|
|
|
|
|
|
def check_state(memo: dict, check: str) -> tuple[bool, str]:
|
|
"""Evaluate a memorizer state assertion."""
|
|
# field contains "value" or "value2"
|
|
m = re.match(r'(\w+)\s+contains\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
field, values_str = m.group(1), m.group(2)
|
|
terms = [t.strip().strip('"') for t in values_str.split(" or ")]
|
|
actual = memo.get(field, "")
|
|
if isinstance(actual, list):
|
|
actual_str = " ".join(str(x) for x in actual)
|
|
else:
|
|
actual_str = str(actual)
|
|
for term in terms:
|
|
if term.lower() in actual_str.lower():
|
|
return True, f"{field}={actual_str[:50]} contains '{term}'"
|
|
return False, f"{field}={actual_str[:50]} doesn't contain any of {terms}"
|
|
|
|
# field is "value" or "value2"
|
|
m = re.match(r'(\w+)\s+is\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
field, values_str = m.group(1), m.group(2)
|
|
terms = [t.strip().strip('"') for t in values_str.split(" or ")]
|
|
actual = str(memo.get(field, ""))
|
|
for term in terms:
|
|
if actual.lower() == term.lower():
|
|
return True, f"{field}={actual}"
|
|
return False, f"{field}={actual} not in {terms}"
|
|
|
|
# facts any contains "value"
|
|
m = re.match(r'facts\s+any\s+contains\s+"?(.+?)"?\s*$', check)
|
|
if m:
|
|
terms = [t.strip().strip('"') for t in m.group(1).split(" or ")]
|
|
facts = memo.get("facts", [])
|
|
facts_str = " ".join(facts).lower()
|
|
for term in terms:
|
|
if term.lower() in facts_str:
|
|
return True, f"found '{term}' in facts"
|
|
return False, f"none of {terms} found in facts: {facts}"
|
|
|
|
return False, f"unknown check: {check}"
|
|
|
|
|
|
# --- Runner ---
|
|
|
|
@dataclass
|
|
class StepResult:
|
|
step: str
|
|
check: str
|
|
status: str # PASS, FAIL, SKIP
|
|
detail: str = ""
|
|
|
|
|
|
class CogTestRunner:
|
|
def __init__(self):
|
|
self.client = CogClient()
|
|
|
|
def run(self, testcase: dict) -> list[dict]:
|
|
results = []
|
|
for step in testcase["steps"]:
|
|
step_results = self._run_step(step)
|
|
results.extend(step_results)
|
|
self.client.close()
|
|
return results
|
|
|
|
def _run_step(self, step: dict) -> list[dict]:
|
|
results = []
|
|
step_name = step["name"]
|
|
|
|
for cmd in step["commands"]:
|
|
if cmd["type"] == "clear":
|
|
self.client.clear()
|
|
results.append({"step": step_name, "check": "clear", "status": "PASS", "detail": "cleared"})
|
|
|
|
elif cmd["type"] == "send":
|
|
try:
|
|
self.client.send(cmd["text"])
|
|
results.append({"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "PASS",
|
|
"detail": f"response: {self.client.last_response[:80]}"})
|
|
except Exception as e:
|
|
results.append({"step": step_name, "check": f"send: {cmd['text'][:40]}", "status": "FAIL",
|
|
"detail": str(e)})
|
|
|
|
elif cmd["type"] == "action":
|
|
try:
|
|
self.client.send_action(cmd["action"])
|
|
results.append({"step": step_name, "check": f"action: {cmd['action']}", "status": "PASS",
|
|
"detail": f"response: {self.client.last_response[:80]}"})
|
|
except Exception as e:
|
|
results.append({"step": step_name, "check": f"action: {cmd['action']}", "status": "FAIL",
|
|
"detail": str(e)})
|
|
|
|
elif cmd["type"] == "action_match":
|
|
# Find first action matching pattern in last_actions
|
|
pattern = cmd["pattern"].lower()
|
|
matched = None
|
|
for a in self.client.last_actions:
|
|
if pattern in a.get("action", "").lower() or pattern in a.get("label", "").lower():
|
|
matched = a["action"]
|
|
break
|
|
if matched:
|
|
try:
|
|
self.client.send_action(matched)
|
|
results.append({"step": step_name, "check": f"action: {matched}", "status": "PASS",
|
|
"detail": f"response: {self.client.last_response[:80]}"})
|
|
except Exception as e:
|
|
results.append({"step": step_name, "check": f"action: {matched}", "status": "FAIL",
|
|
"detail": str(e)})
|
|
else:
|
|
results.append({"step": step_name, "check": f"action matching '{pattern}'", "status": "FAIL",
|
|
"detail": f"no action matching '{pattern}' in {[a.get('action') for a in self.client.last_actions]}"})
|
|
|
|
elif cmd["type"] == "expect_response":
|
|
passed, detail = check_response(self.client.last_response, cmd["check"])
|
|
results.append({"step": step_name, "check": f"response: {cmd['check']}",
|
|
"status": "PASS" if passed else "FAIL", "detail": detail})
|
|
|
|
elif cmd["type"] == "expect_actions":
|
|
passed, detail = check_actions(self.client.last_actions, cmd["check"])
|
|
results.append({"step": step_name, "check": f"actions: {cmd['check']}",
|
|
"status": "PASS" if passed else "FAIL", "detail": detail})
|
|
|
|
elif cmd["type"] == "expect_state":
|
|
self.client.get_state()
|
|
passed, detail = check_state(self.client.last_memo, cmd["check"])
|
|
results.append({"step": step_name, "check": f"state: {cmd['check']}",
|
|
"status": "PASS" if passed else "FAIL", "detail": detail})
|
|
|
|
return results
|
|
|
|
|
|
# --- Standalone runner ---
|
|
|
|
def run_standalone(paths: list[Path] = None):
|
|
if not paths:
|
|
paths = sorted(Path("testcases").glob("*.md"))
|
|
|
|
all_results = {}
|
|
for path in paths:
|
|
tc = parse_testcase(path)
|
|
print(f"\n{'='*60}")
|
|
print(f" {tc['name']}")
|
|
print(f"{'='*60}")
|
|
|
|
runner = CogTestRunner()
|
|
results = runner.run(tc)
|
|
all_results[tc["name"]] = results
|
|
|
|
for r in results:
|
|
icon = "OK" if r["status"] == "PASS" else "FAIL" if r["status"] == "FAIL" else "SKIP"
|
|
print(f" {icon} [{r['step']}] {r['check']}")
|
|
if r["detail"]:
|
|
print(f" {r['detail']}")
|
|
|
|
passed = sum(1 for r in results if r["status"] == "PASS")
|
|
failed = sum(1 for r in results if r["status"] == "FAIL")
|
|
print(f"\n {passed} passed, {failed} failed")
|
|
|
|
# Summary
|
|
print(f"\n{'='*60}")
|
|
total_pass = sum(sum(1 for r in results if r["status"] == "PASS") for results in all_results.values())
|
|
total_fail = sum(sum(1 for r in results if r["status"] == "FAIL") for results in all_results.values())
|
|
print(f" TOTAL: {total_pass} passed, {total_fail} failed")
|
|
print(f"{'='*60}")
|
|
|
|
# Write results JSON for web UI
|
|
output = {
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"testcases": {name: results for name, results in all_results.items()},
|
|
"summary": {"passed": total_pass, "failed": total_fail},
|
|
}
|
|
results_path = Path("testcases/results.json")
|
|
results_path.write_text(json.dumps(output, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
print(f"\n Results written to {results_path}")
|
|
|
|
return total_fail == 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
paths = [Path(p) for p in sys.argv[1:]] if len(sys.argv) > 1 else None
|
|
success = run_standalone(paths)
|
|
sys.exit(0 if success else 1)
|