Nico 1000411eb2 v0.15.0: Frame engine (v3), PA + Expert architecture (v4-eras), live test streaming
Frame Engine (v3-framed):
- Tick-based deterministic pipeline: frames advance on completion, not timers
- FrameRecord/FrameTrace dataclasses for structured per-message tracing
- /api/frames endpoint: queryable frame trace history (last 20 messages)
- frame_trace HUD event with full pipeline visibility
- Reflex=2F, Director=4F, Director+Interpreter=5F deterministic frame counts

Expert Architecture (v4-eras):
- PA node (pa_v1): routes to domain experts, holds user context
- ExpertNode base: stateless executor with plan+execute two-LLM-call pattern
- ErasExpertNode: eras2_production DB specialist with DESCRIBE-first discipline
- Schema caching: DESCRIBE results reused across queries within session
- Progress streaming: PA streams thinking message, expert streams per-tool progress
- PARouting type for structured routing decisions

UI Controls Split:
- Separate thinker_controls from machine controls (current_controls is now a property)
- Machine buttons persist across Thinker responses
- Machine state parser handles both dict and list formats from Director
- Normalized button format with go/payload field mapping

WebSocket Architecture:
- /ws/test: dedicated debug socket for test runner progress
- /ws/trace: dedicated debug socket for HUD/frame trace events
- /ws (chat): cleaned up, only deltas/controls/done/cleared
- WS survives graph switch (re-attaches to new runtime)
- Pipeline result reset on clear

Test Infrastructure:
- Live test streaming: on_result callback fires per check during execution
- Frontend polling fallback (500ms) for proxy-buffered WS
- frame_trace-first trace assertion (fixes stale perceived event bug)
- action_match supports "or" patterns and multi-pattern matching
- Trace window increased to 40 events
- Graph-agnostic assertions (has X or Y)

Test Suites:
- smoketest.md: 12 steps covering all categories (~2min)
- fast.md: 10 quick checks (~1min)
- fast_v4.md: 10 v4-eras specific checks
- expert_eras.md: eras domain tests (routing, DB, schema, errors)
- expert_progress.md: progress streaming tests

Other:
- Shared db.py extracted from thinker_v2 (reused by experts)
- InputNode prompt: few-shot examples, history as context summary
- Director prompt: full tool signatures for add_state/reset_machine/destroy_machine
- nginx no-cache headers for static files during development
- Cache-busted static file references

Scores: v3 smoketest 39/40, v4-eras fast 28/28, expert_eras 23/23

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 17:10:31 +02:00

484 lines
18 KiB
Python

