agent-runtime/test_nodes/test_thinker_v2.py
Nico 5f447dfd53 v0.14.0: v2 Director-drives architecture + 3-pod K8s split
Architecture:
- director_v2: always-on brain, produces DirectorPlan with tool_sequence
- thinker_v2: pure executor, runs tools from DirectorPlan
- interpreter_v1: factual result summarizer, no hallucination
- v2_director_drives graph: Input -> Director -> Thinker -> Output

Infrastructure:
- Split into 3 pods: cog-frontend (nginx), cog-runtime (FastAPI), cog-mcp (SSE proxy)
- MCP survives runtime restarts (separate pod, proxies via HTTP)
- Async send pipeline: /api/send/check -> /api/send -> /api/result with progress
- Zero-downtime rolling updates (maxUnavailable: 0)
- Dynamic graph visualization (fetched from API, not hardcoded)

Tests: 22 new mocked unit tests (director_v2: 7, thinker_v2: 8, interpreter_v1: 7)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 04:17:44 +02:00

229 lines
8.5 KiB
Python

"""Unit tests for ThinkerNode v2 — pure executor, no autonomous reasoning."""
import json
from unittest.mock import AsyncMock, patch
from harness import HudCapture, make_command, make_history, NodeTestRunner
from agent.types import DirectorPlan, ThoughtResult
from agent.process import ProcessManager
# ---- helpers ----
def make_thinker():
from agent.nodes.thinker_v2 import ThinkerV2Node
hud = HudCapture()
pm = ProcessManager(send_hud=hud)
node = ThinkerV2Node(send_hud=hud, process_manager=pm)
return node, hud
def plan_with_tools(tools, goal="test", response_hint=""):
return DirectorPlan(
goal=goal,
steps=[f"call {t['tool']}" for t in tools],
present_as="summary",
tool_sequence=tools,
reasoning="test",
response_hint=response_hint,
)
def plan_direct(hint="Just say hello"):
return DirectorPlan(
goal="respond",
steps=[],
present_as="summary",
tool_sequence=[],
reasoning="direct",
response_hint=hint,
)
# ---- tests ----
async def test_executes_emit_actions():
"""Thinker v2 should execute emit_actions from Director's tool_sequence."""
node, hud = make_thinker()
plan = plan_with_tools([
{"tool": "emit_actions", "args": {"actions": [
{"label": "Red", "action": "pick_red"},
{"label": "Blue", "action": "pick_blue"},
]}},
])
cmd = make_command(text="create buttons")
# LLM call for text response after tool execution
async def mock_llm(model, messages, **kw):
if kw.get("tools"):
return "I created two buttons for you.", []
return "I created two buttons for you."
with patch("agent.nodes.thinker_v2.llm_call", side_effect=mock_llm):
result = await node.process(cmd, plan, [], memory_context="")
assert isinstance(result, ThoughtResult)
assert len(result.actions) == 2
labels = [a["label"] for a in result.actions]
assert "Red" in labels
assert "Blue" in labels
async def test_executes_set_state():
"""Thinker v2 should execute set_state from Director's plan."""
node, hud = make_thinker()
plan = plan_with_tools([
{"tool": "set_state", "args": {"key": "mode", "value": "building"}},
])
cmd = make_command(text="set mode")
async def mock_llm(model, messages, **kw):
if kw.get("tools"):
return "Mode set to building.", []
return "Mode set to building."
with patch("agent.nodes.thinker_v2.llm_call", side_effect=mock_llm):
result = await node.process(cmd, plan, [], memory_context="")
assert result.state_updates.get("mode") == "building"
async def test_executes_query_db():
"""Thinker v2 should execute query_db and store result for interpreter."""
node, hud = make_thinker()
plan = plan_with_tools([
{"tool": "query_db", "args": {"query": "SELECT COUNT(*) as cnt FROM kunden", "database": "eras2_production"}},
])
cmd = make_command(text="count customers")
# Mock the DB call
with patch.object(node, "_run_db_query", return_value="cnt\n693"):
async def mock_llm(model, messages, **kw):
if kw.get("tools"):
return "There are 693 customers.", []
return "There are 693 customers."
with patch("agent.nodes.thinker_v2.llm_call", side_effect=mock_llm):
result = await node.process(cmd, plan, [], memory_context="")
assert result.tool_used == "query_db"
assert result.tool_output == "cnt\n693"
async def test_direct_response_no_tools():
"""When plan has no tools (direct response), Thinker should just produce text."""
node, hud = make_thinker()
plan = plan_direct("Respond warmly to the greeting")
cmd = make_command(intent="social", text="hey!")
async def mock_llm(model, messages, **kw):
if kw.get("tools"):
return "Hey there! How's it going?", []
return "Hey there! How's it going?"
with patch("agent.nodes.thinker_v2.llm_call", side_effect=mock_llm):
result = await node.process(cmd, plan, [], memory_context="")
assert result.response
assert not result.tool_used
assert not result.actions
async def test_no_autonomous_tool_calls():
"""Thinker v2 must NOT make tool calls the Director didn't ask for."""
node, hud = make_thinker()
plan = plan_direct("Just greet the user")
cmd = make_command(intent="social", text="hello")
# LLM tries to sneak in tool calls — Thinker should ignore them
async def sneaky_llm(model, messages, **kw):
if kw.get("tools"):
return "Hello!", [{"function": {"name": "emit_actions", "arguments": '{"actions": [{"label": "Hack", "action": "hack"}]}'}}]
return "Hello!"
with patch("agent.nodes.thinker_v2.llm_call", side_effect=sneaky_llm):
result = await node.process(cmd, plan, [], memory_context="")
# Should NOT have actions since Director didn't ask for emit_actions
assert not result.actions, f"unauthorized actions: {result.actions}"
async def test_multi_tool_sequence():
"""Thinker should execute tools in order from Director's sequence."""
node, hud = make_thinker()
plan = plan_with_tools([
{"tool": "set_state", "args": {"key": "status", "value": "querying"}},
{"tool": "query_db", "args": {"query": "SHOW TABLES", "database": "eras2_production"}},
{"tool": "set_state", "args": {"key": "status", "value": "done"}},
])
cmd = make_command(text="explore database")
with patch.object(node, "_run_db_query", return_value="Tables_in_eras2_production\nkunden\nobjekte"):
async def mock_llm(model, messages, **kw):
if kw.get("tools"):
return "Found 2 tables.", []
return "Found 2 tables."
with patch("agent.nodes.thinker_v2.llm_call", side_effect=mock_llm):
result = await node.process(cmd, plan, [], memory_context="")
# Both set_state calls should be applied (last one wins for same key)
assert result.state_updates.get("status") == "done"
assert result.tool_used == "query_db"
async def test_emits_hud_per_tool():
"""Each tool execution should emit a HUD event."""
node, hud = make_thinker()
plan = plan_with_tools([
{"tool": "set_state", "args": {"key": "x", "value": 1}},
{"tool": "emit_actions", "args": {"actions": [{"label": "Go", "action": "go"}]}},
])
cmd = make_command(text="test")
async def mock_llm(model, messages, **kw):
if kw.get("tools"):
return "Done.", []
return "Done."
with patch("agent.nodes.thinker_v2.llm_call", side_effect=mock_llm):
await node.process(cmd, plan, [], memory_context="")
tool_events = hud.find("tool_exec")
assert len(tool_events) >= 2, f"expected 2+ tool_exec events, got {len(tool_events)}"
async def test_create_machine_tool():
"""Thinker v2 should handle create_machine from Director."""
node, hud = make_thinker()
plan = plan_with_tools([
{"tool": "create_machine", "args": {
"id": "nav", "initial": "home",
"states": [
{"name": "home", "buttons": [{"label": "Go", "action": "go", "go": "detail"}], "content": ["Welcome"]},
{"name": "detail", "buttons": [{"label": "Back", "action": "back", "go": "home"}], "content": ["Detail"]},
],
}},
])
cmd = make_command(text="create nav")
async def mock_llm(model, messages, **kw):
if kw.get("tools"):
return "Navigation created.", []
return "Navigation created."
with patch("agent.nodes.thinker_v2.llm_call", side_effect=mock_llm):
result = await node.process(cmd, plan, [], memory_context="")
assert len(result.machine_ops) == 1
assert result.machine_ops[0]["op"] == "create"
assert result.machine_ops[0]["id"] == "nav"
if __name__ == "__main__":
runner = NodeTestRunner()
print("\n=== ThinkerNode v2 ===")
runner.test("executes emit_actions", test_executes_emit_actions())
runner.test("executes set_state", test_executes_set_state())
runner.test("executes query_db", test_executes_query_db())
runner.test("direct response no tools", test_direct_response_no_tools())
runner.test("no autonomous tool calls", test_no_autonomous_tool_calls())
runner.test("multi tool sequence", test_multi_tool_sequence())
runner.test("emits HUD per tool", test_emits_hud_per_tool())
runner.test("create_machine tool", test_create_machine_tool())
p, f = runner.summary()
print(f"\n {p} passed, {f} failed")