agent-runtime/agent/frame_engine.py
Nico faeb9d3254 v0.15.7: Fix action routing for v4, WS error handling, stable nodes panel
Action routing:
- Button clicks now route through PA→Expert in v4 (was missing has_pa check)
- Previously crashed with KeyError on missing thinker node

WS error handling:
- Exceptions in WS handler caught and logged, not crash
- Frontend receives error HUD event instead of disconnect
- Prevents 1006 reconnect loops on action errors

Nodes panel:
- Fixed pipeline order (no re-sorting on events)
- Deduplicated node names (pa_v1→pa, expert_eras→eras)
- Normalized names in state tracker

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 19:16:15 +02:00

635 lines
28 KiB
Python

"""Frame Engine: tick-based deterministic pipeline execution.
Replaces the imperative handle_message() with a frame-stepping model:
- Each frame dispatches all nodes that have pending input
- Frames advance on completion (not on a timer)
- 0ms when idle, engine awaits external input
- Deterministic ordering: reflex=2 frames, thinker=3-4, interpreter=5
Works with any graph definition (v1, v2, v3). Node implementations unchanged.
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field, asdict
from .types import Envelope, Command, InputAnalysis, ThoughtResult, DirectorPlan, PARouting
log = logging.getLogger("runtime")
# --- Frame Trace ---
@dataclass
class FrameRecord:
"""One frame's execution record."""
frame: int
node: str # which node ran ("input", "director", ...)
started: float = 0.0 # time.monotonic()
ended: float = 0.0
duration_ms: float = 0.0
input_summary: str = "" # what the node received
output_summary: str = "" # what the node produced
route: str = "" # where output goes next ("director", "output+ui", ...)
condition: str = "" # if a condition was evaluated ("reflex=True", ...)
error: str = "" # if the node failed
@dataclass
class FrameTrace:
"""Complete trace of one message through the pipeline."""
message: str = ""
graph: str = ""
total_frames: int = 0
total_ms: float = 0.0
started: float = 0.0
path: str = "" # "reflex", "director", "director+interpreter"
frames: list = field(default_factory=list) # list of FrameRecord
def to_dict(self) -> dict:
return {
"message": self.message[:100],
"graph": self.graph,
"total_frames": self.total_frames,
"total_ms": round(self.total_ms, 1),
"path": self.path,
"frames": [
{
"frame": f.frame,
"node": f.node,
"duration_ms": round(f.duration_ms, 1),
"input": f.input_summary[:200],
"output": f.output_summary[:200],
"route": f.route,
"condition": f.condition,
"error": f.error,
}
for f in self.frames
],
}
class FrameEngine:
"""Tick-based engine that steps through graph nodes frame by frame."""
def __init__(self, graph: dict, nodes: dict, sink, history: list,
send_hud, sensor, memorizer, ui_node, identity: str = "unknown",
channel: str = "unknown", broadcast=None):
self.graph = graph
self.nodes = nodes
self.sink = sink
self.history = history
self._send_hud = send_hud
self.sensor = sensor
self.memorizer = memorizer
self.ui_node = ui_node
self.identity = identity
self.channel = channel
self._broadcast = broadcast or (lambda e: None)
self.frame = 0
self.bus = {}
self.conditions = graph.get("conditions", {})
self.edges = [e for e in graph.get("edges", []) if e.get("type") == "data"]
self.has_director = "director" in nodes and hasattr(nodes.get("director"), "decide")
self.has_interpreter = "interpreter" in nodes
self.has_pa = "pa" in nodes and hasattr(nodes.get("pa"), "route")
# Discover available experts in this graph
self._experts = {}
for role, node in nodes.items():
if role.startswith("expert_"):
expert_name = role[7:] # "expert_eras" → "eras"
self._experts[expert_name] = node
if self.has_pa and self._experts:
nodes["pa"].set_available_experts(list(self._experts.keys()))
log.info(f"[frame] PA with experts: {list(self._experts.keys())}")
# Frame trace — last message's complete trace, queryable via API
self.last_trace: FrameTrace = FrameTrace()
# History of recent traces (last 20 messages)
self.trace_history: list[dict] = []
self.MAX_TRACE_HISTORY = 20
# --- Frame lifecycle helpers ---
def _begin_frame(self, frame_num: int, node: str, input_summary: str = "") -> FrameRecord:
"""Start a new frame. Returns the record to fill in."""
self.frame = frame_num
rec = FrameRecord(
frame=frame_num,
node=node,
started=time.monotonic(),
input_summary=input_summary,
)
self.last_trace.frames.append(rec)
return rec
def _end_frame(self, rec: FrameRecord, output_summary: str = "",
route: str = "", condition: str = ""):
"""Complete a frame record with output and timing."""
rec.ended = time.monotonic()
rec.duration_ms = (rec.ended - rec.started) * 1000
rec.output_summary = output_summary
rec.route = route
rec.condition = condition
log.info(f"[frame] F{rec.frame} {rec.node} "
f"{rec.duration_ms:.0f}ms -> {route or 'done'}")
def _begin_trace(self, text: str) -> FrameTrace:
"""Start a new message trace."""
trace = FrameTrace(
message=text,
graph=self.graph.get("name", "unknown"),
started=time.monotonic(),
)
self.last_trace = trace
self.frame = 0
return trace
def _end_trace(self, path: str):
"""Finalize the trace and emit as HUD event."""
t = self.last_trace
t.total_frames = self.frame
t.total_ms = (time.monotonic() - t.started) * 1000
t.path = path
# Store in history
self.trace_history.append(t.to_dict())
if len(self.trace_history) > self.MAX_TRACE_HISTORY:
self.trace_history = self.trace_history[-self.MAX_TRACE_HISTORY:]
log.info(f"[frame] trace: {path} {t.total_frames}F {t.total_ms:.0f}ms")
async def _emit_trace_hud(self):
"""Emit the completed frame trace as a single HUD event."""
t = self.last_trace
await self._send_hud({
"node": "frame_engine",
"event": "frame_trace",
"trace": t.to_dict(),
})
# --- Main entry point ---
async def process_message(self, text: str, dashboard: list = None) -> dict:
"""Process a message through the frame pipeline.
Returns {response, controls, memorizer, frames, trace}."""
self._begin_trace(text)
# Handle ACTION: prefix
if text.startswith("ACTION:"):
return await self._handle_action(text, dashboard)
# Setup
envelope = Envelope(
text=text, user_id=self.identity,
session_id="test", timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
)
self.sensor.note_user_activity()
if dashboard is not None:
self.sensor.update_browser_dashboard(dashboard)
self.history.append({"role": "user", "content": text})
# --- Frame 1: Input ---
mem_ctx = self._build_context(dashboard)
rec = self._begin_frame(1, "input", input_summary=text[:100])
command = await self.nodes["input"].process(
envelope, self.history, memory_context=mem_ctx,
identity=self.identity, channel=self.channel)
a = command.analysis
cmd_summary = f"intent={a.intent} language={a.language} tone={a.tone} complexity={a.complexity}"
# Check reflex condition
is_reflex = self._check_condition("reflex", command=command)
if is_reflex:
self._end_frame(rec, output_summary=cmd_summary,
route="output (reflex)", condition="reflex=True")
await self._send_hud({"node": "runtime", "event": "reflex_path",
"detail": f"{a.intent}/{a.complexity}"})
return await self._run_reflex(command, mem_ctx)
else:
next_node = "pa" if self.has_pa else ("director" if self.has_director else "thinker")
self._end_frame(rec, output_summary=cmd_summary,
route=next_node, condition=f"reflex=False")
# --- Frame 2+: Pipeline ---
if self.has_pa:
return await self._run_expert_pipeline(command, mem_ctx, dashboard)
elif self.has_director:
return await self._run_director_pipeline(command, mem_ctx, dashboard)
else:
return await self._run_thinker_pipeline(command, mem_ctx, dashboard)
# --- Pipeline variants ---
async def _run_reflex(self, command: Command, mem_ctx: str) -> dict:
"""Reflex: Input(F1) → Output(F2)."""
rec = self._begin_frame(2, "output", input_summary="reflex passthrough")
thought = ThoughtResult(response=command.source_text, actions=[])
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
self._trim_history()
self._end_frame(rec, output_summary=f"response[{len(response)}]")
self._end_trace("reflex")
await self._emit_trace_hud()
return self._make_result(response)
async def _run_expert_pipeline(self, command: Command, mem_ctx: str,
dashboard: list = None) -> dict:
"""Expert pipeline: Input(F1) → PA(F2) → Expert(F3) → [Interpreter(F4)] → Output."""
a = command.analysis
# Frame 2: PA routes
rec = self._begin_frame(2, "pa",
input_summary=f"intent={a.intent} topic={a.topic}")
routing = await self.nodes["pa"].route(
command, self.history, memory_context=mem_ctx,
identity=self.identity, channel=self.channel)
route_summary = f"expert={routing.expert} job={routing.job[:60]}"
self._end_frame(rec, output_summary=route_summary,
route=f"expert_{routing.expert}" if routing.expert != "none" else "output")
# Stream thinking message to user
if routing.thinking_message:
await self.sink.send_delta(routing.thinking_message + "\n\n")
# Direct PA response (no expert needed)
if routing.expert == "none":
rec = self._begin_frame(3, "output+ui",
input_summary=f"pa_direct: {routing.response_hint[:80]}")
thought = ThoughtResult(response=routing.response_hint, actions=[])
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
self._trim_history()
self._end_frame(rec, output_summary=f"response[{len(response)}]")
self._end_trace("pa_direct")
await self._emit_trace_hud()
return self._make_result(response)
# Frame 3: Expert executes
expert = self._experts.get(routing.expert)
if not expert:
log.error(f"[frame] expert '{routing.expert}' not found")
thought = ThoughtResult(response=f"Expert '{routing.expert}' not available.")
rec = self._begin_frame(3, "output+ui", input_summary="expert_not_found")
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
self._end_frame(rec, output_summary="error", error=f"expert '{routing.expert}' not found")
self._end_trace("expert_error")
await self._emit_trace_hud()
return self._make_result(response)
rec = self._begin_frame(3, f"expert_{routing.expert}",
input_summary=f"job: {routing.job[:80]}")
# Wrap expert's HUD to stream progress to user
original_hud = expert.send_hud
expert.send_hud = self._make_progress_wrapper(original_hud, routing.language)
try:
thought = await expert.execute(routing.job, routing.language)
finally:
expert.send_hud = original_hud
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"actions={len(thought.actions)}")
has_tool = bool(thought.tool_used and thought.tool_output)
# Interpreter (conditional)
if self.has_interpreter and has_tool:
self._end_frame(rec, output_summary=thought_summary,
route="interpreter", condition="has_tool_output=True")
rec = self._begin_frame(4, "interpreter",
input_summary=f"tool={thought.tool_used} output[{len(thought.tool_output)}]")
interpreted = await self.nodes["interpreter"].interpret(
thought.tool_used, thought.tool_output, routing.job)
thought.response = interpreted.summary
self._end_frame(rec, output_summary=f"summary[{len(interpreted.summary)}]", route="output+ui")
rec = self._begin_frame(5, "output+ui",
input_summary=f"interpreted: {interpreted.summary[:80]}")
path = "expert+interpreter"
else:
self._end_frame(rec, output_summary=thought_summary,
route="output+ui",
condition="has_tool_output=False" if not has_tool else "")
rec = self._begin_frame(4, "output+ui",
input_summary=f"response: {thought.response[:80]}")
path = "expert"
# Clear progress text, render final response
self.sink.reset()
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
self._trim_history()
controls_count = len(self.ui_node.current_controls)
self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}")
self._end_trace(path)
await self._emit_trace_hud()
return self._make_result(response)
def _make_progress_wrapper(self, original_hud, language: str):
"""Wrap an expert's send_hud to also stream progress deltas to the user."""
sink = self.sink
progress_map = {
"tool_call": {"query_db": "Daten werden abgerufen..." if language == "de" else "Fetching data...",
"emit_actions": "UI wird erstellt..." if language == "de" else "Building UI...",
"create_machine": "Maschine wird erstellt..." if language == "de" else "Creating machine...",
"_default": "Verarbeite..." if language == "de" else "Processing..."},
"tool_result": {"_default": ""}, # silent on result
"planned": {"_default": "Plan erstellt..." if language == "de" else "Plan ready..."},
}
async def wrapper(data: dict):
await original_hud(data)
event = data.get("event", "")
if event in progress_map:
tool = data.get("tool", "_default")
msg = progress_map[event].get(tool, progress_map[event].get("_default", ""))
if msg:
await sink.send_delta(msg + "\n")
return wrapper
async def _run_director_pipeline(self, command: Command, mem_ctx: str,
dashboard: list = None) -> dict:
"""Director: Input(F1) → Director(F2) → Thinker(F3) → [Interpreter(F4)] → Output."""
a = command.analysis
# Frame 2: Director
rec = self._begin_frame(2, "director",
input_summary=f"intent={a.intent} topic={a.topic}")
plan = await self.nodes["director"].decide(command, self.history, memory_context=mem_ctx)
plan_summary = f"goal={plan.goal} tools={len(plan.tool_sequence)} hint={plan.response_hint[:50]}"
self._end_frame(rec, output_summary=plan_summary, route="thinker")
# Frame 3: Thinker
rec = self._begin_frame(3, "thinker",
input_summary=plan_summary[:100])
thought = await self.nodes["thinker"].process(
command, plan, self.history, memory_context=mem_ctx)
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"actions={len(thought.actions)} machines={len(thought.machine_ops)}")
has_tool = bool(thought.tool_used and thought.tool_output)
# Check interpreter condition
if self.has_interpreter and has_tool:
self._end_frame(rec, output_summary=thought_summary,
route="interpreter", condition="has_tool_output=True")
# Frame 4: Interpreter
rec = self._begin_frame(4, "interpreter",
input_summary=f"tool={thought.tool_used} output[{len(thought.tool_output)}]")
interpreted = await self.nodes["interpreter"].interpret(
thought.tool_used, thought.tool_output, command.source_text)
thought.response = interpreted.summary
interp_summary = f"summary[{len(interpreted.summary)}] facts={interpreted.key_facts}"
self._end_frame(rec, output_summary=interp_summary, route="output+ui")
# Frame 5: Output
rec = self._begin_frame(5, "output+ui",
input_summary=f"interpreted: {interpreted.summary[:80]}")
path = "director+interpreter"
else:
self._end_frame(rec, output_summary=thought_summary,
route="output+ui",
condition="has_tool_output=False" if not has_tool else "")
# Frame 4: Output
rec = self._begin_frame(4, "output+ui",
input_summary=f"response: {thought.response[:80]}")
path = "director"
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
self._trim_history()
controls_count = len(self.ui_node.current_controls)
self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}")
self._end_trace(path)
await self._emit_trace_hud()
return self._make_result(response)
async def _run_thinker_pipeline(self, command: Command, mem_ctx: str,
dashboard: list = None) -> dict:
"""v1: Input(F1) → Thinker(F2) → Output(F3)."""
a = command.analysis
# Frame 2: Thinker
rec = self._begin_frame(2, "thinker",
input_summary=f"intent={a.intent} topic={a.topic}")
director = self.nodes.get("director")
if director and hasattr(director, "plan"):
is_complex = command.analysis.complexity == "complex"
text = command.source_text
is_data_request = (command.analysis.intent in ("request", "action")
and any(k in text.lower()
for k in ["daten", "data", "database", "db", "tabelle", "table",
"query", "abfrage", "untersuche", "investigate",
"analyse", "analyze", "customer", "kunde"]))
if is_complex or (is_data_request and len(text.split()) > 8):
await director.plan(self.history, self.memorizer.state, text)
mem_ctx = self._build_context(dashboard)
thought = await self.nodes["thinker"].process(command, self.history, memory_context=mem_ctx)
if director and hasattr(director, "current_plan"):
director.current_plan = ""
thought_summary = f"response[{len(thought.response)}] tool={thought.tool_used or 'none'}"
self._end_frame(rec, output_summary=thought_summary, route="output+ui")
# Frame 3: Output
rec = self._begin_frame(3, "output+ui",
input_summary=f"response: {thought.response[:80]}")
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
if director and hasattr(director, "update"):
await director.update(self.history, self.memorizer.state)
self._trim_history()
self._end_frame(rec, output_summary=f"response[{len(response)}]")
self._end_trace("thinker")
await self._emit_trace_hud()
return self._make_result(response)
async def _handle_action(self, text: str, dashboard: list = None) -> dict:
"""Handle ACTION: messages (button clicks)."""
parts = text.split("|", 1)
action = parts[0].replace("ACTION:", "").strip()
data = None
if len(parts) > 1:
try:
data = json.loads(parts[1].replace("data:", "").strip())
except (json.JSONDecodeError, Exception):
pass
self.sensor.note_user_activity()
# Frame 1: Try machine transition (no LLM)
rec = self._begin_frame(1, "ui", input_summary=f"action={action}")
handled, transition_result = self.ui_node.try_machine_transition(action)
if handled:
await self._send_hud({"node": "ui", "event": "machine_transition",
"action": action, "detail": transition_result})
controls = self.ui_node.get_machine_controls()
for ctrl in self.ui_node.current_controls:
if not ctrl.get("machine_id"):
controls.append(ctrl)
self.ui_node.current_controls = controls
await self.sink.send_controls(controls)
await self._send_hud({"node": "ui", "event": "controls", "controls": controls})
self.sink.reset()
for i in range(0, len(transition_result), 12):
await self.sink.send_delta(transition_result[i:i + 12])
await self.sink.send_done()
self.history.append({"role": "user", "content": f"[clicked {action}]"})
self.history.append({"role": "assistant", "content": transition_result})
self._end_frame(rec, output_summary=f"machine_transition: {transition_result[:80]}")
self._end_trace("action_machine")
await self._emit_trace_hud()
return self._make_result(transition_result)
# Try local UI action
result, controls = await self.ui_node.process_local_action(action, data)
if result is not None:
if controls:
await self.sink.send_controls(controls)
self.sink.reset()
for i in range(0, len(result), 12):
await self.sink.send_delta(result[i:i + 12])
await self.sink.send_done()
self.history.append({"role": "user", "content": f"[clicked {action}]"})
self.history.append({"role": "assistant", "content": result})
self._end_frame(rec, output_summary=f"local_action: {result[:80]}")
self._end_trace("action_local")
await self._emit_trace_hud()
return self._make_result(result)
# Complex action — needs full pipeline
self._end_frame(rec, output_summary="no local handler", route="pa/director/thinker")
action_desc = f"ACTION: {action}"
if data:
action_desc += f" | data: {json.dumps(data)}"
self.history.append({"role": "user", "content": action_desc})
mem_ctx = self._build_context(dashboard)
command = Command(
analysis=InputAnalysis(intent="action", topic=action, complexity="simple"),
source_text=action_desc)
if self.has_pa:
return await self._run_expert_pipeline(command, mem_ctx, dashboard)
elif self.has_director:
return await self._run_director_pipeline(command, mem_ctx, dashboard)
else:
return await self._run_thinker_pipeline(command, mem_ctx, dashboard)
# --- Helpers ---
def _build_context(self, dashboard: list = None) -> str:
"""Build the full context string for nodes."""
sensor_lines = self.sensor.get_context_lines()
director = self.nodes.get("director")
director_line = director.get_context_line() if director else ""
mem_ctx = self.memorizer.get_context_block(
sensor_lines=sensor_lines, ui_state=self.ui_node.state)
if director_line:
mem_ctx += f"\n\n{director_line}"
machine_summary = self.ui_node.get_machine_summary()
if machine_summary:
mem_ctx += f"\n\n{machine_summary}"
if dashboard is not None:
mem_ctx += f"\n\n{self._format_dashboard(dashboard)}"
sensor_flags = self.sensor.consume_flags()
if sensor_flags:
flag_lines = ["Sensor flags:"]
for f in sensor_flags:
if f["type"] == "idle_return":
flag_lines.append(f" - User returned after {f['away_duration']} away.")
elif f["type"] == "workspace_mismatch":
flag_lines.append(f" - Workspace mismatch: {f['detail']}")
mem_ctx += "\n\n" + "\n".join(flag_lines)
return mem_ctx
def _format_dashboard(self, dashboard: list) -> str:
"""Format dashboard controls into context string."""
server_controls = self.ui_node.current_controls
server_buttons = [str(c.get("label", "")) for c in server_controls if isinstance(c, dict) and c.get("type") == "button"]
browser_buttons = [str(c.get("label", "")) for c in dashboard if isinstance(c, dict) and c.get("type") == "button"] if dashboard else []
lines = []
if server_buttons and not browser_buttons:
lines.append(f"WARNING: Server sent {len(server_buttons)} controls but dashboard shows NONE.")
lines.append(f" Expected: {', '.join(server_buttons)}")
lines.append(" Controls failed to render. You MUST re-emit them in ACTIONS.")
elif server_buttons and sorted(server_buttons) != sorted(browser_buttons):
lines.append("WARNING: Dashboard mismatch.")
lines.append(f" Server: {', '.join(server_buttons)}")
lines.append(f" Browser: {', '.join(browser_buttons) or 'nothing'}")
if not dashboard:
lines.append("Dashboard: empty")
else:
lines.append("Dashboard (user sees):")
for ctrl in dashboard:
ctype = ctrl.get("type", "unknown")
if ctype == "button":
lines.append(f" - Button: {ctrl.get('label', '?')}")
elif ctype == "table":
lines.append(f" - Table: {len(ctrl.get('data', []))} rows")
else:
lines.append(f" - {ctype}: {ctrl.get('label', ctrl.get('text', '?'))}")
return "\n".join(lines)
async def _run_output_and_ui(self, thought: ThoughtResult, mem_ctx: str) -> str:
"""Run Output and UI nodes in parallel. Returns response text."""
self.sink.reset()
output_task = asyncio.create_task(
self.nodes["output"].process(thought, self.history, self.sink, memory_context=mem_ctx))
ui_task = asyncio.create_task(
self.ui_node.process(thought, self.history, memory_context=mem_ctx))
response, controls = await asyncio.gather(output_task, ui_task)
if controls:
await self.sink.send_controls(controls)
return response
def _check_condition(self, name: str, command: Command = None,
thought: ThoughtResult = None) -> bool:
"""Evaluate a named condition."""
if name == "reflex" and command:
return (command.analysis.intent == "social"
and command.analysis.complexity == "trivial")
if name == "has_tool_output" and thought:
return bool(thought.tool_used and thought.tool_output)
return False
def _make_result(self, response: str) -> dict:
"""Build the result dict returned to callers."""
return {
"response": response,
"controls": self.ui_node.current_controls,
"memorizer": self.memorizer.state,
"frames": self.frame,
"trace": self.last_trace.to_dict(),
}
def _trim_history(self):
if len(self.history) > 40:
self.history[:] = self.history[-40:]