From ab661775efed4e1361c86a80d347ac31be713268 Mon Sep 17 00:00:00 2001 From: Nico Date: Sat, 28 Mar 2026 00:42:02 +0100 Subject: [PATCH] v0.5.4: sensor node, perceiver model, context budgets, API send - SensorNode: 5s tick loop with delta-only emissions (clock, idle, memo changes) - Input reframed as perceiver (describes what it heard, not commands) - Output reframed as voice (acts on perception, never echoes it) - Per-node token budgets: Input 2K, Output 4K, Memorizer 3K - fit_context() trims oldest messages to stay within budget - History sliding window: 40 messages max - Facts capped at 20, trace file rotates at 500KB - /api/send + /api/clear endpoints for programmatic testing - test_cog.py test suite - Listener context: physical/social/security awareness Co-Authored-By: Claude Opus 4.6 (1M context) --- agent.py | 286 +++++++++++++++++++++++++++++++++++++++++++--- static/index.html | 4 +- test_cog.py | 36 ++++++ 3 files changed, 306 insertions(+), 20 deletions(-) create mode 100644 test_cog.py diff --git a/agent.py b/agent.py index 15057d4..2c6ca34 100644 --- a/agent.py +++ b/agent.py @@ -155,22 +155,227 @@ class Command: # --- Base Node --- +def estimate_tokens(text: str) -> int: + """Rough token estimate: 1 token ≈ 4 chars.""" + return len(text) // 4 + + +def fit_context(messages: list[dict], max_tokens: int, protect_last: int = 4) -> list[dict]: + """Trim oldest messages (after system prompt) to fit token budget. + Always keeps: system prompt(s) at start + last `protect_last` messages.""" + if not messages: + return messages + + # Split into system prefix, middle (trimmable), and protected tail + system_msgs = [] + rest = [] + for m in messages: + if not rest and m["role"] == "system": + system_msgs.append(m) + else: + rest.append(m) + + protected = rest[-protect_last:] if len(rest) > protect_last else rest + middle = rest[:-protect_last] if len(rest) > protect_last else [] + + # Count fixed tokens (system + protected tail) + fixed_tokens = sum(estimate_tokens(m["content"]) for m in system_msgs + protected) + + if fixed_tokens >= max_tokens: + # Even fixed content exceeds budget — truncate protected messages + result = system_msgs + protected + total = sum(estimate_tokens(m["content"]) for m in result) + while total > max_tokens and len(result) > 2: + removed = result.pop(1) # remove oldest non-system + total -= estimate_tokens(removed["content"]) + return result + + # Fill remaining budget with middle messages (newest first) + remaining = max_tokens - fixed_tokens + kept_middle = [] + for m in reversed(middle): + t = estimate_tokens(m["content"]) + if remaining - t < 0: + break + kept_middle.insert(0, m) + remaining -= t + + return system_msgs + kept_middle + protected + + class Node: name: str = "node" model: str | None = None + max_context_tokens: int = 4000 # default budget per node def __init__(self, send_hud): self.send_hud = send_hud # async callable to emit hud events to frontend + self.last_context_tokens = 0 async def hud(self, event: str, **data): await self.send_hud({"node": self.name, "event": event, **data}) + def trim_context(self, messages: list[dict]) -> list[dict]: + """Fit messages within this node's token budget.""" + before = len(messages) + result = fit_context(messages, self.max_context_tokens) + self.last_context_tokens = sum(estimate_tokens(m["content"]) for m in result) + self.context_fill_pct = int(100 * self.last_context_tokens / self.max_context_tokens) + if before != len(result): + log.info(f"[{self.name}] context trimmed: {before} → {len(result)} msgs, {self.context_fill_pct}% fill") + return result + + +# --- Sensor Node (ticks independently, produces context for other nodes) --- + +from datetime import datetime, timezone, timedelta + +BERLIN = timezone(timedelta(hours=2)) # CEST + + +class SensorNode(Node): + name = "sensor" + + def __init__(self, send_hud): + super().__init__(send_hud) + self.tick_count = 0 + self.running = False + self._task: asyncio.Task | None = None + self.interval = 5 # seconds + # Current sensor readings — each is {value, changed_at, prev} + self.readings: dict[str, dict] = {} + self._last_user_activity: float = time.time() + # Snapshot of memorizer state for change detection + self._prev_memo_state: dict = {} + + def _now(self) -> datetime: + return datetime.now(BERLIN) + + def _read_clock(self) -> dict: + """Clock sensor — updates when minute changes.""" + now = self._now() + current = now.strftime("%H:%M") + prev = self.readings.get("clock", {}).get("value") + if current != prev: + return {"value": current, "detail": now.strftime("%Y-%m-%d %H:%M:%S %A"), "changed_at": time.time()} + return {} # no change + + def _read_idle(self) -> dict: + """Idle sensor — time since last user message.""" + idle_s = time.time() - self._last_user_activity + # Only update on threshold crossings: 30s, 1m, 5m, 10m, 30m + thresholds = [30, 60, 300, 600, 1800] + prev_idle = self.readings.get("idle", {}).get("_raw", 0) + for t in thresholds: + if prev_idle < t <= idle_s: + if idle_s < 60: + label = f"{int(idle_s)}s" + else: + label = f"{int(idle_s // 60)}m{int(idle_s % 60)}s" + return {"value": label, "_raw": idle_s, "changed_at": time.time()} + # Update raw but don't flag as changed + if "idle" in self.readings: + self.readings["idle"]["_raw"] = idle_s + return {} + + def _read_memo_changes(self, memo_state: dict) -> dict: + """Detect memorizer state changes.""" + changes = [] + for k, v in memo_state.items(): + prev = self._prev_memo_state.get(k) + if v != prev and prev is not None: + changes.append(f"{k}: {prev} -> {v}") + self._prev_memo_state = dict(memo_state) + if changes: + return {"value": "; ".join(changes), "changed_at": time.time()} + return {} + + def note_user_activity(self): + """Called when user sends a message.""" + self._last_user_activity = time.time() + # Reset idle sensor + self.readings["idle"] = {"value": "active", "_raw": 0, "changed_at": time.time()} + + async def tick(self, memo_state: dict): + """One tick — read all sensors, emit deltas.""" + self.tick_count += 1 + deltas = {} + + # Read each sensor + for name, reader in [("clock", self._read_clock), + ("idle", self._read_idle)]: + update = reader() + if update: + self.readings[name] = {**self.readings.get(name, {}), **update} + deltas[name] = update.get("value") or update.get("detail") + + # Memo changes + memo_update = self._read_memo_changes(memo_state) + if memo_update: + self.readings["memo_delta"] = memo_update + deltas["memo_delta"] = memo_update["value"] + + # Only emit HUD if something changed + if deltas: + await self.hud("tick", tick=self.tick_count, deltas=deltas) + + async def _loop(self, get_memo_state): + """Background tick loop.""" + self.running = True + await self.hud("started", interval=self.interval) + try: + while self.running: + await asyncio.sleep(self.interval) + try: + await self.tick(get_memo_state()) + except Exception as e: + log.error(f"[sensor] tick error: {e}") + except asyncio.CancelledError: + pass + finally: + self.running = False + await self.hud("stopped") + + def start(self, get_memo_state): + """Start the background tick loop.""" + if self._task and not self._task.done(): + return + self._task = asyncio.create_task(self._loop(get_memo_state)) + + def stop(self): + """Stop the tick loop.""" + self.running = False + if self._task: + self._task.cancel() + + def get_context_lines(self) -> list[str]: + """Render current sensor readings for injection into prompts.""" + if not self.readings: + return ["Sensors: (no sensor node running)"] + lines = [f"Sensors (tick #{self.tick_count}, {self.interval}s interval):"] + for name, r in self.readings.items(): + if name.startswith("_"): + continue + val = r.get("value", "?") + detail = r.get("detail") + age = time.time() - r.get("changed_at", time.time()) + if age < 10: + age_str = "just now" + elif age < 60: + age_str = f"{int(age)}s ago" + else: + age_str = f"{int(age // 60)}m ago" + line = f"- {name}: {detail or val} [{age_str}]" + lines.append(line) + return lines + # --- Input Node --- class InputNode(Node): name = "input" model = "google/gemini-2.0-flash-001" + max_context_tokens = 2000 # small budget — perception only SYSTEM = """You are the Input node — the ear of this cognitive runtime. @@ -180,8 +385,8 @@ Listener context: - Physical: private space, Nico lives with Tina — she may use this session too - Security: single-user account, shared physical space — other voices are trusted household -You hear what comes through this channel. Emit ONE instruction sentence telling Output how to respond. -No content, just the command. +Your job: describe what you heard. Who spoke, what they want, what tone, what context matters. +ONE sentence. No content, no response — just your perception of what came through. {memory_context}""" @@ -194,14 +399,14 @@ No content, just the command. {"role": "system", "content": self.SYSTEM.format( memory_context=memory_context, identity=identity, channel=channel)}, ] - # History already includes current user message — don't add it again for msg in history[-8:]: messages.append(msg) + messages = self.trim_context(messages) await self.hud("context", messages=messages) instruction = await llm_call(self.model, messages) log.info(f"[input] → command: {instruction}") - await self.hud("decided", instruction=instruction) + await self.hud("perceived", instruction=instruction) return Command(instruction=instruction, source_text=envelope.text) @@ -210,11 +415,12 @@ No content, just the command. class OutputNode(Node): name = "output" model = "google/gemini-2.0-flash-001" + max_context_tokens = 4000 # larger — needs history for continuity - SYSTEM = """You are the Output node of a cognitive agent runtime. -You receive a command from the Input node telling you HOW to respond, plus the user's original message. -Follow the command's tone and intent. Be natural, don't mention the command or the runtime architecture. -Be concise. + SYSTEM = """You are the Output node — the voice of this cognitive runtime. +The Input node sends you its perception of what the user said. This is internal context for you — never repeat or echo it. +You respond to the USER, not to the Input node. Use the perception to understand intent, then act on it. +Be natural. Be concise. If the user asks you to do something, do it — don't describe what you're about to do. {memory_context}""" @@ -224,11 +430,10 @@ Be concise. messages = [ {"role": "system", "content": self.SYSTEM.format(memory_context=memory_context)}, ] - # Conversation history for continuity (already includes current user message) for msg in history[-20:]: messages.append(msg) - # Inject command as system guidance after the user message - messages.append({"role": "system", "content": f"Input node command: {command.instruction}"}) + messages.append({"role": "system", "content": f"Input perception: {command.instruction}"}) + messages = self.trim_context(messages) await self.hud("context", messages=messages) @@ -263,6 +468,7 @@ Be concise. class MemorizerNode(Node): name = "memorizer" model = "google/gemini-2.0-flash-001" + max_context_tokens = 3000 # needs enough history to distill DISTILL_SYSTEM = """You are the Memorizer node of a cognitive agent runtime. After each exchange you update the shared state that Input and Output nodes read. @@ -293,9 +499,11 @@ Output ONLY valid JSON. No explanation, no markdown fences.""" "facts": [], } - def get_context_block(self) -> str: + def get_context_block(self, sensor_lines: list[str] = None) -> str: """Returns a formatted string for injection into Input/Output system prompts.""" - lines = ["Shared memory (from Memorizer):"] + lines = sensor_lines or ["Sensors: (none)"] + lines.append("") + lines.append("Shared memory (from Memorizer):") for k, v in self.state.items(): if v: lines.append(f"- {k}: {v}") @@ -313,10 +521,10 @@ Output ONLY valid JSON. No explanation, no markdown fences.""" {"role": "system", "content": self.DISTILL_SYSTEM}, {"role": "system", "content": f"Current state: {json.dumps(self.state)}"}, ] - # Last few exchanges for distillation for msg in history[-10:]: messages.append(msg) messages.append({"role": "user", "content": "Update the shared state based on this conversation. Output JSON only."}) + messages = self.trim_context(messages) await self.hud("context", messages=messages) @@ -336,7 +544,7 @@ Output ONLY valid JSON. No explanation, no markdown fences.""" # Merge: keep old facts, add new ones old_facts = set(self.state.get("facts", [])) new_facts = set(new_state.get("facts", [])) - new_state["facts"] = list(old_facts | new_facts) + new_state["facts"] = list(old_facts | new_facts)[-20:] # cap at 20 facts # Preserve topic history if self.state.get("topic") and self.state["topic"] != new_state.get("topic"): hist = new_state.get("topic_history", []) @@ -362,9 +570,13 @@ class Runtime: def __init__(self, ws: WebSocket, user_claims: dict = None, origin: str = ""): self.ws = ws self.history: list[dict] = [] + self.MAX_HISTORY = 40 # sliding window — oldest messages drop off self.input_node = InputNode(send_hud=self._send_hud) self.output_node = OutputNode(send_hud=self._send_hud) self.memorizer = MemorizerNode(send_hud=self._send_hud) + self.sensor = SensorNode(send_hud=self._send_hud) + # Start sensor tick loop + self.sensor.start(get_memo_state=lambda: self.memorizer.state) # Verified identity from auth — Input and Memorizer use this claims = user_claims or {} log.info(f"[runtime] user_claims: {claims}") @@ -383,6 +595,10 @@ class Runtime: try: with open(TRACE_FILE, "a", encoding="utf-8") as f: f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n") + # Rotate trace file at 1000 lines + if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000: + lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n") + TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8") except Exception as e: log.error(f"trace write error: {e}") _broadcast_sse(trace_entry) @@ -395,11 +611,15 @@ class Runtime: timestamp=time.strftime("%Y-%m-%d %H:%M:%S"), ) + # Note user activity for idle sensor + self.sensor.note_user_activity() + # Append user message to history FIRST — both nodes see it self.history.append({"role": "user", "content": text}) - # Get shared memory context for both nodes - mem_ctx = self.memorizer.get_context_block() + # Get shared memory + sensor context for both nodes + sensor_lines = self.sensor.get_context_lines() + mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines) # Input node decides (with memory context + identity + channel) command = await self.input_node.process( @@ -413,12 +633,16 @@ class Runtime: # Memorizer updates shared state after each exchange await self.memorizer.update(self.history) + # Sliding window — trim oldest messages, keep context in memorizer + if len(self.history) > self.MAX_HISTORY: + self.history = self.history[-self.MAX_HISTORY:] + # --- App --- STATIC_DIR = Path(__file__).parent / "static" -app = FastAPI(title="Cognitive Agent Runtime") +app = FastAPI(title="cog") # Keep a reference to the active runtime for API access _active_runtime: Runtime | None = None @@ -472,6 +696,7 @@ async def ws_endpoint(ws: WebSocket, token: str | None = Query(None), access_tok msg = json.loads(data) await runtime.handle_message(msg["text"]) except WebSocketDisconnect: + runtime.sensor.stop() if _active_runtime is runtime: _active_runtime = None @@ -538,6 +763,31 @@ async def poll(since: str = "", user=Depends(require_auth)): "last_messages": _active_runtime.history[-3:] if _active_runtime else [], } +@app.post("/api/send") +async def api_send(body: dict, user=Depends(require_auth)): + """Send a message as if the user typed it. Requires auth. Returns the response.""" + if not _active_runtime: + raise HTTPException(status_code=409, detail="No active session — someone must be connected via WS first") + text = body.get("text", "").strip() + if not text: + raise HTTPException(status_code=400, detail="Missing 'text' field") + await _active_runtime.handle_message(text) + return { + "status": "ok", + "response": _active_runtime.history[-1]["content"] if _active_runtime.history else "", + "memorizer": _active_runtime.memorizer.state, + } + + +@app.post("/api/clear") +async def api_clear(user=Depends(require_auth)): + """Clear conversation history.""" + if not _active_runtime: + raise HTTPException(status_code=409, detail="No active session") + _active_runtime.history.clear() + return {"status": "cleared"} + + @app.get("/api/state") async def get_state(user=Depends(require_auth)): """Current memorizer state + history length.""" diff --git a/static/index.html b/static/index.html index 63e0a42..42a776e 100644 --- a/static/index.html +++ b/static/index.html @@ -3,13 +3,13 @@ -Cognitive Agent Runtime +cog
-

