agent-runtime/agent/nodes/memorizer.py
Nico 7458b2ea35 v0.8.0: refactor agent.py into modular package
Split 1161-line monolith into agent/ package:
auth, llm, types, process, runtime, api, and
nodes/ (base, sensor, input, output, thinker, memorizer).
No logic changes — pure structural split.
uvicorn agent:app entrypoint unchanged.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 01:36:41 +01:00

100 lines
4.0 KiB
Python

"""Memorizer Node: S2 — shared state / coordination."""
import json
import logging
from .base import Node
from ..llm import llm_call
log = logging.getLogger("runtime")
class MemorizerNode(Node):
name = "memorizer"
model = "google/gemini-2.0-flash-001"
max_context_tokens = 3000
DISTILL_SYSTEM = """You are the Memorizer node of a cognitive agent runtime.
After each exchange you update the shared state that Input and Output nodes read.
Given the conversation so far, output a JSON object with these fields:
- user_name: string — how the user identifies themselves (null if unknown)
- user_mood: string — current emotional tone (neutral, happy, frustrated, playful, etc.)
- topic: string — what the conversation is about right now
- topic_history: list of strings — previous topics in this session
- situation: string — social/physical context if mentioned (e.g. "at a pub with tina", "private dev session")
- language: string — primary language being used (en, de, mixed)
- style_hint: string — how Output should talk (casual, formal, technical, poetic, etc.)
- facts: list of strings — important facts learned about the user
Output ONLY valid JSON. No explanation, no markdown fences."""
def __init__(self, send_hud):
super().__init__(send_hud)
self.state: dict = {
"user_name": None,
"user_mood": "neutral",
"topic": None,
"topic_history": [],
"situation": "localhost test runtime, private dev session",
"language": "en",
"style_hint": "casual, technical",
"facts": [],
}
def get_context_block(self, sensor_lines: list[str] = None) -> str:
lines = sensor_lines or ["Sensors: (none)"]
lines.append("")
lines.append("Shared memory (from Memorizer):")
for k, v in self.state.items():
if v:
lines.append(f"- {k}: {v}")
return "\n".join(lines)
async def update(self, history: list[dict]):
if len(history) < 2:
await self.hud("updated", state=self.state)
return
await self.hud("thinking", detail="updating shared state")
messages = [
{"role": "system", "content": self.DISTILL_SYSTEM},
{"role": "system", "content": f"Current state: {json.dumps(self.state)}"},
]
for msg in history[-10:]:
messages.append(msg)
messages.append({"role": "user", "content": "Update the shared state based on this conversation. Output JSON only."})
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
raw = await llm_call(self.model, messages)
log.info(f"[memorizer] raw: {raw[:200]}")
text = raw.strip()
if text.startswith("```"):
text = text.split("\n", 1)[1] if "\n" in text else text[3:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
try:
new_state = json.loads(text)
old_facts = set(self.state.get("facts", []))
new_facts = set(new_state.get("facts", []))
new_state["facts"] = list(old_facts | new_facts)[-20:]
if self.state.get("topic") and self.state["topic"] != new_state.get("topic"):
hist = new_state.get("topic_history", [])
if self.state["topic"] not in hist:
hist.append(self.state["topic"])
new_state["topic_history"] = hist[-5:]
self.state = new_state
log.info(f"[memorizer] updated state: {self.state}")
await self.hud("updated", state=self.state)
except (json.JSONDecodeError, Exception) as e:
log.error(f"[memorizer] update error: {e}, raw: {text[:200]}")
await self.hud("error", detail=f"Update failed: {e}")
await self.hud("updated", state=self.state)