Spaces:
Running
Running
| import os | |
| import json | |
| import sqlite3 | |
| import requests | |
| import datetime | |
| import time | |
| import csv | |
| import io | |
| from flask import Flask, render_template, request, jsonify, g | |
| from werkzeug.utils import secure_filename | |
| from dotenv import load_dotenv | |
| # Load env | |
| load_dotenv() | |
| app = Flask(__name__, instance_relative_config=True) | |
| app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-eval-matrix') | |
| app.config['DATABASE'] = os.path.join(app.instance_path, 'eval_matrix.db') | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB Max Upload | |
| # Ensure instance folder exists | |
| try: | |
| os.makedirs(app.instance_path) | |
| except OSError: | |
| pass | |
| # SiliconFlow Config | |
| SILICONFLOW_API_KEY = os.getenv("SILICONFLOW_API_KEY", "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi") | |
| SILICONFLOW_BASE_URL = "https://api.siliconflow.cn/v1/chat/completions" | |
| # Using Qwen 2.5 7B Instruct as the default judge/worker | |
| DEFAULT_MODEL = "Qwen/Qwen2.5-7B-Instruct" | |
| # --- Database --- | |
| def get_db(): | |
| if 'db' not in g: | |
| g.db = sqlite3.connect(app.config['DATABASE']) | |
| g.db.row_factory = sqlite3.Row | |
| return g.db | |
| def close_db(error): | |
| db = g.pop('db', None) | |
| if db is not None: | |
| db.close() | |
| def init_db(): | |
| db = get_db() | |
| # Test Sets (Collections of cases) | |
| db.execute(''' | |
| CREATE TABLE IF NOT EXISTS test_sets ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| description TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Test Cases (Individual prompts) | |
| db.execute(''' | |
| CREATE TABLE IF NOT EXISTS test_cases ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| test_set_id INTEGER NOT NULL, | |
| prompt TEXT NOT NULL, | |
| expected_output TEXT, | |
| criteria TEXT, -- e.g. "Must be polite", "No JSON errors" | |
| FOREIGN KEY (test_set_id) REFERENCES test_sets (id) | |
| ) | |
| ''') | |
| # Evaluation Runs (A batch execution) | |
| db.execute(''' | |
| CREATE TABLE IF NOT EXISTS eval_runs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| test_set_id INTEGER NOT NULL, | |
| model_name TEXT NOT NULL, | |
| status TEXT DEFAULT 'pending', -- pending, running, completed, failed | |
| avg_score REAL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (test_set_id) REFERENCES test_sets (id) | |
| ) | |
| ''') | |
| # Evaluation Results (Individual case results) | |
| db.execute(''' | |
| CREATE TABLE IF NOT EXISTS eval_results ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| run_id INTEGER NOT NULL, | |
| case_id INTEGER NOT NULL, | |
| model_output TEXT, | |
| judge_score INTEGER, -- 1-10 | |
| judge_reasoning TEXT, | |
| latency_ms INTEGER, | |
| FOREIGN KEY (run_id) REFERENCES eval_runs (id), | |
| FOREIGN KEY (case_id) REFERENCES test_cases (id) | |
| ) | |
| ''') | |
| db.commit() | |
| # --- AI Integration --- | |
| def call_llm(model, prompt, system_prompt="You are a helpful assistant."): | |
| headers = { | |
| "Authorization": f"Bearer {SILICONFLOW_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": model, | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "temperature": 0.7 | |
| } | |
| start_time = time.time() | |
| try: | |
| response = requests.post(SILICONFLOW_BASE_URL, json=payload, headers=headers, timeout=60) | |
| response.raise_for_status() | |
| data = response.json() | |
| content = data['choices'][0]['message']['content'] | |
| latency = int((time.time() - start_time) * 1000) | |
| return content, latency | |
| except Exception as e: | |
| print(f"LLM Call Error: {e}") | |
| return f"Error: {str(e)}", 0 | |
| def judge_output(prompt, expected, criteria, output): | |
| """ | |
| Uses an LLM to judge the quality of the output based on criteria. | |
| Returns: score (1-10), reasoning | |
| """ | |
| judge_system = """You are an expert AI evaluator. | |
| Your task is to evaluate the quality of an AI model's response based on the User Prompt, Expected Output (optional), and specific Criteria. | |
| Return your evaluation in valid JSON format ONLY: | |
| { | |
| "score": <integer 1-10>, | |
| "reasoning": "<short explanation>" | |
| } | |
| """ | |
| judge_prompt = f""" | |
| [User Prompt]: {prompt} | |
| [Expected Output/Guidance]: {expected if expected else "N/A"} | |
| [Evaluation Criteria]: {criteria if criteria else "General helpfulness and accuracy."} | |
| [Model Response]: {output} | |
| Evaluate the Model Response. | |
| """ | |
| content, _ = call_llm(DEFAULT_MODEL, judge_prompt, judge_system) | |
| # Try to parse JSON | |
| try: | |
| # Clean markdown code blocks if present | |
| clean_content = content.replace("```json", "").replace("```", "").strip() | |
| result = json.loads(clean_content) | |
| return result.get("score", 0), result.get("reasoning", "No reasoning provided.") | |
| except: | |
| return 0, f"Failed to parse judge output: {content}" | |
| # --- Routes --- | |
| def index(): | |
| return render_template('index.html') | |
| def init_data(): | |
| init_db() | |
| # Add demo data if empty | |
| db = get_db() | |
| cur = db.execute('SELECT count(*) FROM test_sets') | |
| if cur.fetchone()[0] == 0: | |
| cur = db.execute('INSERT INTO test_sets (name, description) VALUES (?, ?)', | |
| ('Demo: Customer Support', 'Evaluation for customer service bot responses')) | |
| set_id = cur.lastrowid | |
| cases = [ | |
| (set_id, "I want a refund.", "Polite refusal or process explanation", "Empathy, Clarity"), | |
| (set_id, "How do I reset my password?", "Step-by-step guide", "Accuracy, Conciseness"), | |
| (set_id, "Are you human?", "Clarify AI identity", "Transparency") | |
| ] | |
| db.executemany('INSERT INTO test_cases (test_set_id, prompt, expected_output, criteria) VALUES (?, ?, ?, ?)', cases) | |
| db.commit() | |
| return jsonify({"status": "initialized with demo data"}) | |
| return jsonify({"status": "initialized"}) | |
| # Test Sets CRUD | |
| def handle_test_sets(): | |
| db = get_db() | |
| if request.method == 'POST': | |
| data = request.json | |
| db.execute('INSERT INTO test_sets (name, description) VALUES (?, ?)', | |
| (data['name'], data.get('description', ''))) | |
| db.commit() | |
| return jsonify({"status": "success"}) | |
| cur = db.execute('SELECT * FROM test_sets ORDER BY created_at DESC') | |
| return jsonify([dict(row) for row in cur.fetchall()]) | |
| def handle_test_cases(id): | |
| db = get_db() | |
| if request.method == 'POST': | |
| data = request.json | |
| db.execute('INSERT INTO test_cases (test_set_id, prompt, expected_output, criteria) VALUES (?, ?, ?, ?)', | |
| (id, data['prompt'], data.get('expected_output', ''), data.get('criteria', ''))) | |
| db.commit() | |
| return jsonify({"status": "success"}) | |
| cur = db.execute('SELECT * FROM test_cases WHERE test_set_id = ?', (id,)) | |
| return jsonify([dict(row) for row in cur.fetchall()]) | |
| def import_test_cases(id): | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file part"}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"error": "No selected file"}), 400 | |
| if file: | |
| filename = secure_filename(file.filename) | |
| db = get_db() | |
| count = 0 | |
| try: | |
| # Parse File | |
| if filename.endswith('.csv'): | |
| stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None) | |
| csv_input = csv.DictReader(stream) | |
| # Check headers | |
| if not 'prompt' in csv_input.fieldnames: | |
| return jsonify({"error": "CSV must have a 'prompt' column"}), 400 | |
| cases = [] | |
| for row in csv_input: | |
| cases.append(( | |
| id, | |
| row.get('prompt'), | |
| row.get('expected_output', ''), | |
| row.get('criteria', '') | |
| )) | |
| if cases: | |
| db.executemany('INSERT INTO test_cases (test_set_id, prompt, expected_output, criteria) VALUES (?, ?, ?, ?)', cases) | |
| db.commit() | |
| count = len(cases) | |
| elif filename.endswith('.json'): | |
| data = json.load(file) | |
| if not isinstance(data, list): | |
| return jsonify({"error": "JSON must be a list of objects"}), 400 | |
| cases = [] | |
| for item in data: | |
| if 'prompt' in item: | |
| cases.append(( | |
| id, | |
| item.get('prompt'), | |
| item.get('expected_output', ''), | |
| item.get('criteria', '') | |
| )) | |
| if cases: | |
| db.executemany('INSERT INTO test_cases (test_set_id, prompt, expected_output, criteria) VALUES (?, ?, ?, ?)', cases) | |
| db.commit() | |
| count = len(cases) | |
| else: | |
| return jsonify({"error": "Unsupported file type. Use .csv or .json"}), 400 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| return jsonify({"status": "success", "count": count}) | |
| # Evaluation Execution | |
| def run_eval(): | |
| data = request.json | |
| test_set_id = data['test_set_id'] | |
| model_name = data.get('model_name', DEFAULT_MODEL) | |
| db = get_db() | |
| # Create Run Record | |
| cur = db.execute('INSERT INTO eval_runs (test_set_id, model_name, status) VALUES (?, ?, ?)', | |
| (test_set_id, model_name, 'running')) | |
| run_id = cur.lastrowid | |
| db.commit() | |
| # Fetch cases | |
| cases = db.execute('SELECT * FROM test_cases WHERE test_set_id = ?', (test_set_id,)).fetchall() | |
| # Process (In a real app, this should be async/background task using Celery/RQ. | |
| # Here we do it synchronously for simplicity but with a warning) | |
| total_score = 0 | |
| count = 0 | |
| for case in cases: | |
| # 1. Generate Output | |
| output, latency = call_llm(model_name, case['prompt']) | |
| # 2. Judge Output | |
| score, reasoning = judge_output(case['prompt'], case['expected_output'], case['criteria'], output) | |
| # 3. Save Result | |
| db.execute(''' | |
| INSERT INTO eval_results (run_id, case_id, model_output, judge_score, judge_reasoning, latency_ms) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ''', (run_id, case['id'], output, score, reasoning, latency)) | |
| total_score += score | |
| count += 1 | |
| db.commit() # Commit incrementally | |
| avg_score = total_score / count if count > 0 else 0 | |
| db.execute('UPDATE eval_runs SET status = ?, avg_score = ? WHERE id = ?', | |
| ('completed', avg_score, run_id)) | |
| db.commit() | |
| return jsonify({"status": "completed", "run_id": run_id, "avg_score": avg_score}) | |
| def get_runs(): | |
| db = get_db() | |
| cur = db.execute(''' | |
| SELECT r.*, t.name as test_set_name | |
| FROM eval_runs r | |
| JOIN test_sets t ON r.test_set_id = t.id | |
| ORDER BY r.created_at DESC | |
| ''') | |
| return jsonify([dict(row) for row in cur.fetchall()]) | |
| def get_run_details(id): | |
| db = get_db() | |
| run = db.execute('SELECT * FROM eval_runs WHERE id = ?', (id,)).fetchone() | |
| results = db.execute(''' | |
| SELECT r.*, c.prompt, c.expected_output, c.criteria | |
| FROM eval_results r | |
| JOIN test_cases c ON r.case_id = c.id | |
| WHERE r.run_id = ? | |
| ''', (id,)).fetchall() | |
| return jsonify({ | |
| "run": dict(run), | |
| "results": [dict(row) for row in results] | |
| }) | |
| def request_entity_too_large(error): | |
| return jsonify({"error": "File too large"}), 413 | |
| def internal_error(error): | |
| return jsonify({"error": "Internal Server Error"}), 500 | |
| def not_found(error): | |
| return jsonify({"error": "Not Found"}), 404 | |
| if __name__ == '__main__': | |
| with app.app_context(): | |
| init_db() | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |