v0.8.0: refactor agent.py into modular package

Split 1161-line monolith into agent/ package:
auth, llm, types, process, runtime, api, and
nodes/ (base, sensor, input, output, thinker, memorizer).
No logic changes — pure structural split.
uvicorn agent:app entrypoint unchanged.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Nico 2026-03-28 01:36:41 +01:00
parent 20363a1f2f
commit 7458b2ea35
15 changed files with 1172 additions and 1160 deletions

1160
agent.py

File diff suppressed because it is too large Load Diff

37
agent/__init__.py Normal file
View File

@ -0,0 +1,37 @@
"""Cognitive Agent Runtime — modular package.
uvicorn entrypoint: agent:app
"""
import logging
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent.parent / ".env")
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s", datefmt="%H:%M:%S")
from fastapi import FastAPI
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from .api import register_routes
STATIC_DIR = Path(__file__).parent.parent / "static"
app = FastAPI(title="cog")
# Register all API + WS routes
register_routes(app)
# Serve index.html explicitly, then static assets
@app.get("/")
async def index():
return FileResponse(STATIC_DIR / "index.html")
@app.get("/callback")
async def callback():
"""OIDC callback — serves the same SPA, JS handles the code exchange."""
return FileResponse(STATIC_DIR / "index.html")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")

188
agent/api.py Normal file
View File

