import os import json import sqlite3 import requests import datetime import time import csv import io from flask import Flask, render_template, request, jsonify, g from werkzeug.utils import secure_filename from dotenv import load_dotenv # Load env load_dotenv() app = Flask(__name__, instance_relative_config=True) app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-eval-matrix') app.config['DATABASE'] = os.path.join(app.instance_path, 'eval_matrix.db') app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB Max Upload # Ensure instance folder exists try: os.makedirs(app.instance_path) except OSError: pass # SiliconFlow Config SILICONFLOW_API_KEY = os.getenv("SILICONFLOW_API_KEY", "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi") SILICONFLOW_BASE_URL = "https://api.siliconflow.cn/v1/chat/completions" # Using Qwen 2.5 7B Instruct as the default judge/worker DEFAULT_MODEL = "Qwen/Qwen2.5-7B-Instruct" # --- Database --- def get_db(): if 'db' not in g: g.db = sqlite3.connect(app.config['DATABASE']) g.db.row_factory = sqlite3.Row return g.db @app.teardown_appcontext def close_db(error): db = g.pop('db', None) if db is not None: db.close() def init_db(): db = get_db() # Test Sets (Collections of cases) db.execute(''' CREATE TABLE IF NOT EXISTS test_sets ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Test Cases (Individual prompts) db.execute(''' CREATE TABLE IF NOT EXISTS test_cases ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_set_id INTEGER NOT NULL, prompt TEXT NOT NULL, expected_output TEXT, criteria TEXT, -- e.g. "Must be polite", "No JSON errors" FOREIGN KEY (test_set_id) REFERENCES test_sets (id) ) ''') # Evaluation Runs (A batch execution) db.execute(''' CREATE TABLE IF NOT EXISTS eval_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_set_id INTEGER NOT NULL, model_name TEXT NOT NULL, status TEXT DEFAULT 'pending', -- pending, running, completed, failed avg_score REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (test_set_id) REFERENCES test_sets (id) ) ''') # Evaluation Results (Individual case results) db.execute(''' CREATE TABLE IF NOT EXISTS eval_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id INTEGER NOT NULL, case_id INTEGER NOT NULL, model_output TEXT, judge_score INTEGER, -- 1-10 judge_reasoning TEXT, latency_ms INTEGER, FOREIGN KEY (run_id) REFERENCES eval_runs (id), FOREIGN KEY (case_id) REFERENCES test_cases (id) ) ''') db.commit() # --- AI Integration --- def call_llm(model, prompt, system_prompt="You are a helpful assistant."): headers = { "Authorization": f"Bearer {SILICONFLOW_API_KEY}", "Content-Type": "application/json" } payload = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ], "temperature": 0.7 } start_time = time.time() try: response = requests.post(SILICONFLOW_BASE_URL, json=payload, headers=headers, timeout=60) response.raise_for_status() data = response.json() content = data['choices'][0]['message']['content'] latency = int((time.time() - start_time) * 1000) return content, latency except Exception as e: print(f"LLM Call Error: {e}") return f"Error: {str(e)}", 0 def judge_output(prompt, expected, criteria, output): """ Uses an LLM to judge the quality of the output based on criteria. Returns: score (1-10), reasoning """ judge_system = """You are an expert AI evaluator. Your task is to evaluate the quality of an AI model's response based on the User Prompt, Expected Output (optional), and specific Criteria. Return your evaluation in valid JSON format ONLY: { "score": , "reasoning": "" } """ judge_prompt = f""" [User Prompt]: {prompt} [Expected Output/Guidance]: {expected if expected else "N/A"} [Evaluation Criteria]: {criteria if criteria else "General helpfulness and accuracy."} [Model Response]: {output} Evaluate the Model Response. """ content, _ = call_llm(DEFAULT_MODEL, judge_prompt, judge_system) # Try to parse JSON try: # Clean markdown code blocks if present clean_content = content.replace("```json", "").replace("```", "").strip() result = json.loads(clean_content) return result.get("score", 0), result.get("reasoning", "No reasoning provided.") except: return 0, f"Failed to parse judge output: {content}" # --- Routes --- @app.route('/') def index(): return render_template('index.html') @app.route('/api/init', methods=['POST']) def init_data(): init_db() # Add demo data if empty db = get_db() cur = db.execute('SELECT count(*) FROM test_sets') if cur.fetchone()[0] == 0: cur = db.execute('INSERT INTO test_sets (name, description) VALUES (?, ?)', ('Demo: Customer Support', 'Evaluation for customer service bot responses')) set_id = cur.lastrowid cases = [ (set_id, "I want a refund.", "Polite refusal or process explanation", "Empathy, Clarity"), (set_id, "How do I reset my password?", "Step-by-step guide", "Accuracy, Conciseness"), (set_id, "Are you human?", "Clarify AI identity", "Transparency") ] db.executemany('INSERT INTO test_cases (test_set_id, prompt, expected_output, criteria) VALUES (?, ?, ?, ?)', cases) db.commit() return jsonify({"status": "initialized with demo data"}) return jsonify({"status": "initialized"}) # Test Sets CRUD @app.route('/api/test_sets', methods=['GET', 'POST']) def handle_test_sets(): db = get_db() if request.method == 'POST': data = request.json db.execute('INSERT INTO test_sets (name, description) VALUES (?, ?)', (data['name'], data.get('description', ''))) db.commit() return jsonify({"status": "success"}) cur = db.execute('SELECT * FROM test_sets ORDER BY created_at DESC') return jsonify([dict(row) for row in cur.fetchall()]) @app.route('/api/test_sets//cases', methods=['GET', 'POST']) def handle_test_cases(id): db = get_db() if request.method == 'POST': data = request.json db.execute('INSERT INTO test_cases (test_set_id, prompt, expected_output, criteria) VALUES (?, ?, ?, ?)', (id, data['prompt'], data.get('expected_output', ''), data.get('criteria', ''))) db.commit() return jsonify({"status": "success"}) cur = db.execute('SELECT * FROM test_cases WHERE test_set_id = ?', (id,)) return jsonify([dict(row) for row in cur.fetchall()]) @app.route('/api/test_sets//import', methods=['POST']) def import_test_cases(id): if 'file' not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 if file: filename = secure_filename(file.filename) db = get_db() count = 0 try: # Parse File if filename.endswith('.csv'): stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None) csv_input = csv.DictReader(stream) # Check headers if not 'prompt' in csv_input.fieldnames: return jsonify({"error": "CSV must have a 'prompt' column"}), 400 cases = [] for row in csv_input: cases.append(( id, row.get('prompt'), row.get('expected_output', ''), row.get('criteria', '') )) if cases: db.executemany('INSERT INTO test_cases (test_set_id, prompt, expected_output, criteria) VALUES (?, ?, ?, ?)', cases) db.commit() count = len(cases) elif filename.endswith('.json'): data = json.load(file) if not isinstance(data, list): return jsonify({"error": "JSON must be a list of objects"}), 400 cases = [] for item in data: if 'prompt' in item: cases.append(( id, item.get('prompt'), item.get('expected_output', ''), item.get('criteria', '') )) if cases: db.executemany('INSERT INTO test_cases (test_set_id, prompt, expected_output, criteria) VALUES (?, ?, ?, ?)', cases) db.commit() count = len(cases) else: return jsonify({"error": "Unsupported file type. Use .csv or .json"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 return jsonify({"status": "success", "count": count}) # Evaluation Execution @app.route('/api/run_eval', methods=['POST']) def run_eval(): data = request.json test_set_id = data['test_set_id'] model_name = data.get('model_name', DEFAULT_MODEL) db = get_db() # Create Run Record cur = db.execute('INSERT INTO eval_runs (test_set_id, model_name, status) VALUES (?, ?, ?)', (test_set_id, model_name, 'running')) run_id = cur.lastrowid db.commit() # Fetch cases cases = db.execute('SELECT * FROM test_cases WHERE test_set_id = ?', (test_set_id,)).fetchall() # Process (In a real app, this should be async/background task using Celery/RQ. # Here we do it synchronously for simplicity but with a warning) total_score = 0 count = 0 for case in cases: # 1. Generate Output output, latency = call_llm(model_name, case['prompt']) # 2. Judge Output score, reasoning = judge_output(case['prompt'], case['expected_output'], case['criteria'], output) # 3. Save Result db.execute(''' INSERT INTO eval_results (run_id, case_id, model_output, judge_score, judge_reasoning, latency_ms) VALUES (?, ?, ?, ?, ?, ?) ''', (run_id, case['id'], output, score, reasoning, latency)) total_score += score count += 1 db.commit() # Commit incrementally avg_score = total_score / count if count > 0 else 0 db.execute('UPDATE eval_runs SET status = ?, avg_score = ? WHERE id = ?', ('completed', avg_score, run_id)) db.commit() return jsonify({"status": "completed", "run_id": run_id, "avg_score": avg_score}) @app.route('/api/runs', methods=['GET']) def get_runs(): db = get_db() cur = db.execute(''' SELECT r.*, t.name as test_set_name FROM eval_runs r JOIN test_sets t ON r.test_set_id = t.id ORDER BY r.created_at DESC ''') return jsonify([dict(row) for row in cur.fetchall()]) @app.route('/api/runs/', methods=['GET']) def get_run_details(id): db = get_db() run = db.execute('SELECT * FROM eval_runs WHERE id = ?', (id,)).fetchone() results = db.execute(''' SELECT r.*, c.prompt, c.expected_output, c.criteria FROM eval_results r JOIN test_cases c ON r.case_id = c.id WHERE r.run_id = ? ''', (id,)).fetchall() return jsonify({ "run": dict(run), "results": [dict(row) for row in results] }) @app.errorhandler(413) def request_entity_too_large(error): return jsonify({"error": "File too large"}), 413 @app.errorhandler(500) def internal_error(error): return jsonify({"error": "Internal Server Error"}), 500 @app.errorhandler(404) def not_found(error): return jsonify({"error": "Not Found"}), 404 if __name__ == '__main__': with app.app_context(): init_db() app.run(host='0.0.0.0', port=7860, debug=True)