File size: 11,376 Bytes
3bfe1a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3be8f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3bfe1a4
 
 
 
 
 
 
 
 
 
d3be8f6
 
 
 
 
 
 
3bfe1a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
"""Background evaluation job manager for RAG evaluation."""
import os
import json
import threading
import time
from datetime import datetime
from typing import Dict, Optional, List, Any
import traceback

# Job status constants
STATUS_PENDING = "pending"
STATUS_RUNNING = "running"
STATUS_COMPLETED = "completed"
STATUS_FAILED = "failed"

# Directory for storing job files
JOBS_DIR = "./evaluation_jobs"


def ensure_jobs_dir():
    """Ensure jobs directory exists."""
    os.makedirs(JOBS_DIR, exist_ok=True)


def get_job_file_path(job_id: str) -> str:
    """Get path to job status file."""
    ensure_jobs_dir()
    return os.path.join(JOBS_DIR, f"{job_id}.json")


def save_job_status(job_id: str, status: Dict):
    """Save job status to file."""
    filepath = get_job_file_path(job_id)
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(status, f, indent=2, default=str)


def load_job_status(job_id: str) -> Optional[Dict]:
    """Load job status from file."""
    filepath = get_job_file_path(job_id)
    if os.path.exists(filepath):
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                content = f.read().strip()
                if not content:
                    # Empty file, delete it
                    os.remove(filepath)
                    return None
                return json.loads(content)
        except (json.JSONDecodeError, Exception) as e:
            print(f"[Background] Error loading job {job_id}: {e}")
            # Remove corrupted file
            try:
                os.remove(filepath)
            except:
                pass
            return None
    return None


def list_jobs() -> List[Dict]:
    """List all jobs with their status."""
    ensure_jobs_dir()
    jobs = []
    for filename in os.listdir(JOBS_DIR):
        if filename.endswith('.json'):
            job_id = filename[:-5]
            try:
                status = load_job_status(job_id)
                if status:
                    jobs.append(status)
            except Exception as e:
                print(f"[Background] Error loading job {job_id}: {e}")
                continue
    # Sort by created_at descending
    jobs.sort(key=lambda x: x.get('created_at', ''), reverse=True)
    return jobs


def create_job(
    job_id: str,
    collection_name: str,
    dataset_name: str,
    num_samples: int,
    method: str,
    llm_model: str,
    embedding_model: str,
    llm_provider: str = "groq"
) -> Dict:
    """Create a new evaluation job."""
    job_status = {
        "job_id": job_id,
        "collection_name": collection_name,
        "dataset_name": dataset_name,
        "num_samples": num_samples,
        "method": method,
        "llm_model": llm_model,
        "embedding_model": embedding_model,
        "llm_provider": llm_provider,
        "status": STATUS_PENDING,
        "progress": 0,
        "current_step": "Initializing...",
        "logs": [],
        "created_at": datetime.now().isoformat(),
        "started_at": None,
        "completed_at": None,
        "results": None,
        "error": None
    }
    save_job_status(job_id, job_status)
    return job_status


def update_job_progress(job_id: str, progress: int, current_step: str, log_message: Optional[str] = None):
    """Update job progress."""
    status = load_job_status(job_id)
    if status:
        status["progress"] = progress
        status["current_step"] = current_step
        if log_message:
            status["logs"].append({
                "time": datetime.now().isoformat(),
                "message": log_message
            })
        save_job_status(job_id, status)


