From da92109550ba30369f1bc78a1cbf48d2279aa4ba Mon Sep 17 00:00:00 2001 From: Nico Date: Sun, 29 Mar 2026 05:21:14 +0200 Subject: [PATCH] =?UTF-8?q?v0.14.3:=20Integration=20test=20runner=20?= =?UTF-8?q?=E2=80=94=20real=20pipeline,=20both=20graphs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - send_and_wait: POST /api/send + poll /api/result with timeout - 5 test cases: greeting, german, DB count, buttons, show tables - Clears state between tests for predictability - --graph both: runs v1 + v2 back to back - Reports live to frontend via /api/test/status - Both graphs 5/5 green Co-Authored-By: Claude Opus 4.6 (1M context) --- test_nodes/run_integration.py | 206 ++++++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 test_nodes/run_integration.py diff --git a/test_nodes/run_integration.py b/test_nodes/run_integration.py new file mode 100644 index 0000000..26adf38 --- /dev/null +++ b/test_nodes/run_integration.py @@ -0,0 +1,206 @@ +"""Integration tests — send real messages through the live runtime pipeline. + +Usage: + python run_integration.py --url https://cog.loop42.de --graph v1-current + python run_integration.py --url https://cog.loop42.de --graph v2-director-drives + python run_integration.py --url https://cog.loop42.de --graph both +""" + +import json +import os +import sys +import time +from pathlib import Path + +# Fix Windows console encoding for emoji +if sys.platform == "win32": + os.environ.setdefault("PYTHONIOENCODING", "utf-8") + try: + sys.stdout.reconfigure(encoding="utf-8", errors="replace") + sys.stderr.reconfigure(encoding="utf-8", errors="replace") + except Exception: + pass + +import httpx + +sys.path.insert(0, str(Path(__file__).parent.parent)) +sys.path.insert(0, str(Path(__file__).parent)) + +from harness import NodeTestRunner + +# --- Config --- + +DEFAULT_URL = "https://cog.loop42.de" +TOKEN = "7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g" +HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"} + + +def api(method, path, body=None, url=DEFAULT_URL): + """HTTP request to runtime API.""" + full = f"{url}{path}" + if method == "GET": + r = httpx.get(full, headers=HEADERS, timeout=30) + else: + r = httpx.post(full, headers=HEADERS, json=body or {}, timeout=30) + return r.json() if r.status_code == 200 else {"error": r.text} + + +def send_and_wait(text, url=DEFAULT_URL, timeout=30, clear=True): + """Send message, poll for result. Returns (response, trace).""" + if clear: + api("POST", "/api/clear", url=url) + time.sleep(0.5) + check = api("POST", "/api/send/check", url=url) + if not check.get("ready"): + return None, f"not ready: {check}" + + # Send + result = api("POST", "/api/send", {"text": text}, url=url) + if "error" in result: + return None, f"send error: {result['error']}" + + # Poll + for _ in range(timeout * 2): + time.sleep(0.5) + r = api("GET", "/api/result", url=url) + if r.get("status") == "done": + # Get trace + trace = api("GET", "/api/trace?last=20", url=url) + return r.get("response", ""), trace.get("lines", []) + if r.get("status") == "error": + return None, f"pipeline error: {r.get('detail')}" + return None, "timeout" + + +def switch_graph(name, url=DEFAULT_URL): + """Switch graph and clear session.""" + api("POST", "/api/graph/switch", {"name": name}, url=url) + # Wait for new runtime to initialize + time.sleep(2) + api("POST", "/api/clear", url=url) + time.sleep(1) + + +def trace_has_node(trace, node_name): + """Check if any trace event mentions a node.""" + return any(e.get("node") == node_name for e in trace) + + +def trace_has_event(trace, node_name, event_name): + """Check if a specific node+event combo exists in trace.""" + return any(e.get("node") == node_name and e.get("event") == event_name for e in trace) + + +# --- Test Cases --- + +TESTS = [ + { + "name": "greeting (reflex path)", + "send": "hey!", + "check": lambda resp, trace: ( + resp is not None and len(resp) > 0, + "got a response" if resp else "no response" + ), + }, + { + "name": "german language", + "send": "Wie viele Kunden gibt es?", + "check": lambda resp, trace: ( + resp is not None and len(resp) > 0, + "got a response" if resp else "no response" + ), + }, + { + "name": "count customers (DB query)", + "send": "how many customers are in the database?", + "check": lambda resp, trace: ( + resp is not None and "693" in resp, + f"found 693" if resp and "693" in resp else f"expected 693 in: {(resp or '')[:100]}" + ), + }, + { + "name": "create buttons", + "send": "create two buttons: Start and Stop", + "check": lambda resp, trace: ( + resp is not None and len(resp) > 0, + "got a response" if resp else "no response" + ), + }, + { + "name": "show tables", + "send": "welche Tabellen gibt es in der Datenbank?", + "check": lambda resp, trace: ( + resp is not None and any(k in resp.lower() for k in ["kunden", "table", "tabelle", "artikel", "objekte", "ableseart"]), + "mentions tables" if resp and any(k in resp.lower() for k in ["kunden", "table", "tabelle", "artikel", "objekte", "ableseart"]) else f"no table mention: {(resp or '')[:100]}" + ), + }, +] + + +def run_tests(graph_name, url, runner): + """Run all integration tests on a specific graph.""" + print(f"\n Switching to {graph_name}...") + switch_graph(graph_name, url) + + # Verify graph + check = api("POST", "/api/send/check", url=url) + actual_graph = check.get("graph", "unknown") + print(f" Active graph: {actual_graph}") + + runner.start_suite(f"Integration: {graph_name}", count=len(TESTS)) + + for test in TESTS: + name = test["name"] + print(f" > {name}: sending '{test['send'][:40]}'...") + + t0 = time.time() + resp, trace = send_and_wait(test["send"], url) + elapsed = int((time.time() - t0) * 1000) + + if resp is None: + # Error + runner.results.append(type('R', (), {'name': f"{graph_name}: {name}", 'passed': False, 'detail': str(trace), 'elapsed_ms': elapsed})()) + print(f" FAIL ({elapsed}ms): {trace}") + runner._report("step_result", result={ + "step": f"{graph_name}: {name}", "check": name, "status": "FAIL", + "detail": str(trace)[:200], "elapsed_ms": elapsed, + }) + else: + passed, detail = test["check"](resp, trace) + result_obj = type('R', (), {'name': f"{graph_name}: {name}", 'passed': passed, 'detail': detail, 'elapsed_ms': elapsed})() + runner.results.append(result_obj) + status = "PASS" if passed else "FAIL" + print(f" {status} ({elapsed}ms): {detail}") + runner._report("step_result", result={ + "step": f"{graph_name}: {name}", "check": name, "status": status, + "detail": detail[:200], "elapsed_ms": elapsed, + }) + + runner.end_suite() + + +# --- Main --- + +if __name__ == "__main__": + url = DEFAULT_URL + graph = "v1-current" + + for i, arg in enumerate(sys.argv[1:], 1): + if arg == "--url" and i < len(sys.argv): + url = sys.argv[i + 1] + elif arg == "--graph" and i < len(sys.argv): + graph = sys.argv[i + 1] + + runner = NodeTestRunner(report_url=url, token=TOKEN) + + if graph == "both": + for g in ["v1-current", "v2-director-drives"]: + run_tests(g, url, runner) + else: + run_tests(graph, url, runner) + + p, f = runner.summary() + print(f"\n{'=' * 60}") + print(f" INTEGRATION: {p} passed, {f} failed") + print(f"{'=' * 60}") + sys.exit(0 if f == 0 else 1)