agent-runtime/agent/runtime.py
Nico 231f81bc52 v0.8.2: fix pipeline — skip Output for tools, process HUD, inline controls, structured actions
- Thinker tool results stream directly to user, skipping Output node (halves latency)
- ProcessManager process_start/process_done events render as live cards in chat
- UI controls sent before response text, not after
- Button clicks route to handle_action(), skip Input, go straight to Thinker
- Fix Thinker model: gemini-2.5-flash-preview -> gemini-2.5-flash (old ID expired)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 01:43:07 +01:00

137 lines
5.8 KiB
Python

"""Runtime: wires all nodes together into a processing pipeline."""
import json
import logging
import time
from pathlib import Path
from typing import Callable
from fastapi import WebSocket
from .types import Envelope, Command
from .process import ProcessManager
from .nodes import SensorNode, InputNode, OutputNode, ThinkerNode, MemorizerNode
log = logging.getLogger("runtime")
TRACE_FILE = Path(__file__).parent.parent / "trace.jsonl"
class Runtime:
def __init__(self, ws: WebSocket, user_claims: dict = None, origin: str = "",
broadcast: Callable = None):
self.ws = ws
self.history: list[dict] = []
self.MAX_HISTORY = 40
self._broadcast = broadcast or (lambda e: None)
self.input_node = InputNode(send_hud=self._send_hud)
self.process_manager = ProcessManager(send_hud=self._send_hud)
self.thinker = ThinkerNode(send_hud=self._send_hud, process_manager=self.process_manager)
self.output_node = OutputNode(send_hud=self._send_hud)
self.memorizer = MemorizerNode(send_hud=self._send_hud)
self.sensor = SensorNode(send_hud=self._send_hud)
self.sensor.start(get_memo_state=lambda: self.memorizer.state)
claims = user_claims or {}
log.info(f"[runtime] user_claims: {claims}")
self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or "unknown"
log.info(f"[runtime] resolved identity: {self.identity}")
self.channel = origin or "unknown"
self.memorizer.state["user_name"] = self.identity
self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session"
async def _send_hud(self, data: dict):
await self.ws.send_text(json.dumps({"type": "hud", **data}))
trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data}
try:
with open(TRACE_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n")
if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000:
lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n")
TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8")
except Exception as e:
log.error(f"trace write error: {e}")
self._broadcast(trace_entry)
async def _stream_text(self, text: str):
"""Stream pre-formed text to the client as deltas, simulating LLM output."""
# Send in chunks to feel natural
chunk_size = 12
for i in range(0, len(text), chunk_size):
chunk = text[i:i + chunk_size]
await self.ws.send_text(json.dumps({"type": "delta", "content": chunk}))
await self.ws.send_text(json.dumps({"type": "done"}))
async def handle_action(self, action: str, data: dict = None):
"""Handle a structured UI action (button click etc.)."""
# Format as a structured message that Thinker can parse
action_desc = f"ACTION: {action}"
if data:
action_desc += f" | data: {json.dumps(data)}"
# Add to history as a system-level event, not user speech
self.history.append({"role": "user", "content": action_desc})
self.sensor.note_user_activity()
sensor_lines = self.sensor.get_context_lines()
mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines)
# Skip Input node — this isn't speech to perceive, go straight to Thinker
command = Command(instruction=f"User clicked UI button: {action}", source_text=action_desc)
thought = await self.thinker.process(command, self.history, memory_context=mem_ctx)
if thought.controls:
await self.ws.send_text(json.dumps({"type": "controls", "controls": thought.controls}))
await self._stream_text(thought.response)
self.history.append({"role": "assistant", "content": thought.response})
await self.memorizer.update(self.history)
if len(self.history) > self.MAX_HISTORY:
self.history = self.history[-self.MAX_HISTORY:]
async def handle_message(self, text: str):
envelope = Envelope(
text=text,
user_id="nico",
session_id="test",
timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
)
self.sensor.note_user_activity()
self.history.append({"role": "user", "content": text})
sensor_lines = self.sensor.get_context_lines()
mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines)
command = await self.input_node.process(
envelope, self.history, memory_context=mem_ctx,
identity=self.identity, channel=self.channel)
thought = await self.thinker.process(command, self.history, memory_context=mem_ctx)
# Send controls inline (before response text)
if thought.controls:
await self.ws.send_text(json.dumps({"type": "controls", "controls": thought.controls}))
if thought.tool_used:
# Thinker already formulated response from tool output — stream directly
await self._stream_text(thought.response)
response = thought.response
else:
# Pure conversation — Output node adds personality and streams
command = Command(
instruction=f"Thinker says: {thought.response}",
source_text=command.source_text
)
response = await self.output_node.process(command, self.history, self.ws, memory_context=mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
if len(self.history) > self.MAX_HISTORY:
self.history = self.history[-self.MAX_HISTORY:]