def run_background_evaluation(
    job_id: str,
    rag_pipeline,
    vector_store,
    dataset_name: str,
    num_samples: int,
    method: str,
    llm_model: str,
    embedding_model: str,
    llm_provider: str,
    groq_api_key: Optional[str] = None,
    groq_api_keys: Optional[List[str]] = None,
    ollama_host: Optional[str] = None
):
    """Run evaluation in background thread."""
    try:
        # Update status to running
        status = load_job_status(job_id)
        if not status:
            print(f"[Background] Error: Could not load job status for {job_id}")
            return
            
        status["status"] = STATUS_RUNNING
        status["started_at"] = datetime.now().isoformat()
        status["logs"].append({
            "time": datetime.now().isoformat(),
            "message": f"⏱️ Evaluation started"
        })
        save_job_status(job_id, status)
        
        # Import required modules
        from dataset_loader import RAGBenchLoader
        from config import settings
        from llm_client import create_llm_client
        
        update_job_progress(job_id, 5, "Loading test data...", "πŸ“₯ Loading test data...")
        
        # Load test data
        loader = RAGBenchLoader()
        test_data = loader.get_test_data(dataset_name, num_samples)
        
        update_job_progress(job_id, 10, f"Loaded {len(test_data)} samples", f"βœ… Loaded {len(test_data)} test samples")
        
        # Create LLM client for evaluation if needed
        if method in ["gpt_labeling", "hybrid"]:
            update_job_progress(job_id, 15, "Initializing LLM client...", f"πŸ€– Initializing {llm_provider.upper()} LLM...")
            
            eval_llm_client = create_llm_client(
                provider=llm_provider,
                api_key=groq_api_key,
                api_keys=groq_api_keys,
                model_name=llm_model,
                ollama_host=ollama_host or settings.ollama_host,
                max_rpm=settings.groq_rpm_limit,
                rate_limit_delay=settings.rate_limit_delay,
                max_retries=settings.max_retries,
                retry_delay=settings.retry_delay
            )
        
        # Process samples
        test_cases = []
        total_samples = len(test_data)
        
        update_job_progress(job_id, 20, "Processing samples...", "πŸ” Starting sample processing...")
        
        for i, sample in enumerate(test_data):
            progress = 20 + int((i / total_samples) * 40)  # 20-60% for processing
            
            # Query the RAG system
            result = rag_pipeline.query(sample["question"], n_results=5)
            
            test_cases.append({
                "query": sample["question"],
                "response": result["response"],
                "retrieved_documents": [doc["document"] for doc in result["retrieved_documents"]],
                "ground_truth": sample.get("answer", "")
            })
            
            if (i + 1) % 5 == 0 or (i + 1) == total_samples:
                update_job_progress(
                    job_id, 
                    progress, 
                    f"Processed {i+1}/{total_samples} samples",
                    f"  βœ“ Processed {i + 1}/{total_samples} samples"
                )
        
        update_job_progress(job_id, 60, "Running evaluation...", f"πŸ“Š Running {method} evaluation...")
        
        # Get chunking metadata
        chunking_strategy = getattr(vector_store, 'chunking_strategy', None)
        chunk_size = getattr(vector_store, 'chunk_size', None)
        chunk_overlap = getattr(vector_store, 'chunk_overlap', None)
        
        # Run evaluation using advanced evaluator
        from advanced_rag_evaluator import AdvancedRAGEvaluator
        
        evaluator = AdvancedRAGEvaluator(
            llm_client=eval_llm_client if method in ["gpt_labeling", "hybrid"] else rag_pipeline.llm,
            embedding_model=embedding_model,
            chunking_strategy=chunking_strategy,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )
        results = evaluator.evaluate_batch(test_cases)
        
        update_job_progress(job_id, 95, "Saving results...", "πŸ’Ύ Saving results...")
        
        # Save results to file
        status = load_job_status(job_id)
        collection_name = status.get("collection_name", "unknown") if status else "unknown"
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        results_filename = f"{collection_name}_{timestamp}_evaluation.json"
        
        save_data = {
            "collection_name": collection_name,
            "timestamp": datetime.now().isoformat(),
            "evaluation_method": method,
            "num_samples": num_samples,
            "embedding_model": embedding_model,
            "llm_model": llm_model,
            "results": results
        }
        
        with open(results_filename, 'w', encoding='utf-8') as f:
            json.dump(save_data, f, indent=2, default=str)
        
        # Update job as completed
        status = load_job_status(job_id)
        status["status"] = STATUS_COMPLETED
        status["progress"] = 100
        status["current_step"] = "Completed!"
        status["completed_at"] = datetime.now().isoformat()
        status["results"] = results
        status["results_file"] = results_filename
        status["logs"].append({
            "time": datetime.now().isoformat(),
            "message": f"βœ… Evaluation completed! Results saved to {results_filename}"
        })
        save_job_status(job_id, status)
        
    except Exception as e:
        # Update job as failed
        error_msg = str(e)
        error_trace = traceback.format_exc()
        
        status = load_job_status(job_id)
        if status:
            status["status"] = STATUS_FAILED
            status["error"] = error_msg
            status["error_trace"] = error_trace
            status["completed_at"] = datetime.now().isoformat()
            status["logs"].append({
                "time": datetime.now().isoformat(),
                "message": f"❌ Error: {error_msg}"
            })
            save_job_status(job_id, status)


def start_background_job(
    job_id: str,
    rag_pipeline,
    vector_store,
    dataset_name: str,
    num_samples: int,
    method: str,
    llm_model: str,
    embedding_model: str,
    llm_provider: str,
    groq_api_key: Optional[str] = None,
    groq_api_keys: Optional[List[str]] = None,
    ollama_host: Optional[str] = None
) -> threading.Thread:
    """Start evaluation in a background thread."""
    thread = threading.Thread(
        target=run_background_evaluation,
        args=(
            job_id,
            rag_pipeline,
            vector_store,
            dataset_name,
            num_samples,
            method,
            llm_model,
            embedding_model,
            llm_provider,
            groq_api_key,
            groq_api_keys,
            ollama_host
        ),
        daemon=True
    )
    thread.start()
    return thread


def delete_job(job_id: str) -> bool:
    """Delete a job and its status file."""
    filepath = get_job_file_path(job_id)
    if os.path.exists(filepath):
        os.remove(filepath)
        return True
    return False


def cleanup_old_jobs(max_age_hours: int = 24):
    """Clean up jobs older than specified hours."""
    ensure_jobs_dir()
    cutoff = datetime.now().timestamp() - (max_age_hours * 3600)
    
    for filename in os.listdir(JOBS_DIR):
        if filename.endswith('.json'):
            filepath = os.path.join(JOBS_DIR, filename)
            if os.path.getmtime(filepath) < cutoff:
                os.remove(filepath)