"""Runtime: wires all nodes together into a processing pipeline.""" import asyncio import json import logging import time from pathlib import Path from typing import Callable from .types import Envelope, Command, InputAnalysis, ThoughtResult, DirectorPlan from .process import ProcessManager from .engine import load_graph, instantiate_nodes, list_graphs, get_graph_for_cytoscape from .frame_engine import FrameEngine log = logging.getLogger("runtime") TRACE_FILE = Path(__file__).parent.parent / "trace.jsonl" # Default graph — can be switched at runtime _active_graph_name = "v1-current" class OutputSink: """Collects output. Optionally streams to attached WebSocket.""" def __init__(self): self.ws = None self.response: str = "" self.controls: list = [] self.done: bool = False def attach(self, ws): self.ws = ws def detach(self): self.ws = None def reset(self): self.response = "" self.controls = [] self.done = False async def send_delta(self, text: str): self.response += text if self.ws: try: await self.ws.send_text(json.dumps({"type": "delta", "content": text})) except Exception: pass async def send_controls(self, controls: list): self.controls = controls if self.ws: try: await self.ws.send_text(json.dumps({"type": "controls", "controls": controls})) except Exception: pass async def send_hud(self, data: dict): if self.ws: try: await self.ws.send_text(json.dumps({"type": "hud", **data})) except Exception: pass async def send_done(self): self.done = True if self.ws: try: await self.ws.send_text(json.dumps({"type": "done"})) except Exception: pass class Runtime: def __init__(self, user_claims: dict = None, origin: str = "", broadcast: Callable = None, graph_name: str = None): self.sink = OutputSink() self.history: list[dict] = [] self.MAX_HISTORY = 40 self._broadcast = broadcast or (lambda e: None) # Load graph and instantiate nodes gname = graph_name or _active_graph_name self.graph = load_graph(gname) self.process_manager = ProcessManager(send_hud=self._send_hud) nodes = instantiate_nodes(self.graph, send_hud=self._send_hud, process_manager=self.process_manager) # Bind nodes by role (pipeline code references these) self.input_node = nodes["input"] self.thinker = nodes.get("thinker") # v1/v2/v3 self.output_node = nodes["output"] self.ui_node = nodes["ui"] self.memorizer = nodes["memorizer"] self.director = nodes.get("director") # v1/v2/v3, None in v4 self.sensor = nodes["sensor"] self.interpreter = nodes.get("interpreter") # v2+ only # Detect graph type self.is_v2 = self.director is not None and hasattr(self.director, "decide") self.use_frames = self.graph.get("engine") == "frames" self.sensor.start( get_memo_state=lambda: self.memorizer.state, get_server_controls=lambda: self.ui_node.current_controls, ) claims = user_claims or {} log.info(f"[runtime] user_claims: {claims}") self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or "unknown" log.info(f"[runtime] resolved identity: {self.identity}") self.channel = origin or "unknown" self.memorizer.state["user_name"] = self.identity self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session" # Frame engine (for v3+ graphs) if self.use_frames: self.frame_engine = FrameEngine( graph=self.graph, nodes=nodes, sink=self.sink, history=self.history, send_hud=self._send_hud, sensor=self.sensor, memorizer=self.memorizer, ui_node=self.ui_node, identity=self.identity, channel=self.channel, broadcast=self._broadcast) log.info(f"[runtime] using FrameEngine for graph '{gname}'") def attach_ws(self, ws): """Attach a WebSocket for real-time streaming.""" self.sink.attach(ws) log.info("[runtime] WS attached") def detach_ws(self): """Detach WebSocket. Runtime keeps running.""" self.sink.detach() log.info("[runtime] WS detached") def update_identity(self, user_claims: dict, origin: str = ""): """Update identity from WS auth claims.""" claims = user_claims or {} self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or self.identity if origin: self.channel = origin self.memorizer.state["user_name"] = self.identity self.memorizer.state["situation"] = f"authenticated on {self.channel}" log.info(f"[runtime] identity updated: {self.identity}") async def _send_hud(self, data: dict): await self.sink.send_hud(data) trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data} try: with open(TRACE_FILE, "a", encoding="utf-8") as f: f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n") if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000: lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n") TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8") except Exception as e: log.error(f"trace write error: {e}") self._broadcast(trace_entry) async def _stream_text(self, text: str): """Stream pre-formed text to the client as deltas.""" self.sink.reset() chunk_size = 12 for i in range(0, len(text), chunk_size): await self.sink.send_delta(text[i:i + chunk_size]) await self.sink.send_done() async def _run_output_and_ui(self, thought, mem_ctx): """Run Output and UI nodes in parallel. Returns (response_text, controls).""" self.sink.reset() output_task = asyncio.create_task( self.output_node.process(thought, self.history, self.sink, memory_context=mem_ctx)) ui_task = asyncio.create_task( self.ui_node.process(thought, self.history, memory_context=mem_ctx)) response, controls = await asyncio.gather(output_task, ui_task) if controls: await self.sink.send_controls(controls) return response async def handle_action(self, action: str, data: dict = None): """Handle a structured UI action (button click etc.).""" self.sensor.note_user_activity() # Try machine transition first (go: target — no LLM needed) handled, transition_result = self.ui_node.try_machine_transition(action) if handled: await self._send_hud({"node": "ui", "event": "machine_transition", "action": action, "detail": transition_result}) # Re-render all controls (machines + state + buttons) controls = self.ui_node.get_machine_controls() # Include non-machine buttons and labels for ctrl in self.ui_node.current_controls: if not ctrl.get("machine_id"): controls.append(ctrl) self.ui_node.current_controls = controls await self.sink.send_controls(controls) await self._send_hud({"node": "ui", "event": "controls", "controls": controls}) await self._stream_text(transition_result) self.history.append({"role": "user", "content": f"[clicked {action}]"}) self.history.append({"role": "assistant", "content": transition_result}) return # Try local UI action next (inc, dec, toggle — no LLM needed) result, controls = await self.ui_node.process_local_action(action, data) if result is not None: # Local action handled — send controls update + short response if controls: await self.sink.send_controls(controls) await self._stream_text(result) self.history.append({"role": "user", "content": f"[clicked {action}]"}) self.history.append({"role": "assistant", "content": result}) return # Complex action — needs Thinker reasoning action_desc = f"ACTION: {action}" if data: action_desc += f" | data: {json.dumps(data)}" self.history.append({"role": "user", "content": action_desc}) sensor_lines = self.sensor.get_context_lines() director_line = self.director.get_context_line() mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines, ui_state=self.ui_node.state) mem_ctx += f"\n\n{director_line}" command = Command( analysis=InputAnalysis(intent="action", topic=action, complexity="simple"), source_text=action_desc) if self.is_v2: plan = await self.director.decide(command, self.history, memory_context=mem_ctx) thought = await self.thinker.process(command, plan, self.history, memory_context=mem_ctx) if self.interpreter and thought.tool_used and thought.tool_output: interpreted = await self.interpreter.interpret( thought.tool_used, thought.tool_output, action_desc) thought.response = interpreted.summary else: thought = await self.thinker.process(command, self.history, memory_context=mem_ctx) response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) if not self.is_v2: await self.director.update(self.history, self.memorizer.state) if len(self.history) > self.MAX_HISTORY: self.history = self.history[-self.MAX_HISTORY:] def _format_dashboard(self, dashboard: list) -> str: """Format dashboard controls into a context string for Thinker. Compares browser-reported state against server-side controls to detect mismatches.""" server_controls = self.ui_node.current_controls server_buttons = [str(c.get("label", "")) for c in server_controls if isinstance(c, dict) and c.get("type") == "button"] browser_buttons = [str(c.get("label", "")) for c in dashboard if isinstance(c, dict) and c.get("type") == "button"] if dashboard else [] lines = [] # Mismatch detection (S3* audit) if server_buttons and not browser_buttons: lines.append(f"WARNING: Server sent {len(server_buttons)} controls but dashboard shows NONE.") lines.append(f" Expected buttons: {', '.join(server_buttons)}") lines.append(" Controls failed to render or were lost. You MUST re-emit them in ACTIONS.") elif server_buttons and sorted(server_buttons) != sorted(browser_buttons): lines.append(f"WARNING: Dashboard mismatch.") lines.append(f" Server sent: {', '.join(server_buttons)}") lines.append(f" Browser shows: {', '.join(browser_buttons) or 'nothing'}") lines.append(" Re-emit correct controls in ACTIONS if needed.") if not dashboard: lines.append("Dashboard: empty (user sees nothing)") else: lines.append("Dashboard (what user currently sees):") for ctrl in dashboard: ctype = ctrl.get("type", "unknown") if ctype == "button": lines.append(f" - Button: {ctrl.get('label', '?')}") elif ctype == "label": lines.append(f" - Label: {ctrl.get('text', '?')} = {ctrl.get('value', '?')}") elif ctype == "table": cols = ctrl.get("columns", []) rows = len(ctrl.get("data", [])) lines.append(f" - Table: {', '.join(cols)} ({rows} rows)") else: lines.append(f" - {ctype}: {ctrl.get('label', ctrl.get('text', '?'))}") return "\n".join(lines) async def handle_message(self, text: str, dashboard: list = None): # Frame engine: delegate entirely if self.use_frames: result = await self.frame_engine.process_message(text, dashboard) return result # Detect ACTION: prefix from API/test runner if text.startswith("ACTION:"): parts = text.split("|", 1) action = parts[0].replace("ACTION:", "").strip() data = None if len(parts) > 1: try: data = json.loads(parts[1].replace("data:", "").strip()) except (json.JSONDecodeError, Exception): pass return await self.handle_action(action, data) envelope = Envelope( text=text, user_id="bob", session_id="test", timestamp=time.strftime("%Y-%m-%d %H:%M:%S"), ) self.sensor.note_user_activity() if dashboard is not None: self.sensor.update_browser_dashboard(dashboard) self.history.append({"role": "user", "content": text}) # Check Sensor flags (idle return, workspace mismatch) sensor_flags = self.sensor.consume_flags() sensor_lines = self.sensor.get_context_lines() director_line = self.director.get_context_line() mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines, ui_state=self.ui_node.state) mem_ctx += f"\n\n{director_line}" machine_summary = self.ui_node.get_machine_summary() if machine_summary: mem_ctx += f"\n\n{machine_summary}" if dashboard is not None: mem_ctx += f"\n\n{self._format_dashboard(dashboard)}" # Inject sensor flags into context if sensor_flags: flag_lines = ["Sensor flags:"] for f in sensor_flags: if f["type"] == "idle_return": flag_lines.append(f" - User returned after {f['away_duration']} away. Welcome them back briefly, mention what's on their dashboard.") elif f["type"] == "workspace_mismatch": flag_lines.append(f" - Workspace mismatch detected: {f['detail']}. Check if controls need re-emitting.") mem_ctx += "\n\n" + "\n".join(flag_lines) command = await self.input_node.process( envelope, self.history, memory_context=mem_ctx, identity=self.identity, channel=self.channel) # Reflex path: trivial social messages skip Thinker entirely if command.analysis.intent == "social" and command.analysis.complexity == "trivial": await self._send_hud({"node": "runtime", "event": "reflex_path", "detail": f"{command.analysis.intent}/{command.analysis.complexity}"}) thought = ThoughtResult(response=command.source_text, actions=[]) response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) if not self.is_v2: await self.director.update(self.history, self.memorizer.state) if len(self.history) > self.MAX_HISTORY: self.history = self.history[-self.MAX_HISTORY:] return if self.is_v2: # v2 flow: Director decides, Thinker executes, Interpreter reads results plan = await self.director.decide(command, self.history, memory_context=mem_ctx) thought = await self.thinker.process(command, plan, self.history, memory_context=mem_ctx) # Interpreter: factual summary of tool results (no hallucination) if self.interpreter and thought.tool_used and thought.tool_output: interpreted = await self.interpreter.interpret( thought.tool_used, thought.tool_output, text) # Replace thinker's response with interpreter's factual summary thought.response = interpreted.summary else: # v1 flow: optional Director pre-planning for complex requests is_complex = command.analysis.complexity == "complex" is_data_request = (command.analysis.intent in ("request", "action") and any(k in text.lower() for k in ["daten", "data", "database", "db", "tabelle", "table", "query", "abfrage", "untersuche", "investigate", "explore", "analyse", "analyze", "umsatz", "revenue", "billing", "abrechnung", "customer", "kunde", "geraete", "device", "objekt", "object", "how many", "wieviele", "welche"])) needs_planning = is_complex or (is_data_request and len(text.split()) > 8) if needs_planning: plan = await self.director.plan(self.history, self.memorizer.state, text) if plan: director_line = self.director.get_context_line() mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines, ui_state=self.ui_node.state) mem_ctx += f"\n\n{director_line}" if machine_summary: mem_ctx += f"\n\n{machine_summary}" if dashboard is not None: mem_ctx += f"\n\n{self._format_dashboard(dashboard)}" thought = await self.thinker.process(command, self.history, memory_context=mem_ctx) self.director.current_plan = "" # Output (voice) and UI (screen) run in parallel response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) if not self.is_v2: await self.director.update(self.history, self.memorizer.state) if len(self.history) > self.MAX_HISTORY: self.history = self.history[-self.MAX_HISTORY:]