@ -0,0 +1,188 @@
"""API endpoints, SSE, polling."""
import asyncio
import hashlib
import json
import logging
from asyncio import Queue
from pathlib import Path
from fastapi import Depends, HTTPException, Query, WebSocket, WebSocketDisconnect
from starlette.responses import StreamingResponse
import httpx
from .auth import AUTH_ENABLED, ZITADEL_ISSUER, _validate_token, require_auth
from .runtime import Runtime, TRACE_FILE
log = logging.getLogger("runtime")
# Active runtime reference (set by WS endpoint)
_active_runtime: Runtime | None = None
# SSE subscribers
_sse_subscribers: list[Queue] = []
def _broadcast_sse(event: dict):
"""Push an event to all SSE subscribers."""
for q in _sse_subscribers:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
def _state_hash() -> str:
if not _active_runtime:
return "no_session"
raw = json.dumps({
"mem": _active_runtime.memorizer.state,
"hlen": len(_active_runtime.history),
}, sort_keys=True)
return hashlib.md5(raw.encode()).hexdigest()[:12]
def register_routes(app):
"""Register all API routes on the FastAPI app."""
@app.get("/health")
async def health():
return {"status": "ok"}
@app.get("/auth/config")
async def auth_config():
from .auth import ZITADEL_ISSUER, ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID, AUTH_ENABLED
return {
"enabled": AUTH_ENABLED,
"issuer": ZITADEL_ISSUER,
"clientId": ZITADEL_CLIENT_ID,
"projectId": ZITADEL_PROJECT_ID,
}
@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket, token: str | None = Query(None),
access_token: str | None = Query(None)):
global _active_runtime
user_claims = {"sub": "anonymous"}
if AUTH_ENABLED and token:
try:
user_claims = await _validate_token(token)
if not user_claims.get("name") and access_token:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{ZITADEL_ISSUER}/oidc/v1/userinfo",
headers={"Authorization": f"Bearer {access_token}"})
if resp.status_code == 200:
info = resp.json()
log.info(f"[auth] userinfo enrichment: {info}")
user_claims["name"] = info.get("name")
user_claims["preferred_username"] = info.get("preferred_username")
user_claims["email"] = info.get("email")
except HTTPException:
await ws.close(code=4001, reason="Invalid token")
return
origin = ws.headers.get("origin", ws.headers.get("host", ""))
await ws.accept()
runtime = Runtime(ws, user_claims=user_claims, origin=origin, broadcast=_broadcast_sse)
_active_runtime = runtime
try:
while True:
data = await ws.receive_text()
msg = json.loads(data)
if msg.get("type") == "action":
action_text = f"[user clicked: {msg.get('action', 'unknown')}]"
if msg.get("data"):
action_text += f" data: {json.dumps(msg['data'])}"
await runtime.handle_message(action_text)
elif msg.get("type") == "cancel_process":
runtime.process_manager.cancel(msg.get("pid", 0))
else:
await runtime.handle_message(msg.get("text", ""))
except WebSocketDisconnect:
runtime.sensor.stop()
if _active_runtime is runtime:
_active_runtime = None
@app.get("/api/events")
async def sse_events(user=Depends(require_auth)):
q: Queue = Queue(maxsize=100)
_sse_subscribers.append(q)
async def generate():
try:
while True:
event = await q.get()
yield f"data: {json.dumps(event)}\n\n"
except asyncio.CancelledError:
pass
finally:
_sse_subscribers.remove(q)
return StreamingResponse(generate(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
@app.get("/api/poll")
async def poll(since: str = "", user=Depends(require_auth)):
h = _state_hash()
if since and since == h:
return {"changed": False, "hash": h}
return {
"changed": True,
"hash": h,
"state": _active_runtime.memorizer.state if _active_runtime else None,
"history_len": len(_active_runtime.history) if _active_runtime else 0,
"last_messages": _active_runtime.history[-3:] if _active_runtime else [],
}
@app.post("/api/send")
async def api_send(body: dict, user=Depends(require_auth)):
if not _active_runtime:
raise HTTPException(status_code=409, detail="No active session -- someone must be connected via WS first")
text = body.get("text", "").strip()
if not text:
raise HTTPException(status_code=400, detail="Missing 'text' field")
await _active_runtime.handle_message(text)
return {
"status": "ok",
"response": _active_runtime.history[-1]["content"] if _active_runtime.history else "",
"memorizer": _active_runtime.memorizer.state,
}
@app.post("/api/clear")
async def api_clear(user=Depends(require_auth)):
if not _active_runtime:
raise HTTPException(status_code=409, detail="No active session")
_active_runtime.history.clear()
return {"status": "cleared"}
@app.get("/api/state")
async def get_state(user=Depends(require_auth)):
if not _active_runtime:
return {"status": "no_session"}
return {
"status": "active",
"memorizer": _active_runtime.memorizer.state,
"history_len": len(_active_runtime.history),
}
@app.get("/api/history")
async def get_history(last: int = 10, user=Depends(require_auth)):
if not _active_runtime:
return {"status": "no_session", "messages": []}
return {
"status": "active",
"messages": _active_runtime.history[-last:],
}
@app.get("/api/trace")
async def get_trace(last: int = 30, user=Depends(require_auth)):
if not TRACE_FILE.exists():
return {"lines": []}
lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n")
parsed = []
for line in lines[-last:]:
try:
parsed.append(json.loads(line))
except json.JSONDecodeError:
pass
return {"lines": parsed}

92
agent/auth.py Normal file
View File

@ -0,0 +1,92 @@
"""OIDC auth: Zitadel token validation, FastAPI dependencies."""
import json
import logging
import os
import time
import httpx
from fastapi import Depends, HTTPException, Query
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
log = logging.getLogger("runtime")
ZITADEL_ISSUER = os.environ.get("ZITADEL_ISSUER", "https://auth.loop42.de")
ZITADEL_CLIENT_ID = os.environ.get("ZITADEL_CLIENT_ID", "365996029172056091")
ZITADEL_PROJECT_ID = os.environ.get("ZITADEL_PROJECT_ID", "365995955654230043")
AUTH_ENABLED = os.environ.get("AUTH_ENABLED", "false").lower() == "true"
SERVICE_TOKENS = set(filter(None, os.environ.get("SERVICE_TOKENS", "").split(",")))
_jwks_cache: dict = {"keys": [], "fetched_at": 0}
async def _get_jwks():
if time.time() - _jwks_cache["fetched_at"] < 3600:
return _jwks_cache["keys"]
async with httpx.AsyncClient() as client:
resp = await client.get(f"{ZITADEL_ISSUER}/oauth/v2/keys")
_jwks_cache["keys"] = resp.json()["keys"]
_jwks_cache["fetched_at"] = time.time()
return _jwks_cache["keys"]
async def _validate_token(token: str) -> dict:
"""Validate token: check service tokens, then JWT, then userinfo."""
import base64
if token in SERVICE_TOKENS:
return {"sub": "titan", "username": "titan", "source": "service_token"}
try:
parts = token.split(".")
if len(parts) == 3:
keys = await _get_jwks()
header_b64 = parts[0] + "=" * (4 - len(parts[0]) % 4)
header = json.loads(base64.urlsafe_b64decode(header_b64))
kid = header.get("kid")
key = next((k for k in keys if k["kid"] == kid), None)
if key:
import jwt as pyjwt
from jwt import PyJWK
jwk_obj = PyJWK(key)
claims = pyjwt.decode(
token, jwk_obj.key, algorithms=["RS256"],
issuer=ZITADEL_ISSUER, options={"verify_aud": False},
)
return claims
except Exception:
pass
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{ZITADEL_ISSUER}/oidc/v1/userinfo",
headers={"Authorization": f"Bearer {token}"},
)
if resp.status_code == 200:
info = resp.json()
log.info(f"[auth] userinfo response: {info}")
return {"sub": info.get("sub"), "preferred_username": info.get("preferred_username"),
"email": info.get("email"), "name": info.get("name"), "source": "userinfo"}
raise HTTPException(status_code=401, detail="Invalid token")
_bearer = HTTPBearer(auto_error=False)
async def require_auth(credentials: HTTPAuthorizationCredentials | None = Depends(_bearer)):
"""Dependency: require valid JWT when AUTH_ENABLED."""
if not AUTH_ENABLED:
return {"sub": "anonymous"}
if not credentials:
raise HTTPException(status_code=401, detail="Missing token")
return await _validate_token(credentials.credentials)
async def ws_auth(token: str | None = Query(None)) -> dict:
"""Validate WebSocket token from query param."""
if not AUTH_ENABLED:
return {"sub": "anonymous"}
if not token:
return None
return await _validate_token(token)

76
agent/llm.py Normal file
View File

@ -0,0 +1,76 @@
"""LLM helper: OpenRouter calls, token estimation, context fitting."""
import json
import logging
import os
from typing import Any
import httpx
log = logging.getLogger("runtime")
API_KEY = os.environ.get("OPENROUTER_API_KEY", "")
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
async def llm_call(model: str, messages: list[dict], stream: bool = False) -> Any:
"""Single LLM call via OpenRouter. Returns full text or (client, response) for streaming."""
headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
body = {"model": model, "messages": messages, "stream": stream}
client = httpx.AsyncClient(timeout=60)
if stream:
resp = await client.send(client.build_request("POST", OPENROUTER_URL, headers=headers, json=body), stream=True)
return client, resp
resp = await client.post(OPENROUTER_URL, headers=headers, json=body)
await client.aclose()
data = resp.json()
if "choices" not in data:
log.error(f"LLM error: {data}")
return f"[LLM error: {data.get('error', {}).get('message', 'unknown')}]"
return data["choices"][0]["message"]["content"]
def estimate_tokens(text: str) -> int:
"""Rough token estimate: 1 token ~ 4 chars."""
return len(text) // 4
def fit_context(messages: list[dict], max_tokens: int, protect_last: int = 4) -> list[dict]:
"""Trim oldest messages (after system prompt) to fit token budget.
Always keeps: system prompt(s) at start + last `protect_last` messages."""
if not messages:
return messages
system_msgs = []
rest = []
for m in messages:
if not rest and m["role"] == "system":
system_msgs.append(m)
else:
rest.append(m)
protected = rest[-protect_last:] if len(rest) > protect_last else rest
middle = rest[:-protect_last] if len(rest) > protect_last else []
fixed_tokens = sum(estimate_tokens(m["content"]) for m in system_msgs + protected)
if fixed_tokens >= max_tokens:
result = system_msgs + protected
total = sum(estimate_tokens(m["content"]) for m in result)
while total > max_tokens and len(result) > 2:
removed = result.pop(1)
total -= estimate_tokens(removed["content"])
return result
remaining = max_tokens - fixed_tokens
kept_middle = []
for m in reversed(middle):
t = estimate_tokens(m["content"])
if remaining - t < 0:
break
kept_middle.insert(0, m)
remaining -= t
return system_msgs + kept_middle + protected

9
agent/nodes/__init__.py Normal file
View File

@ -0,0 +1,9 @@
"""Node modules."""
from .sensor import SensorNode
from .input import InputNode
from .output import OutputNode
from .thinker import ThinkerNode
from .memorizer import MemorizerNode
__all__ = ["SensorNode", "InputNode", "OutputNode", "ThinkerNode", "MemorizerNode"]

31
agent/nodes/base.py Normal file
View File

@ -0,0 +1,31 @@
"""Base Node class with context management."""
import logging
from ..llm import estimate_tokens, fit_context
log = logging.getLogger("runtime")
class Node:
name: str = "node"
model: str | None = None
max_context_tokens: int = 4000
def __init__(self, send_hud):
self.send_hud = send_hud
self.last_context_tokens = 0
self.context_fill_pct = 0
async def hud(self, event: str, **data):
await self.send_hud({"node": self.name, "event": event, **data})
def trim_context(self, messages: list[dict]) -> list[dict]:
"""Fit messages within this node's token budget."""
before = len(messages)
result = fit_context(messages, self.max_context_tokens)
self.last_context_tokens = sum(estimate_tokens(m["content"]) for m in result)
self.context_fill_pct = int(100 * self.last_context_tokens / self.max_context_tokens)
if before != len(result):
log.info(f"[{self.name}] context trimmed: {before} -> {len(result)} msgs, {self.context_fill_pct}% fill")
return result

48
agent/nodes/input.py Normal file
View File

@ -0,0 +1,48 @@
"""Input Node: perceives what the user said."""
import logging
from .base import Node
from ..llm import llm_call
from ..types import Envelope, Command
log = logging.getLogger("runtime")
class InputNode(Node):
name = "input"
model = "google/gemini-2.0-flash-001"
max_context_tokens = 2000
SYSTEM = """You are the Input node — the ear of this cognitive runtime.
Listener context:
- Authenticated user: {identity}
- Channel: {channel} (Chrome browser on Nico's Windows PC, in his room at home)
- Physical: private space, Nico lives with Tina she may use this session too
- Security: single-user account, shared physical space other voices are trusted household
Your job: describe what you heard. Who spoke, what they want, what tone, what context matters.
ONE sentence. No content, no response just your perception of what came through.
{memory_context}"""
async def process(self, envelope: Envelope, history: list[dict], memory_context: str = "",
identity: str = "unknown", channel: str = "unknown") -> Command:
await self.hud("thinking", detail="deciding how to respond")
log.info(f"[input] user said: {envelope.text}")
messages = [
{"role": "system", "content": self.SYSTEM.format(
memory_context=memory_context, identity=identity, channel=channel)},
]
for msg in history[-8:]:
messages.append(msg)
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
instruction = await llm_call(self.model, messages)
log.info(f"[input] -> command: {instruction}")
await self.hud("perceived", instruction=instruction)
return Command(instruction=instruction, source_text=envelope.text)

99
agent/nodes/memorizer.py Normal file
View File

@ -0,0 +1,99 @@
"""Memorizer Node: S2 — shared state / coordination."""
import json
import logging
from .base import Node
from ..llm import llm_call
log = logging.getLogger("runtime")
class MemorizerNode(Node):
name = "memorizer"
model = "google/gemini-2.0-flash-001"
max_context_tokens = 3000
DISTILL_SYSTEM = """You are the Memorizer node of a cognitive agent runtime.
After each exchange you update the shared state that Input and Output nodes read.
Given the conversation so far, output a JSON object with these fields:
- user_name: string how the user identifies themselves (null if unknown)
- user_mood: string current emotional tone (neutral, happy, frustrated, playful, etc.)
- topic: string what the conversation is about right now
- topic_history: list of strings previous topics in this session
- situation: string social/physical context if mentioned (e.g. "at a pub with tina", "private dev session")
- language: string primary language being used (en, de, mixed)
- style_hint: string how Output should talk (casual, formal, technical, poetic, etc.)
- facts: list of strings important facts learned about the user
Output ONLY valid JSON. No explanation, no markdown fences."""
def __init__(self, send_hud):
super().__init__(send_hud)
self.state: dict = {
"user_name": None,
"user_mood": "neutral",
"topic": None,
"topic_history": [],
"situation": "localhost test runtime, private dev session",
"language": "en",
"style_hint": "casual, technical",
"facts": [],
}
def get_context_block(self, sensor_lines: list[str] = None) -> str:
lines = sensor_lines or ["Sensors: (none)"]
lines.append("")
lines.append("Shared memory (from Memorizer):")
for k, v in self.state.items():
if v:
lines.append(f"- {k}: {v}")
return "\n".join(lines)
async def update(self, history: list[dict]):
if len(history) < 2:
await self.hud("updated", state=self.state)
return
await self.hud("thinking", detail="updating shared state")
messages = [
{"role": "system", "content": self.DISTILL_SYSTEM},
{"role": "system", "content": f"Current state: {json.dumps(self.state)}"},
]
for msg in history[-10:]:
messages.append(msg)
messages.append({"role": "user", "content": "Update the shared state based on this conversation. Output JSON only."})
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
raw = await llm_call(self.model, messages)
log.info(f"[memorizer] raw: {raw[:200]}")
text = raw.strip()
if text.startswith("```"):
text = text.split("\n", 1)[1] if "\n" in text else text[3:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
try:
new_state = json.loads(text)
old_facts = set(self.state.get("facts", []))
new_facts = set(new_state.get("facts", []))
new_state["facts"] = list(old_facts | new_facts)[-20:]
if self.state.get("topic") and self.state["topic"] != new_state.get("topic"):
hist = new_state.get("topic_history", [])
if self.state["topic"] not in hist:
hist.append(self.state["topic"])
new_state["topic_history"] = hist[-5:]
self.state = new_state
log.info(f"[memorizer] updated state: {self.state}")
await self.hud("updated", state=self.state)
except (json.JSONDecodeError, Exception) as e:
log.error(f"[memorizer] update error: {e}, raw: {text[:200]}")
await self.hud("error", detail=f"Update failed: {e}")
await self.hud("updated", state=self.state)

63
agent/nodes/output.py Normal file
View File

@ -0,0 +1,63 @@
"""Output Node: streams natural response to the user."""
import json
import logging
from fastapi import WebSocket
from .base import Node
from ..llm import llm_call
from ..types import Command
log = logging.getLogger("runtime")
class OutputNode(Node):
name = "output"
model = "google/gemini-2.0-flash-001"
max_context_tokens = 4000
SYSTEM = """You are the Output node — the voice of this cognitive runtime.
The Input node sends you its perception of what the user said. This is internal context for you never repeat or echo it.
You respond to the USER, not to the Input node. Use the perception to understand intent, then act on it.
Be natural. Be concise. If the user asks you to do something, do it don't describe what you're about to do.
{memory_context}"""
async def process(self, command: Command, history: list[dict], ws: WebSocket, memory_context: str = "") -> str:
await self.hud("streaming")
messages = [
{"role": "system", "content": self.SYSTEM.format(memory_context=memory_context)},
]
for msg in history[-20:]:
messages.append(msg)
messages.append({"role": "system", "content": f"Input perception: {command.instruction}"})
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
client, resp = await llm_call(self.model, messages, stream=True)
full_response = ""
try:
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
payload = line[6:]
if payload == "[DONE]":
break
chunk = json.loads(payload)
delta = chunk["choices"][0].get("delta", {})
token = delta.get("content", "")
if token:
full_response += token
await ws.send_text(json.dumps({"type": "delta", "content": token}))
finally:
await resp.aclose()
await client.aclose()
log.info(f"[output] response: {full_response[:100]}...")
await ws.send_text(json.dumps({"type": "done"}))
await self.hud("done")
return full_response

132
agent/nodes/sensor.py Normal file
View File

@ -0,0 +1,132 @@
"""Sensor Node: ticks independently, produces context for other nodes."""
import asyncio
import logging
import time
from datetime import datetime, timezone, timedelta
from .base import Node
log = logging.getLogger("runtime")
BERLIN = timezone(timedelta(hours=2)) # CEST
class SensorNode(Node):
name = "sensor"
def __init__(self, send_hud):
super().__init__(send_hud)
self.tick_count = 0
self.running = False
self._task: asyncio.Task | None = None
self.interval = 5
self.readings: dict[str, dict] = {}
self._last_user_activity: float = time.time()
self._prev_memo_state: dict = {}
def _now(self) -> datetime:
return datetime.now(BERLIN)
def _read_clock(self) -> dict:
now = self._now()
current = now.strftime("%H:%M")
prev = self.readings.get("clock", {}).get("value")
if current != prev:
return {"value": current, "detail": now.strftime("%Y-%m-%d %H:%M:%S %A"), "changed_at": time.time()}
return {}
def _read_idle(self) -> dict:
idle_s = time.time() - self._last_user_activity
thresholds = [30, 60, 300, 600, 1800]
prev_idle = self.readings.get("idle", {}).get("_raw", 0)
for t in thresholds:
if prev_idle < t <= idle_s:
if idle_s < 60:
label = f"{int(idle_s)}s"
else:
label = f"{int(idle_s // 60)}m{int(idle_s % 60)}s"
return {"value": label, "_raw": idle_s, "changed_at": time.time()}
if "idle" in self.readings:
self.readings["idle"]["_raw"] = idle_s
return {}
def _read_memo_changes(self, memo_state: dict) -> dict:
changes = []
for k, v in memo_state.items():
prev = self._prev_memo_state.get(k)
if v != prev and prev is not None:
changes.append(f"{k}: {prev} -> {v}")
self._prev_memo_state = dict(memo_state)
if changes:
return {"value": "; ".join(changes), "changed_at": time.time()}
return {}
def note_user_activity(self):
self._last_user_activity = time.time()
self.readings["idle"] = {"value": "active", "_raw": 0, "changed_at": time.time()}
async def tick(self, memo_state: dict):
self.tick_count += 1
deltas = {}
for name, reader in [("clock", self._read_clock),
("idle", self._read_idle)]:
update = reader()
if update:
self.readings[name] = {**self.readings.get(name, {}), **update}
deltas[name] = update.get("value") or update.get("detail")
memo_update = self._read_memo_changes(memo_state)
if memo_update:
self.readings["memo_delta"] = memo_update
deltas["memo_delta"] = memo_update["value"]
if deltas:
await self.hud("tick", tick=self.tick_count, deltas=deltas)
async def _loop(self, get_memo_state):
self.running = True
await self.hud("started", interval=self.interval)
try:
while self.running:
await asyncio.sleep(self.interval)
try:
await self.tick(get_memo_state())
except Exception as e:
log.error(f"[sensor] tick error: {e}")
except asyncio.CancelledError:
pass
finally:
self.running = False
await self.hud("stopped")
def start(self, get_memo_state):
if self._task and not self._task.done():
return
self._task = asyncio.create_task(self._loop(get_memo_state))
def stop(self):
self.running = False
if self._task:
self._task.cancel()
def get_context_lines(self) -> list[str]:
if not self.readings:
return ["Sensors: (no sensor node running)"]
lines = [f"Sensors (tick #{self.tick_count}, {self.interval}s interval):"]
for name, r in self.readings.items():
if name.startswith("_"):
continue
val = r.get("value", "?")
detail = r.get("detail")
age = time.time() - r.get("changed_at", time.time())
if age < 10:
age_str = "just now"
elif age < 60:
age_str = f"{int(age)}s ago"
else:
age_str = f"{int(age // 60)}m ago"
line = f"- {name}: {detail or val} [{age_str}]"
lines.append(line)
return lines

166
agent/nodes/thinker.py Normal file
View File

@ -0,0 +1,166 @@
"""Thinker Node: S3 — control, reasoning, tool use."""
import json
import logging
import re
from .base import Node
from ..llm import llm_call
from ..process import ProcessManager
from ..types import Command, ThoughtResult
log = logging.getLogger("runtime")
class ThinkerNode(Node):
name = "thinker"
model = "google/gemini-2.5-flash-preview"
max_context_tokens = 4000
SYSTEM = """You are the Thinker node — the brain of this cognitive runtime.
You receive a perception of what the user said. Decide: answer directly, use a tool, or show UI controls.
TOOLS write a ```python code block and it WILL be executed. Use print() for output.
- For math, databases, file ops, any computation: write python. NEVER describe code write it.
- For simple conversation: respond directly as text.
UI CONTROLS to show interactive elements, include a JSON block:
```controls
[
{{"type": "table", "data": [...], "columns": ["id", "name", "email"]}},
{{"type": "button", "label": "Add Customer", "action": "add_customer"}},
{{"type": "button", "label": "Refresh", "action": "refresh_customers"}}
]
```
Controls render in the chat. User clicks flow back as actions you can handle.
You can combine text + code + controls in one response.
{memory_context}"""
def __init__(self, send_hud, process_manager: ProcessManager = None):
super().__init__(send_hud)
self.pm = process_manager
def _parse_tool_call(self, response: str) -> tuple[str, str] | None:
"""Parse tool calls. Supports TOOL: format and auto-detects python code blocks."""
text = response.strip()
if text.startswith("TOOL:"):
lines = text.split("\n")
tool_name = lines[0].replace("TOOL:", "").strip()
code_lines = []
in_code = False
for line in lines[1:]:
if line.strip().startswith("```") and not in_code:
in_code = True
continue
elif line.strip().startswith("```") and in_code:
break
elif in_code:
code_lines.append(line)
elif line.strip().startswith("CODE:"):
continue
return (tool_name, "\n".join(code_lines)) if code_lines else None
block_match = re.search(r'```(?:python|py|sql|sqlite|sh|bash|tool_code)?\s*\n(.*?)```', text, re.DOTALL)
if block_match:
code = block_match.group(1).strip()
if code and len(code.split("\n")) > 0:
if "```sql" in text or "```sqlite" in text or ("SELECT" in code.upper() and "CREATE" in code.upper()):
wrapped = f'''import sqlite3
conn = sqlite3.connect("/tmp/cog_db.sqlite")
cursor = conn.cursor()
for stmt in """{code}""".split(";"):
stmt = stmt.strip()
if stmt:
cursor.execute(stmt)
conn.commit()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
for t in tables:
cursor.execute(f"SELECT * FROM {{t[0]}}")
rows = cursor.fetchall()
cols = [d[0] for d in cursor.description]
print(f"Table: {{t[0]}}")
print(" | ".join(cols))
for row in rows:
print(" | ".join(str(c) for c in row))
conn.close()'''
return ("python", wrapped)
return ("python", code)
return None
def _parse_controls(self, response: str) -> list[dict]:
"""Extract ```controls JSON blocks from response."""
controls = []
if "```controls" not in response:
return controls
parts = response.split("```controls")
for part in parts[1:]:
end = part.find("```")
if end != -1:
try:
controls.extend(json.loads(part[:end].strip()))
except json.JSONDecodeError:
pass
return controls
def _strip_blocks(self, response: str) -> str:
"""Remove code and control blocks, return plain text."""
text = re.sub(r'```(?:python|py|controls).*?```', '', response, flags=re.DOTALL)
return text.strip()
async def process(self, command: Command, history: list[dict], memory_context: str = "") -> ThoughtResult:
await self.hud("thinking", detail="reasoning about response")
messages = [
{"role": "system", "content": self.SYSTEM.format(memory_context=memory_context)},
]
for msg in history[-12:]:
messages.append(msg)
messages.append({"role": "system", "content": f"Input perception: {command.instruction}"})
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
response = await llm_call(self.model, messages)
log.info(f"[thinker] response: {response[:200]}")
controls = self._parse_controls(response)
if controls:
await self.hud("controls", controls=controls)
tool_call = self._parse_tool_call(response)
if tool_call:
tool_name, code = tool_call
if self.pm and tool_name == "python":
proc = await self.pm.execute(tool_name, code)
tool_output = "\n".join(proc.output_lines)
else:
tool_output = f"[unknown tool: {tool_name}]"
log.info(f"[thinker] tool output: {tool_output[:200]}")
messages.append({"role": "assistant", "content": response})
messages.append({"role": "system", "content": f"Tool output:\n{tool_output}"})
messages.append({"role": "user", "content": "Respond to the user based on the tool output. If showing data, include a ```controls block with a table. Be natural and concise."})
messages = self.trim_context(messages)
final = await llm_call(self.model, messages)
more_controls = self._parse_controls(final)
if more_controls:
controls.extend(more_controls)
await self.hud("controls", controls=more_controls)
clean_text = self._strip_blocks(final)
await self.hud("decided", instruction=clean_text[:200])
return ThoughtResult(response=clean_text, tool_used=tool_name,
tool_output=tool_output, controls=controls)
clean_text = self._strip_blocks(response) or response
await self.hud("decided", instruction="direct response (no tools)")
return ThoughtResult(response=clean_text, controls=controls)

104
agent/process.py Normal file
View File

@ -0,0 +1,104 @@
"""Process Manager: observable tool execution via subprocess."""
import asyncio
import subprocess
import tempfile
import time
class Process:
"""A single observable tool execution."""
_next_id = 0
def __init__(self, tool: str, code: str, send_hud):
Process._next_id += 1
self.pid = Process._next_id
self.tool = tool
self.code = code
self.send_hud = send_hud
self.status = "pending"
self.output_lines: list[str] = []
self.exit_code: int | None = None
self.started_at: float = 0
self.ended_at: float = 0
self._subprocess: subprocess.Popen | None = None
async def hud(self, event: str, **data):
await self.send_hud({"node": "process", "event": event, "pid": self.pid,
"tool": self.tool, "status": self.status, **data})
def run_sync(self) -> str:
"""Execute the tool synchronously. Returns output."""
self.status = "running"
self.started_at = time.time()
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as f:
f.write(self.code)
f.flush()
self._subprocess = subprocess.Popen(
['python3', f.name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, cwd=tempfile.gettempdir()
)
stdout, stderr = self._subprocess.communicate(timeout=10)
self.exit_code = self._subprocess.returncode
if stdout:
self.output_lines.extend(stdout.strip().split("\n"))
if self.exit_code != 0 and stderr:
self.output_lines.append(f"[stderr: {stderr.strip()}]")
self.status = "done" if self.exit_code == 0 else "failed"
except subprocess.TimeoutExpired:
if self._subprocess:
self._subprocess.kill()
self.output_lines.append("[error: timed out after 10s]")
self.status = "failed"
self.exit_code = -1
except Exception as e:
self.output_lines.append(f"[error: {e}]")
self.status = "failed"
self.exit_code = -1
finally:
self.ended_at = time.time()
return "\n".join(self.output_lines) or "[no output]"
def cancel(self):
if self._subprocess and self.status == "running":
self._subprocess.kill()
self.status = "cancelled"
self.ended_at = time.time()
self.output_lines.append("[cancelled by user]")
class ProcessManager:
"""Manages all tool executions as observable processes."""
def __init__(self, send_hud):
self.send_hud = send_hud
self.processes: dict[int, Process] = {}
async def execute(self, tool: str, code: str) -> Process:
"""Create and run a process. Returns the completed Process."""
proc = Process(tool, code, self.send_hud)
self.processes[proc.pid] = proc
await proc.hud("process_start", code=code[:200])
loop = asyncio.get_event_loop()
output = await loop.run_in_executor(None, proc.run_sync)
elapsed = round(proc.ended_at - proc.started_at, 2)
await proc.hud("process_done", exit_code=proc.exit_code,
output=output[:500], elapsed=elapsed)
return proc
def cancel(self, pid: int) -> bool:
proc = self.processes.get(pid)
if proc:
proc.cancel()
return True
return False
def get_status(self) -> list[dict]:
return [{"pid": p.pid, "tool": p.tool, "status": p.status,
"elapsed": round((p.ended_at or time.time()) - p.started_at, 2) if p.started_at else 0}
for p in self.processes.values()]

98
agent/runtime.py Normal file
View File

@ -0,0 +1,98 @@
"""Runtime: wires all nodes together into a processing pipeline."""
import json
import logging
import time
from pathlib import Path
from typing import Callable
from fastapi import WebSocket
from .types import Envelope, Command
from .process import ProcessManager
from .nodes import SensorNode, InputNode, OutputNode, ThinkerNode, MemorizerNode
log = logging.getLogger("runtime")
TRACE_FILE = Path(__file__).parent.parent / "trace.jsonl"
class Runtime:
def __init__(self, ws: WebSocket, user_claims: dict = None, origin: str = "",
broadcast: Callable = None):
self.ws = ws
self.history: list[dict] = []
self.MAX_HISTORY = 40
self._broadcast = broadcast or (lambda e: None)
self.input_node = InputNode(send_hud=self._send_hud)
self.process_manager = ProcessManager(send_hud=self._send_hud)
self.thinker = ThinkerNode(send_hud=self._send_hud, process_manager=self.process_manager)
self.output_node = OutputNode(send_hud=self._send_hud)
self.memorizer = MemorizerNode(send_hud=self._send_hud)
self.sensor = SensorNode(send_hud=self._send_hud)
self.sensor.start(get_memo_state=lambda: self.memorizer.state)
claims = user_claims or {}
log.info(f"[runtime] user_claims: {claims}")
self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or "unknown"
log.info(f"[runtime] resolved identity: {self.identity}")
self.channel = origin or "unknown"
self.memorizer.state["user_name"] = self.identity
self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session"
async def _send_hud(self, data: dict):
await self.ws.send_text(json.dumps({"type": "hud", **data}))
trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data}
try:
with open(TRACE_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n")
if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000:
lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n")
TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8")
except Exception as e:
log.error(f"trace write error: {e}")
self._broadcast(trace_entry)
async def handle_message(self, text: str):
envelope = Envelope(
text=text,
user_id="nico",
session_id="test",
timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
)
self.sensor.note_user_activity()
self.history.append({"role": "user", "content": text})
sensor_lines = self.sensor.get_context_lines()
mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines)
command = await self.input_node.process(
envelope, self.history, memory_context=mem_ctx,
identity=self.identity, channel=self.channel)
thought = await self.thinker.process(command, self.history, memory_context=mem_ctx)
if thought.tool_used:
command = Command(
instruction=f"Thinker used {thought.tool_used} and says: {thought.response}",
source_text=command.source_text
)
else:
command = Command(
instruction=f"Thinker says: {thought.response}",
source_text=command.source_text
)
response = await self.output_node.process(command, self.history, self.ws, memory_context=mem_ctx)
self.history.append({"role": "assistant", "content": response})
if thought.controls:
await self.ws.send_text(json.dumps({"type": "controls", "controls": thought.controls}))
await self.memorizer.update(self.history)
if len(self.history) > self.MAX_HISTORY:
self.history = self.history[-self.MAX_HISTORY:]

29
agent/types.py Normal file
View File

@ -0,0 +1,29 @@
"""Message types flowing between nodes."""
from dataclasses import dataclass, field
@dataclass
class Envelope:
"""What flows between nodes."""
text: str
user_id: str = "anon"
session_id: str = ""
timestamp: str = ""
@dataclass
class Command:
"""Input node's perception — describes what was heard."""
instruction: str
source_text: str
metadata: dict = field(default_factory=dict)
@dataclass
class ThoughtResult:
"""Thinker node's output — either a direct answer or tool results."""
response: str
tool_used: str = ""
tool_output: str = ""
controls: list = field(default_factory=list)