"""Frame Engine: tick-based deterministic pipeline execution. Replaces the imperative handle_message() with a frame-stepping model: - Each frame dispatches all nodes that have pending input - Frames advance on completion (not on a timer) - 0ms when idle, engine awaits external input - Deterministic ordering: reflex=2 frames, thinker=3-4, interpreter=5 Works with any graph definition (v1, v2, v3). Node implementations unchanged. """ import asyncio import json import logging import time from dataclasses import dataclass, field, asdict from .types import Envelope, Command, InputAnalysis, ThoughtResult, DirectorPlan, PARouting log = logging.getLogger("runtime") # --- Frame Trace --- @dataclass class FrameRecord: """One frame's execution record.""" frame: int node: str # which node ran ("input", "director", ...) started: float = 0.0 # time.monotonic() ended: float = 0.0 duration_ms: float = 0.0 input_summary: str = "" # what the node received output_summary: str = "" # what the node produced route: str = "" # where output goes next ("director", "output+ui", ...) condition: str = "" # if a condition was evaluated ("reflex=True", ...) error: str = "" # if the node failed @dataclass class FrameTrace: """Complete trace of one message through the pipeline.""" message: str = "" graph: str = "" total_frames: int = 0 total_ms: float = 0.0 started: float = 0.0 path: str = "" # "reflex", "director", "director+interpreter" frames: list = field(default_factory=list) # list of FrameRecord def to_dict(self) -> dict: return { "message": self.message[:100], "graph": self.graph, "total_frames": self.total_frames, "total_ms": round(self.total_ms, 1), "path": self.path, "frames": [ { "frame": f.frame, "node": f.node, "duration_ms": round(f.duration_ms, 1), "input": f.input_summary[:200], "output": f.output_summary[:200], "route": f.route, "condition": f.condition, "error": f.error, } for f in self.frames ], } class FrameEngine: """Tick-based engine that steps through graph nodes frame by frame.""" def __init__(self, graph: dict, nodes: dict, sink, history: list, send_hud, sensor, memorizer, ui_node, identity: str = "unknown", channel: str = "unknown", broadcast=None): self.graph = graph self.nodes = nodes self.sink = sink self.history = history self._send_hud = send_hud self.sensor = sensor self.memorizer = memorizer self.ui_node = ui_node self.identity = identity self.channel = channel self._broadcast = broadcast or (lambda e: None) self.frame = 0 self.bus = {} self.conditions = graph.get("conditions", {}) self.edges = [e for e in graph.get("edges", []) if e.get("type") == "data"] self.has_director = "director" in nodes and hasattr(nodes.get("director"), "decide") self.has_interpreter = "interpreter" in nodes self.has_pa = "pa" in nodes and hasattr(nodes.get("pa"), "route") # Discover available experts in this graph self._experts = {} for role, node in nodes.items(): if role.startswith("expert_"): expert_name = role[7:] # "expert_eras" → "eras" self._experts[expert_name] = node if self.has_pa and self._experts: nodes["pa"].set_available_experts(list(self._experts.keys())) log.info(f"[frame] PA with experts: {list(self._experts.keys())}") # Frame trace — last message's complete trace, queryable via API self.last_trace: FrameTrace = FrameTrace() # History of recent traces (last 20 messages) self.trace_history: list[dict] = [] self.MAX_TRACE_HISTORY = 20 # --- Frame lifecycle helpers --- def _begin_frame(self, frame_num: int, node: str, input_summary: str = "") -> FrameRecord: """Start a new frame. Returns the record to fill in.""" self.frame = frame_num rec = FrameRecord( frame=frame_num, node=node, started=time.monotonic(), input_summary=input_summary, ) self.last_trace.frames.append(rec) return rec def _end_frame(self, rec: FrameRecord, output_summary: str = "", route: str = "", condition: str = ""): """Complete a frame record with output and timing.""" rec.ended = time.monotonic() rec.duration_ms = (rec.ended - rec.started) * 1000 rec.output_summary = output_summary rec.route = route rec.condition = condition log.info(f"[frame] F{rec.frame} {rec.node} " f"{rec.duration_ms:.0f}ms -> {route or 'done'}") def _begin_trace(self, text: str) -> FrameTrace: """Start a new message trace.""" trace = FrameTrace( message=text, graph=self.graph.get("name", "unknown"), started=time.monotonic(), ) self.last_trace = trace self.frame = 0 return trace def _end_trace(self, path: str): """Finalize the trace and emit as HUD event.""" t = self.last_trace t.total_frames = self.frame t.total_ms = (time.monotonic() - t.started) * 1000 t.path = path # Store in history self.trace_history.append(t.to_dict()) if len(self.trace_history) > self.MAX_TRACE_HISTORY: self.trace_history = self.trace_history[-self.MAX_TRACE_HISTORY:] log.info(f"[frame] trace: {path} {t.total_frames}F {t.total_ms:.0f}ms") async def _emit_trace_hud(self): """Emit the completed frame trace as a single HUD event.""" t = self.last_trace await self._send_hud({ "node": "frame_engine", "event": "frame_trace", "trace": t.to_dict(), }) # --- Main entry point --- async def process_message(self, text: str, dashboard: list = None) -> dict: """Process a message through the frame pipeline. Returns {response, controls, memorizer, frames, trace}.""" self._begin_trace(text) # Handle ACTION: prefix if text.startswith("ACTION:"): return await self._handle_action(text, dashboard) # Setup envelope = Envelope( text=text, user_id=self.identity, session_id="test", timestamp=time.strftime("%Y-%m-%d %H:%M:%S"), ) self.sensor.note_user_activity() if dashboard is not None: self.sensor.update_browser_dashboard(dashboard) self.history.append({"role": "user", "content": text}) # --- Frame 1: Input --- mem_ctx = self._build_context(dashboard) rec = self._begin_frame(1, "input", input_summary=text[:100]) command = await self.nodes["input"].process( envelope, self.history, memory_context=mem_ctx, identity=self.identity, channel=self.channel) a = command.analysis cmd_summary = f"intent={a.intent} language={a.language} tone={a.tone} complexity={a.complexity}" # Check reflex condition is_reflex = self._check_condition("reflex", command=command) if is_reflex: self._end_frame(rec, output_summary=cmd_summary, route="output (reflex)", condition="reflex=True") await self._send_hud({"node": "runtime", "event": "reflex_path", "detail": f"{a.intent}/{a.complexity}"}) return await self._run_reflex(command, mem_ctx) else: next_node = "pa" if self.has_pa else ("director" if self.has_director else "thinker") self._end_frame(rec, output_summary=cmd_summary, route=next_node, condition=f"reflex=False") # --- Frame 2+: Pipeline --- if self.has_pa: return await self._run_expert_pipeline(command, mem_ctx, dashboard) elif self.has_director: return await self._run_director_pipeline(command, mem_ctx, dashboard) else: return await self._run_thinker_pipeline(command, mem_ctx, dashboard) # --- Pipeline variants --- async def _run_reflex(self, command: Command, mem_ctx: str) -> dict: """Reflex: Input(F1) → Output(F2).""" rec = self._begin_frame(2, "output", input_summary="reflex passthrough") thought = ThoughtResult(response=command.source_text, actions=[]) response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) self._trim_history() self._end_frame(rec, output_summary=f"response[{len(response)}]") self._end_trace("reflex") await self._emit_trace_hud() return self._make_result(response) async def _run_expert_pipeline(self, command: Command, mem_ctx: str, dashboard: list = None) -> dict: """Expert pipeline: Input(F1) → PA(F2) → Expert(F3) → [Interpreter(F4)] → Output.""" a = command.analysis # Frame 2: PA routes rec = self._begin_frame(2, "pa", input_summary=f"intent={a.intent} topic={a.topic}") routing = await self.nodes["pa"].route( command, self.history, memory_context=mem_ctx, identity=self.identity, channel=self.channel) route_summary = f"expert={routing.expert} job={routing.job[:60]}" self._end_frame(rec, output_summary=route_summary, route=f"expert_{routing.expert}" if routing.expert != "none" else "output") # Stream thinking message to user if routing.thinking_message: await self.sink.send_delta(routing.thinking_message + "\n\n") # Direct PA response (no expert needed) if routing.expert == "none": rec = self._begin_frame(3, "output+ui", input_summary=f"pa_direct: {routing.response_hint[:80]}") thought = ThoughtResult(response=routing.response_hint, actions=[]) response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) self._trim_history() self._end_frame(rec, output_summary=f"response[{len(response)}]") self._end_trace("pa_direct") await self._emit_trace_hud() return self._make_result(response) # Frame 3: Expert executes expert = self._experts.get(routing.expert) if not expert: log.error(f"[frame] expert '{routing.expert}' not found") thought = ThoughtResult(response=f"Expert '{routing.expert}' not available.") rec = self._begin_frame(3, "output+ui", input_summary="expert_not_found") response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) self._end_frame(rec, output_summary="error", error=f"expert '{routing.expert}' not found") self._end_trace("expert_error") await self._emit_trace_hud() return self._make_result(response) rec = self._begin_frame(3, f"expert_{routing.expert}", input_summary=f"job: {routing.job[:80]}") # Wrap expert's HUD to stream progress to user original_hud = expert.send_hud expert.send_hud = self._make_progress_wrapper(original_hud, routing.language) try: thought = await expert.execute(routing.job, routing.language) finally: expert.send_hud = original_hud thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} " f"actions={len(thought.actions)}") has_tool = bool(thought.tool_used and thought.tool_output) # Interpreter (conditional) if self.has_interpreter and has_tool: self._end_frame(rec, output_summary=thought_summary, route="interpreter", condition="has_tool_output=True") rec = self._begin_frame(4, "interpreter", input_summary=f"tool={thought.tool_used} output[{len(thought.tool_output)}]") interpreted = await self.nodes["interpreter"].interpret( thought.tool_used, thought.tool_output, routing.job) thought.response = interpreted.summary self._end_frame(rec, output_summary=f"summary[{len(interpreted.summary)}]", route="output+ui") rec = self._begin_frame(5, "output+ui", input_summary=f"interpreted: {interpreted.summary[:80]}") path = "expert+interpreter" else: self._end_frame(rec, output_summary=thought_summary, route="output+ui", condition="has_tool_output=False" if not has_tool else "") rec = self._begin_frame(4, "output+ui", input_summary=f"response: {thought.response[:80]}") path = "expert" # Clear progress text, render final response self.sink.reset() response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) self._trim_history() controls_count = len(self.ui_node.current_controls) self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}") self._end_trace(path) await self._emit_trace_hud() return self._make_result(response) def _make_progress_wrapper(self, original_hud, language: str): """Wrap an expert's send_hud to also stream progress deltas to the user.""" sink = self.sink progress_map = { "tool_call": {"query_db": "Daten werden abgerufen..." if language == "de" else "Fetching data...", "emit_actions": "UI wird erstellt..." if language == "de" else "Building UI...", "create_machine": "Maschine wird erstellt..." if language == "de" else "Creating machine...", "_default": "Verarbeite..." if language == "de" else "Processing..."}, "tool_result": {"_default": ""}, # silent on result "planned": {"_default": "Plan erstellt..." if language == "de" else "Plan ready..."}, } async def wrapper(data: dict): await original_hud(data) event = data.get("event", "") if event in progress_map: tool = data.get("tool", "_default") msg = progress_map[event].get(tool, progress_map[event].get("_default", "")) if msg: await sink.send_delta(msg + "\n") return wrapper async def _run_director_pipeline(self, command: Command, mem_ctx: str, dashboard: list = None) -> dict: """Director: Input(F1) → Director(F2) → Thinker(F3) → [Interpreter(F4)] → Output.""" a = command.analysis # Frame 2: Director rec = self._begin_frame(2, "director", input_summary=f"intent={a.intent} topic={a.topic}") plan = await self.nodes["director"].decide(command, self.history, memory_context=mem_ctx) plan_summary = f"goal={plan.goal} tools={len(plan.tool_sequence)} hint={plan.response_hint[:50]}" self._end_frame(rec, output_summary=plan_summary, route="thinker") # Frame 3: Thinker rec = self._begin_frame(3, "thinker", input_summary=plan_summary[:100]) thought = await self.nodes["thinker"].process( command, plan, self.history, memory_context=mem_ctx) thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} " f"actions={len(thought.actions)} machines={len(thought.machine_ops)}") has_tool = bool(thought.tool_used and thought.tool_output) # Check interpreter condition if self.has_interpreter and has_tool: self._end_frame(rec, output_summary=thought_summary, route="interpreter", condition="has_tool_output=True") # Frame 4: Interpreter rec = self._begin_frame(4, "interpreter", input_summary=f"tool={thought.tool_used} output[{len(thought.tool_output)}]") interpreted = await self.nodes["interpreter"].interpret( thought.tool_used, thought.tool_output, command.source_text) thought.response = interpreted.summary interp_summary = f"summary[{len(interpreted.summary)}] facts={interpreted.key_facts}" self._end_frame(rec, output_summary=interp_summary, route="output+ui") # Frame 5: Output rec = self._begin_frame(5, "output+ui", input_summary=f"interpreted: {interpreted.summary[:80]}") path = "director+interpreter" else: self._end_frame(rec, output_summary=thought_summary, route="output+ui", condition="has_tool_output=False" if not has_tool else "") # Frame 4: Output rec = self._begin_frame(4, "output+ui", input_summary=f"response: {thought.response[:80]}") path = "director" response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) self._trim_history() controls_count = len(self.ui_node.current_controls) self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}") self._end_trace(path) await self._emit_trace_hud() return self._make_result(response) async def _run_thinker_pipeline(self, command: Command, mem_ctx: str, dashboard: list = None) -> dict: """v1: Input(F1) → Thinker(F2) → Output(F3).""" a = command.analysis # Frame 2: Thinker rec = self._begin_frame(2, "thinker", input_summary=f"intent={a.intent} topic={a.topic}") director = self.nodes.get("director") if director and hasattr(director, "plan"): is_complex = command.analysis.complexity == "complex" text = command.source_text is_data_request = (command.analysis.intent in ("request", "action") and any(k in text.lower() for k in ["daten", "data", "database", "db", "tabelle", "table", "query", "abfrage", "untersuche", "investigate", "analyse", "analyze", "customer", "kunde"])) if is_complex or (is_data_request and len(text.split()) > 8): await director.plan(self.history, self.memorizer.state, text) mem_ctx = self._build_context(dashboard) thought = await self.nodes["thinker"].process(command, self.history, memory_context=mem_ctx) if director and hasattr(director, "current_plan"): director.current_plan = "" thought_summary = f"response[{len(thought.response)}] tool={thought.tool_used or 'none'}" self._end_frame(rec, output_summary=thought_summary, route="output+ui") # Frame 3: Output rec = self._begin_frame(3, "output+ui", input_summary=f"response: {thought.response[:80]}") response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) if director and hasattr(director, "update"): await director.update(self.history, self.memorizer.state) self._trim_history() self._end_frame(rec, output_summary=f"response[{len(response)}]") self._end_trace("thinker") await self._emit_trace_hud() return self._make_result(response) async def _handle_action(self, text: str, dashboard: list = None) -> dict: """Handle ACTION: messages (button clicks).""" parts = text.split("|", 1) action = parts[0].replace("ACTION:", "").strip() data = None if len(parts) > 1: try: data = json.loads(parts[1].replace("data:", "").strip()) except (json.JSONDecodeError, Exception): pass self.sensor.note_user_activity() # Frame 1: Try machine transition (no LLM) rec = self._begin_frame(1, "ui", input_summary=f"action={action}") handled, transition_result = self.ui_node.try_machine_transition(action) if handled: await self._send_hud({"node": "ui", "event": "machine_transition", "action": action, "detail": transition_result}) controls = self.ui_node.get_machine_controls() for ctrl in self.ui_node.current_controls: if not ctrl.get("machine_id"): controls.append(ctrl) self.ui_node.current_controls = controls await self.sink.send_controls(controls) await self._send_hud({"node": "ui", "event": "controls", "controls": controls}) self.sink.reset() for i in range(0, len(transition_result), 12): await self.sink.send_delta(transition_result[i:i + 12]) await self.sink.send_done() self.history.append({"role": "user", "content": f"[clicked {action}]"}) self.history.append({"role": "assistant", "content": transition_result}) self._end_frame(rec, output_summary=f"machine_transition: {transition_result[:80]}") self._end_trace("action_machine") await self._emit_trace_hud() return self._make_result(transition_result) # Try local UI action result, controls = await self.ui_node.process_local_action(action, data) if result is not None: if controls: await self.sink.send_controls(controls) self.sink.reset() for i in range(0, len(result), 12): await self.sink.send_delta(result[i:i + 12]) await self.sink.send_done() self.history.append({"role": "user", "content": f"[clicked {action}]"}) self.history.append({"role": "assistant", "content": result}) self._end_frame(rec, output_summary=f"local_action: {result[:80]}") self._end_trace("action_local") await self._emit_trace_hud() return self._make_result(result) # Complex action — needs full pipeline self._end_frame(rec, output_summary="no local handler", route="director/thinker") action_desc = f"ACTION: {action}" if data: action_desc += f" | data: {json.dumps(data)}" self.history.append({"role": "user", "content": action_desc}) mem_ctx = self._build_context(dashboard) command = Command( analysis=InputAnalysis(intent="action", topic=action, complexity="simple"), source_text=action_desc) if self.has_director: return await self._run_director_pipeline(command, mem_ctx, dashboard) else: return await self._run_thinker_pipeline(command, mem_ctx, dashboard) # --- Helpers --- def _build_context(self, dashboard: list = None) -> str: """Build the full context string for nodes.""" sensor_lines = self.sensor.get_context_lines() director = self.nodes.get("director") director_line = director.get_context_line() if director else "" mem_ctx = self.memorizer.get_context_block( sensor_lines=sensor_lines, ui_state=self.ui_node.state) if director_line: mem_ctx += f"\n\n{director_line}" machine_summary = self.ui_node.get_machine_summary() if machine_summary: mem_ctx += f"\n\n{machine_summary}" if dashboard is not None: mem_ctx += f"\n\n{self._format_dashboard(dashboard)}" sensor_flags = self.sensor.consume_flags() if sensor_flags: flag_lines = ["Sensor flags:"] for f in sensor_flags: if f["type"] == "idle_return": flag_lines.append(f" - User returned after {f['away_duration']} away.") elif f["type"] == "workspace_mismatch": flag_lines.append(f" - Workspace mismatch: {f['detail']}") mem_ctx += "\n\n" + "\n".join(flag_lines) return mem_ctx def _format_dashboard(self, dashboard: list) -> str: """Format dashboard controls into context string.""" server_controls = self.ui_node.current_controls server_buttons = [str(c.get("label", "")) for c in server_controls if isinstance(c, dict) and c.get("type") == "button"] browser_buttons = [str(c.get("label", "")) for c in dashboard if isinstance(c, dict) and c.get("type") == "button"] if dashboard else [] lines = [] if server_buttons and not browser_buttons: lines.append(f"WARNING: Server sent {len(server_buttons)} controls but dashboard shows NONE.") lines.append(f" Expected: {', '.join(server_buttons)}") lines.append(" Controls failed to render. You MUST re-emit them in ACTIONS.") elif server_buttons and sorted(server_buttons) != sorted(browser_buttons): lines.append("WARNING: Dashboard mismatch.") lines.append(f" Server: {', '.join(server_buttons)}") lines.append(f" Browser: {', '.join(browser_buttons) or 'nothing'}") if not dashboard: lines.append("Dashboard: empty") else: lines.append("Dashboard (user sees):") for ctrl in dashboard: ctype = ctrl.get("type", "unknown") if ctype == "button": lines.append(f" - Button: {ctrl.get('label', '?')}") elif ctype == "table": lines.append(f" - Table: {len(ctrl.get('data', []))} rows") else: lines.append(f" - {ctype}: {ctrl.get('label', ctrl.get('text', '?'))}") return "\n".join(lines) async def _run_output_and_ui(self, thought: ThoughtResult, mem_ctx: str) -> str: """Run Output and UI nodes in parallel. Returns response text.""" self.sink.reset() output_task = asyncio.create_task( self.nodes["output"].process(thought, self.history, self.sink, memory_context=mem_ctx)) ui_task = asyncio.create_task( self.ui_node.process(thought, self.history, memory_context=mem_ctx)) response, controls = await asyncio.gather(output_task, ui_task) if controls: await self.sink.send_controls(controls) return response def _check_condition(self, name: str, command: Command = None, thought: ThoughtResult = None) -> bool: """Evaluate a named condition.""" if name == "reflex" and command: return (command.analysis.intent == "social" and command.analysis.complexity == "trivial") if name == "has_tool_output" and thought: return bool(thought.tool_used and thought.tool_output) return False def _make_result(self, response: str) -> dict: """Build the result dict returned to callers.""" return { "response": response, "controls": self.ui_node.current_controls, "memorizer": self.memorizer.state, "frames": self.frame, "trace": self.last_trace.to_dict(), } def _trim_history(self): if len(self.history) > 40: self.history[:] = self.history[-40:]