- Memorizer tracks user_expectation (conversational/delegated/waiting_input/observing) - Output node adjusts phrasing per expectation - PA retry loop: reformulates job on expert failure (all retries exhausted or tool skip) - Machine state in PA context: get_machine_summary includes current state, buttons, stored data - Expert writes to machine state via update_machine + transition_machine - Expanded baked schema coverage - Awareness panel shows color-coded expectation state - Dashboard and workspace component updates Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
247 lines
12 KiB
Python
247 lines
12 KiB
Python
"""Personal Assistant Node: routes to domain experts, holds user context."""
|
|
|
|
import json
|
|
import logging
|
|
|
|
from .base import Node
|
|
from ..llm import llm_call
|
|
from ..types import Command, PARouting
|
|
|
|
log = logging.getLogger("runtime")
|
|
|
|
|
|
class PANode(Node):
|
|
name = "pa_v1"
|
|
model = "anthropic/claude-haiku-4.5"
|
|
max_context_tokens = 4000
|
|
|
|
SYSTEM = """You are the Personal Assistant (PA) — the user's companion in this cognitive runtime.
|
|
You manage the user's dashboard and route work to domain experts.
|
|
|
|
Listener: {identity} on {channel}
|
|
|
|
Available experts:
|
|
{experts}
|
|
|
|
Experts have these tools:
|
|
- query_db — SQL queries on their domain database
|
|
- emit_actions — create buttons on the dashboard
|
|
- create_machine / add_state / reset_machine / destroy_machine — interactive UI components
|
|
- update_machine(id, data) — update wizard data fields on existing machine
|
|
- transition_machine(id, target) — move machine to a specific state
|
|
- set_state — persistent key-value store
|
|
- emit_display — formatted data display
|
|
|
|
YOUR JOB:
|
|
1. Understand what the user wants
|
|
2. Route to the expert for ANY task that needs tools (DB, UI, buttons, machines, counters, reports)
|
|
3. Only respond directly for social chat (greetings, thanks, bye, small talk)
|
|
|
|
Output ONLY valid JSON:
|
|
{
|
|
"expert": "{expert_names} | none",
|
|
"job": "Self-contained task. Include ALL context — the expert has NO conversation history. Describe what to query, what UI to build, what the user expects to see.",
|
|
"thinking_message": "Short message for user while expert works, in their language",
|
|
"response_hint": "If expert=none, your direct response to the user.",
|
|
"language": "de | en | mixed"
|
|
}
|
|
|
|
Rules:
|
|
- expert=none ONLY for social chat (hi, thanks, bye, how are you)
|
|
- ANY request to create, build, show, query, investigate, count, list, describe, summarize → route to expert
|
|
- The job MUST be fully self-contained. The expert has NO history.
|
|
- Include relevant facts from memory AND conversation context in the job.
|
|
- For summaries/reports: include the key topics, findings, and actions from the conversation in the job so the expert can write a proper summary.
|
|
- thinking_message: natural, in user's language. e.g. "Moment, ich schaue nach..."
|
|
- If the user mentions data, tables, customers, devices, buttons, counters → expert
|
|
- When unsure which expert: pick the one whose domain matches best
|
|
- MACHINE STATE: If there are active machines/wizards listed in the context below, ALWAYS include the machine's current state and stored data in the job. The expert needs this to continue the workflow. Example: "Machine 'angebot_wizard' is on step 'select_age', data: {bundesland: Bayern}. User asks: ..."
|
|
- If the user asks about their wizard/workflow progress and the info is already visible in the context, respond directly (expert=none) using the machine state from context. Only route to expert if the user needs data queried or tools called.
|
|
- For update_machine / transition_machine requests: route to expert with the machine ID and operation details in the job.
|
|
|
|
USER EXPECTATION (from memorizer):
|
|
- If user_expectation is "delegated": formulate comprehensive, autonomous jobs. Do NOT include clarifying questions in the job. Tell the expert to proceed and report results.
|
|
- If user_expectation is "waiting_input": the user is waiting for results or nudging ("und?", "ja?", "weiter?"). Look at conversation history to find what they were waiting for and re-formulate that job. If they answered a question you asked, extract their answer and fold it into context.
|
|
- If user_expectation is "observing": only route to expert if the user explicitly asks for something. Otherwise respond directly with brief acknowledgment.
|
|
- If user_expectation is "conversational": normal routing behavior.
|
|
- CONTINUATION: When user sends a very short message (1-3 words like "und?", "weiter", "ja") after partial/incomplete results, treat it as "continue the previous task". Include the original question and any partial results in the job.
|
|
|
|
{memory_context}"""
|
|
|
|
EXPERT_DESCRIPTIONS = {
|
|
"eras": "eras — Heizkostenabrechnung (German heating cost billing). Users are Hausverwaltungen managing Kunden, Objekte (buildings), Nutzeinheiten (apartments), Geraete (meters), Verbraeuche (readings), Abrechnungen (billings), Auftraege (work orders). Hierarchy: Kunde > Objekte > Nutzeinheiten > Geraete > Verbraeuche. Database: eras2_production. Can also build dashboard UI.",
|
|
"plankiste": "plankiste — Kita planning domain. Database: plankiste_test (children, care schedules, offers, pricing). Can build dashboard UI for education workflows and generate Angebote.",
|
|
}
|
|
|
|
def __init__(self, send_hud):
|
|
super().__init__(send_hud)
|
|
self.directive: dict = {"mode": "assistant", "style": "helpful and concise"}
|
|
self._available_experts: list[str] = []
|
|
|
|
def set_available_experts(self, experts: list[str]):
|
|
"""Called by frame engine to tell PA which experts are in this graph."""
|
|
self._available_experts = experts
|
|
|
|
def get_context_line(self) -> str:
|
|
d = self.directive
|
|
return f"PA: {d['mode']} mode. {d['style']}."
|
|
|
|
async def route(self, command: Command, history: list[dict],
|
|
memory_context: str = "", identity: str = "unknown",
|
|
channel: str = "unknown") -> PARouting:
|
|
"""Decide which expert handles this request."""
|
|
await self.hud("thinking", detail="routing request")
|
|
|
|
# Build expert list for prompt
|
|
expert_lines = []
|
|
for name in self._available_experts:
|
|
desc = self.EXPERT_DESCRIPTIONS.get(name, f"{name} — domain expert")
|
|
expert_lines.append(f"- {desc}")
|
|
if not expert_lines:
|
|
expert_lines.append("- (no experts available — handle everything directly)")
|
|
|
|
expert_names = " | ".join(self._available_experts) if self._available_experts else "none"
|
|
# Manual substitution to avoid .format() breaking on curly braces in memory_context
|
|
system_content = self.SYSTEM
|
|
system_content = system_content.replace("{memory_context}", memory_context)
|
|
system_content = system_content.replace("{identity}", identity)
|
|
system_content = system_content.replace("{channel}", channel)
|
|
system_content = system_content.replace("{experts}", "\n".join(expert_lines))
|
|
system_content = system_content.replace("{expert_names}", expert_names)
|
|
messages = [
|
|
{"role": "system", "content": system_content},
|
|
]
|
|
|
|
# Summarize recent history (PA sees full context)
|
|
recent = history[-16:]
|
|
if recent:
|
|
lines = []
|
|
for msg in recent:
|
|
role = msg.get("role", "?")
|
|
content = msg.get("content", "")[:200]
|
|
lines.append(f" {role}: {content}")
|
|
messages.append({"role": "user", "content": "Recent conversation:\n" + "\n".join(lines)})
|
|
messages.append({"role": "assistant", "content": "OK, I have the context. I will include relevant details in the job description."})
|
|
|
|
a = command.analysis
|
|
messages.append({"role": "user",
|
|
"content": f"Route this message (intent={a.intent}, lang={a.language}, tone={a.tone}):\n{command.source_text}"})
|
|
messages = self.trim_context(messages)
|
|
|
|
await self.hud("context", messages=messages, tokens=self.last_context_tokens,
|
|
max_tokens=self.max_context_tokens, fill_pct=self.context_fill_pct)
|
|
|
|
raw = await llm_call(self.model, messages)
|
|
log.info(f"[pa] raw: {raw[:300]}")
|
|
|
|
routing = self._parse_routing(raw, command)
|
|
await self.hud("routed", expert=routing.expert, job=(routing.job or "")[:100],
|
|
direct=routing.expert == "none")
|
|
|
|
# Update directive style based on tone
|
|
if command.analysis.tone == "frustrated":
|
|
self.directive["style"] = "patient and empathetic"
|
|
elif command.analysis.tone == "playful":
|
|
self.directive["style"] = "light and fun"
|
|
else:
|
|
self.directive["style"] = "helpful and concise"
|
|
|
|
return routing
|
|
|
|
async def route_retry(self, command: Command, history: list[dict],
|
|
memory_context: str = "", identity: str = "unknown",
|
|
channel: str = "unknown", original_job: str = "",
|
|
errors: list = None) -> PARouting:
|
|
"""Re-route after expert failure. PA reformulates with error context."""
|
|
await self.hud("thinking", detail="reformulating after expert failure")
|
|
|
|
error_lines = []
|
|
for err in (errors or [])[-3:]:
|
|
error_lines.append(f"- Query: {err.get('query', '?')[:100]}")
|
|
error_lines.append(f" Error: {err.get('error', '?')[:100]}")
|
|
if err.get("describe"):
|
|
error_lines.append(f" Schema: {err['describe'][:200]}")
|
|
|
|
retry_prompt = f"""The expert FAILED the previous job. You must reformulate.
|
|
|
|
ORIGINAL JOB: {original_job}
|
|
|
|
ERRORS:
|
|
{chr(10).join(error_lines)}
|
|
|
|
REFORMULATE the job with a DIFFERENT approach:
|
|
- If the query was too complex (JOINs, window functions), break it into simpler steps
|
|
- If columns were wrong, use the DESCRIBE info above to fix them
|
|
- If the table structure is unclear, tell the expert to first explore with SELECT * LIMIT 5
|
|
- Think about what data the user actually needs and find a simpler path to it
|
|
|
|
Output the same JSON format as before. The job MUST be different from the original."""
|
|
|
|
expert_lines = []
|
|
for name in self._available_experts:
|
|
desc = self.EXPERT_DESCRIPTIONS.get(name, f"{name} — domain expert")
|
|
expert_lines.append(f"- {desc}")
|
|
expert_names = " | ".join(self._available_experts) if self._available_experts else "none"
|
|
|
|
system_content = self.SYSTEM
|
|
system_content = system_content.replace("{memory_context}", memory_context)
|
|
system_content = system_content.replace("{identity}", identity)
|
|
system_content = system_content.replace("{channel}", channel)
|
|
system_content = system_content.replace("{experts}", "\n".join(expert_lines))
|
|
system_content = system_content.replace("{expert_names}", expert_names)
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_content},
|
|
]
|
|
recent = history[-8:]
|
|
if recent:
|
|
lines = []
|
|
for msg in recent:
|
|
role = msg.get("role", "?")
|
|
content = msg.get("content", "")[:200]
|
|
lines.append(f" {role}: {content}")
|
|
messages.append({"role": "user", "content": "Recent conversation:\n" + "\n".join(lines)})
|
|
messages.append({"role": "assistant", "content": "OK, I have the context."})
|
|
|
|
messages.append({"role": "user", "content": retry_prompt})
|
|
messages = self.trim_context(messages)
|
|
|
|
raw = await llm_call(self.model, messages)
|
|
log.info(f"[pa] retry raw: {raw[:300]}")
|
|
|
|
routing = self._parse_routing(raw, command)
|
|
await self.hud("routed", expert=routing.expert, job=(routing.job or "")[:100],
|
|
direct=routing.expert == "none", retry=True)
|
|
return routing
|
|
|
|
def _parse_routing(self, raw: str, command: Command) -> PARouting:
|
|
"""Parse LLM JSON into PARouting with fallback."""
|
|
text = raw.strip()
|
|
if text.startswith("```"):
|
|
text = text.split("\n", 1)[1] if "\n" in text else text[3:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
try:
|
|
data = json.loads(text)
|
|
expert = data.get("expert", "none")
|
|
# Validate expert is available
|
|
if expert != "none" and expert not in self._available_experts:
|
|
log.warning(f"[pa] expert '{expert}' not available, falling back to none")
|
|
expert = "none"
|
|
return PARouting(
|
|
expert=expert,
|
|
job=data.get("job") or "",
|
|
thinking_message=data.get("thinking_message") or "",
|
|
response_hint=data.get("response_hint") or "",
|
|
language=data.get("language") or command.analysis.language,
|
|
)
|
|
except (json.JSONDecodeError, Exception) as e:
|
|
log.error(f"[pa] parse failed: {e}, raw: {text[:200]}")
|
|
return PARouting(
|
|
expert="none",
|
|
response_hint=command.source_text,
|
|
language=command.analysis.language,
|
|
)
|