Nico 925fff731f v0.17.0: User expectation tracking, PA retry loop, machine state in PA context
- Memorizer tracks user_expectation (conversational/delegated/waiting_input/observing)
- Output node adjusts phrasing per expectation
- PA retry loop: reformulates job on expert failure (all retries exhausted or tool skip)
- Machine state in PA context: get_machine_summary includes current state, buttons, stored data
- Expert writes to machine state via update_machine + transition_machine
- Expanded baked schema coverage
- Awareness panel shows color-coded expectation state
- Dashboard and workspace component updates

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-30 19:03:07 +02:00

131 lines
5.3 KiB
Python

"""Input Node: structured analyst — classifies user input."""
import json
import logging
from .base import Node
from ..llm import llm_call
from ..types import Envelope, Command, InputAnalysis
log = logging.getLogger("runtime")
class InputNode(Node):
name = "input"
model = "google/gemini-2.0-flash-001"
max_context_tokens = 2000
SYSTEM = """You are the Input node — classify ONLY the current message.
Listener: {identity} on {channel}
Return ONLY valid JSON. No markdown, no explanation.
Schema:
{
"who": "name or unknown",
"language": "en | de | mixed",
"intent": "question | request | social | action | feedback",
"topic": "short topic string",
"tone": "casual | frustrated | playful | urgent",
"complexity": "trivial | simple | complex",
"context": "brief note or empty"
}
Rules:
- Classify the CURRENT message only. Previous messages are context, not the target.
- language: detect from the CURRENT message text, not the conversation language.
"Wie spaet ist es?" = de. "hello" = en. "Hallo, how are you" = mixed.
- intent: what does THIS message ask for?
social = greetings, thanks, goodbye, ok, bye, cool
question = asking for info (what, how, when, why, wieviel, was, wie)
request = asking to create/build/do something
action = clicking a button or UI trigger
feedback = commenting on results, correcting, satisfaction/dissatisfaction
- complexity: how much reasoning does THIS message need?
trivial = one-word social (hi, ok, thanks, bye)
simple = clear single-step
complex = multi-step, ambiguous, deep reasoning
- tone: emotional register of THIS message
frustrated = complaints, anger, "broken", "nothing works", "sick of"
urgent = time pressure, critical
playful = jokes, teasing
casual = neutral
Examples:
"hi there!" -> {"language":"en","intent":"social","tone":"casual","complexity":"trivial"}
"Wie spaet ist es?" -> {"language":"de","intent":"question","tone":"casual","complexity":"simple"}
"this is broken, nothing works" -> {"language":"en","intent":"feedback","tone":"frustrated","complexity":"simple"}
"create two buttons" -> {"language":"en","intent":"request","tone":"casual","complexity":"simple"}
"ok thanks bye" -> {"language":"en","intent":"social","tone":"casual","complexity":"trivial"}
{memory_context}"""
async def process(self, envelope: Envelope, history: list[dict], memory_context: str = "",
identity: str = "unknown", channel: str = "unknown") -> Command:
await self.hud("thinking", detail="analyzing input")
log.info(f"[input] user said: {envelope.text}")
# Build context summary from recent history (not raw chat messages)
history_summary = ""
recent = history[-8:]
if recent:
lines = []
for msg in recent:
role = msg.get("role", "?")
content = msg.get("content", "")[:80]
lines.append(f" {role}: {content}")
history_summary = "Recent conversation:\n" + "\n".join(lines)
messages = [
{"role": "system", "content": self.SYSTEM.replace(
"{memory_context}", memory_context).replace(
"{identity}", identity).replace("{channel}", channel)},
]
if history_summary:
messages.append({"role": "user", "content": history_summary})
messages.append({"role": "assistant", "content": "OK, I have the context. Send the message to classify."})
messages.append({"role": "user", "content": f"Classify: {envelope.text}"})
messages = self.trim_context(messages)
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
raw = await llm_call(self.model, messages)
log.info(f"[input] raw: {raw[:300]}")
analysis = self._parse_analysis(raw, identity)
log.info(f"[input] analysis: {analysis}")
await self.hud("perceived", analysis=self._to_dict(analysis))
return Command(analysis=analysis, source_text=envelope.text)
def _parse_analysis(self, raw: str, identity: str = "unknown") -> InputAnalysis:
"""Parse LLM JSON response into InputAnalysis, with fallback defaults."""
text = raw.strip()
# Strip markdown fences if present
if text.startswith("```"):
text = text.split("\n", 1)[1] if "\n" in text else text[3:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
try:
data = json.loads(text)
return InputAnalysis(
who=data.get("who", identity) or identity,
language=data.get("language", "en"),
intent=data.get("intent", "request"),
topic=data.get("topic", ""),
tone=data.get("tone", "casual"),
complexity=data.get("complexity", "simple"),
context=data.get("context", ""),
)
except (json.JSONDecodeError, Exception) as e:
log.error(f"[input] JSON parse failed: {e}, raw: {text[:200]}")
# Fallback: best-effort from raw text
return InputAnalysis(who=identity, topic=text[:50])
@staticmethod
def _to_dict(analysis: InputAnalysis) -> dict:
from dataclasses import asdict
return asdict(analysis)