"""Runtime: wires all nodes together into a processing pipeline.""" import asyncio import json import logging import time from pathlib import Path from typing import Callable from fastapi import WebSocket from .types import Envelope, Command from .process import ProcessManager from .nodes import SensorNode, InputNode, OutputNode, ThinkerNode, MemorizerNode, UINode log = logging.getLogger("runtime") TRACE_FILE = Path(__file__).parent.parent / "trace.jsonl" class Runtime: def __init__(self, ws: WebSocket, user_claims: dict = None, origin: str = "", broadcast: Callable = None): self.ws = ws self.history: list[dict] = [] self.MAX_HISTORY = 40 self._broadcast = broadcast or (lambda e: None) self.input_node = InputNode(send_hud=self._send_hud) self.process_manager = ProcessManager(send_hud=self._send_hud) self.thinker = ThinkerNode(send_hud=self._send_hud, process_manager=self.process_manager) self.output_node = OutputNode(send_hud=self._send_hud) self.ui_node = UINode(send_hud=self._send_hud) self.memorizer = MemorizerNode(send_hud=self._send_hud) self.sensor = SensorNode(send_hud=self._send_hud) self.sensor.start(get_memo_state=lambda: self.memorizer.state) claims = user_claims or {} log.info(f"[runtime] user_claims: {claims}") self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or "unknown" log.info(f"[runtime] resolved identity: {self.identity}") self.channel = origin or "unknown" self.memorizer.state["user_name"] = self.identity self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session" async def _send_hud(self, data: dict): await self.ws.send_text(json.dumps({"type": "hud", **data})) trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data} try: with open(TRACE_FILE, "a", encoding="utf-8") as f: f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n") if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000: lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n") TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8") except Exception as e: log.error(f"trace write error: {e}") self._broadcast(trace_entry) async def _run_output_and_ui(self, thought, mem_ctx): """Run Output and UI nodes in parallel. Returns (response_text, controls).""" output_task = asyncio.create_task( self.output_node.process(thought, self.history, self.ws, memory_context=mem_ctx)) ui_task = asyncio.create_task( self.ui_node.process(thought, self.history, memory_context=mem_ctx)) # Output streams to WS, UI returns controls — both run concurrently response, controls = await asyncio.gather(output_task, ui_task) # Send controls after Output starts streaming (UI may finish first or after) if controls: await self.ws.send_text(json.dumps({"type": "controls", "controls": controls})) return response async def handle_action(self, action: str, data: dict = None): """Handle a structured UI action (button click etc.).""" action_desc = f"ACTION: {action}" if data: action_desc += f" | data: {json.dumps(data)}" self.history.append({"role": "user", "content": action_desc}) self.sensor.note_user_activity() sensor_lines = self.sensor.get_context_lines() mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines) # Skip Input — this isn't speech, go straight to Thinker command = Command(instruction=f"User clicked UI button: {action}", source_text=action_desc) thought = await self.thinker.process(command, self.history, memory_context=mem_ctx) response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) if len(self.history) > self.MAX_HISTORY: self.history = self.history[-self.MAX_HISTORY:] async def handle_message(self, text: str): envelope = Envelope( text=text, user_id="nico", session_id="test", timestamp=time.strftime("%Y-%m-%d %H:%M:%S"), ) self.sensor.note_user_activity() self.history.append({"role": "user", "content": text}) sensor_lines = self.sensor.get_context_lines() mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines) command = await self.input_node.process( envelope, self.history, memory_context=mem_ctx, identity=self.identity, channel=self.channel) thought = await self.thinker.process(command, self.history, memory_context=mem_ctx) # Output (voice) and UI (screen) run in parallel response = await self._run_output_and_ui(thought, mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) if len(self.history) > self.MAX_HISTORY: self.history = self.history[-self.MAX_HISTORY:]