"""API endpoints, SSE, polling."""
import asyncio
import hashlib
import json
import logging
from asyncio import Queue
from pathlib import Path
from fastapi import Depends, HTTPException, Query, WebSocket, WebSocketDisconnect
from starlette.responses import StreamingResponse
import httpx
from .auth import AUTH_ENABLED, ZITADEL_ISSUER, _validate_token, require_auth
from .runtime import Runtime, TRACE_FILE
log = logging.getLogger("runtime")
# Active runtime reference (set by WS endpoint)
_active_runtime: Runtime | None = None
# SSE subscribers
_sse_subscribers: list[Queue] = []
# Dedicated WS channels (debug sockets)
_test_ws_clients: list[WebSocket] = [] # /ws/test subscribers
_trace_ws_clients: list[WebSocket] = [] # /ws/trace subscribers
async def _broadcast_test(event: dict):
"""Push to all /ws/test subscribers."""
msg = json.dumps(event)
dead = []
log.info(f"[ws/test] broadcasting to {len(_test_ws_clients)} clients")
for ws in _test_ws_clients:
try:
await ws.send_text(msg)
except Exception as e:
log.error(f"[ws/test] send failed: {e}")
dead.append(ws)
for ws in dead:
_test_ws_clients.remove(ws)
async def _broadcast_trace(event: dict):
"""Push to all /ws/trace subscribers."""
msg = json.dumps(event)
dead = []
for ws in _trace_ws_clients:
try:
await ws.send_text(msg)
except Exception:
dead.append(ws)
for ws in dead:
_trace_ws_clients.remove(ws)
# Async message pipeline state
_pipeline_task: asyncio.Task | None = None
_pipeline_result: dict = {"status": "idle"}
_pipeline_id: int = 0
def _broadcast_sse(event: dict):
"""Push an event to all SSE subscribers + /ws/trace + update pipeline progress."""
for q in _sse_subscribers:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
# Push to /ws/trace subscribers (fire-and-forget)
if _trace_ws_clients:
try:
asyncio.get_event_loop().create_task(_broadcast_trace(event))
except RuntimeError:
pass # no event loop (startup)
# Update pipeline progress from HUD events
if _pipeline_result.get("status") == "running":
node = event.get("node", "")
evt = event.get("event", "")
if node and evt in ("thinking", "perceived", "decided", "streaming", "tool_call", "interpreted", "updated"):
_pipeline_result["stage"] = node
_pipeline_result["event"] = evt
def _state_hash() -> str:
if not _active_runtime:
return "no_session"
raw = json.dumps({
"mem": _active_runtime.memorizer.state,
"hlen": len(_active_runtime.history),
}, sort_keys=True)
return hashlib.md5(raw.encode()).hexdigest()[:12]
def register_routes(app):
"""Register all API routes on the FastAPI app."""
@app.get("/health")
async def health():
return {"status": "ok"}
@app.get("/auth/config")
async def auth_config():
from .auth import ZITADEL_ISSUER, ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID, AUTH_ENABLED
return {
"enabled": AUTH_ENABLED,
"issuer": ZITADEL_ISSUER,
"clientId": ZITADEL_CLIENT_ID,
"projectId": ZITADEL_PROJECT_ID,
}
def _ensure_runtime(user_claims=None, origin=""):
"""Get or create the persistent runtime."""
global _active_runtime
if _active_runtime is None:
_active_runtime = Runtime(user_claims=user_claims, origin=origin,
broadcast=_broadcast_sse)
log.info("[api] created persistent runtime")
return _active_runtime
@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket, token: str | None = Query(None),
access_token: str | None = Query(None)):
user_claims = {"sub": "anonymous"}
if AUTH_ENABLED and token:
try:
user_claims = await _validate_token(token)
if not user_claims.get("name") and access_token:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{ZITADEL_ISSUER}/oidc/v1/userinfo",
headers={"Authorization": f"Bearer {access_token}"})
if resp.status_code == 200:
info = resp.json()
log.info(f"[auth] userinfo enrichment: {info}")
user_claims["name"] = info.get("name")
user_claims["preferred_username"] = info.get("preferred_username")
user_claims["email"] = info.get("email")
except HTTPException:
await ws.close(code=4001, reason="Invalid token")
return
origin = ws.headers.get("origin", ws.headers.get("host", ""))
await ws.accept()
# Get or create runtime, attach WS
runtime = _ensure_runtime(user_claims=user_claims, origin=origin)
runtime.update_identity(user_claims, origin)
runtime.attach_ws(ws)
try:
while True:
data = await ws.receive_text()
msg = json.loads(data)
# Always use current runtime (may change after graph switch)
rt = _active_runtime or runtime
if msg.get("type") == "action":
await rt.handle_action(msg.get("action", "unknown"), msg.get("data"))
elif msg.get("type") == "cancel_process":
rt.process_manager.cancel(msg.get("pid", 0))
else:
await rt.handle_message(msg.get("text", ""), dashboard=msg.get("dashboard"))
except WebSocketDisconnect:
if _active_runtime:
_active_runtime.detach_ws()
log.info("[api] WS disconnected — runtime stays alive")
async def _auth_debug_ws(ws: WebSocket, token: str | None) -> bool:
"""Validate token for debug WS. Returns True if auth OK."""
if not AUTH_ENABLED:
return True
if not token:
await ws.close(code=4001, reason="Missing token")
return False
try:
await _validate_token(token)
return True
except HTTPException:
await ws.close(code=4001, reason="Invalid token")
return False
@app.websocket("/ws/test")
async def ws_test(ws: WebSocket, token: str | None = Query(None)):
"""Dedicated WS for test runner progress. Debug only, auth required."""
await ws.accept()
if not await _auth_debug_ws(ws, token):
return
_test_ws_clients.append(ws)
log.info(f"[api] /ws/test connected ({len(_test_ws_clients)} clients)")
try:
while True:
await ws.receive_text()
except WebSocketDisconnect:
pass
finally:
if ws in _test_ws_clients:
_test_ws_clients.remove(ws)
log.info(f"[api] /ws/test disconnected ({len(_test_ws_clients)} clients)")
@app.websocket("/ws/trace")
async def ws_trace(ws: WebSocket, token: str | None = Query(None)):
"""Dedicated WS for HUD/frame trace events. Debug only, auth required."""
await ws.accept()
if not await _auth_debug_ws(ws, token):
return
_trace_ws_clients.append(ws)
log.info(f"[api] /ws/trace connected ({len(_trace_ws_clients)} clients)")
try:
while True:
await ws.receive_text()
except WebSocketDisconnect:
pass
finally:
if ws in _trace_ws_clients:
_trace_ws_clients.remove(ws)
log.info(f"[api] /ws/trace disconnected ({len(_trace_ws_clients)} clients)")
@app.get("/api/events")
async def sse_events(user=Depends(require_auth)):
q: Queue = Queue(maxsize=100)
_sse_subscribers.append(q)
async def generate():
try:
while True:
event = await q.get()
yield f"data: {json.dumps(event)}\n\n"
except asyncio.CancelledError:
pass
finally:
_sse_subscribers.remove(q)
return StreamingResponse(generate(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
@app.get("/api/poll")
async def poll(since: str = "", user=Depends(require_auth)):
h = _state_hash()
if since and since == h:
return {"changed": False, "hash": h}
return {
"changed": True,
"hash": h,
"state": _active_runtime.memorizer.state if _active_runtime else {},
"history_len": len(_active_runtime.history) if _active_runtime else 0,
"last_messages": _active_runtime.history[-3:] if _active_runtime else [],
}
@app.post("/api/send/check")
async def api_send_check(user=Depends(require_auth)):
"""Validate runtime is ready to accept a message. Fast, no LLM calls."""
global _pipeline_task
runtime = _ensure_runtime()
if _pipeline_task and not _pipeline_task.done():
return {"ready": False, "reason": "busy", "detail": "Pipeline already running"}
return {
"ready": True,
"graph": runtime.graph.get("name", "unknown"),
"identity": runtime.identity,
"history_len": len(runtime.history),
"ws_connected": runtime.sink.ws is not None,
}
@app.post("/api/send")
async def api_send(body: dict, user=Depends(require_auth)):
"""Queue a message for async processing. Returns immediately with a message ID."""
global _pipeline_task, _pipeline_result, _pipeline_id
runtime = _ensure_runtime()
if _pipeline_task and not _pipeline_task.done():
raise HTTPException(status_code=409, detail="Pipeline already running")
text = body.get("text", "").strip()
if not text:
raise HTTPException(status_code=400, detail="Missing 'text' field")
_pipeline_id += 1
msg_id = f"msg_{_pipeline_id}"
dashboard = body.get("dashboard")
_pipeline_result = {"status": "running", "id": msg_id, "stage": "queued", "text": text}
async def _run_pipeline():
global _pipeline_result
try:
_pipeline_result["stage"] = "input"
result = await runtime.handle_message(text, dashboard=dashboard)
# Frame engine returns a dict with response; imperative pipeline uses history
if isinstance(result, dict) and "response" in result:
response = result["response"]
log.info(f"[api] frame engine response[{len(response)}]: {response[:80]}")
else:
response = runtime.history[-1]["content"] if runtime.history else ""
log.info(f"[api] history response[{len(response)}]: {response[:80]}")
_pipeline_result = {
"status": "done",
"id": msg_id,
"stage": "done",
"response": response,
"memorizer": runtime.memorizer.state,
}
except Exception as e:
import traceback
log.error(f"[api] pipeline error: {e}\n{traceback.format_exc()}")
_pipeline_result = {
"status": "error",
"id": msg_id,
"stage": "error",
"detail": str(e),
}
_pipeline_task = asyncio.create_task(_run_pipeline())
return {"status": "queued", "id": msg_id}
@app.get("/api/result")
async def api_result(user=Depends(require_auth)):
"""Poll for the current pipeline result."""
return _pipeline_result
@app.get("/api/frames")
async def api_frames(user=Depends(require_auth), last: int = 5):
"""Get frame traces from the frame engine. Returns last N message traces."""
runtime = _ensure_runtime()
if hasattr(runtime, 'frame_engine'):
engine = runtime.frame_engine
traces = engine.trace_history[-last:]
return {
"graph": engine.graph.get("name", "unknown"),
"engine": "frames",
"traces": traces,
"last_trace": engine.last_trace.to_dict() if engine.last_trace.message else None,
}
return {"engine": "imperative", "traces": [], "detail": "Frame engine not active"}
@app.post("/api/clear")
async def api_clear(user=Depends(require_auth)):
global _pipeline_result
runtime = _ensure_runtime()
runtime.history.clear()
runtime.ui_node.state.clear()
runtime.ui_node.bindings.clear()
runtime.ui_node.thinker_controls.clear()
runtime.ui_node.machines.clear()
runtime.memorizer.state = {
"user_name": runtime.identity,
"user_mood": "neutral",
"topic": None,
"topic_history": [],
"situation": runtime.memorizer.state.get("situation", ""),
"language": "en",
"style_hint": "casual, technical",
"facts": [],
}
_pipeline_result = {"status": "idle", "id": "", "stage": "cleared"}
# Notify frontend via WS
if runtime.sink.ws:
try:
await runtime.sink.ws.send_text(json.dumps({"type": "cleared"}))
except Exception:
pass
return {"status": "cleared"}
@app.get("/api/state")
async def get_state(user=Depends(require_auth)):
runtime = _ensure_runtime()
return {
"status": "active",
"memorizer": runtime.memorizer.state,
"history_len": len(runtime.history),
"ws_connected": runtime.sink.ws is not None,
}
@app.get("/api/history")
async def get_history(last: int = 10, user=Depends(require_auth)):
runtime = _ensure_runtime()
return {
"status": "active",
"messages": runtime.history[-last:],
}
@app.get("/api/graph/active")
async def get_active_graph():
from .engine import load_graph, get_graph_for_cytoscape
from .runtime import _active_graph_name
graph = load_graph(_active_graph_name)
return {
"name": graph["name"],
"description": graph["description"],
"nodes": graph["nodes"],
"edges": graph["edges"],
"cytoscape": get_graph_for_cytoscape(graph),
}
@app.get("/api/graph/list")
async def get_graph_list():
from .engine import list_graphs
return {"graphs": list_graphs()}
@app.post("/api/graph/switch")
async def switch_graph(body: dict, user=Depends(require_auth)):
global _active_runtime
from .engine import load_graph
import agent.runtime as rt
name = body.get("name", "")
graph = load_graph(name) # validates it exists
rt._active_graph_name = name
# Preserve WS connection across graph switch
old_ws = None
old_claims = {}
old_origin = ""
if _active_runtime:
old_ws = _active_runtime.sink.ws
old_claims = {"name": _active_runtime.identity}
old_origin = _active_runtime.channel
_active_runtime.sensor.stop()
_active_runtime = None
# Create new runtime with new graph
new_runtime = _ensure_runtime(user_claims=old_claims, origin=old_origin)
# Re-attach WS if it was connected
if old_ws:
new_runtime.attach_ws(old_ws)
log.info(f"[api] re-attached WS after graph switch to '{name}'")
return {"status": "ok", "name": graph["name"],
"note": "Graph switched. WS re-attached."}
# --- Test status (real-time) ---
_test_status = {"running": False, "current": "", "results": [], "last_green": None, "last_red": None, "total_expected": 0}
@app.post("/api/test/status")
async def post_test_status(body: dict, user=Depends(require_auth)):
"""Receive test status updates from the test runner."""
event = body.get("event", "")
if event == "suite_start":
_test_status["running"] = True
_test_status["current"] = body.get("suite", "")
if body.get("count"):
# First suite_start with count resets everything
_test_status["results"] = []
_test_status["total_expected"] = body["count"]
_test_status["last_green"] = None
_test_status["last_red"] = None
elif event == "step_result":
result = body.get("result", {})
_test_status["results"].append(result)
_test_status["current"] = f"{result.get('step', '')}{result.get('check', '')}"
if result.get("status") == "FAIL":
_test_status["last_red"] = result
elif result.get("status") == "PASS":
_test_status["last_green"] = result
elif event == "suite_end":
_test_status["running"] = False
_test_status["current"] = ""
# Broadcast to /ws/test subscribers — must await to ensure delivery before response
await _broadcast_test({"type": "test_status", **_test_status})
# Also SSE for backward compat
_broadcast_sse({"type": "test_status", **_test_status})
return {"ok": True}
@app.get("/api/test/status")
async def get_test_status(user=Depends(require_auth)):
return _test_status
@app.get("/api/tests")
async def get_tests():
"""Latest test results from runtime_test.py."""
results_path = Path(__file__).parent.parent / "testcases" / "results.json"
if not results_path.exists():
return {}
return json.loads(results_path.read_text(encoding="utf-8"))
@app.get("/api/trace")
async def get_trace(last: int = 30, user=Depends(require_auth)):
if not TRACE_FILE.exists():
return {"lines": []}
lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n")
parsed = []
for line in lines[-last:]:
try:
parsed.append(json.loads(line))
except json.JSONDecodeError:
pass
return {"lines": parsed}