Nico a2bc6347fc v0.13.0: Graph engine, versioned nodes, S3* audit, DB tools, Cytoscape
Architecture:
- Graph engine (engine.py) loads graph definitions, instantiates nodes
- Versioned nodes: input_v1, thinker_v1, output_v1, memorizer_v1, director_v1
- NODE_REGISTRY for dynamic node lookup by name
- Graph API: /api/graph/active, /api/graph/list, /api/graph/switch
- Graph definition: graphs/v1_current.py (7 nodes, 13 edges, 3 edge types)

S3* Audit system:
- Workspace mismatch detection (server vs browser controls)
- Code-without-tools retry (Thinker wrote code but no tool calls)
- Intent-without-action retry (request intent but Thinker only produced text)
- Dashboard feedback: browser sends workspace state on every message
- Sensor continuous comparison on 5s tick

State machines:
- create_machine / add_state / reset_machine / destroy_machine via function calling
- Local transitions (go:) resolve without LLM round-trip
- Button persistence across turns

Database tools:
- query_db tool via pymysql to MariaDB K3s pod (eras2_production)
- Table rendering in workspace (tab-separated parsing)
- Director pre-planning with Opus for complex data requests
- Error retry with corrected SQL

Frontend:
- Cytoscape.js pipeline graph with real-time node animations
- Overlay scrollbars (CSS-only, no reflow)
- Tool call/result trace events
- S3* audit events in trace

Testing:
- 167 integration tests (11 test suites)
- 22 node-level unit tests (test_nodes/)
- Three test levels: node unit, graph integration, scenario

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 00:18:45 +01:00

125 lines
3.8 KiB
Python

"""Shared test harness for node-level tests."""
import asyncio
import json
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
# Add parent to path so we can import agent
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent.types import Envelope, Command, InputAnalysis, ThoughtResult
class HudCapture:
"""Mock send_hud that captures all HUD events for inspection."""
def __init__(self):
self.events: list[dict] = []
async def __call__(self, data: dict):
self.events.append(data)
def find(self, event: str) -> list[dict]:
return [e for e in self.events if e.get("event") == event]
def has(self, event: str) -> bool:
return any(e.get("event") == event for e in self.events)
def last(self) -> dict:
return self.events[-1] if self.events else {}
def clear(self):
self.events.clear()
class MockWebSocket:
"""Mock WebSocket that captures sent messages."""
def __init__(self):
self.sent: list[str] = []
self.readyState = 1
async def send_text(self, text: str):
self.sent.append(text)
def get_messages(self) -> list[dict]:
return [json.loads(s) for s in self.sent]
def get_deltas(self) -> str:
"""Reconstruct streamed text from delta messages."""
return "".join(
json.loads(s).get("content", "")
for s in self.sent
if '"type": "delta"' in s or '"type":"delta"' in s
)
def make_envelope(text: str, user_id: str = "bob") -> Envelope:
return Envelope(text=text, user_id=user_id, session_id="test",
timestamp=time.strftime("%Y-%m-%d %H:%M:%S"))
def make_command(intent: str = "request", topic: str = "", text: str = "",
complexity: str = "simple", tone: str = "casual",
language: str = "en", who: str = "bob") -> Command:
return Command(
analysis=InputAnalysis(
who=who, language=language, intent=intent,
topic=topic, tone=tone, complexity=complexity,
),
source_text=text or topic,
)
def make_history(messages: list[tuple[str, str]] = None) -> list[dict]:
"""Create history from (role, content) tuples."""
if not messages:
return []
return [{"role": r, "content": c} for r, c in messages]
@dataclass
class NodeTestResult:
name: str
passed: bool
detail: str = ""
elapsed_ms: int = 0
def run_async(coro):
"""Run an async function synchronously."""
return asyncio.get_event_loop().run_until_complete(coro)
class NodeTestRunner:
"""Collects and runs node-level tests."""
def __init__(self):
self.results: list[NodeTestResult] = []
def test(self, name: str, coro):
"""Run a single async test, catch and record result."""
t0 = time.time()
try:
run_async(coro)
elapsed = int((time.time() - t0) * 1000)
self.results.append(NodeTestResult(name=name, passed=True, elapsed_ms=elapsed))
print(f" OK {name} ({elapsed}ms)")
except AssertionError as e:
elapsed = int((time.time() - t0) * 1000)
self.results.append(NodeTestResult(name=name, passed=False,
detail=str(e), elapsed_ms=elapsed))
print(f" FAIL {name} ({elapsed}ms)")
print(f" {e}")
except Exception as e:
elapsed = int((time.time() - t0) * 1000)
self.results.append(NodeTestResult(name=name, passed=False,
detail=f"ERROR: {e}", elapsed_ms=elapsed))
print(f" ERR {name} ({elapsed}ms)")
print(f" {e}")
def summary(self) -> tuple[int, int]:
passed = sum(1 for r in self.results if r.passed)
failed = len(self.results) - passed
return passed, failed