- Memorizer tracks user_expectation (conversational/delegated/waiting_input/observing) - Output node adjusts phrasing per expectation - PA retry loop: reformulates job on expert failure (all retries exhausted or tool skip) - Machine state in PA context: get_machine_summary includes current state, buttons, stored data - Expert writes to machine state via update_machine + transition_machine - Expanded baked schema coverage - Awareness panel shows color-coded expectation state - Dashboard and workspace component updates Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
126 lines
6.0 KiB
Python
126 lines
6.0 KiB
Python
"""Memorizer Node: S2 — shared state / coordination."""
|
|
|
|
import json
|
|
import logging
|
|
|
|
from .base import Node
|
|
from ..llm import llm_call
|
|
|
|
log = logging.getLogger("runtime")
|
|
|
|
|
|
class MemorizerNode(Node):
|
|
name = "memorizer"
|
|
model = "google/gemini-2.0-flash-001"
|
|
max_context_tokens = 3000
|
|
|
|
DISTILL_SYSTEM = """You are the Memorizer node of a cognitive agent runtime.
|
|
After each exchange you update the shared state that Input and Output nodes read.
|
|
|
|
Given the conversation so far, output a JSON object with these fields:
|
|
- user_name: string — how the user identifies themselves (null if unknown)
|
|
- user_mood: string — current emotional tone (neutral, happy, frustrated, playful, etc.)
|
|
- topic: string — what the conversation is about right now
|
|
- topic_history: list of strings — previous topics in this session
|
|
- situation: string — social/physical context if mentioned (e.g. "at a pub with alice", "private dev session")
|
|
- language: string — primary language being used (en, de, mixed)
|
|
- style_hint: string — how Output should talk (casual, formal, technical, poetic, etc.)
|
|
- facts: list of strings — important facts learned about the user. NEVER drop facts from the existing list unless they are proven wrong. Always include all existing facts plus any new ones.
|
|
- user_expectation: string — what the user expects the agent to do next. One of:
|
|
"conversational" — default. User is chatting, asking questions, browsing. Normal back-and-forth.
|
|
"delegated" — user gave an imperative task ("build X", "do Y", "create Z"). They expect autonomous progress, not clarifying questions.
|
|
"waiting_input" — agent asked a question or presented choices. User's next message is likely an answer.
|
|
"observing" — user returned after being idle, or is reviewing a large output. Brief responses, wait for explicit engagement.
|
|
Cues:
|
|
- Imperative verbs + task scope ("build", "create", "do", "find") → delegated
|
|
- Agent ended with "Moment..." / thinking message but user hasn't seen full results yet → delegated (task still in progress)
|
|
- Short follow-ups like "und?", "ja?", "weiter?", "and?", "so?", "result?", "ergebnis?" → waiting_input (user is waiting for the agent to deliver)
|
|
- Agent ended with a question ("Sollen wir...?", "Gibt es...?") → waiting_input
|
|
- User said "ok/thanks/bye/danke" after output → observing
|
|
- Everything else → conversational
|
|
IMPORTANT: If the agent just delivered partial results or said "Moment..." and the user sends a short nudge, that is ALWAYS waiting_input, never conversational.
|
|
|
|
Output ONLY valid JSON. No explanation, no markdown fences."""
|
|
|
|
def __init__(self, send_hud):
|
|
super().__init__(send_hud)
|
|
self.state: dict = {
|
|
"user_name": None,
|
|
"user_mood": "neutral",
|
|
"topic": None,
|
|
"topic_history": [],
|
|
"situation": "localhost test runtime, private dev session",
|
|
"language": "en",
|
|
"style_hint": "casual, technical",
|
|
"facts": [],
|
|
"user_expectation": "conversational",
|
|
}
|
|
|
|
def get_context_block(self, sensor_lines: list[str] = None, ui_state: dict = None) -> str:
|
|
lines = sensor_lines or ["Sensors: (none)"]
|
|
lines.append("")
|
|
lines.append("Shared memory (from Memorizer):")
|
|
for k, v in self.state.items():
|
|
if v:
|
|
lines.append(f"- {k}: {v}")
|
|
if ui_state:
|
|
lines.append("")
|
|
lines.append("UI state (visible to user in workspace):")
|
|
for k, v in ui_state.items():
|
|
lines.append(f"- {k} = {v}")
|
|
return "\n".join(lines)
|
|
|
|
async def update(self, history: list[dict]):
|
|
if len(history) < 2:
|
|
await self.hud("updated", state=self.state)
|
|
return
|
|
|
|
await self.hud("thinking", detail="updating shared state")
|
|
|
|
messages = [
|
|
{"role": "system", "content": self.DISTILL_SYSTEM},
|
|
{"role": "system", "content": f"Current state: {json.dumps(self.state)}"},
|
|
]
|
|
for msg in history[-10:]:
|
|
messages.append(msg)
|
|
messages.append({"role": "user", "content": "Update the shared state based on this conversation. Output JSON only."})
|
|
messages = self.trim_context(messages)
|
|
|
|
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
|
|
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
|
|
|
|
raw = await llm_call(self.model, messages)
|
|
log.info(f"[memorizer] raw: {raw[:200]}")
|
|
|
|
text = raw.strip()
|
|
if text.startswith("```"):
|
|
text = text.split("\n", 1)[1] if "\n" in text else text[3:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
try:
|
|
new_state = json.loads(text)
|
|
# Fact retention: preserve old facts, append new ones, cap at 30
|
|
old_facts = self.state.get("facts", [])
|
|
new_facts = new_state.get("facts", [])
|
|
# Start with old facts (preserves order), add genuinely new ones
|
|
merged = list(old_facts)
|
|
old_lower = {f.lower() for f in old_facts}
|
|
for f in new_facts:
|
|
if f.lower() not in old_lower:
|
|
merged.append(f)
|
|
new_state["facts"] = merged[-30:]
|
|
if self.state.get("topic") and self.state["topic"] != new_state.get("topic"):
|
|
hist = new_state.get("topic_history", [])
|
|
if self.state["topic"] not in hist:
|
|
hist.append(self.state["topic"])
|
|
new_state["topic_history"] = hist[-5:]
|
|
self.state = new_state
|
|
log.info(f"[memorizer] updated state: {self.state}")
|
|
await self.hud("updated", state=self.state)
|
|
except (json.JSONDecodeError, Exception) as e:
|
|
log.error(f"[memorizer] update error: {e}, raw: {text[:200]}")
|
|
await self.hud("error", detail=f"Update failed: {e}")
|
|
await self.hud("updated", state=self.state)
|