Architecture: - director_v2: always-on brain, produces DirectorPlan with tool_sequence - thinker_v2: pure executor, runs tools from DirectorPlan - interpreter_v1: factual result summarizer, no hallucination - v2_director_drives graph: Input -> Director -> Thinker -> Output Infrastructure: - Split into 3 pods: cog-frontend (nginx), cog-runtime (FastAPI), cog-mcp (SSE proxy) - MCP survives runtime restarts (separate pod, proxies via HTTP) - Async send pipeline: /api/send/check -> /api/send -> /api/result with progress - Zero-downtime rolling updates (maxUnavailable: 0) - Dynamic graph visualization (fetched from API, not hardcoded) Tests: 22 new mocked unit tests (director_v2: 7, thinker_v2: 8, interpreter_v1: 7) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
189 lines
7.8 KiB
Python
189 lines
7.8 KiB
Python
"""Unit tests for DirectorNode v2 — always-on brain, drives thinker."""
|
|
|
|
import json
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
from harness import HudCapture, make_command, make_history, NodeTestRunner
|
|
|
|
|
|
# ---- helpers ----
|
|
|
|
def mock_llm_json(obj):
|
|
"""Return an AsyncMock that returns JSON string (no tools)."""
|
|
async def _call(model, messages, **kw):
|
|
if kw.get("tools"):
|
|
return json.dumps(obj), []
|
|
return json.dumps(obj)
|
|
return _call
|
|
|
|
|
|
def make_director():
|
|
from agent.nodes.director_v2 import DirectorV2Node
|
|
hud = HudCapture()
|
|
node = DirectorV2Node(send_hud=hud)
|
|
return node, hud
|
|
|
|
|
|
# ---- tests ----
|
|
|
|
async def test_returns_director_plan():
|
|
"""Director v2 should return a DirectorPlan, not just a style directive."""
|
|
from agent.types import DirectorPlan
|
|
node, hud = make_director()
|
|
cmd = make_command(intent="request", topic="database query",
|
|
text="how many customers are there?", complexity="complex")
|
|
mock_response = {
|
|
"goal": "count customers",
|
|
"steps": ["query_db('SELECT COUNT(*) FROM kunden')"],
|
|
"present_as": "summary",
|
|
"tool_sequence": [{"tool": "query_db", "args": {"query": "SELECT COUNT(*) FROM kunden", "database": "eras2_production"}}],
|
|
"reasoning": "simple count query",
|
|
"response_hint": "",
|
|
}
|
|
with patch("agent.nodes.director_v2.llm_call", side_effect=mock_llm_json(mock_response)):
|
|
plan = await node.decide(cmd, [], memory_context="")
|
|
assert isinstance(plan, DirectorPlan), f"got {type(plan)}"
|
|
assert plan.goal == "count customers"
|
|
assert len(plan.tool_sequence) == 1
|
|
assert plan.tool_sequence[0]["tool"] == "query_db"
|
|
|
|
|
|
async def test_direct_response_for_simple():
|
|
"""Simple questions should get response_hint, no tool_sequence."""
|
|
node, hud = make_director()
|
|
cmd = make_command(intent="question", topic="greeting", text="hey how are you?",
|
|
complexity="trivial")
|
|
mock_response = {
|
|
"goal": "respond to greeting",
|
|
"steps": [],
|
|
"present_as": "summary",
|
|
"tool_sequence": [],
|
|
"reasoning": "social greeting, no tools needed",
|
|
"response_hint": "Respond warmly to the greeting",
|
|
}
|
|
with patch("agent.nodes.director_v2.llm_call", side_effect=mock_llm_json(mock_response)):
|
|
plan = await node.decide(cmd, [], memory_context="")
|
|
assert plan.is_direct_response, "should be direct response"
|
|
assert not plan.has_tools, "should have no tools"
|
|
assert plan.response_hint
|
|
|
|
|
|
async def test_multi_step_plan():
|
|
"""Complex requests should produce multi-step tool_sequence."""
|
|
node, hud = make_director()
|
|
cmd = make_command(intent="request", topic="customer devices",
|
|
text="show customers with most devices", complexity="complex")
|
|
mock_response = {
|
|
"goal": "find customers with most devices",
|
|
"steps": [
|
|
"Step 1: query_db to count devices per customer",
|
|
"Step 2: present top 10 as table",
|
|
],
|
|
"present_as": "table",
|
|
"tool_sequence": [
|
|
{"tool": "query_db", "args": {"query": "SELECT k.name, COUNT(g.id) as cnt FROM kunden k JOIN geraete g ON g.kunden_id = k.id GROUP BY k.id ORDER BY cnt DESC LIMIT 10", "database": "eras2_production"}},
|
|
{"tool": "emit_display", "args": {"items": [{"type": "text", "label": "Top customers by device count"}]}},
|
|
],
|
|
"reasoning": "join kunden and geraete, aggregate, sort",
|
|
"response_hint": "",
|
|
}
|
|
with patch("agent.nodes.director_v2.llm_call", side_effect=mock_llm_json(mock_response)):
|
|
plan = await node.decide(cmd, [], memory_context="")
|
|
assert plan.has_tools
|
|
assert len(plan.tool_sequence) == 2
|
|
assert plan.present_as == "table"
|
|
|
|
|
|
async def test_emits_hud_events():
|
|
"""Director v2 should emit thinking + decided HUD events."""
|
|
node, hud = make_director()
|
|
cmd = make_command(intent="question", text="hello")
|
|
mock_response = {
|
|
"goal": "greet", "steps": [], "present_as": "summary",
|
|
"tool_sequence": [], "reasoning": "simple", "response_hint": "say hi",
|
|
}
|
|
with patch("agent.nodes.director_v2.llm_call", side_effect=mock_llm_json(mock_response)):
|
|
await node.decide(cmd, [], memory_context="")
|
|
assert hud.has("thinking"), f"missing thinking: {[e['event'] for e in hud.events]}"
|
|
assert hud.has("decided"), f"missing decided: {[e['event'] for e in hud.events]}"
|
|
|
|
|
|
async def test_still_updates_style_directive():
|
|
"""Director v2 should still maintain mode/style for Output node."""
|
|
node, hud = make_director()
|
|
cmd = make_command(intent="request", tone="frustrated",
|
|
text="nothing works", complexity="simple")
|
|
mock_response = {
|
|
"goal": "help debug",
|
|
"steps": [],
|
|
"present_as": "summary",
|
|
"tool_sequence": [],
|
|
"reasoning": "user frustrated, be patient",
|
|
"response_hint": "Acknowledge frustration, offer to help step by step",
|
|
"mode": "debugging",
|
|
"style": "patient and structured",
|
|
}
|
|
with patch("agent.nodes.director_v2.llm_call", side_effect=mock_llm_json(mock_response)):
|
|
plan = await node.decide(cmd, [], memory_context="")
|
|
assert node.directive["mode"] == "debugging"
|
|
assert "patient" in node.directive["style"].lower()
|
|
|
|
|
|
async def test_history_included_in_context():
|
|
"""Director should use conversation history for context."""
|
|
node, hud = make_director()
|
|
cmd = make_command(intent="request", text="now show the details")
|
|
history = make_history([
|
|
("user", "show me customers"),
|
|
("assistant", "Here are the top customers..."),
|
|
])
|
|
mock_response = {
|
|
"goal": "show details", "steps": [], "present_as": "summary",
|
|
"tool_sequence": [{"tool": "query_db", "args": {"query": "SELECT * FROM kunden LIMIT 5", "database": "eras2_production"}}],
|
|
"reasoning": "follow-up from customer list", "response_hint": "",
|
|
}
|
|
captured_messages = []
|
|
|
|
async def capture_llm(model, messages, **kw):
|
|
captured_messages.extend(messages)
|
|
if kw.get("tools"):
|
|
return json.dumps(mock_response), []
|
|
return json.dumps(mock_response)
|
|
|
|
with patch("agent.nodes.director_v2.llm_call", side_effect=capture_llm):
|
|
await node.decide(cmd, history, memory_context="")
|
|
# History messages should appear in the LLM context
|
|
contents = [m["content"] for m in captured_messages]
|
|
assert any("show me customers" in c for c in contents), "history not in context"
|
|
|
|
|
|
async def test_bad_json_returns_fallback():
|
|
"""If LLM returns garbage, Director should return a safe fallback plan."""
|
|
node, hud = make_director()
|
|
cmd = make_command(intent="question", text="hello")
|
|
|
|
async def bad_llm(model, messages, **kw):
|
|
if kw.get("tools"):
|
|
return "not json at all {{{", []
|
|
return "not json at all {{{"
|
|
|
|
with patch("agent.nodes.director_v2.llm_call", side_effect=bad_llm):
|
|
plan = await node.decide(cmd, [], memory_context="")
|
|
# Should not crash — should return a fallback
|
|
assert plan.is_direct_response, "fallback should be direct response"
|
|
assert plan.response_hint, "fallback should have response_hint"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
runner = NodeTestRunner()
|
|
print("\n=== DirectorNode v2 ===")
|
|
runner.test("returns DirectorPlan", test_returns_director_plan())
|
|
runner.test("direct response for simple", test_direct_response_for_simple())
|
|
runner.test("multi-step plan", test_multi_step_plan())
|
|
runner.test("emits HUD events", test_emits_hud_events())
|
|
runner.test("still updates style directive", test_still_updates_style_directive())
|
|
runner.test("history included in context", test_history_included_in_context())
|
|
runner.test("bad JSON returns fallback", test_bad_json_returns_fallback())
|
|
p, f = runner.summary()
|
|
print(f"\n {p} passed, {f} failed")
|