diff --git a/agent/api.py b/agent/api.py
index 24fbefa..eeb100f 100644
--- a/agent/api.py
+++ b/agent/api.py
@@ -277,7 +277,7 @@ def register_routes(app):
"note": "New sessions will use this graph. Existing session unchanged."}
# --- Test status (real-time) ---
- _test_status = {"running": False, "current": "", "results": [], "last_green": None, "last_red": None}
+ _test_status = {"running": False, "current": "", "results": [], "last_green": None, "last_red": None, "total_expected": 0}
@app.post("/api/test/status")
async def post_test_status(body: dict, user=Depends(require_auth)):
@@ -286,7 +286,12 @@ def register_routes(app):
if event == "suite_start":
_test_status["running"] = True
_test_status["current"] = body.get("suite", "")
- _test_status["results"] = []
+ if body.get("count"):
+ # First suite_start with count resets everything
+ _test_status["results"] = []
+ _test_status["total_expected"] = body["count"]
+ _test_status["last_green"] = None
+ _test_status["last_red"] = None
elif event == "step_result":
result = body.get("result", {})
_test_status["results"].append(result)
@@ -300,9 +305,10 @@ def register_routes(app):
_test_status["current"] = ""
# Broadcast to frontend via SSE + WS
_broadcast_sse({"type": "test_status", **_test_status})
- if _active_runtime:
+ runtime = _ensure_runtime()
+ if runtime.sink.ws:
try:
- await _active_runtime.sink.send_hud({"type": "test_status", **_test_status})
+ await runtime.sink.ws.send_text(json.dumps({"type": "test_status", **_test_status}))
except Exception:
pass
return {"ok": True}
diff --git a/static/app.js b/static/app.js
index b6d9d5c..6060145 100644
--- a/static/app.js
+++ b/static/app.js
@@ -575,22 +575,27 @@ function updateTestStatus(data) {
const results = data.results || [];
const pass = results.filter(r => r.status === 'PASS').length;
const fail = results.filter(r => r.status === 'FAIL').length;
- const total = results.length;
+ const done = results.length;
+ const expected = data.total_expected || done;
+
+ const totalMs = results.reduce((s, r) => s + (r.elapsed_ms || 0), 0);
+ const durStr = totalMs > 1000 ? `${(totalMs / 1000).toFixed(1)}s` : `${totalMs}ms`;
if (data.running) {
const current = data.current || '';
- el.innerHTML = `TESTING`
- + `${pass}/${total}`
- + (fail ? `${fail}F` : '')
- + `${esc(current)}`;
- } else if (total > 0) {
- const lastGreen = data.last_green;
- const lastRed = data.last_red;
- let parts = [`TESTS`,
- `${pass}P`,
- fail ? `${fail}F` : ''];
- if (lastRed) parts.push(`last red: ${esc((lastRed.step || '') + ' ' + (lastRed.check || ''))}`);
- el.innerHTML = parts.filter(Boolean).join(' ');
+ const lastMs = results.length ? results[results.length - 1].elapsed_ms || 0 : 0;
+ const lastStr = lastMs > 1000 ? `${(lastMs / 1000).toFixed(1)}s` : `${lastMs}ms`;
+ el.innerHTML = `TESTING `
+ + `${done}/${expected}`
+ + (fail ? ` ${fail}F` : '')
+ + ` ${durStr}`
+ + ` ${esc(current)}`
+ + ` ${lastStr}`;
+ } else if (done > 0) {
+ const allGreen = fail === 0;
+ el.innerHTML = `${pass}/${expected}`
+ + (fail ? ` ${fail} failed` : ' all green')
+ + ` ${durStr}`;
}
}
diff --git a/test_nodes/harness.py b/test_nodes/harness.py
index 19e34db..93a7858 100644
--- a/test_nodes/harness.py
+++ b/test_nodes/harness.py
@@ -2,11 +2,14 @@
import asyncio
import json
+import os
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
+import httpx
+
# Add parent to path so we can import agent
sys.path.insert(0, str(Path(__file__).parent.parent))
@@ -93,30 +96,71 @@ def run_async(coro):
class NodeTestRunner:
- """Collects and runs node-level tests."""
- def __init__(self):
+ """Collects and runs node-level tests. Optionally reports to frontend."""
+
+ def __init__(self, report_url: str = None, token: str = None):
self.results: list[NodeTestResult] = []
+ self.report_url = report_url or os.environ.get("COG_TEST_URL")
+ self.token = token or os.environ.get("COG_TEST_TOKEN", "")
+ self._suite = ""
+
+ def _report(self, event: str, **data):
+ """POST test status to frontend. Fire-and-forget, never blocks tests."""
+ if not self.report_url:
+ return
+ try:
+ httpx.post(
+ f"{self.report_url}/api/test/status",
+ json={"event": event, **data},
+ headers={"Authorization": f"Bearer {self.token}"},
+ timeout=3,
+ )
+ except Exception:
+ pass
+
+ def start_suite(self, name: str, count: int = 0):
+ """Call before a group of tests."""
+ self._suite = name
+ self._report("suite_start", suite=name, count=count)
+
+ def end_suite(self):
+ """Call after a group of tests."""
+ self._report("suite_end")
+ self._suite = ""
def test(self, name: str, coro):
"""Run a single async test, catch and record result."""
+ full_name = f"{self._suite}: {name}" if self._suite else name
t0 = time.time()
try:
run_async(coro)
elapsed = int((time.time() - t0) * 1000)
- self.results.append(NodeTestResult(name=name, passed=True, elapsed_ms=elapsed))
+ self.results.append(NodeTestResult(name=full_name, passed=True, elapsed_ms=elapsed))
print(f" OK {name} ({elapsed}ms)")
+ self._report("step_result", result={
+ "step": full_name, "check": name, "status": "PASS",
+ "elapsed_ms": elapsed,
+ })
except AssertionError as e:
elapsed = int((time.time() - t0) * 1000)
- self.results.append(NodeTestResult(name=name, passed=False,
+ self.results.append(NodeTestResult(name=full_name, passed=False,
detail=str(e), elapsed_ms=elapsed))
print(f" FAIL {name} ({elapsed}ms)")
print(f" {e}")
+ self._report("step_result", result={
+ "step": full_name, "check": name, "status": "FAIL",
+ "detail": str(e)[:200], "elapsed_ms": elapsed,
+ })
except Exception as e:
elapsed = int((time.time() - t0) * 1000)
- self.results.append(NodeTestResult(name=name, passed=False,
+ self.results.append(NodeTestResult(name=full_name, passed=False,
detail=f"ERROR: {e}", elapsed_ms=elapsed))
print(f" ERR {name} ({elapsed}ms)")
print(f" {e}")
+ self._report("step_result", result={
+ "step": full_name, "check": name, "status": "FAIL",
+ "detail": f"ERROR: {str(e)[:200]}", "elapsed_ms": elapsed,
+ })
def summary(self) -> tuple[int, int]:
passed = sum(1 for r in self.results if r.passed)
diff --git a/test_nodes/run_all.py b/test_nodes/run_all.py
index 7d6ef26..7d54a0a 100644
--- a/test_nodes/run_all.py
+++ b/test_nodes/run_all.py
@@ -1,4 +1,9 @@
-"""Run all node-level unit tests."""
+"""Run all node-level unit tests.
+
+Usage:
+ python run_all.py # local only
+ python run_all.py --report https://cog.loop42.de # + push to frontend
+"""
import sys
import time
@@ -19,14 +24,31 @@ import test_director_v2
import test_thinker_v2
import test_interpreter_v1
-runner = NodeTestRunner()
+# Count total tests by inspecting test modules
+_test_modules = [test_input_v1, test_thinker_v1, test_memorizer_v1, test_director_v1,
+ test_director_v2, test_thinker_v2, test_interpreter_v1]
+TOTAL_TESTS = sum(1 for mod in _test_modules for name in dir(mod) if name.startswith("test_"))
+
+# Parse --report URL and --token from CLI
+report_url = None
+token = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g"
+for i, arg in enumerate(sys.argv[1:], 1):
+ if arg == "--report" and i < len(sys.argv):
+ report_url = sys.argv[i + 1] if i + 1 <= len(sys.argv) else None
+ elif arg == "--token" and i < len(sys.argv):
+ token = sys.argv[i + 1] if i + 1 <= len(sys.argv) else token
+
+runner = NodeTestRunner(report_url=report_url, token=token)
t0 = time.time()
print("\n" + "=" * 60)
print(" Node-Level Unit Tests")
+if report_url:
+ print(f" Reporting to: {report_url}")
print("=" * 60)
# Input v1
+runner.start_suite("InputNode v1", count=TOTAL_TESTS)
print("\n--- InputNode v1 ---")
runner.test("greeting is social+trivial", test_input_v1.test_greeting_is_social_trivial())
runner.test("german detected", test_input_v1.test_german_detected())
@@ -34,8 +56,10 @@ runner.test("request classified", test_input_v1.test_request_classified())
runner.test("frustrated tone", test_input_v1.test_frustrated_tone())
runner.test("emits perceived HUD", test_input_v1.test_emits_perceived_hud())
runner.test("source text preserved", test_input_v1.test_source_text_preserved())
+runner.end_suite()
# Thinker v1
+runner.start_suite("ThinkerNode v1")
print("\n--- ThinkerNode v1 ---")
runner.test("simple response", test_thinker_v1.test_simple_response())
runner.test("no code in response", test_thinker_v1.test_no_code_in_response())
@@ -43,24 +67,30 @@ runner.test("emits tool calls for buttons", test_thinker_v1.test_emits_tool_call
runner.test("query_db called for DB question", test_thinker_v1.test_query_db_called())
runner.test("S3* audit mechanism", test_thinker_v1.test_s3_audit_code_without_tools())
runner.test("decided HUD emitted", test_thinker_v1.test_decided_hud_emitted())
+runner.end_suite()
# Memorizer v1
+runner.start_suite("MemorizerNode v1")
print("\n--- MemorizerNode v1 ---")
runner.test("extracts mood", test_memorizer_v1.test_extracts_mood())
runner.test("extracts language", test_memorizer_v1.test_extracts_language())
runner.test("facts preserved across updates", test_memorizer_v1.test_facts_preserved_across_updates())
runner.test("topic tracked", test_memorizer_v1.test_topic_tracked())
runner.test("emits updated HUD", test_memorizer_v1.test_emits_updated_hud())
+runner.end_suite()
# Director v1
+runner.start_suite("DirectorNode v1")
print("\n--- DirectorNode v1 ---")
runner.test("detects casual mode", test_director_v1.test_detects_casual_mode())
runner.test("detects frustrated style", test_director_v1.test_detects_frustrated_style())
runner.test("produces plan for complex request", test_director_v1.test_produces_plan_for_complex_request())
runner.test("directive has required fields", test_director_v1.test_directive_has_required_fields())
runner.test("context line includes plan", test_director_v1.test_context_line_includes_plan())
+runner.end_suite()
# Director v2
+runner.start_suite("DirectorNode v2")
print("\n--- DirectorNode v2 ---")
runner.test("returns DirectorPlan", test_director_v2.test_returns_director_plan())
runner.test("direct response for simple", test_director_v2.test_direct_response_for_simple())
@@ -69,8 +99,10 @@ runner.test("emits HUD events", test_director_v2.test_emits_hud_events())
runner.test("still updates style directive", test_director_v2.test_still_updates_style_directive())
runner.test("history included in context", test_director_v2.test_history_included_in_context())
runner.test("bad JSON returns fallback", test_director_v2.test_bad_json_returns_fallback())
+runner.end_suite()
# Thinker v2
+runner.start_suite("ThinkerNode v2")
print("\n--- ThinkerNode v2 ---")
runner.test("executes emit_actions", test_thinker_v2.test_executes_emit_actions())
runner.test("executes set_state", test_thinker_v2.test_executes_set_state())
@@ -80,8 +112,10 @@ runner.test("no autonomous tool calls", test_thinker_v2.test_no_autonomous_tool_
runner.test("multi tool sequence", test_thinker_v2.test_multi_tool_sequence())
runner.test("emits HUD per tool", test_thinker_v2.test_emits_hud_per_tool())
runner.test("create_machine tool", test_thinker_v2.test_create_machine_tool())
+runner.end_suite()
# Interpreter v1
+runner.start_suite("InterpreterNode v1")
print("\n--- InterpreterNode v1 ---")
runner.test("summarizes DB result", test_interpreter_v1.test_summarizes_db_result())
runner.test("handles empty result", test_interpreter_v1.test_handles_empty_result())
@@ -90,6 +124,7 @@ runner.test("no hallucination guard", test_interpreter_v1.test_no_hallucination_
runner.test("emits HUD", test_interpreter_v1.test_emits_hud())
runner.test("bad JSON fallback", test_interpreter_v1.test_bad_json_fallback())
runner.test("python tool output", test_interpreter_v1.test_python_tool_output())
+runner.end_suite()
# Summary
elapsed = time.time() - t0