Cognitive Agent Runtime

+

cog

disconnected
diff --git a/test_cog.py b/test_cog.py new file mode 100644 index 0000000..ce304df --- /dev/null +++ b/test_cog.py @@ -0,0 +1,36 @@ +"""Test script for cog runtime API. Run with: .venv/Scripts/python.exe test_cog.py""" +import httpx, sys, time + +API = "https://cog.loop42.de/api" +TOKEN = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g" +HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"} + +def send(text): + r = httpx.post(f"{API}/send", json={"text": text}, headers=HEADERS, timeout=30) + d = r.json() + return d.get("response", "").strip(), d.get("memorizer", {}) + +def clear(): + httpx.post(f"{API}/clear", headers=HEADERS, timeout=10) + +tests = [ + ("hello!", None), + ("hey tina hier!", None), + ("wir gehen gleich in den pub", None), + ("nico back - schreib mir ein haiku", None), + ("auf deutsch, mit unseren namen und deinem, dark future tech theme", None), + ("wie spaet ist es?", None), +] + +clear() +print("=== COG TEST RUN ===\n") + +for i, (msg, _) in enumerate(tests, 1): + print(f"--- {i}. USER: {msg}") + resp, memo = send(msg) + print(f" COG: {resp}") + print(f" MEMO: name={memo.get('user_name')} mood={memo.get('user_mood')} topic={memo.get('topic')}") + print() + time.sleep(0.5) + +print("=== DONE ===")