"""Runtime: wires all nodes together into a processing pipeline.""" import json import logging import time from pathlib import Path from typing import Callable from fastapi import WebSocket from .types import Envelope, Command from .process import ProcessManager from .nodes import SensorNode, InputNode, OutputNode, ThinkerNode, MemorizerNode log = logging.getLogger("runtime") TRACE_FILE = Path(__file__).parent.parent / "trace.jsonl" class Runtime: def __init__(self, ws: WebSocket, user_claims: dict = None, origin: str = "", broadcast: Callable = None): self.ws = ws self.history: list[dict] = [] self.MAX_HISTORY = 40 self._broadcast = broadcast or (lambda e: None) self.input_node = InputNode(send_hud=self._send_hud) self.process_manager = ProcessManager(send_hud=self._send_hud) self.thinker = ThinkerNode(send_hud=self._send_hud, process_manager=self.process_manager) self.output_node = OutputNode(send_hud=self._send_hud) self.memorizer = MemorizerNode(send_hud=self._send_hud) self.sensor = SensorNode(send_hud=self._send_hud) self.sensor.start(get_memo_state=lambda: self.memorizer.state) claims = user_claims or {} log.info(f"[runtime] user_claims: {claims}") self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or "unknown" log.info(f"[runtime] resolved identity: {self.identity}") self.channel = origin or "unknown" self.memorizer.state["user_name"] = self.identity self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session" async def _send_hud(self, data: dict): await self.ws.send_text(json.dumps({"type": "hud", **data})) trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data} try: with open(TRACE_FILE, "a", encoding="utf-8") as f: f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n") if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000: lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n") TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8") except Exception as e: log.error(f"trace write error: {e}") self._broadcast(trace_entry) async def _stream_text(self, text: str): """Stream pre-formed text to the client as deltas, simulating LLM output.""" # Send in chunks to feel natural chunk_size = 12 for i in range(0, len(text), chunk_size): chunk = text[i:i + chunk_size] await self.ws.send_text(json.dumps({"type": "delta", "content": chunk})) await self.ws.send_text(json.dumps({"type": "done"})) async def handle_action(self, action: str, data: dict = None): """Handle a structured UI action (button click etc.).""" # Format as a structured message that Thinker can parse action_desc = f"ACTION: {action}" if data: action_desc += f" | data: {json.dumps(data)}" # Add to history as a system-level event, not user speech self.history.append({"role": "user", "content": action_desc}) self.sensor.note_user_activity() sensor_lines = self.sensor.get_context_lines() mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines) # Skip Input node — this isn't speech to perceive, go straight to Thinker command = Command(instruction=f"User clicked UI button: {action}", source_text=action_desc) thought = await self.thinker.process(command, self.history, memory_context=mem_ctx) if thought.controls: await self.ws.send_text(json.dumps({"type": "controls", "controls": thought.controls})) response = await self.output_node.process(thought, self.history, self.ws, memory_context=mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) if len(self.history) > self.MAX_HISTORY: self.history = self.history[-self.MAX_HISTORY:] async def handle_message(self, text: str): envelope = Envelope( text=text, user_id="nico", session_id="test", timestamp=time.strftime("%Y-%m-%d %H:%M:%S"), ) self.sensor.note_user_activity() self.history.append({"role": "user", "content": text}) sensor_lines = self.sensor.get_context_lines() mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines) command = await self.input_node.process( envelope, self.history, memory_context=mem_ctx, identity=self.identity, channel=self.channel) thought = await self.thinker.process(command, self.history, memory_context=mem_ctx) # Send controls inline (before response text) if thought.controls: await self.ws.send_text(json.dumps({"type": "controls", "controls": thought.controls})) # Output renders Thinker's reasoning into device-appropriate response response = await self.output_node.process(thought, self.history, self.ws, memory_context=mem_ctx) self.history.append({"role": "assistant", "content": response}) await self.memorizer.update(self.history) if len(self.history) > self.MAX_HISTORY: self.history = self.history[-self.MAX_HISTORY:]