agent-runtime/agent.py
Nico 20363a1f2f v0.7.2: UI controls + ProcessManager + Thinker upgrade (WIP)
- ProcessManager: observable tool execution with start/stop/status
- UI controls protocol: buttons, tables, process cards
- Frontend renders controls in chat, clicks route back as actions
- Thinker upgraded to gemini-2.5-flash-preview
- Auto-detect SQL/python/tool_code blocks for execution
- SQL blocks auto-wrapped in Python sqlite3 script
- WIP: tool execution path needs tuning, controls not yet triggered

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 01:16:26 +01:00

1161 lines
45 KiB
Python

"""
Cognitive Agent Runtime — Phase A.2: Three-node graph (Input → Output + Memorizer).
Input decides WHAT to do. Output executes and streams.
Memorizer holds shared state (S2 — coordination).
"""
import asyncio
import json
import os
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import httpx
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, Query
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
# --- Config ---
API_KEY = os.environ["OPENROUTER_API_KEY"]
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
# --- Auth (Zitadel OIDC) ---
ZITADEL_ISSUER = os.environ.get("ZITADEL_ISSUER", "https://auth.loop42.de")
ZITADEL_CLIENT_ID = os.environ.get("ZITADEL_CLIENT_ID", "365996029172056091")
ZITADEL_PROJECT_ID = os.environ.get("ZITADEL_PROJECT_ID", "365995955654230043")
AUTH_ENABLED = os.environ.get("AUTH_ENABLED", "false").lower() == "true"
SERVICE_TOKENS = set(filter(None, os.environ.get("SERVICE_TOKENS", "").split(",")))
_jwks_cache: dict = {"keys": [], "fetched_at": 0}
async def _get_jwks():
if time.time() - _jwks_cache["fetched_at"] < 3600:
return _jwks_cache["keys"]
async with httpx.AsyncClient() as client:
resp = await client.get(f"{ZITADEL_ISSUER}/oauth/v2/keys")
_jwks_cache["keys"] = resp.json()["keys"]
_jwks_cache["fetched_at"] = time.time()
return _jwks_cache["keys"]
async def _validate_token(token: str) -> dict:
"""Validate token: check service tokens, then JWT, then introspection."""
import base64
# Check static service tokens (for machine accounts like titan)
if token in SERVICE_TOKENS:
return {"sub": "titan", "username": "titan", "source": "service_token"}
# Try JWT validation first
try:
parts = token.split(".")
if len(parts) == 3:
keys = await _get_jwks()
header_b64 = parts[0] + "=" * (4 - len(parts[0]) % 4)
header = json.loads(base64.urlsafe_b64decode(header_b64))
kid = header.get("kid")
key = next((k for k in keys if k["kid"] == kid), None)
if key:
import jwt as pyjwt
from jwt import PyJWK
jwk_obj = PyJWK(key)
claims = pyjwt.decode(
token, jwk_obj.key, algorithms=["RS256"],
issuer=ZITADEL_ISSUER, options={"verify_aud": False},
)
return claims
except Exception:
pass
# Fall back to introspection (for opaque access tokens)
# Zitadel requires client_id + client_secret or JWT profile for introspection
# For a public SPA client, use the project's API app instead
# Simplest: check via userinfo endpoint with the token
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{ZITADEL_ISSUER}/oidc/v1/userinfo",
headers={"Authorization": f"Bearer {token}"},
)
if resp.status_code == 200:
info = resp.json()
log.info(f"[auth] userinfo response: {info}")
return {"sub": info.get("sub"), "preferred_username": info.get("preferred_username"),
"email": info.get("email"), "name": info.get("name"), "source": "userinfo"}
raise HTTPException(status_code=401, detail="Invalid token")
_bearer = HTTPBearer(auto_error=False)
async def require_auth(credentials: HTTPAuthorizationCredentials | None = Depends(_bearer)):
"""Dependency: require valid JWT when AUTH_ENABLED."""
if not AUTH_ENABLED:
return {"sub": "anonymous"}
if not credentials:
raise HTTPException(status_code=401, detail="Missing token")
return await _validate_token(credentials.credentials)
async def ws_auth(token: str | None = Query(None)) -> dict:
"""Validate WebSocket token from query param."""
if not AUTH_ENABLED:
return {"sub": "anonymous"}
if not token:
return None # Will reject in ws_endpoint
return await _validate_token(token)
# --- LLM helper ---
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s", datefmt="%H:%M:%S")
log = logging.getLogger("runtime")
async def llm_call(model: str, messages: list[dict], stream: bool = False) -> Any:
"""Single LLM call via OpenRouter. Returns full text or (client, response) for streaming."""
headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
body = {"model": model, "messages": messages, "stream": stream}
client = httpx.AsyncClient(timeout=60)
if stream:
resp = await client.send(client.build_request("POST", OPENROUTER_URL, headers=headers, json=body), stream=True)
return client, resp # caller owns cleanup
resp = await client.post(OPENROUTER_URL, headers=headers, json=body)
await client.aclose()
data = resp.json()
if "choices" not in data:
log.error(f"LLM error: {data}")
return f"[LLM error: {data.get('error', {}).get('message', 'unknown')}]"
return data["choices"][0]["message"]["content"]
# --- Message types ---
@dataclass
class Envelope:
"""What flows between nodes."""
text: str
user_id: str = "anon"
session_id: str = ""
timestamp: str = ""
@dataclass
class Command:
"""Input node's perception — describes what was heard."""
instruction: str # natural language perception
source_text: str # original user message
metadata: dict = field(default_factory=dict)
@dataclass
class ThoughtResult:
"""Thinker node's output — either a direct answer or tool results."""
response: str # what to tell the user (direct or post-tool)
tool_used: str = "" # which tool was called (empty if none)
tool_output: str = "" # raw tool output
controls: list = field(default_factory=list) # UI controls to render
# --- Process Manager (observable tool execution) ---
class Process:
"""A single observable tool execution."""
_next_id = 0
def __init__(self, tool: str, code: str, send_hud):
Process._next_id += 1
self.pid = Process._next_id
self.tool = tool
self.code = code
self.send_hud = send_hud
self.status = "pending" # pending, running, done, failed, cancelled
self.output_lines: list[str] = []
self.exit_code: int | None = None
self.started_at: float = 0
self.ended_at: float = 0
self._subprocess: subprocess.Popen | None = None
async def hud(self, event: str, **data):
await self.send_hud({"node": "process", "event": event, "pid": self.pid,
"tool": self.tool, "status": self.status, **data})
def run_sync(self) -> str:
"""Execute the tool synchronously. Returns output."""
self.status = "running"
self.started_at = time.time()
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as f:
f.write(self.code)
f.flush()
self._subprocess = subprocess.Popen(
['python3', f.name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, cwd=tempfile.gettempdir()
)
stdout, stderr = self._subprocess.communicate(timeout=10)
self.exit_code = self._subprocess.returncode
if stdout:
self.output_lines.extend(stdout.strip().split("\n"))
if self.exit_code != 0 and stderr:
self.output_lines.append(f"[stderr: {stderr.strip()}]")
self.status = "done" if self.exit_code == 0 else "failed"
except subprocess.TimeoutExpired:
if self._subprocess:
self._subprocess.kill()
self.output_lines.append("[error: timed out after 10s]")
self.status = "failed"
self.exit_code = -1
except Exception as e:
self.output_lines.append(f"[error: {e}]")
self.status = "failed"
self.exit_code = -1
finally:
self.ended_at = time.time()
return "\n".join(self.output_lines) or "[no output]"
def cancel(self):
if self._subprocess and self.status == "running":
self._subprocess.kill()
self.status = "cancelled"
self.ended_at = time.time()
self.output_lines.append("[cancelled by user]")
class ProcessManager:
"""Manages all tool executions as observable processes."""
def __init__(self, send_hud):
self.send_hud = send_hud
self.processes: dict[int, Process] = {}
async def execute(self, tool: str, code: str) -> Process:
"""Create and run a process. Returns the completed Process."""
proc = Process(tool, code, self.send_hud)
self.processes[proc.pid] = proc
await proc.hud("process_start", code=code[:200])
# Run in executor to avoid blocking the event loop
loop = asyncio.get_event_loop()
output = await loop.run_in_executor(None, proc.run_sync)
elapsed = round(proc.ended_at - proc.started_at, 2)
await proc.hud("process_done", exit_code=proc.exit_code,
output=output[:500], elapsed=elapsed)
return proc
def cancel(self, pid: int) -> bool:
proc = self.processes.get(pid)
if proc:
proc.cancel()
return True
return False
def get_status(self) -> list[dict]:
return [{"pid": p.pid, "tool": p.tool, "status": p.status,
"elapsed": round((p.ended_at or time.time()) - p.started_at, 2) if p.started_at else 0}
for p in self.processes.values()]
# --- Base Node ---
def estimate_tokens(text: str) -> int:
"""Rough token estimate: 1 token ≈ 4 chars."""
return len(text) // 4
def fit_context(messages: list[dict], max_tokens: int, protect_last: int = 4) -> list[dict]:
"""Trim oldest messages (after system prompt) to fit token budget.
Always keeps: system prompt(s) at start + last `protect_last` messages."""
if not messages:
return messages
# Split into system prefix, middle (trimmable), and protected tail
system_msgs = []
rest = []
for m in messages:
if not rest and m["role"] == "system":
system_msgs.append(m)
else:
rest.append(m)
protected = rest[-protect_last:] if len(rest) > protect_last else rest
middle = rest[:-protect_last] if len(rest) > protect_last else []
# Count fixed tokens (system + protected tail)
fixed_tokens = sum(estimate_tokens(m["content"]) for m in system_msgs + protected)
if fixed_tokens >= max_tokens:
# Even fixed content exceeds budget — truncate protected messages
result = system_msgs + protected
total = sum(estimate_tokens(m["content"]) for m in result)
while total > max_tokens and len(result) > 2:
removed = result.pop(1) # remove oldest non-system
total -= estimate_tokens(removed["content"])
return result
# Fill remaining budget with middle messages (newest first)
remaining = max_tokens - fixed_tokens
kept_middle = []
for m in reversed(middle):
t = estimate_tokens(m["content"])
if remaining - t < 0:
break
kept_middle.insert(0, m)
remaining -= t
return system_msgs + kept_middle + protected
class Node:
name: str = "node"
model: str | None = None
max_context_tokens: int = 4000 # default budget per node
def __init__(self, send_hud):
self.send_hud = send_hud # async callable to emit hud events to frontend
self.last_context_tokens = 0
self.context_fill_pct = 0
async def hud(self, event: str, **data):
await self.send_hud({"node": self.name, "event": event, **data})
def trim_context(self, messages: list[dict]) -> list[dict]:
"""Fit messages within this node's token budget."""
before = len(messages)
result = fit_context(messages, self.max_context_tokens)
self.last_context_tokens = sum(estimate_tokens(m["content"]) for m in result)
self.context_fill_pct = int(100 * self.last_context_tokens / self.max_context_tokens)
if before != len(result):
log.info(f"[{self.name}] context trimmed: {before}{len(result)} msgs, {self.context_fill_pct}% fill")
return result
# --- Sensor Node (ticks independently, produces context for other nodes) ---
from datetime import datetime, timezone, timedelta
BERLIN = timezone(timedelta(hours=2)) # CEST
class SensorNode(Node):
name = "sensor"
def __init__(self, send_hud):
super().__init__(send_hud)
self.tick_count = 0
self.running = False
self._task: asyncio.Task | None = None
self.interval = 5 # seconds
# Current sensor readings — each is {value, changed_at, prev}
self.readings: dict[str, dict] = {}
self._last_user_activity: float = time.time()
# Snapshot of memorizer state for change detection
self._prev_memo_state: dict = {}
def _now(self) -> datetime:
return datetime.now(BERLIN)
def _read_clock(self) -> dict:
"""Clock sensor — updates when minute changes."""
now = self._now()
current = now.strftime("%H:%M")
prev = self.readings.get("clock", {}).get("value")
if current != prev:
return {"value": current, "detail": now.strftime("%Y-%m-%d %H:%M:%S %A"), "changed_at": time.time()}
return {} # no change
def _read_idle(self) -> dict:
"""Idle sensor — time since last user message."""
idle_s = time.time() - self._last_user_activity
# Only update on threshold crossings: 30s, 1m, 5m, 10m, 30m
thresholds = [30, 60, 300, 600, 1800]
prev_idle = self.readings.get("idle", {}).get("_raw", 0)
for t in thresholds:
if prev_idle < t <= idle_s:
if idle_s < 60:
label = f"{int(idle_s)}s"
else:
label = f"{int(idle_s // 60)}m{int(idle_s % 60)}s"
return {"value": label, "_raw": idle_s, "changed_at": time.time()}
# Update raw but don't flag as changed
if "idle" in self.readings:
self.readings["idle"]["_raw"] = idle_s
return {}
def _read_memo_changes(self, memo_state: dict) -> dict:
"""Detect memorizer state changes."""
changes = []
for k, v in memo_state.items():
prev = self._prev_memo_state.get(k)
if v != prev and prev is not None:
changes.append(f"{k}: {prev} -> {v}")
self._prev_memo_state = dict(memo_state)
if changes:
return {"value": "; ".join(changes), "changed_at": time.time()}
return {}
def note_user_activity(self):
"""Called when user sends a message."""
self._last_user_activity = time.time()
# Reset idle sensor
self.readings["idle"] = {"value": "active", "_raw": 0, "changed_at": time.time()}
async def tick(self, memo_state: dict):
"""One tick — read all sensors, emit deltas."""
self.tick_count += 1
deltas = {}
# Read each sensor
for name, reader in [("clock", self._read_clock),
("idle", self._read_idle)]:
update = reader()
if update:
self.readings[name] = {**self.readings.get(name, {}), **update}
deltas[name] = update.get("value") or update.get("detail")
# Memo changes
memo_update = self._read_memo_changes(memo_state)
if memo_update:
self.readings["memo_delta"] = memo_update
deltas["memo_delta"] = memo_update["value"]
# Only emit HUD if something changed
if deltas:
await self.hud("tick", tick=self.tick_count, deltas=deltas)
async def _loop(self, get_memo_state):
"""Background tick loop."""
self.running = True
await self.hud("started", interval=self.interval)
try:
while self.running:
await asyncio.sleep(self.interval)
try:
await self.tick(get_memo_state())
except Exception as e:
log.error(f"[sensor] tick error: {e}")
except asyncio.CancelledError:
pass
finally:
self.running = False
await self.hud("stopped")
def start(self, get_memo_state):
"""Start the background tick loop."""
if self._task and not self._task.done():
return
self._task = asyncio.create_task(self._loop(get_memo_state))
def stop(self):
"""Stop the tick loop."""
self.running = False
if self._task:
self._task.cancel()
def get_context_lines(self) -> list[str]:
"""Render current sensor readings for injection into prompts."""
if not self.readings:
return ["Sensors: (no sensor node running)"]
lines = [f"Sensors (tick #{self.tick_count}, {self.interval}s interval):"]
for name, r in self.readings.items():
if name.startswith("_"):
continue
val = r.get("value", "?")
detail = r.get("detail")
age = time.time() - r.get("changed_at", time.time())
if age < 10:
age_str = "just now"
elif age < 60:
age_str = f"{int(age)}s ago"
else:
age_str = f"{int(age // 60)}m ago"
line = f"- {name}: {detail or val} [{age_str}]"
lines.append(line)
return lines
# --- Input Node ---
class InputNode(Node):
name = "input"
model = "google/gemini-2.0-flash-001"
max_context_tokens = 2000 # small budget — perception only
SYSTEM = """You are the Input node — the ear of this cognitive runtime.
Listener context:
- Authenticated user: {identity}
- Channel: {channel} (Chrome browser on Nico's Windows PC, in his room at home)
- Physical: private space, Nico lives with Tina — she may use this session too
- Security: single-user account, shared physical space — other voices are trusted household
Your job: describe what you heard. Who spoke, what they want, what tone, what context matters.
ONE sentence. No content, no response — just your perception of what came through.
{memory_context}"""
async def process(self, envelope: Envelope, history: list[dict], memory_context: str = "",
identity: str = "unknown", channel: str = "unknown") -> Command:
await self.hud("thinking", detail="deciding how to respond")
log.info(f"[input] user said: {envelope.text}")
messages = [
{"role": "system", "content": self.SYSTEM.format(
memory_context=memory_context, identity=identity, channel=channel)},
]
for msg in history[-8:]:
messages.append(msg)
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens, max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
instruction = await llm_call(self.model, messages)
log.info(f"[input] → command: {instruction}")
await self.hud("perceived", instruction=instruction)
return Command(instruction=instruction, source_text=envelope.text)
# --- Output Node ---
class OutputNode(Node):
name = "output"
model = "google/gemini-2.0-flash-001"
max_context_tokens = 4000 # larger — needs history for continuity
SYSTEM = """You are the Output node — the voice of this cognitive runtime.
The Input node sends you its perception of what the user said. This is internal context for you — never repeat or echo it.
You respond to the USER, not to the Input node. Use the perception to understand intent, then act on it.
Be natural. Be concise. If the user asks you to do something, do it — don't describe what you're about to do.
{memory_context}"""
async def process(self, command: Command, history: list[dict], ws: WebSocket, memory_context: str = "") -> str:
await self.hud("streaming")
messages = [
{"role": "system", "content": self.SYSTEM.format(memory_context=memory_context)},
]
for msg in history[-20:]:
messages.append(msg)
messages.append({"role": "system", "content": f"Input perception: {command.instruction}"})
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens, max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
# Stream response
client, resp = await llm_call(self.model, messages, stream=True)
full_response = ""
try:
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
payload = line[6:]
if payload == "[DONE]":
break
chunk = json.loads(payload)
delta = chunk["choices"][0].get("delta", {})
token = delta.get("content", "")
if token:
full_response += token
await ws.send_text(json.dumps({"type": "delta", "content": token}))
finally:
await resp.aclose()
await client.aclose()
log.info(f"[output] response: {full_response[:100]}...")
await ws.send_text(json.dumps({"type": "done"}))
await self.hud("done")
return full_response
# --- Thinker Node (S3 — control, reasoning, tool use) ---
import subprocess
import tempfile
class ThinkerNode(Node):
name = "thinker"
model = "google/gemini-2.5-flash-preview"
max_context_tokens = 4000
SYSTEM = """You are the Thinker node — the brain of this cognitive runtime.
You receive a perception of what the user said. Decide: answer directly, use a tool, or show UI controls.
TOOLS — write a ```python code block and it WILL be executed. Use print() for output.
- For math, databases, file ops, any computation: write python. NEVER describe code — write it.
- For simple conversation: respond directly as text.
UI CONTROLS — to show interactive elements, include a JSON block:
```controls
[
{{"type": "table", "data": [...], "columns": ["id", "name", "email"]}},
{{"type": "button", "label": "Add Customer", "action": "add_customer"}},
{{"type": "button", "label": "Refresh", "action": "refresh_customers"}}
]
```
Controls render in the chat. User clicks flow back as actions you can handle.
You can combine text + code + controls in one response.
{memory_context}"""
def _parse_tool_call(self, response: str) -> tuple[str, str] | None:
"""Parse tool calls. Supports TOOL: format and auto-detects python code blocks."""
text = response.strip()
# Explicit TOOL: format
if text.startswith("TOOL:"):
lines = text.split("\n")
tool_name = lines[0].replace("TOOL:", "").strip()
code_lines = []
in_code = False
for line in lines[1:]:
if line.strip().startswith("```") and not in_code:
in_code = True
continue
elif line.strip().startswith("```") and in_code:
break
elif in_code:
code_lines.append(line)
elif line.strip().startswith("CODE:"):
continue
return (tool_name, "\n".join(code_lines)) if code_lines else None
# Auto-detect: code blocks get executed as python
# Catches ```python, ```py, ```sql, ```sqlite, or bare ``` with code-like content
import re
block_match = re.search(r'```(?:python|py|sql|sqlite|sh|bash|tool_code)?\s*\n(.*?)```', text, re.DOTALL)
if block_match:
code = block_match.group(1).strip()
if code and len(code.split("\n")) > 0:
# If it's SQL, wrap it in a python sqlite3 script
if "```sql" in text or "```sqlite" in text or ("SELECT" in code.upper() and "CREATE" in code.upper()):
wrapped = f'''import sqlite3
conn = sqlite3.connect("/tmp/cog_db.sqlite")
cursor = conn.cursor()
for stmt in """{code}""".split(";"):
stmt = stmt.strip()
if stmt:
cursor.execute(stmt)
conn.commit()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
for t in tables:
cursor.execute(f"SELECT * FROM {{t[0]}}")
rows = cursor.fetchall()
cols = [d[0] for d in cursor.description]
print(f"Table: {{t[0]}}")
print(" | ".join(cols))
for row in rows:
print(" | ".join(str(c) for c in row))
conn.close()'''
return ("python", wrapped)
return ("python", code)
return None
def __init__(self, send_hud, process_manager: ProcessManager = None):
super().__init__(send_hud)
self.pm = process_manager
def _parse_controls(self, response: str) -> list[dict]:
"""Extract ```controls JSON blocks from response."""
controls = []
if "```controls" not in response:
return controls
parts = response.split("```controls")
for part in parts[1:]:
end = part.find("```")
if end != -1:
try:
controls.extend(json.loads(part[:end].strip()))
except json.JSONDecodeError:
pass
return controls
def _strip_blocks(self, response: str) -> str:
"""Remove code and control blocks, return plain text."""
import re
text = re.sub(r'```(?:python|py|controls).*?```', '', response, flags=re.DOTALL)
return text.strip()
async def process(self, command: Command, history: list[dict], memory_context: str = "") -> ThoughtResult:
await self.hud("thinking", detail="reasoning about response")
messages = [
{"role": "system", "content": self.SYSTEM.format(memory_context=memory_context)},
]
for msg in history[-12:]:
messages.append(msg)
messages.append({"role": "system", "content": f"Input perception: {command.instruction}"})
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
response = await llm_call(self.model, messages)
log.info(f"[thinker] response: {response[:200]}")
# Parse UI controls
controls = self._parse_controls(response)
if controls:
await self.hud("controls", controls=controls)
# Check if Thinker wants to use a tool
tool_call = self._parse_tool_call(response)
if tool_call:
tool_name, code = tool_call
if self.pm and tool_name == "python":
proc = await self.pm.execute(tool_name, code)
tool_output = "\n".join(proc.output_lines)
else:
tool_output = f"[unknown tool: {tool_name}]"
log.info(f"[thinker] tool output: {tool_output[:200]}")
# Second LLM call: interpret tool output + optionally add controls
messages.append({"role": "assistant", "content": response})
messages.append({"role": "system", "content": f"Tool output:\n{tool_output}"})
messages.append({"role": "user", "content": "Respond to the user based on the tool output. If showing data, include a ```controls block with a table. Be natural and concise."})
messages = self.trim_context(messages)
final = await llm_call(self.model, messages)
# Parse controls from the follow-up too
more_controls = self._parse_controls(final)
if more_controls:
controls.extend(more_controls)
await self.hud("controls", controls=more_controls)
clean_text = self._strip_blocks(final)
await self.hud("decided", instruction=clean_text[:200])
return ThoughtResult(response=clean_text, tool_used=tool_name,
tool_output=tool_output, controls=controls)
# No tool needed — pass through
clean_text = self._strip_blocks(response) or response
await self.hud("decided", instruction="direct response (no tools)")
return ThoughtResult(response=clean_text, controls=controls)
# --- Memorizer Node (S2 — shared state / coordination) ---
class MemorizerNode(Node):
name = "memorizer"
model = "google/gemini-2.0-flash-001"
max_context_tokens = 3000 # needs enough history to distill
DISTILL_SYSTEM = """You are the Memorizer node of a cognitive agent runtime.
After each exchange you update the shared state that Input and Output nodes read.
Given the conversation so far, output a JSON object with these fields:
- user_name: string — how the user identifies themselves (null if unknown)
- user_mood: string — current emotional tone (neutral, happy, frustrated, playful, etc.)
- topic: string — what the conversation is about right now
- topic_history: list of strings — previous topics in this session
- situation: string — social/physical context if mentioned (e.g. "at a pub with tina", "private dev session")
- language: string — primary language being used (en, de, mixed)
- style_hint: string — how Output should talk (casual, formal, technical, poetic, etc.)
- facts: list of strings — important facts learned about the user
Output ONLY valid JSON. No explanation, no markdown fences."""
def __init__(self, send_hud):
super().__init__(send_hud)
# The shared state — starts empty, grows over conversation
self.state: dict = {
"user_name": None,
"user_mood": "neutral",
"topic": None,
"topic_history": [],
"situation": "localhost test runtime, private dev session",
"language": "en",
"style_hint": "casual, technical",
"facts": [],
}
def get_context_block(self, sensor_lines: list[str] = None) -> str:
"""Returns a formatted string for injection into Input/Output system prompts."""
lines = sensor_lines or ["Sensors: (none)"]
lines.append("")
lines.append("Shared memory (from Memorizer):")
for k, v in self.state.items():
if v:
lines.append(f"- {k}: {v}")
return "\n".join(lines)
async def update(self, history: list[dict]):
"""Distill conversation into updated shared state. Called after each exchange."""
if len(history) < 2:
await self.hud("updated", state=self.state) # emit default state
return
await self.hud("thinking", detail="updating shared state")
messages = [
{"role": "system", "content": self.DISTILL_SYSTEM},
{"role": "system", "content": f"Current state: {json.dumps(self.state)}"},
]
for msg in history[-10:]:
messages.append(msg)
messages.append({"role": "user", "content": "Update the shared state based on this conversation. Output JSON only."})
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens, max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
raw = await llm_call(self.model, messages)
log.info(f"[memorizer] raw: {raw[:200]}")
# Parse JSON from response (strip markdown fences if present)
text = raw.strip()
if text.startswith("```"):
text = text.split("\n", 1)[1] if "\n" in text else text[3:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
try:
new_state = json.loads(text)
# Merge: keep old facts, add new ones
old_facts = set(self.state.get("facts", []))
new_facts = set(new_state.get("facts", []))
new_state["facts"] = list(old_facts | new_facts)[-20:] # cap at 20 facts
# Preserve topic history
if self.state.get("topic") and self.state["topic"] != new_state.get("topic"):
hist = new_state.get("topic_history", [])
if self.state["topic"] not in hist:
hist.append(self.state["topic"])
new_state["topic_history"] = hist[-5:] # keep last 5
self.state = new_state
log.info(f"[memorizer] updated state: {self.state}")
await self.hud("updated", state=self.state)
except (json.JSONDecodeError, Exception) as e:
log.error(f"[memorizer] update error: {e}, raw: {text[:200]}")
await self.hud("error", detail=f"Update failed: {e}")
# Still emit current state so frontend shows something
await self.hud("updated", state=self.state)
# --- Runtime (wires nodes together) ---
TRACE_FILE = Path(__file__).parent / "trace.jsonl"
class Runtime:
def __init__(self, ws: WebSocket, user_claims: dict = None, origin: str = ""):
self.ws = ws
self.history: list[dict] = []
self.MAX_HISTORY = 40 # sliding window — oldest messages drop off
self.input_node = InputNode(send_hud=self._send_hud)
self.process_manager = ProcessManager(send_hud=self._send_hud)
self.thinker = ThinkerNode(send_hud=self._send_hud, process_manager=self.process_manager)
self.output_node = OutputNode(send_hud=self._send_hud)
self.memorizer = MemorizerNode(send_hud=self._send_hud)
self.sensor = SensorNode(send_hud=self._send_hud)
# Start sensor tick loop
self.sensor.start(get_memo_state=lambda: self.memorizer.state)
# Verified identity from auth — Input and Memorizer use this
claims = user_claims or {}
log.info(f"[runtime] user_claims: {claims}")
self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or "unknown"
log.info(f"[runtime] resolved identity: {self.identity}")
self.channel = origin or "unknown"
# Seed memorizer with verified info
self.memorizer.state["user_name"] = self.identity
self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session"
async def _send_hud(self, data: dict):
# Send to frontend
await self.ws.send_text(json.dumps({"type": "hud", **data}))
# Append to trace file + broadcast to SSE subscribers
trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data}
try:
with open(TRACE_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n")
# Rotate trace file at 1000 lines
if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000:
lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n")
TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8")
except Exception as e:
log.error(f"trace write error: {e}")
_broadcast_sse(trace_entry)
async def handle_message(self, text: str):
envelope = Envelope(
text=text,
user_id="nico",
session_id="test",
timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
)
# Note user activity for idle sensor
self.sensor.note_user_activity()
# Append user message to history FIRST — both nodes see it
self.history.append({"role": "user", "content": text})
# Get shared memory + sensor context for both nodes
sensor_lines = self.sensor.get_context_lines()
mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines)
# Input node perceives (with memory context + identity + channel)
command = await self.input_node.process(
envelope, self.history, memory_context=mem_ctx,
identity=self.identity, channel=self.channel)
# Thinker node reasons + optionally uses tools
thought = await self.thinker.process(command, self.history, memory_context=mem_ctx)
# If Thinker used a tool, inject its result into the command for Output
if thought.tool_used:
# Thinker already formulated the response — Output just streams it
command = Command(
instruction=f"Thinker used {thought.tool_used} and says: {thought.response}",
source_text=command.source_text
)
else:
# Thinker answered directly — Output streams that
command = Command(
instruction=f"Thinker says: {thought.response}",
source_text=command.source_text
)
# Output node streams the response
response = await self.output_node.process(command, self.history, self.ws, memory_context=mem_ctx)
self.history.append({"role": "assistant", "content": response})
# Send UI controls if Thinker produced any
if thought.controls:
await self.ws.send_text(json.dumps({"type": "controls", "controls": thought.controls}))
# Memorizer updates shared state after each exchange
await self.memorizer.update(self.history)
# Sliding window — trim oldest messages, keep context in memorizer
if len(self.history) > self.MAX_HISTORY:
self.history = self.history[-self.MAX_HISTORY:]
# --- App ---
STATIC_DIR = Path(__file__).parent / "static"
app = FastAPI(title="cog")
# Keep a reference to the active runtime for API access
_active_runtime: Runtime | None = None
@app.get("/health")
async def health():
return {"status": "ok"}
@app.get("/auth/config")
async def auth_config():
"""Public: auth config for frontend OIDC flow."""
return {
"enabled": AUTH_ENABLED,
"issuer": ZITADEL_ISSUER,
"clientId": ZITADEL_CLIENT_ID,
"projectId": ZITADEL_PROJECT_ID,
}
@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket, token: str | None = Query(None), access_token: str | None = Query(None)):
global _active_runtime
# Validate auth if enabled
user_claims = {"sub": "anonymous"}
if AUTH_ENABLED and token:
try:
user_claims = await _validate_token(token)
# If id_token lacks name, enrich from userinfo with access_token
if not user_claims.get("name") and access_token:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{ZITADEL_ISSUER}/oidc/v1/userinfo",
headers={"Authorization": f"Bearer {access_token}"})
if resp.status_code == 200:
info = resp.json()
log.info(f"[auth] userinfo enrichment: {info}")
user_claims["name"] = info.get("name")
user_claims["preferred_username"] = info.get("preferred_username")
user_claims["email"] = info.get("email")
except HTTPException:
await ws.close(code=4001, reason="Invalid token")
return
origin = ws.headers.get("origin", ws.headers.get("host", ""))
await ws.accept()
runtime = Runtime(ws, user_claims=user_claims, origin=origin)
_active_runtime = runtime
try:
while True:
data = await ws.receive_text()
msg = json.loads(data)
if msg.get("type") == "action":
# User clicked a UI control
action_text = f"[user clicked: {msg.get('action', 'unknown')}]"
if msg.get("data"):
action_text += f" data: {json.dumps(msg['data'])}"
await runtime.handle_message(action_text)
elif msg.get("type") == "cancel_process":
runtime.process_manager.cancel(msg.get("pid", 0))
else:
await runtime.handle_message(msg.get("text", ""))
except WebSocketDisconnect:
runtime.sensor.stop()
if _active_runtime is runtime:
_active_runtime = None
# --- API endpoints (for Claude to inspect runtime state) ---
import hashlib
from asyncio import Queue
from starlette.responses import StreamingResponse
# SSE subscribers (for titan/service accounts to watch live)
_sse_subscribers: list[Queue] = []
def _broadcast_sse(event: dict):
"""Push an event to all SSE subscribers."""
for q in _sse_subscribers:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass # drop if subscriber is too slow
def _state_hash() -> str:
"""Hash of current runtime state — cheap way to detect changes."""
if not _active_runtime:
return "no_session"
raw = json.dumps({
"mem": _active_runtime.memorizer.state,
"hlen": len(_active_runtime.history),
}, sort_keys=True)
return hashlib.md5(raw.encode()).hexdigest()[:12]
@app.get("/api/events")
async def sse_events(user=Depends(require_auth)):
"""SSE stream of runtime events (trace, state changes)."""
q: Queue = Queue(maxsize=100)
_sse_subscribers.append(q)
async def generate():
try:
while True:
event = await q.get()
yield f"data: {json.dumps(event)}\n\n"
except asyncio.CancelledError:
pass
finally:
_sse_subscribers.remove(q)
return StreamingResponse(generate(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
@app.get("/api/poll")
async def poll(since: str = "", user=Depends(require_auth)):
"""Returns current hash. If 'since' matches, returns {changed: false}. Cheap polling."""
h = _state_hash()
if since and since == h:
return {"changed": False, "hash": h}
return {
"changed": True,
"hash": h,
"state": _active_runtime.memorizer.state if _active_runtime else None,
"history_len": len(_active_runtime.history) if _active_runtime else 0,
"last_messages": _active_runtime.history[-3:] if _active_runtime else [],
}
@app.post("/api/send")
async def api_send(body: dict, user=Depends(require_auth)):
"""Send a message as if the user typed it. Requires auth. Returns the response."""
if not _active_runtime:
raise HTTPException(status_code=409, detail="No active session — someone must be connected via WS first")
text = body.get("text", "").strip()
if not text:
raise HTTPException(status_code=400, detail="Missing 'text' field")
await _active_runtime.handle_message(text)
return {
"status": "ok",
"response": _active_runtime.history[-1]["content"] if _active_runtime.history else "",
"memorizer": _active_runtime.memorizer.state,
}
@app.post("/api/clear")
async def api_clear(user=Depends(require_auth)):
"""Clear conversation history."""
if not _active_runtime:
raise HTTPException(status_code=409, detail="No active session")
_active_runtime.history.clear()
return {"status": "cleared"}
@app.get("/api/state")
async def get_state(user=Depends(require_auth)):
"""Current memorizer state + history length."""
if not _active_runtime:
return {"status": "no_session"}
return {
"status": "active",
"memorizer": _active_runtime.memorizer.state,
"history_len": len(_active_runtime.history),
}
@app.get("/api/history")
async def get_history(last: int = 10, user=Depends(require_auth)):
"""Recent conversation history."""
if not _active_runtime:
return {"status": "no_session", "messages": []}
return {
"status": "active",
"messages": _active_runtime.history[-last:],
}
@app.get("/api/trace")
async def get_trace(last: int = 30, user=Depends(require_auth)):
"""Recent trace lines from trace.jsonl."""
if not TRACE_FILE.exists():
return {"lines": []}
lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n")
parsed = []
for line in lines[-last:]:
try:
parsed.append(json.loads(line))
except json.JSONDecodeError:
pass
return {"lines": parsed}
# Serve index.html explicitly, then static assets
from fastapi.responses import FileResponse
@app.get("/")
async def index():
return FileResponse(STATIC_DIR / "index.html")
@app.get("/callback")
async def callback():
"""OIDC callback — serves the same SPA, JS handles the code exchange."""
return FileResponse(STATIC_DIR / "index.html")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run("agent:app", host="0.0.0.0", port=8000, reload=True)