From 6f4d26ab82ee9b74ba6aa9d5045efbe4fbdcf8e1 Mon Sep 17 00:00:00 2001 From: Nico Date: Sun, 29 Mar 2026 04:36:28 +0200 Subject: [PATCH] =?UTF-8?q?v0.14.1:=20Decouple=20Runtime=20from=20WebSocke?= =?UTF-8?q?t=20=E2=80=94=20persistent=20server-side=20runtime?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - OutputSink: collects output, optionally streams to attached WS - Runtime no longer requires WebSocket — works headless for MCP - WS connects/disconnects via attach_ws()/detach_ws(), runtime persists - /api/send/check + /api/send (async) + /api/result (poll with progress) - Graph switch destroys old runtime, next request creates new one - Director v2 model: claude-opus-4 (was claude-sonnet-4, reserved) Co-Authored-By: Claude Opus 4.6 (1M context) --- agent/api.py | 76 ++++++++++++++++----------- agent/nodes/director_v2.py | 2 +- agent/nodes/output_v1.py | 10 ++-- agent/runtime.py | 105 ++++++++++++++++++++++++++++++------- 4 files changed, 136 insertions(+), 57 deletions(-) diff --git a/agent/api.py b/agent/api.py index d616a19..24fbefa 100644 --- a/agent/api.py +++ b/agent/api.py @@ -72,10 +72,18 @@ def register_routes(app): "projectId": ZITADEL_PROJECT_ID, } + def _ensure_runtime(user_claims=None, origin=""): + """Get or create the persistent runtime.""" + global _active_runtime + if _active_runtime is None: + _active_runtime = Runtime(user_claims=user_claims, origin=origin, + broadcast=_broadcast_sse) + log.info("[api] created persistent runtime") + return _active_runtime + @app.websocket("/ws") async def ws_endpoint(ws: WebSocket, token: str | None = Query(None), access_token: str | None = Query(None)): - global _active_runtime user_claims = {"sub": "anonymous"} if AUTH_ENABLED and token: try: @@ -95,8 +103,12 @@ def register_routes(app): return origin = ws.headers.get("origin", ws.headers.get("host", "")) await ws.accept() - runtime = Runtime(ws, user_claims=user_claims, origin=origin, broadcast=_broadcast_sse) - _active_runtime = runtime + + # Get or create runtime, attach WS + runtime = _ensure_runtime(user_claims=user_claims, origin=origin) + runtime.update_identity(user_claims, origin) + runtime.attach_ws(ws) + try: while True: data = await ws.receive_text() @@ -108,9 +120,8 @@ def register_routes(app): else: await runtime.handle_message(msg.get("text", ""), dashboard=msg.get("dashboard")) except WebSocketDisconnect: - runtime.sensor.stop() - if _active_runtime is runtime: - _active_runtime = None + runtime.detach_ws() + log.info("[api] WS disconnected — runtime stays alive") @app.get("/api/events") async def sse_events(user=Depends(require_auth)): @@ -138,7 +149,7 @@ def register_routes(app): return { "changed": True, "hash": h, - "state": _active_runtime.memorizer.state if _active_runtime else None, + "state": _active_runtime.memorizer.state if _active_runtime else {}, "history_len": len(_active_runtime.history) if _active_runtime else 0, "last_messages": _active_runtime.history[-3:] if _active_runtime else [], } @@ -147,23 +158,22 @@ def register_routes(app): async def api_send_check(user=Depends(require_auth)): """Validate runtime is ready to accept a message. Fast, no LLM calls.""" global _pipeline_task - if not _active_runtime: - return {"ready": False, "reason": "no_session", "detail": "No WS connection -- someone must be connected via browser first"} + runtime = _ensure_runtime() if _pipeline_task and not _pipeline_task.done(): return {"ready": False, "reason": "busy", "detail": "Pipeline already running"} return { "ready": True, - "graph": _active_runtime.graph.get("name", "unknown"), - "identity": _active_runtime.identity, - "history_len": len(_active_runtime.history), + "graph": runtime.graph.get("name", "unknown"), + "identity": runtime.identity, + "history_len": len(runtime.history), + "ws_connected": runtime.sink.ws is not None, } @app.post("/api/send") async def api_send(body: dict, user=Depends(require_auth)): """Queue a message for async processing. Returns immediately with a message ID.""" global _pipeline_task, _pipeline_result, _pipeline_id - if not _active_runtime: - raise HTTPException(status_code=409, detail="No active session -- someone must be connected via WS first") + runtime = _ensure_runtime() if _pipeline_task and not _pipeline_task.done(): raise HTTPException(status_code=409, detail="Pipeline already running") text = body.get("text", "").strip() @@ -180,13 +190,13 @@ def register_routes(app): global _pipeline_result try: _pipeline_result["stage"] = "input" - await _active_runtime.handle_message(text, dashboard=dashboard) + await runtime.handle_message(text, dashboard=dashboard) _pipeline_result = { "status": "done", "id": msg_id, "stage": "done", - "response": _active_runtime.history[-1]["content"] if _active_runtime.history else "", - "memorizer": _active_runtime.memorizer.state, + "response": runtime.history[-1]["content"] if runtime.history else "", + "memorizer": runtime.memorizer.state, } except Exception as e: log.error(f"[api] pipeline error: {e}") @@ -207,31 +217,30 @@ def register_routes(app): @app.post("/api/clear") async def api_clear(user=Depends(require_auth)): - if not _active_runtime: - raise HTTPException(status_code=409, detail="No active session") - _active_runtime.history.clear() - _active_runtime.ui_node.state.clear() - _active_runtime.ui_node.bindings.clear() - _active_runtime.ui_node.current_controls.clear() + runtime = _ensure_runtime() + runtime.history.clear() + runtime.ui_node.state.clear() + runtime.ui_node.bindings.clear() + runtime.ui_node.current_controls.clear() + runtime.ui_node.machines.clear() return {"status": "cleared"} @app.get("/api/state") async def get_state(user=Depends(require_auth)): - if not _active_runtime: - return {"status": "no_session"} + runtime = _ensure_runtime() return { "status": "active", - "memorizer": _active_runtime.memorizer.state, - "history_len": len(_active_runtime.history), + "memorizer": runtime.memorizer.state, + "history_len": len(runtime.history), + "ws_connected": runtime.sink.ws is not None, } @app.get("/api/history") async def get_history(last: int = 10, user=Depends(require_auth)): - if not _active_runtime: - return {"status": "no_session", "messages": []} + runtime = _ensure_runtime() return { "status": "active", - "messages": _active_runtime.history[-last:], + "messages": runtime.history[-last:], } @app.get("/api/graph/active") @@ -254,11 +263,16 @@ def register_routes(app): @app.post("/api/graph/switch") async def switch_graph(body: dict, user=Depends(require_auth)): + global _active_runtime from .engine import load_graph import agent.runtime as rt name = body.get("name", "") graph = load_graph(name) # validates it exists rt._active_graph_name = name + # Kill old runtime, next request creates new one with new graph + if _active_runtime: + _active_runtime.sensor.stop() + _active_runtime = None return {"status": "ok", "name": graph["name"], "note": "New sessions will use this graph. Existing session unchanged."} @@ -288,7 +302,7 @@ def register_routes(app): _broadcast_sse({"type": "test_status", **_test_status}) if _active_runtime: try: - await _active_runtime.ws.send_text(json.dumps({"type": "test_status", **_test_status})) + await _active_runtime.sink.send_hud({"type": "test_status", **_test_status}) except Exception: pass return {"ok": True} diff --git a/agent/nodes/director_v2.py b/agent/nodes/director_v2.py index 56e9475..702d468 100644 --- a/agent/nodes/director_v2.py +++ b/agent/nodes/director_v2.py @@ -12,7 +12,7 @@ log = logging.getLogger("runtime") class DirectorV2Node(Node): name = "director_v2" - model = "anthropic/claude-sonnet-4" + model = "anthropic/claude-opus-4" max_context_tokens = 4000 SYSTEM = """You are the Director — the brain of this cognitive agent runtime. diff --git a/agent/nodes/output_v1.py b/agent/nodes/output_v1.py index 29a6d43..b89874b 100644 --- a/agent/nodes/output_v1.py +++ b/agent/nodes/output_v1.py @@ -3,8 +3,6 @@ import json import logging -from fastapi import WebSocket - from .base import Node from ..llm import llm_call from ..types import Command, ThoughtResult @@ -39,7 +37,8 @@ YOUR JOB: Transform the Thinker's reasoning into a natural, human-readable text {memory_context}""" async def process(self, thought: ThoughtResult, history: list[dict], - ws: WebSocket, memory_context: str = "") -> str: + sink, memory_context: str = "") -> str: + """Render Thinker's output. Streams via sink (OutputSink).""" await self.hud("streaming") messages = [ @@ -52,7 +51,6 @@ YOUR JOB: Transform the Thinker's reasoning into a natural, human-readable text thinker_ctx = f"Thinker response: {thought.response}" if thought.tool_used: if thought.tool_used == "query_db" and thought.tool_output and not thought.tool_output.startswith("Error"): - # DB results render as table in workspace — just tell Output the summary row_count = max(0, thought.tool_output.count("\n")) thinker_ctx += f"\n\nTool: query_db returned {row_count} rows (shown as table in workspace). Do NOT repeat the data. Just give a brief summary or insight." else: @@ -80,12 +78,12 @@ YOUR JOB: Transform the Thinker's reasoning into a natural, human-readable text token = delta.get("content", "") if token: full_response += token - await ws.send_text(json.dumps({"type": "delta", "content": token})) + await sink.send_delta(token) finally: await resp.aclose() await client.aclose() log.info(f"[output] response: {full_response[:100]}...") - await ws.send_text(json.dumps({"type": "done"})) + await sink.send_done() await self.hud("done") return full_response diff --git a/agent/runtime.py b/agent/runtime.py index 0906c46..ac583b3 100644 --- a/agent/runtime.py +++ b/agent/runtime.py @@ -7,8 +7,6 @@ import time from pathlib import Path from typing import Callable -from fastapi import WebSocket - from .types import Envelope, Command, InputAnalysis, ThoughtResult, DirectorPlan from .process import ProcessManager from .engine import load_graph, instantiate_nodes, list_graphs, get_graph_for_cytoscape @@ -21,10 +19,62 @@ TRACE_FILE = Path(__file__).parent.parent / "trace.jsonl" _active_graph_name = "v1-current" -class Runtime: - def __init__(self, ws: WebSocket, user_claims: dict = None, origin: str = "", - broadcast: Callable = None, graph_name: str = None): +class OutputSink: + """Collects output. Optionally streams to attached WebSocket.""" + + def __init__(self): + self.ws = None + self.response: str = "" + self.controls: list = [] + self.done: bool = False + + def attach(self, ws): self.ws = ws + + def detach(self): + self.ws = None + + def reset(self): + self.response = "" + self.controls = [] + self.done = False + + async def send_delta(self, text: str): + self.response += text + if self.ws: + try: + await self.ws.send_text(json.dumps({"type": "delta", "content": text})) + except Exception: + pass + + async def send_controls(self, controls: list): + self.controls = controls + if self.ws: + try: + await self.ws.send_text(json.dumps({"type": "controls", "controls": controls})) + except Exception: + pass + + async def send_hud(self, data: dict): + if self.ws: + try: + await self.ws.send_text(json.dumps({"type": "hud", **data})) + except Exception: + pass + + async def send_done(self): + self.done = True + if self.ws: + try: + await self.ws.send_text(json.dumps({"type": "done"})) + except Exception: + pass + + +class Runtime: + def __init__(self, user_claims: dict = None, origin: str = "", + broadcast: Callable = None, graph_name: str = None): + self.sink = OutputSink() self.history: list[dict] = [] self.MAX_HISTORY = 40 self._broadcast = broadcast or (lambda e: None) @@ -62,8 +112,28 @@ class Runtime: self.memorizer.state["user_name"] = self.identity self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session" + def attach_ws(self, ws): + """Attach a WebSocket for real-time streaming.""" + self.sink.attach(ws) + log.info("[runtime] WS attached") + + def detach_ws(self): + """Detach WebSocket. Runtime keeps running.""" + self.sink.detach() + log.info("[runtime] WS detached") + + def update_identity(self, user_claims: dict, origin: str = ""): + """Update identity from WS auth claims.""" + claims = user_claims or {} + self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or self.identity + if origin: + self.channel = origin + self.memorizer.state["user_name"] = self.identity + self.memorizer.state["situation"] = f"authenticated on {self.channel}" + log.info(f"[runtime] identity updated: {self.identity}") + async def _send_hud(self, data: dict): - await self.ws.send_text(json.dumps({"type": "hud", **data})) + await self.sink.send_hud(data) trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data} try: with open(TRACE_FILE, "a", encoding="utf-8") as f: @@ -77,27 +147,24 @@ class Runtime: async def _stream_text(self, text: str): """Stream pre-formed text to the client as deltas.""" - try: - chunk_size = 12 - for i in range(0, len(text), chunk_size): - await self.ws.send_text(json.dumps({"type": "delta", "content": text[i:i + chunk_size]})) - await self.ws.send_text(json.dumps({"type": "done"})) - except Exception: - pass # WS may not be connected (e.g. API-only calls) + self.sink.reset() + chunk_size = 12 + for i in range(0, len(text), chunk_size): + await self.sink.send_delta(text[i:i + chunk_size]) + await self.sink.send_done() async def _run_output_and_ui(self, thought, mem_ctx): """Run Output and UI nodes in parallel. Returns (response_text, controls).""" + self.sink.reset() output_task = asyncio.create_task( - self.output_node.process(thought, self.history, self.ws, memory_context=mem_ctx)) + self.output_node.process(thought, self.history, self.sink, memory_context=mem_ctx)) ui_task = asyncio.create_task( self.ui_node.process(thought, self.history, memory_context=mem_ctx)) - # Output streams to WS, UI returns controls — both run concurrently response, controls = await asyncio.gather(output_task, ui_task) - # Send controls after Output starts streaming (UI may finish first or after) if controls: - await self.ws.send_text(json.dumps({"type": "controls", "controls": controls})) + await self.sink.send_controls(controls) return response @@ -117,7 +184,7 @@ class Runtime: if not ctrl.get("machine_id"): controls.append(ctrl) self.ui_node.current_controls = controls - await self.ws.send_text(json.dumps({"type": "controls", "controls": controls})) + await self.sink.send_controls(controls) await self._send_hud({"node": "ui", "event": "controls", "controls": controls}) await self._stream_text(transition_result) self.history.append({"role": "user", "content": f"[clicked {action}]"}) @@ -129,7 +196,7 @@ class Runtime: if result is not None: # Local action handled — send controls update + short response if controls: - await self.ws.send_text(json.dumps({"type": "controls", "controls": controls})) + await self.sink.send_controls(controls) await self._stream_text(result) self.history.append({"role": "user", "content": f"[clicked {action}]"}) self.history.append({"role": "assistant", "content": result})