File size: 5,641 Bytes
5bbe904
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""
Integration test for AgLLM models.

Tests each model by calling initialize_qa_chain() from app.py directly,
exercising the same code path as the Gradio UI.

Usage:
    python test_app_models.py                        # Test all models
    python test_app_models.py --model "GPT-4"        # Test a single model
    python test_app_models.py --model "Gemini-2.5 Pro"

Exit code 0 = all tested models passed, 1 = any failure.
Models with missing API keys are skipped (not failures).
"""

import os
import sys
import json
import time
import argparse

# Ensure working directory is project root (app.py uses relative paths)
os.chdir(os.path.dirname(os.path.abspath(__file__)))

# Load .env before anything else
from dotenv import load_dotenv
load_dotenv()

# Which API key each model needs
REQUIRED_KEYS = {
    "GPT-4": "OPENAI_API_KEY",
    "GPT-4.1 Mini": "OPENAI_API_KEY",
    "Llama-4 Maverick": "OPENROUTER_API_KEY",
    "Llama-4 Scout": "OPENROUTER_API_KEY",
    "Gemini-2.5 Pro": "OPENROUTER_API_KEY",
    "Claude Opus 4.6": "ANTHROPIC_API_KEY",
    "Claude Sonnet 4.6": "ANTHROPIC_API_KEY",
}

TEST_QUESTION = "What is the common name of {species}? Answer briefly."


def get_active_models(app_module):
    """Extract model labels from initialize_qa_chain's if/elif branches."""
    import inspect
    source = inspect.getsource(app_module.initialize_qa_chain)
    models = []
    for line in source.splitlines():
        line = line.strip()
        if line.startswith('if model_name==') or line.startswith('elif model_name=='):
            label = line.split('==')[1].strip().rstrip(':').strip('"').strip("'")
            models.append(label)
    return models


def test_model(app_module, model_label, species):
    """Test a model through the RAG path."""
    qa_chain, availability_msg = app_module.initialize_qa_chain(
        specie_selector=species,
        application_mode="Researcher",
        model_name=model_label,
        region="Midwest USA",
    )
    question = TEST_QUESTION.format(species=species)
    result = qa_chain({"question": question, "chat_history": []})
    answer = result["answer"]
    if not answer or len(answer.strip()) == 0:
        raise ValueError("Empty response from RAG chain")
    return answer.strip()


def run_single_test(app_module, model_label, species):
    """Run a single test. Returns a result dict."""
    required_key = REQUIRED_KEYS.get(model_label)
    if required_key and not os.getenv(required_key):
        return {
            "model": model_label,
            "status": "SKIP",
            "reason": f"Missing {required_key}",
        }

    start = time.time()
    try:
        response = test_model(app_module, model_label, species)
        elapsed = time.time() - start
        return {
            "model": model_label,
            "status": "PASS",
            "response": response[:200],
            "latency_seconds": round(elapsed, 2),
        }
    except Exception as e:
        elapsed = time.time() - start
        return {
            "model": model_label,
            "status": "FAIL",
            "error": str(e)[:300],
            "latency_seconds": round(elapsed, 2),
        }


def print_result(r):
    status = r["status"]
    label = r["model"]
    if status == "PASS":
        preview = r["response"][:80].replace("\n", " ")
        print(f"  PASS  {label:25s}  ({r['latency_seconds']:.1f}s)  \"{preview}\"")
    elif status == "SKIP":
        print(f"  SKIP  {label:25s}  {r['reason']}")
    else:
        print(f"  FAIL  {label:25s}  ({r['latency_seconds']:.1f}s)  {r['error'][:100]}")


def main():
    parser = argparse.ArgumentParser(description="AgLLM integration test")
    parser.add_argument("--model", type=str, help="Test a single model by label")
    args = parser.parse_args()

    # Pre-flight check
    if not os.getenv("OPENAI_API_KEY"):
        print("FATAL: OPENAI_API_KEY is required (used for embeddings). Set it and retry.")
        sys.exit(1)

    print("\n  Loading app.py (vector DBs, embeddings, data files)...")
    import app as app_module
    print("  App loaded successfully.\n")

    # Use first species in the DB for testing
    species = app_module.species_list[0]

    # Determine which models to test
    if args.model:
        models_to_test = [args.model]
    else:
        models_to_test = get_active_models(app_module)

    print(f"{'='*75}")
    print(f"  AgLLM Integration Test")
    print(f"  Species: {species}")
    print(f"  Models:  {', '.join(models_to_test)}")
    print(f"{'='*75}\n")

    results = []
    tested = 0
    passed = 0
    skipped = 0
    failed = 0

    for model_label in models_to_test:
        r = run_single_test(app_module, model_label, species)
        results.append(r)
        print_result(r)

        if r["status"] == "PASS":
            tested += 1
            passed += 1
        elif r["status"] == "SKIP":
            skipped += 1
        else:
            tested += 1
            failed += 1

    print(f"\n{'='*75}")
    print(f"  Results: {passed}/{tested} passed, {failed} failed, {skipped} skipped")
    print(f"{'='*75}\n")

    # Write JSON results
    output = {
        "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
        "species_tested": species,
        "tested": tested,
        "passed": passed,
        "failed": failed,
        "skipped": skipped,
        "results": results,
    }
    output_file = "test_app_models_results.json"
    with open(output_file, "w") as f:
        json.dump(output, f, indent=2)
    print(f"  Results written to {output_file}\n")

    sys.exit(0 if failed == 0 and tested > 0 else 1)


if __name__ == "__main__":
    main()