From 7458b2ea35c88515129bb0a4bfbdccf656b17312 Mon Sep 17 00:00:00 2001 From: Nico Date: Sat, 28 Mar 2026 01:36:41 +0100 Subject: [PATCH] v0.8.0: refactor agent.py into modular package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split 1161-line monolith into agent/ package: auth, llm, types, process, runtime, api, and nodes/ (base, sensor, input, output, thinker, memorizer). No logic changes — pure structural split. uvicorn agent:app entrypoint unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) --- agent.py | 1160 -------------------------------------- agent/__init__.py | 37 ++ agent/api.py | 188 ++++++ agent/auth.py | 92 +++ agent/llm.py | 76 +++ agent/nodes/__init__.py | 9 + agent/nodes/base.py | 31 + agent/nodes/input.py | 48 ++ agent/nodes/memorizer.py | 99 ++++ agent/nodes/output.py | 63 +++ agent/nodes/sensor.py | 132 +++++ agent/nodes/thinker.py | 166 ++++++ agent/process.py | 104 ++++ agent/runtime.py | 98 ++++ agent/types.py | 29 + 15 files changed, 1172 insertions(+), 1160 deletions(-) delete mode 100644 agent.py create mode 100644 agent/__init__.py create mode 100644 agent/api.py create mode 100644 agent/auth.py create mode 100644 agent/llm.py create mode 100644 agent/nodes/__init__.py create mode 100644 agent/nodes/base.py create mode 100644 agent/nodes/input.py create mode 100644 agent/nodes/memorizer.py create mode 100644 agent/nodes/output.py create mode 100644 agent/nodes/sensor.py create mode 100644 agent/nodes/thinker.py create mode 100644 agent/process.py create mode 100644 agent/runtime.py create mode 100644 agent/types.py diff --git a/agent.py b/agent.py deleted file mode 100644 index f415bc6..0000000 --- a/agent.py +++ /dev/null @@ -1,1160 +0,0 @@ -""" -Cognitive Agent Runtime — Phase A.2: Three-node graph (Input → Output + Memorizer). -Input decides WHAT to do. Output executes and streams. -Memorizer holds shared state (S2 — coordination). -""" - -import asyncio -import json -import os -import time -from dataclasses import dataclass, field -from pathlib import Path -from typing import Any - -import httpx -from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, Query -from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer -from fastapi.staticfiles import StaticFiles - -from dotenv import load_dotenv -load_dotenv(Path(__file__).parent / ".env") - -# --- Config --- - -API_KEY = os.environ["OPENROUTER_API_KEY"] -OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" - -# --- Auth (Zitadel OIDC) --- - -ZITADEL_ISSUER = os.environ.get("ZITADEL_ISSUER", "https://auth.loop42.de") -ZITADEL_CLIENT_ID = os.environ.get("ZITADEL_CLIENT_ID", "365996029172056091") -ZITADEL_PROJECT_ID = os.environ.get("ZITADEL_PROJECT_ID", "365995955654230043") -AUTH_ENABLED = os.environ.get("AUTH_ENABLED", "false").lower() == "true" -SERVICE_TOKENS = set(filter(None, os.environ.get("SERVICE_TOKENS", "").split(","))) - -_jwks_cache: dict = {"keys": [], "fetched_at": 0} - -async def _get_jwks(): - if time.time() - _jwks_cache["fetched_at"] < 3600: - return _jwks_cache["keys"] - async with httpx.AsyncClient() as client: - resp = await client.get(f"{ZITADEL_ISSUER}/oauth/v2/keys") - _jwks_cache["keys"] = resp.json()["keys"] - _jwks_cache["fetched_at"] = time.time() - return _jwks_cache["keys"] - -async def _validate_token(token: str) -> dict: - """Validate token: check service tokens, then JWT, then introspection.""" - import base64 - - # Check static service tokens (for machine accounts like titan) - if token in SERVICE_TOKENS: - return {"sub": "titan", "username": "titan", "source": "service_token"} - - # Try JWT validation first - try: - parts = token.split(".") - if len(parts) == 3: - keys = await _get_jwks() - header_b64 = parts[0] + "=" * (4 - len(parts[0]) % 4) - header = json.loads(base64.urlsafe_b64decode(header_b64)) - kid = header.get("kid") - key = next((k for k in keys if k["kid"] == kid), None) - if key: - import jwt as pyjwt - from jwt import PyJWK - jwk_obj = PyJWK(key) - claims = pyjwt.decode( - token, jwk_obj.key, algorithms=["RS256"], - issuer=ZITADEL_ISSUER, options={"verify_aud": False}, - ) - return claims - except Exception: - pass - - # Fall back to introspection (for opaque access tokens) - # Zitadel requires client_id + client_secret or JWT profile for introspection - # For a public SPA client, use the project's API app instead - # Simplest: check via userinfo endpoint with the token - async with httpx.AsyncClient() as client: - resp = await client.get( - f"{ZITADEL_ISSUER}/oidc/v1/userinfo", - headers={"Authorization": f"Bearer {token}"}, - ) - if resp.status_code == 200: - info = resp.json() - log.info(f"[auth] userinfo response: {info}") - return {"sub": info.get("sub"), "preferred_username": info.get("preferred_username"), - "email": info.get("email"), "name": info.get("name"), "source": "userinfo"} - - raise HTTPException(status_code=401, detail="Invalid token") - -_bearer = HTTPBearer(auto_error=False) - -async def require_auth(credentials: HTTPAuthorizationCredentials | None = Depends(_bearer)): - """Dependency: require valid JWT when AUTH_ENABLED.""" - if not AUTH_ENABLED: - return {"sub": "anonymous"} - if not credentials: - raise HTTPException(status_code=401, detail="Missing token") - return await _validate_token(credentials.credentials) - -async def ws_auth(token: str | None = Query(None)) -> dict: - """Validate WebSocket token from query param.""" - if not AUTH_ENABLED: - return {"sub": "anonymous"} - if not token: - return None # Will reject in ws_endpoint - return await _validate_token(token) - -# --- LLM helper --- - -import logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s", datefmt="%H:%M:%S") -log = logging.getLogger("runtime") - - -async def llm_call(model: str, messages: list[dict], stream: bool = False) -> Any: - """Single LLM call via OpenRouter. Returns full text or (client, response) for streaming.""" - headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"} - body = {"model": model, "messages": messages, "stream": stream} - - client = httpx.AsyncClient(timeout=60) - if stream: - resp = await client.send(client.build_request("POST", OPENROUTER_URL, headers=headers, json=body), stream=True) - return client, resp # caller owns cleanup - - resp = await client.post(OPENROUTER_URL, headers=headers, json=body) - await client.aclose() - data = resp.json() - if "choices" not in data: - log.error(f"LLM error: {data}") - return f"[LLM error: {data.get('error', {}).get('message', 'unknown')}]" - return data["choices"][0]["message"]["content"] - - -# --- Message types --- - -@dataclass -class Envelope: - """What flows between nodes.""" - text: str - user_id: str = "anon" - session_id: str = "" - timestamp: str = "" - - -@dataclass -class Command: - """Input node's perception — describes what was heard.""" - instruction: str # natural language perception - source_text: str # original user message - metadata: dict = field(default_factory=dict) - - -@dataclass -class ThoughtResult: - """Thinker node's output — either a direct answer or tool results.""" - response: str # what to tell the user (direct or post-tool) - tool_used: str = "" # which tool was called (empty if none) - tool_output: str = "" # raw tool output - controls: list = field(default_factory=list) # UI controls to render - - -# --- Process Manager (observable tool execution) --- - -class Process: - """A single observable tool execution.""" - _next_id = 0 - - def __init__(self, tool: str, code: str, send_hud): - Process._next_id += 1 - self.pid = Process._next_id - self.tool = tool - self.code = code - self.send_hud = send_hud - self.status = "pending" # pending, running, done, failed, cancelled - self.output_lines: list[str] = [] - self.exit_code: int | None = None - self.started_at: float = 0 - self.ended_at: float = 0 - self._subprocess: subprocess.Popen | None = None - - async def hud(self, event: str, **data): - await self.send_hud({"node": "process", "event": event, "pid": self.pid, - "tool": self.tool, "status": self.status, **data}) - - def run_sync(self) -> str: - """Execute the tool synchronously. Returns output.""" - self.status = "running" - self.started_at = time.time() - try: - with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as f: - f.write(self.code) - f.flush() - self._subprocess = subprocess.Popen( - ['python3', f.name], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - text=True, cwd=tempfile.gettempdir() - ) - stdout, stderr = self._subprocess.communicate(timeout=10) - self.exit_code = self._subprocess.returncode - if stdout: - self.output_lines.extend(stdout.strip().split("\n")) - if self.exit_code != 0 and stderr: - self.output_lines.append(f"[stderr: {stderr.strip()}]") - self.status = "done" if self.exit_code == 0 else "failed" - except subprocess.TimeoutExpired: - if self._subprocess: - self._subprocess.kill() - self.output_lines.append("[error: timed out after 10s]") - self.status = "failed" - self.exit_code = -1 - except Exception as e: - self.output_lines.append(f"[error: {e}]") - self.status = "failed" - self.exit_code = -1 - finally: - self.ended_at = time.time() - return "\n".join(self.output_lines) or "[no output]" - - def cancel(self): - if self._subprocess and self.status == "running": - self._subprocess.kill() - self.status = "cancelled" - self.ended_at = time.time() - self.output_lines.append("[cancelled by user]") - - -class ProcessManager: - """Manages all tool executions as observable processes.""" - - def __init__(self, send_hud): - self.send_hud = send_hud - self.processes: dict[int, Process] = {} - - async def execute(self, tool: str, code: str) -> Process: - """Create and run a process. Returns the completed Process.""" - proc = Process(tool, code, self.send_hud) - self.processes[proc.pid] = proc - - await proc.hud("process_start", code=code[:200]) - - # Run in executor to avoid blocking the event loop - loop = asyncio.get_event_loop() - output = await loop.run_in_executor(None, proc.run_sync) - - elapsed = round(proc.ended_at - proc.started_at, 2) - await proc.hud("process_done", exit_code=proc.exit_code, - output=output[:500], elapsed=elapsed) - return proc - - def cancel(self, pid: int) -> bool: - proc = self.processes.get(pid) - if proc: - proc.cancel() - return True - return False - - def get_status(self) -> list[dict]: - return [{"pid": p.pid, "tool": p.tool, "status": p.status, - "elapsed": round((p.ended_at or time.time()) - p.started_at, 2) if p.started_at else 0} - for p in self.processes.values()] - - -# --- Base Node --- - -def estimate_tokens(text: str) -> int: - """Rough token estimate: 1 token ≈ 4 chars.""" - return len(text) // 4 - - -def fit_context(messages: list[dict], max_tokens: int, protect_last: int = 4) -> list[dict]: - """Trim oldest messages (after system prompt) to fit token budget. - Always keeps: system prompt(s) at start + last `protect_last` messages.""" - if not messages: - return messages - - # Split into system prefix, middle (trimmable), and protected tail - system_msgs = [] - rest = [] - for m in messages: - if not rest and m["role"] == "system": - system_msgs.append(m) - else: - rest.append(m) - - protected = rest[-protect_last:] if len(rest) > protect_last else rest - middle = rest[:-protect_last] if len(rest) > protect_last else [] - - # Count fixed tokens (system + protected tail) - fixed_tokens = sum(estimate_tokens(m["content"]) for m in system_msgs + protected) - - if fixed_tokens >= max_tokens: - # Even fixed content exceeds budget — truncate protected messages - result = system_msgs + protected - total = sum(estimate_tokens(m["content"]) for m in result) - while total > max_tokens and len(result) > 2: - removed = result.pop(1) # remove oldest non-system - total -= estimate_tokens(removed["content"]) - return result - - # Fill remaining budget with middle messages (newest first) - remaining = max_tokens - fixed_tokens - kept_middle = [] - for m in reversed(middle): - t = estimate_tokens(m["content"]) - if remaining - t < 0: - break - kept_middle.insert(0, m) - remaining -= t - - return system_msgs + kept_middle + protected - - -class Node: - name: str = "node" - model: str | None = None - max_context_tokens: int = 4000 # default budget per node - - def __init__(self, send_hud): - self.send_hud = send_hud # async callable to emit hud events to frontend - self.last_context_tokens = 0 - self.context_fill_pct = 0 - - async def hud(self, event: str, **data): - await self.send_hud({"node": self.name, "event": event, **data}) - - def trim_context(self, messages: list[dict]) -> list[dict]: - """Fit messages within this node's token budget.""" - before = len(messages) - result = fit_context(messages, self.max_context_tokens) - self.last_context_tokens = sum(estimate_tokens(m["content"]) for m in result) - self.context_fill_pct = int(100 * self.last_context_tokens / self.max_context_tokens) - if before != len(result): - log.info(f"[{self.name}] context trimmed: {before} → {len(result)} msgs, {self.context_fill_pct}% fill") - return result - - -# --- Sensor Node (ticks independently, produces context for other nodes) --- - -from datetime import datetime, timezone, timedelta - -BERLIN = timezone(timedelta(hours=2)) # CEST - - -class SensorNode(Node): - name = "sensor" - - def __init__(self, send_hud): - super().__init__(send_hud) - self.tick_count = 0 - self.running = False - self._task: asyncio.Task | None = None - self.interval = 5 # seconds - # Current sensor readings — each is {value, changed_at, prev} - self.readings: dict[str, dict] = {} - self._last_user_activity: float = time.time() - # Snapshot of memorizer state for change detection - self._prev_memo_state: dict = {} - - def _now(self) -> datetime: - return datetime.now(BERLIN) - - def _read_clock(self) -> dict: - """Clock sensor — updates when minute changes.""" - now = self._now() - current = now.strftime("%H:%M") - prev = self.readings.get("clock", {}).get("value") - if current != prev: - return {"value": current, "detail": now.strftime("%Y-%m-%d %H:%M:%S %A"), "changed_at": time.time()} - return {} # no change - - def _read_idle(self) -> dict: - """Idle sensor — time since last user message.""" - idle_s = time.time() - self._last_user_activity - # Only update on threshold crossings: 30s, 1m, 5m, 10m, 30m - thresholds = [30, 60, 300, 600, 1800] - prev_idle = self.readings.get("idle", {}).get("_raw", 0) - for t in thresholds: - if prev_idle < t <= idle_s: - if idle_s < 60: - label = f"{int(idle_s)}s" - else: - label = f"{int(idle_s // 60)}m{int(idle_s % 60)}s" - return {"value": label, "_raw": idle_s, "changed_at": time.time()} - # Update raw but don't flag as changed - if "idle" in self.readings: - self.readings["idle"]["_raw"] = idle_s - return {} - - def _read_memo_changes(self, memo_state: dict) -> dict: - """Detect memorizer state changes.""" - changes = [] - for k, v in memo_state.items(): - prev = self._prev_memo_state.get(k) - if v != prev and prev is not None: - changes.append(f"{k}: {prev} -> {v}") - self._prev_memo_state = dict(memo_state) - if changes: - return {"value": "; ".join(changes), "changed_at": time.time()} - return {} - - def note_user_activity(self): - """Called when user sends a message.""" - self._last_user_activity = time.time() - # Reset idle sensor - self.readings["idle"] = {"value": "active", "_raw": 0, "changed_at": time.time()} - - async def tick(self, memo_state: dict): - """One tick — read all sensors, emit deltas.""" - self.tick_count += 1 - deltas = {} - - # Read each sensor - for name, reader in [("clock", self._read_clock), - ("idle", self._read_idle)]: - update = reader() - if update: - self.readings[name] = {**self.readings.get(name, {}), **update} - deltas[name] = update.get("value") or update.get("detail") - - # Memo changes - memo_update = self._read_memo_changes(memo_state) - if memo_update: - self.readings["memo_delta"] = memo_update - deltas["memo_delta"] = memo_update["value"] - - # Only emit HUD if something changed - if deltas: - await self.hud("tick", tick=self.tick_count, deltas=deltas) - - async def _loop(self, get_memo_state): - """Background tick loop.""" - self.running = True - await self.hud("started", interval=self.interval) - try: - while self.running: - await asyncio.sleep(self.interval) - try: - await self.tick(get_memo_state()) - except Exception as e: - log.error(f"[sensor] tick error: {e}") - except asyncio.CancelledError: - pass - finally: - self.running = False - await self.hud("stopped") - - def start(self, get_memo_state): - """Start the background tick loop.""" - if self._task and not self._task.done(): - return - self._task = asyncio.create_task(self._loop(get_memo_state)) - - def stop(self): - """Stop the tick loop.""" - self.running = False - if self._task: - self._task.cancel() - - def get_context_lines(self) -> list[str]: - """Render current sensor readings for injection into prompts.""" - if not self.readings: - return ["Sensors: (no sensor node running)"] - lines = [f"Sensors (tick #{self.tick_count}, {self.interval}s interval):"] - for name, r in self.readings.items(): - if name.startswith("_"): - continue - val = r.get("value", "?") - detail = r.get("detail") - age = time.time() - r.get("changed_at", time.time()) - if age < 10: - age_str = "just now" - elif age < 60: - age_str = f"{int(age)}s ago" - else: - age_str = f"{int(age // 60)}m ago" - line = f"- {name}: {detail or val} [{age_str}]" - lines.append(line) - return lines - - -# --- Input Node --- - -class InputNode(Node): - name = "input" - model = "google/gemini-2.0-flash-001" - max_context_tokens = 2000 # small budget — perception only - - SYSTEM = """You are the Input node — the ear of this cognitive runtime. - -Listener context: -- Authenticated user: {identity} -- Channel: {channel} (Chrome browser on Nico's Windows PC, in his room at home) -- Physical: private space, Nico lives with Tina — she may use this session too -- Security: single-user account, shared physical space — other voices are trusted household - -Your job: describe what you heard. Who spoke, what they want, what tone, what context matters. -ONE sentence. No content, no response — just your perception of what came through. - -{memory_context}""" - - async def process(self, envelope: Envelope, history: list[dict], memory_context: str = "", - identity: str = "unknown", channel: str = "unknown") -> Command: - await self.hud("thinking", detail="deciding how to respond") - log.info(f"[input] user said: {envelope.text}") - - messages = [ - {"role": "system", "content": self.SYSTEM.format( - memory_context=memory_context, identity=identity, channel=channel)}, - ] - for msg in history[-8:]: - messages.append(msg) - messages = self.trim_context(messages) - - await self.hud("context", messages=messages, tokens=self.last_context_tokens, max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct) - instruction = await llm_call(self.model, messages) - log.info(f"[input] → command: {instruction}") - await self.hud("perceived", instruction=instruction) - return Command(instruction=instruction, source_text=envelope.text) - - -# --- Output Node --- - -class OutputNode(Node): - name = "output" - model = "google/gemini-2.0-flash-001" - max_context_tokens = 4000 # larger — needs history for continuity - - SYSTEM = """You are the Output node — the voice of this cognitive runtime. -The Input node sends you its perception of what the user said. This is internal context for you — never repeat or echo it. -You respond to the USER, not to the Input node. Use the perception to understand intent, then act on it. -Be natural. Be concise. If the user asks you to do something, do it — don't describe what you're about to do. - -{memory_context}""" - - async def process(self, command: Command, history: list[dict], ws: WebSocket, memory_context: str = "") -> str: - await self.hud("streaming") - - messages = [ - {"role": "system", "content": self.SYSTEM.format(memory_context=memory_context)}, - ] - for msg in history[-20:]: - messages.append(msg) - messages.append({"role": "system", "content": f"Input perception: {command.instruction}"}) - messages = self.trim_context(messages) - - await self.hud("context", messages=messages, tokens=self.last_context_tokens, max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct) - - # Stream response - client, resp = await llm_call(self.model, messages, stream=True) - full_response = "" - try: - async for line in resp.aiter_lines(): - if not line.startswith("data: "): - continue - payload = line[6:] - if payload == "[DONE]": - break - chunk = json.loads(payload) - delta = chunk["choices"][0].get("delta", {}) - token = delta.get("content", "") - if token: - full_response += token - await ws.send_text(json.dumps({"type": "delta", "content": token})) - finally: - await resp.aclose() - await client.aclose() - - log.info(f"[output] response: {full_response[:100]}...") - await ws.send_text(json.dumps({"type": "done"})) - await self.hud("done") - return full_response - - -# --- Thinker Node (S3 — control, reasoning, tool use) --- - -import subprocess -import tempfile - -class ThinkerNode(Node): - name = "thinker" - model = "google/gemini-2.5-flash-preview" - max_context_tokens = 4000 - - SYSTEM = """You are the Thinker node — the brain of this cognitive runtime. -You receive a perception of what the user said. Decide: answer directly, use a tool, or show UI controls. - -TOOLS — write a ```python code block and it WILL be executed. Use print() for output. -- For math, databases, file ops, any computation: write python. NEVER describe code — write it. -- For simple conversation: respond directly as text. - -UI CONTROLS — to show interactive elements, include a JSON block: -```controls -[ - {{"type": "table", "data": [...], "columns": ["id", "name", "email"]}}, - {{"type": "button", "label": "Add Customer", "action": "add_customer"}}, - {{"type": "button", "label": "Refresh", "action": "refresh_customers"}} -] -``` -Controls render in the chat. User clicks flow back as actions you can handle. - -You can combine text + code + controls in one response. - -{memory_context}""" - - - def _parse_tool_call(self, response: str) -> tuple[str, str] | None: - """Parse tool calls. Supports TOOL: format and auto-detects python code blocks.""" - text = response.strip() - - # Explicit TOOL: format - if text.startswith("TOOL:"): - lines = text.split("\n") - tool_name = lines[0].replace("TOOL:", "").strip() - code_lines = [] - in_code = False - for line in lines[1:]: - if line.strip().startswith("```") and not in_code: - in_code = True - continue - elif line.strip().startswith("```") and in_code: - break - elif in_code: - code_lines.append(line) - elif line.strip().startswith("CODE:"): - continue - return (tool_name, "\n".join(code_lines)) if code_lines else None - - # Auto-detect: code blocks get executed as python - # Catches ```python, ```py, ```sql, ```sqlite, or bare ``` with code-like content - import re - block_match = re.search(r'```(?:python|py|sql|sqlite|sh|bash|tool_code)?\s*\n(.*?)```', text, re.DOTALL) - if block_match: - code = block_match.group(1).strip() - if code and len(code.split("\n")) > 0: - # If it's SQL, wrap it in a python sqlite3 script - if "```sql" in text or "```sqlite" in text or ("SELECT" in code.upper() and "CREATE" in code.upper()): - wrapped = f'''import sqlite3 -conn = sqlite3.connect("/tmp/cog_db.sqlite") -cursor = conn.cursor() -for stmt in """{code}""".split(";"): - stmt = stmt.strip() - if stmt: - cursor.execute(stmt) -conn.commit() -cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") -tables = cursor.fetchall() -for t in tables: - cursor.execute(f"SELECT * FROM {{t[0]}}") - rows = cursor.fetchall() - cols = [d[0] for d in cursor.description] - print(f"Table: {{t[0]}}") - print(" | ".join(cols)) - for row in rows: - print(" | ".join(str(c) for c in row)) -conn.close()''' - return ("python", wrapped) - return ("python", code) - - return None - - def __init__(self, send_hud, process_manager: ProcessManager = None): - super().__init__(send_hud) - self.pm = process_manager - - def _parse_controls(self, response: str) -> list[dict]: - """Extract ```controls JSON blocks from response.""" - controls = [] - if "```controls" not in response: - return controls - parts = response.split("```controls") - for part in parts[1:]: - end = part.find("```") - if end != -1: - try: - controls.extend(json.loads(part[:end].strip())) - except json.JSONDecodeError: - pass - return controls - - def _strip_blocks(self, response: str) -> str: - """Remove code and control blocks, return plain text.""" - import re - text = re.sub(r'```(?:python|py|controls).*?```', '', response, flags=re.DOTALL) - return text.strip() - - async def process(self, command: Command, history: list[dict], memory_context: str = "") -> ThoughtResult: - await self.hud("thinking", detail="reasoning about response") - - messages = [ - {"role": "system", "content": self.SYSTEM.format(memory_context=memory_context)}, - ] - for msg in history[-12:]: - messages.append(msg) - messages.append({"role": "system", "content": f"Input perception: {command.instruction}"}) - messages = self.trim_context(messages) - - await self.hud("context", messages=messages, tokens=self.last_context_tokens, - max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct) - - response = await llm_call(self.model, messages) - log.info(f"[thinker] response: {response[:200]}") - - # Parse UI controls - controls = self._parse_controls(response) - if controls: - await self.hud("controls", controls=controls) - - # Check if Thinker wants to use a tool - tool_call = self._parse_tool_call(response) - if tool_call: - tool_name, code = tool_call - - if self.pm and tool_name == "python": - proc = await self.pm.execute(tool_name, code) - tool_output = "\n".join(proc.output_lines) - else: - tool_output = f"[unknown tool: {tool_name}]" - - log.info(f"[thinker] tool output: {tool_output[:200]}") - - # Second LLM call: interpret tool output + optionally add controls - messages.append({"role": "assistant", "content": response}) - messages.append({"role": "system", "content": f"Tool output:\n{tool_output}"}) - messages.append({"role": "user", "content": "Respond to the user based on the tool output. If showing data, include a ```controls block with a table. Be natural and concise."}) - messages = self.trim_context(messages) - final = await llm_call(self.model, messages) - - # Parse controls from the follow-up too - more_controls = self._parse_controls(final) - if more_controls: - controls.extend(more_controls) - await self.hud("controls", controls=more_controls) - - clean_text = self._strip_blocks(final) - await self.hud("decided", instruction=clean_text[:200]) - return ThoughtResult(response=clean_text, tool_used=tool_name, - tool_output=tool_output, controls=controls) - - # No tool needed — pass through - clean_text = self._strip_blocks(response) or response - await self.hud("decided", instruction="direct response (no tools)") - return ThoughtResult(response=clean_text, controls=controls) - - -# --- Memorizer Node (S2 — shared state / coordination) --- - -class MemorizerNode(Node): - name = "memorizer" - model = "google/gemini-2.0-flash-001" - max_context_tokens = 3000 # needs enough history to distill - - DISTILL_SYSTEM = """You are the Memorizer node of a cognitive agent runtime. -After each exchange you update the shared state that Input and Output nodes read. - -Given the conversation so far, output a JSON object with these fields: -- user_name: string — how the user identifies themselves (null if unknown) -- user_mood: string — current emotional tone (neutral, happy, frustrated, playful, etc.) -- topic: string — what the conversation is about right now -- topic_history: list of strings — previous topics in this session -- situation: string — social/physical context if mentioned (e.g. "at a pub with tina", "private dev session") -- language: string — primary language being used (en, de, mixed) -- style_hint: string — how Output should talk (casual, formal, technical, poetic, etc.) -- facts: list of strings — important facts learned about the user - -Output ONLY valid JSON. No explanation, no markdown fences.""" - - def __init__(self, send_hud): - super().__init__(send_hud) - # The shared state — starts empty, grows over conversation - self.state: dict = { - "user_name": None, - "user_mood": "neutral", - "topic": None, - "topic_history": [], - "situation": "localhost test runtime, private dev session", - "language": "en", - "style_hint": "casual, technical", - "facts": [], - } - - def get_context_block(self, sensor_lines: list[str] = None) -> str: - """Returns a formatted string for injection into Input/Output system prompts.""" - lines = sensor_lines or ["Sensors: (none)"] - lines.append("") - lines.append("Shared memory (from Memorizer):") - for k, v in self.state.items(): - if v: - lines.append(f"- {k}: {v}") - return "\n".join(lines) - - async def update(self, history: list[dict]): - """Distill conversation into updated shared state. Called after each exchange.""" - if len(history) < 2: - await self.hud("updated", state=self.state) # emit default state - return - - await self.hud("thinking", detail="updating shared state") - - messages = [ - {"role": "system", "content": self.DISTILL_SYSTEM}, - {"role": "system", "content": f"Current state: {json.dumps(self.state)}"}, - ] - for msg in history[-10:]: - messages.append(msg) - messages.append({"role": "user", "content": "Update the shared state based on this conversation. Output JSON only."}) - messages = self.trim_context(messages) - - await self.hud("context", messages=messages, tokens=self.last_context_tokens, max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct) - - raw = await llm_call(self.model, messages) - log.info(f"[memorizer] raw: {raw[:200]}") - - # Parse JSON from response (strip markdown fences if present) - text = raw.strip() - if text.startswith("```"): - text = text.split("\n", 1)[1] if "\n" in text else text[3:] - if text.endswith("```"): - text = text[:-3] - text = text.strip() - - try: - new_state = json.loads(text) - # Merge: keep old facts, add new ones - old_facts = set(self.state.get("facts", [])) - new_facts = set(new_state.get("facts", [])) - new_state["facts"] = list(old_facts | new_facts)[-20:] # cap at 20 facts - # Preserve topic history - if self.state.get("topic") and self.state["topic"] != new_state.get("topic"): - hist = new_state.get("topic_history", []) - if self.state["topic"] not in hist: - hist.append(self.state["topic"]) - new_state["topic_history"] = hist[-5:] # keep last 5 - self.state = new_state - log.info(f"[memorizer] updated state: {self.state}") - await self.hud("updated", state=self.state) - except (json.JSONDecodeError, Exception) as e: - log.error(f"[memorizer] update error: {e}, raw: {text[:200]}") - await self.hud("error", detail=f"Update failed: {e}") - # Still emit current state so frontend shows something - await self.hud("updated", state=self.state) - - -# --- Runtime (wires nodes together) --- - -TRACE_FILE = Path(__file__).parent / "trace.jsonl" - - -class Runtime: - def __init__(self, ws: WebSocket, user_claims: dict = None, origin: str = ""): - self.ws = ws - self.history: list[dict] = [] - self.MAX_HISTORY = 40 # sliding window — oldest messages drop off - self.input_node = InputNode(send_hud=self._send_hud) - self.process_manager = ProcessManager(send_hud=self._send_hud) - self.thinker = ThinkerNode(send_hud=self._send_hud, process_manager=self.process_manager) - self.output_node = OutputNode(send_hud=self._send_hud) - self.memorizer = MemorizerNode(send_hud=self._send_hud) - self.sensor = SensorNode(send_hud=self._send_hud) - # Start sensor tick loop - self.sensor.start(get_memo_state=lambda: self.memorizer.state) - # Verified identity from auth — Input and Memorizer use this - claims = user_claims or {} - log.info(f"[runtime] user_claims: {claims}") - self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or "unknown" - log.info(f"[runtime] resolved identity: {self.identity}") - self.channel = origin or "unknown" - # Seed memorizer with verified info - self.memorizer.state["user_name"] = self.identity - self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session" - - async def _send_hud(self, data: dict): - # Send to frontend - await self.ws.send_text(json.dumps({"type": "hud", **data})) - # Append to trace file + broadcast to SSE subscribers - trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data} - try: - with open(TRACE_FILE, "a", encoding="utf-8") as f: - f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n") - # Rotate trace file at 1000 lines - if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000: - lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n") - TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8") - except Exception as e: - log.error(f"trace write error: {e}") - _broadcast_sse(trace_entry) - - async def handle_message(self, text: str): - envelope = Envelope( - text=text, - user_id="nico", - session_id="test", - timestamp=time.strftime("%Y-%m-%d %H:%M:%S"), - ) - - # Note user activity for idle sensor - self.sensor.note_user_activity() - - # Append user message to history FIRST — both nodes see it - self.history.append({"role": "user", "content": text}) - - # Get shared memory + sensor context for both nodes - sensor_lines = self.sensor.get_context_lines() - mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines) - - # Input node perceives (with memory context + identity + channel) - command = await self.input_node.process( - envelope, self.history, memory_context=mem_ctx, - identity=self.identity, channel=self.channel) - - # Thinker node reasons + optionally uses tools - thought = await self.thinker.process(command, self.history, memory_context=mem_ctx) - - # If Thinker used a tool, inject its result into the command for Output - if thought.tool_used: - # Thinker already formulated the response — Output just streams it - command = Command( - instruction=f"Thinker used {thought.tool_used} and says: {thought.response}", - source_text=command.source_text - ) - else: - # Thinker answered directly — Output streams that - command = Command( - instruction=f"Thinker says: {thought.response}", - source_text=command.source_text - ) - - # Output node streams the response - response = await self.output_node.process(command, self.history, self.ws, memory_context=mem_ctx) - self.history.append({"role": "assistant", "content": response}) - - # Send UI controls if Thinker produced any - if thought.controls: - await self.ws.send_text(json.dumps({"type": "controls", "controls": thought.controls})) - - # Memorizer updates shared state after each exchange - await self.memorizer.update(self.history) - - # Sliding window — trim oldest messages, keep context in memorizer - if len(self.history) > self.MAX_HISTORY: - self.history = self.history[-self.MAX_HISTORY:] - - -# --- App --- - -STATIC_DIR = Path(__file__).parent / "static" - -app = FastAPI(title="cog") - -# Keep a reference to the active runtime for API access -_active_runtime: Runtime | None = None - - -@app.get("/health") -async def health(): - return {"status": "ok"} - - -@app.get("/auth/config") -async def auth_config(): - """Public: auth config for frontend OIDC flow.""" - return { - "enabled": AUTH_ENABLED, - "issuer": ZITADEL_ISSUER, - "clientId": ZITADEL_CLIENT_ID, - "projectId": ZITADEL_PROJECT_ID, - } - - -@app.websocket("/ws") -async def ws_endpoint(ws: WebSocket, token: str | None = Query(None), access_token: str | None = Query(None)): - global _active_runtime - # Validate auth if enabled - user_claims = {"sub": "anonymous"} - if AUTH_ENABLED and token: - try: - user_claims = await _validate_token(token) - # If id_token lacks name, enrich from userinfo with access_token - if not user_claims.get("name") and access_token: - async with httpx.AsyncClient() as client: - resp = await client.get(f"{ZITADEL_ISSUER}/oidc/v1/userinfo", - headers={"Authorization": f"Bearer {access_token}"}) - if resp.status_code == 200: - info = resp.json() - log.info(f"[auth] userinfo enrichment: {info}") - user_claims["name"] = info.get("name") - user_claims["preferred_username"] = info.get("preferred_username") - user_claims["email"] = info.get("email") - except HTTPException: - await ws.close(code=4001, reason="Invalid token") - return - origin = ws.headers.get("origin", ws.headers.get("host", "")) - await ws.accept() - runtime = Runtime(ws, user_claims=user_claims, origin=origin) - _active_runtime = runtime - try: - while True: - data = await ws.receive_text() - msg = json.loads(data) - if msg.get("type") == "action": - # User clicked a UI control - action_text = f"[user clicked: {msg.get('action', 'unknown')}]" - if msg.get("data"): - action_text += f" data: {json.dumps(msg['data'])}" - await runtime.handle_message(action_text) - elif msg.get("type") == "cancel_process": - runtime.process_manager.cancel(msg.get("pid", 0)) - else: - await runtime.handle_message(msg.get("text", "")) - except WebSocketDisconnect: - runtime.sensor.stop() - if _active_runtime is runtime: - _active_runtime = None - - -# --- API endpoints (for Claude to inspect runtime state) --- - -import hashlib -from asyncio import Queue -from starlette.responses import StreamingResponse - -# SSE subscribers (for titan/service accounts to watch live) -_sse_subscribers: list[Queue] = [] - -def _broadcast_sse(event: dict): - """Push an event to all SSE subscribers.""" - for q in _sse_subscribers: - try: - q.put_nowait(event) - except asyncio.QueueFull: - pass # drop if subscriber is too slow - -def _state_hash() -> str: - """Hash of current runtime state — cheap way to detect changes.""" - if not _active_runtime: - return "no_session" - raw = json.dumps({ - "mem": _active_runtime.memorizer.state, - "hlen": len(_active_runtime.history), - }, sort_keys=True) - return hashlib.md5(raw.encode()).hexdigest()[:12] - - -@app.get("/api/events") -async def sse_events(user=Depends(require_auth)): - """SSE stream of runtime events (trace, state changes).""" - q: Queue = Queue(maxsize=100) - _sse_subscribers.append(q) - - async def generate(): - try: - while True: - event = await q.get() - yield f"data: {json.dumps(event)}\n\n" - except asyncio.CancelledError: - pass - finally: - _sse_subscribers.remove(q) - - return StreamingResponse(generate(), media_type="text/event-stream", - headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) - - -@app.get("/api/poll") -async def poll(since: str = "", user=Depends(require_auth)): - """Returns current hash. If 'since' matches, returns {changed: false}. Cheap polling.""" - h = _state_hash() - if since and since == h: - return {"changed": False, "hash": h} - return { - "changed": True, - "hash": h, - "state": _active_runtime.memorizer.state if _active_runtime else None, - "history_len": len(_active_runtime.history) if _active_runtime else 0, - "last_messages": _active_runtime.history[-3:] if _active_runtime else [], - } - -@app.post("/api/send") -async def api_send(body: dict, user=Depends(require_auth)): - """Send a message as if the user typed it. Requires auth. Returns the response.""" - if not _active_runtime: - raise HTTPException(status_code=409, detail="No active session — someone must be connected via WS first") - text = body.get("text", "").strip() - if not text: - raise HTTPException(status_code=400, detail="Missing 'text' field") - await _active_runtime.handle_message(text) - return { - "status": "ok", - "response": _active_runtime.history[-1]["content"] if _active_runtime.history else "", - "memorizer": _active_runtime.memorizer.state, - } - - -@app.post("/api/clear") -async def api_clear(user=Depends(require_auth)): - """Clear conversation history.""" - if not _active_runtime: - raise HTTPException(status_code=409, detail="No active session") - _active_runtime.history.clear() - return {"status": "cleared"} - - -@app.get("/api/state") -async def get_state(user=Depends(require_auth)): - """Current memorizer state + history length.""" - if not _active_runtime: - return {"status": "no_session"} - return { - "status": "active", - "memorizer": _active_runtime.memorizer.state, - "history_len": len(_active_runtime.history), - } - - -@app.get("/api/history") -async def get_history(last: int = 10, user=Depends(require_auth)): - """Recent conversation history.""" - if not _active_runtime: - return {"status": "no_session", "messages": []} - return { - "status": "active", - "messages": _active_runtime.history[-last:], - } - - -@app.get("/api/trace") -async def get_trace(last: int = 30, user=Depends(require_auth)): - """Recent trace lines from trace.jsonl.""" - if not TRACE_FILE.exists(): - return {"lines": []} - lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n") - parsed = [] - for line in lines[-last:]: - try: - parsed.append(json.loads(line)) - except json.JSONDecodeError: - pass - return {"lines": parsed} - - -# Serve index.html explicitly, then static assets -from fastapi.responses import FileResponse - -@app.get("/") -async def index(): - return FileResponse(STATIC_DIR / "index.html") - -@app.get("/callback") -async def callback(): - """OIDC callback — serves the same SPA, JS handles the code exchange.""" - return FileResponse(STATIC_DIR / "index.html") - -app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") - - -if __name__ == "__main__": - import uvicorn - uvicorn.run("agent:app", host="0.0.0.0", port=8000, reload=True) diff --git a/agent/__init__.py b/agent/__init__.py new file mode 100644 index 0000000..3141200 --- /dev/null +++ b/agent/__init__.py @@ -0,0 +1,37 @@ +"""Cognitive Agent Runtime — modular package. + +uvicorn entrypoint: agent:app +""" + +import logging +from pathlib import Path + +from dotenv import load_dotenv +load_dotenv(Path(__file__).parent.parent / ".env") + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s", datefmt="%H:%M:%S") + +from fastapi import FastAPI +from fastapi.responses import FileResponse +from fastapi.staticfiles import StaticFiles + +from .api import register_routes + +STATIC_DIR = Path(__file__).parent.parent / "static" + +app = FastAPI(title="cog") + +# Register all API + WS routes +register_routes(app) + +# Serve index.html explicitly, then static assets +@app.get("/") +async def index(): + return FileResponse(STATIC_DIR / "index.html") + +@app.get("/callback") +async def callback(): + """OIDC callback — serves the same SPA, JS handles the code exchange.""" + return FileResponse(STATIC_DIR / "index.html") + +app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") diff --git a/agent/api.py b/agent/api.py new file mode 100644 index 0000000..8b3746d --- /dev/null +++ b/agent/api.py @@ -0,0 +1,188 @@ +"""API endpoints, SSE, polling.""" + +import asyncio +import hashlib +import json +import logging +from asyncio import Queue +from pathlib import Path + +from fastapi import Depends, HTTPException, Query, WebSocket, WebSocketDisconnect +from starlette.responses import StreamingResponse + +import httpx + +from .auth import AUTH_ENABLED, ZITADEL_ISSUER, _validate_token, require_auth +from .runtime import Runtime, TRACE_FILE + +log = logging.getLogger("runtime") + +# Active runtime reference (set by WS endpoint) +_active_runtime: Runtime | None = None + +# SSE subscribers +_sse_subscribers: list[Queue] = [] + + +def _broadcast_sse(event: dict): + """Push an event to all SSE subscribers.""" + for q in _sse_subscribers: + try: + q.put_nowait(event) + except asyncio.QueueFull: + pass + + +def _state_hash() -> str: + if not _active_runtime: + return "no_session" + raw = json.dumps({ + "mem": _active_runtime.memorizer.state, + "hlen": len(_active_runtime.history), + }, sort_keys=True) + return hashlib.md5(raw.encode()).hexdigest()[:12] + + +def register_routes(app): + """Register all API routes on the FastAPI app.""" + + @app.get("/health") + async def health(): + return {"status": "ok"} + + @app.get("/auth/config") + async def auth_config(): + from .auth import ZITADEL_ISSUER, ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID, AUTH_ENABLED + return { + "enabled": AUTH_ENABLED, + "issuer": ZITADEL_ISSUER, + "clientId": ZITADEL_CLIENT_ID, + "projectId": ZITADEL_PROJECT_ID, + } + + @app.websocket("/ws") + async def ws_endpoint(ws: WebSocket, token: str | None = Query(None), + access_token: str | None = Query(None)): + global _active_runtime + user_claims = {"sub": "anonymous"} + if AUTH_ENABLED and token: + try: + user_claims = await _validate_token(token) + if not user_claims.get("name") and access_token: + async with httpx.AsyncClient() as client: + resp = await client.get(f"{ZITADEL_ISSUER}/oidc/v1/userinfo", + headers={"Authorization": f"Bearer {access_token}"}) + if resp.status_code == 200: + info = resp.json() + log.info(f"[auth] userinfo enrichment: {info}") + user_claims["name"] = info.get("name") + user_claims["preferred_username"] = info.get("preferred_username") + user_claims["email"] = info.get("email") + except HTTPException: + await ws.close(code=4001, reason="Invalid token") + return + origin = ws.headers.get("origin", ws.headers.get("host", "")) + await ws.accept() + runtime = Runtime(ws, user_claims=user_claims, origin=origin, broadcast=_broadcast_sse) + _active_runtime = runtime + try: + while True: + data = await ws.receive_text() + msg = json.loads(data) + if msg.get("type") == "action": + action_text = f"[user clicked: {msg.get('action', 'unknown')}]" + if msg.get("data"): + action_text += f" data: {json.dumps(msg['data'])}" + await runtime.handle_message(action_text) + elif msg.get("type") == "cancel_process": + runtime.process_manager.cancel(msg.get("pid", 0)) + else: + await runtime.handle_message(msg.get("text", "")) + except WebSocketDisconnect: + runtime.sensor.stop() + if _active_runtime is runtime: + _active_runtime = None + + @app.get("/api/events") + async def sse_events(user=Depends(require_auth)): + q: Queue = Queue(maxsize=100) + _sse_subscribers.append(q) + + async def generate(): + try: + while True: + event = await q.get() + yield f"data: {json.dumps(event)}\n\n" + except asyncio.CancelledError: + pass + finally: + _sse_subscribers.remove(q) + + return StreamingResponse(generate(), media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) + + @app.get("/api/poll") + async def poll(since: str = "", user=Depends(require_auth)): + h = _state_hash() + if since and since == h: + return {"changed": False, "hash": h} + return { + "changed": True, + "hash": h, + "state": _active_runtime.memorizer.state if _active_runtime else None, + "history_len": len(_active_runtime.history) if _active_runtime else 0, + "last_messages": _active_runtime.history[-3:] if _active_runtime else [], + } + + @app.post("/api/send") + async def api_send(body: dict, user=Depends(require_auth)): + if not _active_runtime: + raise HTTPException(status_code=409, detail="No active session -- someone must be connected via WS first") + text = body.get("text", "").strip() + if not text: + raise HTTPException(status_code=400, detail="Missing 'text' field") + await _active_runtime.handle_message(text) + return { + "status": "ok", + "response": _active_runtime.history[-1]["content"] if _active_runtime.history else "", + "memorizer": _active_runtime.memorizer.state, + } + + @app.post("/api/clear") + async def api_clear(user=Depends(require_auth)): + if not _active_runtime: + raise HTTPException(status_code=409, detail="No active session") + _active_runtime.history.clear() + return {"status": "cleared"} + + @app.get("/api/state") + async def get_state(user=Depends(require_auth)): + if not _active_runtime: + return {"status": "no_session"} + return { + "status": "active", + "memorizer": _active_runtime.memorizer.state, + "history_len": len(_active_runtime.history), + } + + @app.get("/api/history") + async def get_history(last: int = 10, user=Depends(require_auth)): + if not _active_runtime: + return {"status": "no_session", "messages": []} + return { + "status": "active", + "messages": _active_runtime.history[-last:], + } + + @app.get("/api/trace") + async def get_trace(last: int = 30, user=Depends(require_auth)): + if not TRACE_FILE.exists(): + return {"lines": []} + lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n") + parsed = [] + for line in lines[-last:]: + try: + parsed.append(json.loads(line)) + except json.JSONDecodeError: + pass + return {"lines": parsed} diff --git a/agent/auth.py b/agent/auth.py new file mode 100644 index 0000000..a6ee32e --- /dev/null +++ b/agent/auth.py @@ -0,0 +1,92 @@ +"""OIDC auth: Zitadel token validation, FastAPI dependencies.""" + +import json +import logging +import os +import time + +import httpx +from fastapi import Depends, HTTPException, Query +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer + +log = logging.getLogger("runtime") + +ZITADEL_ISSUER = os.environ.get("ZITADEL_ISSUER", "https://auth.loop42.de") +ZITADEL_CLIENT_ID = os.environ.get("ZITADEL_CLIENT_ID", "365996029172056091") +ZITADEL_PROJECT_ID = os.environ.get("ZITADEL_PROJECT_ID", "365995955654230043") +AUTH_ENABLED = os.environ.get("AUTH_ENABLED", "false").lower() == "true" +SERVICE_TOKENS = set(filter(None, os.environ.get("SERVICE_TOKENS", "").split(","))) + +_jwks_cache: dict = {"keys": [], "fetched_at": 0} + + +async def _get_jwks(): + if time.time() - _jwks_cache["fetched_at"] < 3600: + return _jwks_cache["keys"] + async with httpx.AsyncClient() as client: + resp = await client.get(f"{ZITADEL_ISSUER}/oauth/v2/keys") + _jwks_cache["keys"] = resp.json()["keys"] + _jwks_cache["fetched_at"] = time.time() + return _jwks_cache["keys"] + + +async def _validate_token(token: str) -> dict: + """Validate token: check service tokens, then JWT, then userinfo.""" + import base64 + + if token in SERVICE_TOKENS: + return {"sub": "titan", "username": "titan", "source": "service_token"} + + try: + parts = token.split(".") + if len(parts) == 3: + keys = await _get_jwks() + header_b64 = parts[0] + "=" * (4 - len(parts[0]) % 4) + header = json.loads(base64.urlsafe_b64decode(header_b64)) + kid = header.get("kid") + key = next((k for k in keys if k["kid"] == kid), None) + if key: + import jwt as pyjwt + from jwt import PyJWK + jwk_obj = PyJWK(key) + claims = pyjwt.decode( + token, jwk_obj.key, algorithms=["RS256"], + issuer=ZITADEL_ISSUER, options={"verify_aud": False}, + ) + return claims + except Exception: + pass + + async with httpx.AsyncClient() as client: + resp = await client.get( + f"{ZITADEL_ISSUER}/oidc/v1/userinfo", + headers={"Authorization": f"Bearer {token}"}, + ) + if resp.status_code == 200: + info = resp.json() + log.info(f"[auth] userinfo response: {info}") + return {"sub": info.get("sub"), "preferred_username": info.get("preferred_username"), + "email": info.get("email"), "name": info.get("name"), "source": "userinfo"} + + raise HTTPException(status_code=401, detail="Invalid token") + + +_bearer = HTTPBearer(auto_error=False) + + +async def require_auth(credentials: HTTPAuthorizationCredentials | None = Depends(_bearer)): + """Dependency: require valid JWT when AUTH_ENABLED.""" + if not AUTH_ENABLED: + return {"sub": "anonymous"} + if not credentials: + raise HTTPException(status_code=401, detail="Missing token") + return await _validate_token(credentials.credentials) + + +async def ws_auth(token: str | None = Query(None)) -> dict: + """Validate WebSocket token from query param.""" + if not AUTH_ENABLED: + return {"sub": "anonymous"} + if not token: + return None + return await _validate_token(token) diff --git a/agent/llm.py b/agent/llm.py new file mode 100644 index 0000000..c0858fa --- /dev/null +++ b/agent/llm.py @@ -0,0 +1,76 @@ +"""LLM helper: OpenRouter calls, token estimation, context fitting.""" + +import json +import logging +import os +from typing import Any + +import httpx + +log = logging.getLogger("runtime") + +API_KEY = os.environ.get("OPENROUTER_API_KEY", "") +OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" + + +async def llm_call(model: str, messages: list[dict], stream: bool = False) -> Any: + """Single LLM call via OpenRouter. Returns full text or (client, response) for streaming.""" + headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"} + body = {"model": model, "messages": messages, "stream": stream} + + client = httpx.AsyncClient(timeout=60) + if stream: + resp = await client.send(client.build_request("POST", OPENROUTER_URL, headers=headers, json=body), stream=True) + return client, resp + + resp = await client.post(OPENROUTER_URL, headers=headers, json=body) + await client.aclose() + data = resp.json() + if "choices" not in data: + log.error(f"LLM error: {data}") + return f"[LLM error: {data.get('error', {}).get('message', 'unknown')}]" + return data["choices"][0]["message"]["content"] + + +def estimate_tokens(text: str) -> int: + """Rough token estimate: 1 token ~ 4 chars.""" + return len(text) // 4 + + +def fit_context(messages: list[dict], max_tokens: int, protect_last: int = 4) -> list[dict]: + """Trim oldest messages (after system prompt) to fit token budget. + Always keeps: system prompt(s) at start + last `protect_last` messages.""" + if not messages: + return messages + + system_msgs = [] + rest = [] + for m in messages: + if not rest and m["role"] == "system": + system_msgs.append(m) + else: + rest.append(m) + + protected = rest[-protect_last:] if len(rest) > protect_last else rest + middle = rest[:-protect_last] if len(rest) > protect_last else [] + + fixed_tokens = sum(estimate_tokens(m["content"]) for m in system_msgs + protected) + + if fixed_tokens >= max_tokens: + result = system_msgs + protected + total = sum(estimate_tokens(m["content"]) for m in result) + while total > max_tokens and len(result) > 2: + removed = result.pop(1) + total -= estimate_tokens(removed["content"]) + return result + + remaining = max_tokens - fixed_tokens + kept_middle = [] + for m in reversed(middle): + t = estimate_tokens(m["content"]) + if remaining - t < 0: + break + kept_middle.insert(0, m) + remaining -= t + + return system_msgs + kept_middle + protected diff --git a/agent/nodes/__init__.py b/agent/nodes/__init__.py new file mode 100644 index 0000000..b5890e7 --- /dev/null +++ b/agent/nodes/__init__.py @@ -0,0 +1,9 @@ +"""Node modules.""" + +from .sensor import SensorNode +from .input import InputNode +from .output import OutputNode +from .thinker import ThinkerNode +from .memorizer import MemorizerNode + +__all__ = ["SensorNode", "InputNode", "OutputNode", "ThinkerNode", "MemorizerNode"] diff --git a/agent/nodes/base.py b/agent/nodes/base.py new file mode 100644 index 0000000..c7e57c1 --- /dev/null +++ b/agent/nodes/base.py @@ -0,0 +1,31 @@ +"""Base Node class with context management.""" + +import logging + +from ..llm import estimate_tokens, fit_context + +log = logging.getLogger("runtime") + + +class Node: + name: str = "node" + model: str | None = None + max_context_tokens: int = 4000 + + def __init__(self, send_hud): + self.send_hud = send_hud + self.last_context_tokens = 0 + self.context_fill_pct = 0 + + async def hud(self, event: str, **data): + await self.send_hud({"node": self.name, "event": event, **data}) + + def trim_context(self, messages: list[dict]) -> list[dict]: + """Fit messages within this node's token budget.""" + before = len(messages) + result = fit_context(messages, self.max_context_tokens) + self.last_context_tokens = sum(estimate_tokens(m["content"]) for m in result) + self.context_fill_pct = int(100 * self.last_context_tokens / self.max_context_tokens) + if before != len(result): + log.info(f"[{self.name}] context trimmed: {before} -> {len(result)} msgs, {self.context_fill_pct}% fill") + return result diff --git a/agent/nodes/input.py b/agent/nodes/input.py new file mode 100644 index 0000000..b3b978b --- /dev/null +++ b/agent/nodes/input.py @@ -0,0 +1,48 @@ +"""Input Node: perceives what the user said.""" + +import logging + +from .base import Node +from ..llm import llm_call +from ..types import Envelope, Command + +log = logging.getLogger("runtime") + + +class InputNode(Node): + name = "input" + model = "google/gemini-2.0-flash-001" + max_context_tokens = 2000 + + SYSTEM = """You are the Input node — the ear of this cognitive runtime. + +Listener context: +- Authenticated user: {identity} +- Channel: {channel} (Chrome browser on Nico's Windows PC, in his room at home) +- Physical: private space, Nico lives with Tina — she may use this session too +- Security: single-user account, shared physical space — other voices are trusted household + +Your job: describe what you heard. Who spoke, what they want, what tone, what context matters. +ONE sentence. No content, no response — just your perception of what came through. + +{memory_context}""" + + async def process(self, envelope: Envelope, history: list[dict], memory_context: str = "", + identity: str = "unknown", channel: str = "unknown") -> Command: + await self.hud("thinking", detail="deciding how to respond") + log.info(f"[input] user said: {envelope.text}") + + messages = [ + {"role": "system", "content": self.SYSTEM.format( + memory_context=memory_context, identity=identity, channel=channel)}, + ] + for msg in history[-8:]: + messages.append(msg) + messages = self.trim_context(messages) + + await self.hud("context", messages=messages, tokens=self.last_context_tokens, + max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct) + instruction = await llm_call(self.model, messages) + log.info(f"[input] -> command: {instruction}") + await self.hud("perceived", instruction=instruction) + return Command(instruction=instruction, source_text=envelope.text) diff --git a/agent/nodes/memorizer.py b/agent/nodes/memorizer.py new file mode 100644 index 0000000..04763d0 --- /dev/null +++ b/agent/nodes/memorizer.py @@ -0,0 +1,99 @@ +"""Memorizer Node: S2 — shared state / coordination.""" + +import json +import logging + +from .base import Node +from ..llm import llm_call + +log = logging.getLogger("runtime") + + +class MemorizerNode(Node): + name = "memorizer" + model = "google/gemini-2.0-flash-001" + max_context_tokens = 3000 + + DISTILL_SYSTEM = """You are the Memorizer node of a cognitive agent runtime. +After each exchange you update the shared state that Input and Output nodes read. + +Given the conversation so far, output a JSON object with these fields: +- user_name: string — how the user identifies themselves (null if unknown) +- user_mood: string — current emotional tone (neutral, happy, frustrated, playful, etc.) +- topic: string — what the conversation is about right now +- topic_history: list of strings — previous topics in this session +- situation: string — social/physical context if mentioned (e.g. "at a pub with tina", "private dev session") +- language: string — primary language being used (en, de, mixed) +- style_hint: string — how Output should talk (casual, formal, technical, poetic, etc.) +- facts: list of strings — important facts learned about the user + +Output ONLY valid JSON. No explanation, no markdown fences.""" + + def __init__(self, send_hud): + super().__init__(send_hud) + self.state: dict = { + "user_name": None, + "user_mood": "neutral", + "topic": None, + "topic_history": [], + "situation": "localhost test runtime, private dev session", + "language": "en", + "style_hint": "casual, technical", + "facts": [], + } + + def get_context_block(self, sensor_lines: list[str] = None) -> str: + lines = sensor_lines or ["Sensors: (none)"] + lines.append("") + lines.append("Shared memory (from Memorizer):") + for k, v in self.state.items(): + if v: + lines.append(f"- {k}: {v}") + return "\n".join(lines) + + async def update(self, history: list[dict]): + if len(history) < 2: + await self.hud("updated", state=self.state) + return + + await self.hud("thinking", detail="updating shared state") + + messages = [ + {"role": "system", "content": self.DISTILL_SYSTEM}, + {"role": "system", "content": f"Current state: {json.dumps(self.state)}"}, + ] + for msg in history[-10:]: + messages.append(msg) + messages.append({"role": "user", "content": "Update the shared state based on this conversation. Output JSON only."}) + messages = self.trim_context(messages) + + await self.hud("context", messages=messages, tokens=self.last_context_tokens, + max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct) + + raw = await llm_call(self.model, messages) + log.info(f"[memorizer] raw: {raw[:200]}") + + text = raw.strip() + if text.startswith("```"): + text = text.split("\n", 1)[1] if "\n" in text else text[3:] + if text.endswith("```"): + text = text[:-3] + text = text.strip() + + try: + new_state = json.loads(text) + old_facts = set(self.state.get("facts", [])) + new_facts = set(new_state.get("facts", [])) + new_state["facts"] = list(old_facts | new_facts)[-20:] + if self.state.get("topic") and self.state["topic"] != new_state.get("topic"): + hist = new_state.get("topic_history", []) + if self.state["topic"] not in hist: + hist.append(self.state["topic"]) + new_state["topic_history"] = hist[-5:] + self.state = new_state + log.info(f"[memorizer] updated state: {self.state}") + await self.hud("updated", state=self.state) + except (json.JSONDecodeError, Exception) as e: + log.error(f"[memorizer] update error: {e}, raw: {text[:200]}") + await self.hud("error", detail=f"Update failed: {e}") + await self.hud("updated", state=self.state) diff --git a/agent/nodes/output.py b/agent/nodes/output.py new file mode 100644 index 0000000..35e659e --- /dev/null +++ b/agent/nodes/output.py @@ -0,0 +1,63 @@ +"""Output Node: streams natural response to the user.""" + +import json +import logging + +from fastapi import WebSocket + +from .base import Node +from ..llm import llm_call +from ..types import Command + +log = logging.getLogger("runtime") + + +class OutputNode(Node): + name = "output" + model = "google/gemini-2.0-flash-001" + max_context_tokens = 4000 + + SYSTEM = """You are the Output node — the voice of this cognitive runtime. +The Input node sends you its perception of what the user said. This is internal context for you — never repeat or echo it. +You respond to the USER, not to the Input node. Use the perception to understand intent, then act on it. +Be natural. Be concise. If the user asks you to do something, do it — don't describe what you're about to do. + +{memory_context}""" + + async def process(self, command: Command, history: list[dict], ws: WebSocket, memory_context: str = "") -> str: + await self.hud("streaming") + + messages = [ + {"role": "system", "content": self.SYSTEM.format(memory_context=memory_context)}, + ] + for msg in history[-20:]: + messages.append(msg) + messages.append({"role": "system", "content": f"Input perception: {command.instruction}"}) + messages = self.trim_context(messages) + + await self.hud("context", messages=messages, tokens=self.last_context_tokens, + max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct) + + client, resp = await llm_call(self.model, messages, stream=True) + full_response = "" + try: + async for line in resp.aiter_lines(): + if not line.startswith("data: "): + continue + payload = line[6:] + if payload == "[DONE]": + break + chunk = json.loads(payload) + delta = chunk["choices"][0].get("delta", {}) + token = delta.get("content", "") + if token: + full_response += token + await ws.send_text(json.dumps({"type": "delta", "content": token})) + finally: + await resp.aclose() + await client.aclose() + + log.info(f"[output] response: {full_response[:100]}...") + await ws.send_text(json.dumps({"type": "done"})) + await self.hud("done") + return full_response diff --git a/agent/nodes/sensor.py b/agent/nodes/sensor.py new file mode 100644 index 0000000..6ac65b0 --- /dev/null +++ b/agent/nodes/sensor.py @@ -0,0 +1,132 @@ +"""Sensor Node: ticks independently, produces context for other nodes.""" + +import asyncio +import logging +import time +from datetime import datetime, timezone, timedelta + +from .base import Node + +log = logging.getLogger("runtime") + +BERLIN = timezone(timedelta(hours=2)) # CEST + + +class SensorNode(Node): + name = "sensor" + + def __init__(self, send_hud): + super().__init__(send_hud) + self.tick_count = 0 + self.running = False + self._task: asyncio.Task | None = None + self.interval = 5 + self.readings: dict[str, dict] = {} + self._last_user_activity: float = time.time() + self._prev_memo_state: dict = {} + + def _now(self) -> datetime: + return datetime.now(BERLIN) + + def _read_clock(self) -> dict: + now = self._now() + current = now.strftime("%H:%M") + prev = self.readings.get("clock", {}).get("value") + if current != prev: + return {"value": current, "detail": now.strftime("%Y-%m-%d %H:%M:%S %A"), "changed_at": time.time()} + return {} + + def _read_idle(self) -> dict: + idle_s = time.time() - self._last_user_activity + thresholds = [30, 60, 300, 600, 1800] + prev_idle = self.readings.get("idle", {}).get("_raw", 0) + for t in thresholds: + if prev_idle < t <= idle_s: + if idle_s < 60: + label = f"{int(idle_s)}s" + else: + label = f"{int(idle_s // 60)}m{int(idle_s % 60)}s" + return {"value": label, "_raw": idle_s, "changed_at": time.time()} + if "idle" in self.readings: + self.readings["idle"]["_raw"] = idle_s + return {} + + def _read_memo_changes(self, memo_state: dict) -> dict: + changes = [] + for k, v in memo_state.items(): + prev = self._prev_memo_state.get(k) + if v != prev and prev is not None: + changes.append(f"{k}: {prev} -> {v}") + self._prev_memo_state = dict(memo_state) + if changes: + return {"value": "; ".join(changes), "changed_at": time.time()} + return {} + + def note_user_activity(self): + self._last_user_activity = time.time() + self.readings["idle"] = {"value": "active", "_raw": 0, "changed_at": time.time()} + + async def tick(self, memo_state: dict): + self.tick_count += 1 + deltas = {} + + for name, reader in [("clock", self._read_clock), + ("idle", self._read_idle)]: + update = reader() + if update: + self.readings[name] = {**self.readings.get(name, {}), **update} + deltas[name] = update.get("value") or update.get("detail") + + memo_update = self._read_memo_changes(memo_state) + if memo_update: + self.readings["memo_delta"] = memo_update + deltas["memo_delta"] = memo_update["value"] + + if deltas: + await self.hud("tick", tick=self.tick_count, deltas=deltas) + + async def _loop(self, get_memo_state): + self.running = True + await self.hud("started", interval=self.interval) + try: + while self.running: + await asyncio.sleep(self.interval) + try: + await self.tick(get_memo_state()) + except Exception as e: + log.error(f"[sensor] tick error: {e}") + except asyncio.CancelledError: + pass + finally: + self.running = False + await self.hud("stopped") + + def start(self, get_memo_state): + if self._task and not self._task.done(): + return + self._task = asyncio.create_task(self._loop(get_memo_state)) + + def stop(self): + self.running = False + if self._task: + self._task.cancel() + + def get_context_lines(self) -> list[str]: + if not self.readings: + return ["Sensors: (no sensor node running)"] + lines = [f"Sensors (tick #{self.tick_count}, {self.interval}s interval):"] + for name, r in self.readings.items(): + if name.startswith("_"): + continue + val = r.get("value", "?") + detail = r.get("detail") + age = time.time() - r.get("changed_at", time.time()) + if age < 10: + age_str = "just now" + elif age < 60: + age_str = f"{int(age)}s ago" + else: + age_str = f"{int(age // 60)}m ago" + line = f"- {name}: {detail or val} [{age_str}]" + lines.append(line) + return lines diff --git a/agent/nodes/thinker.py b/agent/nodes/thinker.py new file mode 100644 index 0000000..871a617 --- /dev/null +++ b/agent/nodes/thinker.py @@ -0,0 +1,166 @@ +"""Thinker Node: S3 — control, reasoning, tool use.""" + +import json +import logging +import re + +from .base import Node +from ..llm import llm_call +from ..process import ProcessManager +from ..types import Command, ThoughtResult + +log = logging.getLogger("runtime") + + +class ThinkerNode(Node): + name = "thinker" + model = "google/gemini-2.5-flash-preview" + max_context_tokens = 4000 + + SYSTEM = """You are the Thinker node — the brain of this cognitive runtime. +You receive a perception of what the user said. Decide: answer directly, use a tool, or show UI controls. + +TOOLS — write a ```python code block and it WILL be executed. Use print() for output. +- For math, databases, file ops, any computation: write python. NEVER describe code — write it. +- For simple conversation: respond directly as text. + +UI CONTROLS — to show interactive elements, include a JSON block: +```controls +[ + {{"type": "table", "data": [...], "columns": ["id", "name", "email"]}}, + {{"type": "button", "label": "Add Customer", "action": "add_customer"}}, + {{"type": "button", "label": "Refresh", "action": "refresh_customers"}} +] +``` +Controls render in the chat. User clicks flow back as actions you can handle. + +You can combine text + code + controls in one response. + +{memory_context}""" + + def __init__(self, send_hud, process_manager: ProcessManager = None): + super().__init__(send_hud) + self.pm = process_manager + + def _parse_tool_call(self, response: str) -> tuple[str, str] | None: + """Parse tool calls. Supports TOOL: format and auto-detects python code blocks.""" + text = response.strip() + + if text.startswith("TOOL:"): + lines = text.split("\n") + tool_name = lines[0].replace("TOOL:", "").strip() + code_lines = [] + in_code = False + for line in lines[1:]: + if line.strip().startswith("```") and not in_code: + in_code = True + continue + elif line.strip().startswith("```") and in_code: + break + elif in_code: + code_lines.append(line) + elif line.strip().startswith("CODE:"): + continue + return (tool_name, "\n".join(code_lines)) if code_lines else None + + block_match = re.search(r'```(?:python|py|sql|sqlite|sh|bash|tool_code)?\s*\n(.*?)```', text, re.DOTALL) + if block_match: + code = block_match.group(1).strip() + if code and len(code.split("\n")) > 0: + if "```sql" in text or "```sqlite" in text or ("SELECT" in code.upper() and "CREATE" in code.upper()): + wrapped = f'''import sqlite3 +conn = sqlite3.connect("/tmp/cog_db.sqlite") +cursor = conn.cursor() +for stmt in """{code}""".split(";"): + stmt = stmt.strip() + if stmt: + cursor.execute(stmt) +conn.commit() +cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") +tables = cursor.fetchall() +for t in tables: + cursor.execute(f"SELECT * FROM {{t[0]}}") + rows = cursor.fetchall() + cols = [d[0] for d in cursor.description] + print(f"Table: {{t[0]}}") + print(" | ".join(cols)) + for row in rows: + print(" | ".join(str(c) for c in row)) +conn.close()''' + return ("python", wrapped) + return ("python", code) + + return None + + def _parse_controls(self, response: str) -> list[dict]: + """Extract ```controls JSON blocks from response.""" + controls = [] + if "```controls" not in response: + return controls + parts = response.split("```controls") + for part in parts[1:]: + end = part.find("```") + if end != -1: + try: + controls.extend(json.loads(part[:end].strip())) + except json.JSONDecodeError: + pass + return controls + + def _strip_blocks(self, response: str) -> str: + """Remove code and control blocks, return plain text.""" + text = re.sub(r'```(?:python|py|controls).*?```', '', response, flags=re.DOTALL) + return text.strip() + + async def process(self, command: Command, history: list[dict], memory_context: str = "") -> ThoughtResult: + await self.hud("thinking", detail="reasoning about response") + + messages = [ + {"role": "system", "content": self.SYSTEM.format(memory_context=memory_context)}, + ] + for msg in history[-12:]: + messages.append(msg) + messages.append({"role": "system", "content": f"Input perception: {command.instruction}"}) + messages = self.trim_context(messages) + + await self.hud("context", messages=messages, tokens=self.last_context_tokens, + max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct) + + response = await llm_call(self.model, messages) + log.info(f"[thinker] response: {response[:200]}") + + controls = self._parse_controls(response) + if controls: + await self.hud("controls", controls=controls) + + tool_call = self._parse_tool_call(response) + if tool_call: + tool_name, code = tool_call + + if self.pm and tool_name == "python": + proc = await self.pm.execute(tool_name, code) + tool_output = "\n".join(proc.output_lines) + else: + tool_output = f"[unknown tool: {tool_name}]" + + log.info(f"[thinker] tool output: {tool_output[:200]}") + + messages.append({"role": "assistant", "content": response}) + messages.append({"role": "system", "content": f"Tool output:\n{tool_output}"}) + messages.append({"role": "user", "content": "Respond to the user based on the tool output. If showing data, include a ```controls block with a table. Be natural and concise."}) + messages = self.trim_context(messages) + final = await llm_call(self.model, messages) + + more_controls = self._parse_controls(final) + if more_controls: + controls.extend(more_controls) + await self.hud("controls", controls=more_controls) + + clean_text = self._strip_blocks(final) + await self.hud("decided", instruction=clean_text[:200]) + return ThoughtResult(response=clean_text, tool_used=tool_name, + tool_output=tool_output, controls=controls) + + clean_text = self._strip_blocks(response) or response + await self.hud("decided", instruction="direct response (no tools)") + return ThoughtResult(response=clean_text, controls=controls) diff --git a/agent/process.py b/agent/process.py new file mode 100644 index 0000000..81114fb --- /dev/null +++ b/agent/process.py @@ -0,0 +1,104 @@ +"""Process Manager: observable tool execution via subprocess.""" + +import asyncio +import subprocess +import tempfile +import time + + +class Process: + """A single observable tool execution.""" + _next_id = 0 + + def __init__(self, tool: str, code: str, send_hud): + Process._next_id += 1 + self.pid = Process._next_id + self.tool = tool + self.code = code + self.send_hud = send_hud + self.status = "pending" + self.output_lines: list[str] = [] + self.exit_code: int | None = None + self.started_at: float = 0 + self.ended_at: float = 0 + self._subprocess: subprocess.Popen | None = None + + async def hud(self, event: str, **data): + await self.send_hud({"node": "process", "event": event, "pid": self.pid, + "tool": self.tool, "status": self.status, **data}) + + def run_sync(self) -> str: + """Execute the tool synchronously. Returns output.""" + self.status = "running" + self.started_at = time.time() + try: + with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as f: + f.write(self.code) + f.flush() + self._subprocess = subprocess.Popen( + ['python3', f.name], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + text=True, cwd=tempfile.gettempdir() + ) + stdout, stderr = self._subprocess.communicate(timeout=10) + self.exit_code = self._subprocess.returncode + if stdout: + self.output_lines.extend(stdout.strip().split("\n")) + if self.exit_code != 0 and stderr: + self.output_lines.append(f"[stderr: {stderr.strip()}]") + self.status = "done" if self.exit_code == 0 else "failed" + except subprocess.TimeoutExpired: + if self._subprocess: + self._subprocess.kill() + self.output_lines.append("[error: timed out after 10s]") + self.status = "failed" + self.exit_code = -1 + except Exception as e: + self.output_lines.append(f"[error: {e}]") + self.status = "failed" + self.exit_code = -1 + finally: + self.ended_at = time.time() + return "\n".join(self.output_lines) or "[no output]" + + def cancel(self): + if self._subprocess and self.status == "running": + self._subprocess.kill() + self.status = "cancelled" + self.ended_at = time.time() + self.output_lines.append("[cancelled by user]") + + +class ProcessManager: + """Manages all tool executions as observable processes.""" + + def __init__(self, send_hud): + self.send_hud = send_hud + self.processes: dict[int, Process] = {} + + async def execute(self, tool: str, code: str) -> Process: + """Create and run a process. Returns the completed Process.""" + proc = Process(tool, code, self.send_hud) + self.processes[proc.pid] = proc + + await proc.hud("process_start", code=code[:200]) + + loop = asyncio.get_event_loop() + output = await loop.run_in_executor(None, proc.run_sync) + + elapsed = round(proc.ended_at - proc.started_at, 2) + await proc.hud("process_done", exit_code=proc.exit_code, + output=output[:500], elapsed=elapsed) + return proc + + def cancel(self, pid: int) -> bool: + proc = self.processes.get(pid) + if proc: + proc.cancel() + return True + return False + + def get_status(self) -> list[dict]: + return [{"pid": p.pid, "tool": p.tool, "status": p.status, + "elapsed": round((p.ended_at or time.time()) - p.started_at, 2) if p.started_at else 0} + for p in self.processes.values()] diff --git a/agent/runtime.py b/agent/runtime.py new file mode 100644 index 0000000..fa2cd3f --- /dev/null +++ b/agent/runtime.py @@ -0,0 +1,98 @@ +"""Runtime: wires all nodes together into a processing pipeline.""" + +import json +import logging +import time +from pathlib import Path +from typing import Callable + +from fastapi import WebSocket + +from .types import Envelope, Command +from .process import ProcessManager +from .nodes import SensorNode, InputNode, OutputNode, ThinkerNode, MemorizerNode + +log = logging.getLogger("runtime") + +TRACE_FILE = Path(__file__).parent.parent / "trace.jsonl" + + +class Runtime: + def __init__(self, ws: WebSocket, user_claims: dict = None, origin: str = "", + broadcast: Callable = None): + self.ws = ws + self.history: list[dict] = [] + self.MAX_HISTORY = 40 + self._broadcast = broadcast or (lambda e: None) + + self.input_node = InputNode(send_hud=self._send_hud) + self.process_manager = ProcessManager(send_hud=self._send_hud) + self.thinker = ThinkerNode(send_hud=self._send_hud, process_manager=self.process_manager) + self.output_node = OutputNode(send_hud=self._send_hud) + self.memorizer = MemorizerNode(send_hud=self._send_hud) + self.sensor = SensorNode(send_hud=self._send_hud) + self.sensor.start(get_memo_state=lambda: self.memorizer.state) + + claims = user_claims or {} + log.info(f"[runtime] user_claims: {claims}") + self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or "unknown" + log.info(f"[runtime] resolved identity: {self.identity}") + self.channel = origin or "unknown" + + self.memorizer.state["user_name"] = self.identity + self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session" + + async def _send_hud(self, data: dict): + await self.ws.send_text(json.dumps({"type": "hud", **data})) + trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data} + try: + with open(TRACE_FILE, "a", encoding="utf-8") as f: + f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n") + if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000: + lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n") + TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8") + except Exception as e: + log.error(f"trace write error: {e}") + self._broadcast(trace_entry) + + async def handle_message(self, text: str): + envelope = Envelope( + text=text, + user_id="nico", + session_id="test", + timestamp=time.strftime("%Y-%m-%d %H:%M:%S"), + ) + + self.sensor.note_user_activity() + self.history.append({"role": "user", "content": text}) + + sensor_lines = self.sensor.get_context_lines() + mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines) + + command = await self.input_node.process( + envelope, self.history, memory_context=mem_ctx, + identity=self.identity, channel=self.channel) + + thought = await self.thinker.process(command, self.history, memory_context=mem_ctx) + + if thought.tool_used: + command = Command( + instruction=f"Thinker used {thought.tool_used} and says: {thought.response}", + source_text=command.source_text + ) + else: + command = Command( + instruction=f"Thinker says: {thought.response}", + source_text=command.source_text + ) + + response = await self.output_node.process(command, self.history, self.ws, memory_context=mem_ctx) + self.history.append({"role": "assistant", "content": response}) + + if thought.controls: + await self.ws.send_text(json.dumps({"type": "controls", "controls": thought.controls})) + + await self.memorizer.update(self.history) + + if len(self.history) > self.MAX_HISTORY: + self.history = self.history[-self.MAX_HISTORY:] diff --git a/agent/types.py b/agent/types.py new file mode 100644 index 0000000..ec52721 --- /dev/null +++ b/agent/types.py @@ -0,0 +1,29 @@ +"""Message types flowing between nodes.""" + +from dataclasses import dataclass, field + + +@dataclass +class Envelope: + """What flows between nodes.""" + text: str + user_id: str = "anon" + session_id: str = "" + timestamp: str = "" + + +@dataclass +class Command: + """Input node's perception — describes what was heard.""" + instruction: str + source_text: str + metadata: dict = field(default_factory=dict) + + +@dataclass +class ThoughtResult: + """Thinker node's output — either a direct answer or tool results.""" + response: str + tool_used: str = "" + tool_output: str = "" + controls: list = field(default_factory=list)