File size: 17,171 Bytes
61a383e
 
 
 
 
 
 
 
56a8632
 
d7fb7f8
56a8632
 
61a383e
c27741d
 
0209035
7d60219
0209035
 
c27741d
 
61a383e
56a8632
61a383e
75bf67b
61a383e
 
56a8632
 
61a383e
c27741d
 
56a8632
61a383e
75bf67b
0209035
 
56a8632
75bf67b
 
 
 
 
c27741d
56a8632
c27741d
56a8632
c27741d
56a8632
c27741d
 
56a8632
 
c27741d
 
 
 
7d60219
c27741d
 
 
 
 
 
56a8632
 
7d60219
56a8632
 
 
 
 
c27741d
56a8632
 
75bf67b
 
 
 
 
 
 
 
 
 
da5f99b
 
75bf67b
 
 
da5f99b
75bf67b
 
 
 
 
7d60219
da5f99b
6ea2d23
3d03815
da5f99b
 
 
75bf67b
da5f99b
 
 
 
 
 
 
 
 
 
 
 
5a536b8
da5f99b
 
 
 
 
 
 
75bf67b
 
da5f99b
 
7d60219
da5f99b
4adde24
da5f99b
 
 
 
 
 
 
 
5a536b8
da5f99b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75bf67b
61a383e
 
 
 
 
75bf67b
61a383e
 
 
 
75bf67b
 
 
 
 
 
61a383e
 
75bf67b
7531f8a
61a383e
 
 
5a536b8
61a383e
7531f8a
 
 
 
 
 
61a383e
 
 
 
 
 
 
 
7531f8a
 
61a383e
 
75bf67b
 
 
 
 
 
61a383e
75bf67b
61a383e
 
 
75bf67b
 
 
61a383e
 
 
75bf67b
 
61a383e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56a8632
61a383e
2e6aca3
 
61a383e
2e6aca3
56a8632
2e6aca3
56a8632
 
 
 
61a383e
75bf67b
 
 
61a383e
56a8632
61a383e
56a8632
 
61a383e
75bf67b
61a383e
56a8632
75bf67b
 
 
61a383e
75bf67b
 
 
 
 
56a8632
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75bf67b
 
 
 
56a8632
 
 
 
61a383e
 
2e6aca3
61a383e
2e6aca3
61a383e
2457f4e
2e6aca3
 
 
 
 
2457f4e
2e6aca3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2457f4e
2e6aca3
61a383e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2e6aca3
9b0f151
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
"""
Module for evaluation and annotation of bot responses
"""

import json
import os
import datetime
from typing import List, Dict, Any, Tuple, Optional
import io
import logging
from huggingface_hub import HfApi

logger = logging.getLogger(__name__)

from config.settings import (
    DATASET_ID,
    HF_TOKEN,
    CHAT_HISTORY_PATH,
    DATASET_CHAT_HISTORY_PATH,
    DATASET_ANNOTATIONS_PATH
)

class ChatEvaluator:
    def __init__(self, hf_token: str = None, dataset_id: str = None):
        """
        Initialize chat evaluator with lazy loading
        
        Args:
            hf_token: Hugging Face token
            dataset_id: Dataset ID on Hugging Face
        """
        self.hf_token = hf_token or HF_TOKEN
        self.dataset_id = dataset_id or DATASET_ID
        self.api = HfApi(token=self.hf_token)
        
        # Using paths from settings
        self.chat_history_path = DATASET_CHAT_HISTORY_PATH
        self.annotations_path = DATASET_ANNOTATIONS_PATH
        
        # Cache for chat histories and QA pairs
        self._chat_histories = None
        self._qa_pairs = None
        self._annotations = None
        
        # Ensure directories exist in dataset
        try:
            self._ensure_dataset_structure()
        except Exception as e:
            logger.error(f"Failed to ensure dataset structure: {e}")

    def _ensure_dataset_structure(self):
        """Ensure required directories exist in dataset"""
        try:
            files = self.api.list_repo_files(self.dataset_id, repo_type="dataset")
            
            # Check and create chat history directory
            if self.chat_history_path not in files:
                self.api.upload_file(
                    path_or_fileobj=io.BytesIO(b""),
                    path_in_repo=f"{self.chat_history_path}/.gitkeep",
                    repo_id=self.dataset_id,
                    repo_type="dataset"
                )
            
            # Check and create annotations directory
            if self.annotations_path not in files:
                self.api.upload_file(
                    path_or_fileobj=io.BytesIO(b""),
                    path_in_repo=f"{self.annotations_path}/.gitkeep",
                    repo_id=self.dataset_id,
                    repo_type="dataset"
                )
        except Exception as e:
            logger.error(f"Error ensuring dataset structure: {e}")
            raise

    def reset_cache(self):
        """
        Reset the cache to force reload of data
        """
        self._chat_histories = None
        self._qa_pairs = None
        self._annotations = None
        logger.info("Chat evaluator cache has been reset")

    def get_chat_history(self, force_reload=False) -> List[Dict[str, Any]]:
        """
        Get all chat histories from the dataset
        
        Args:
            force_reload: If True, ignore cache and reload from dataset
        """
        # Return cached data if available and not forcing reload
        if self._chat_histories is not None and not force_reload:
            logger.debug("Returning cached chat histories")
            return self._chat_histories
        
        try:
            # Get list of all files in chat history directory
            files = self.api.list_repo_files(self.dataset_id, repo_type="dataset")  
            
            # Filter for chat history files
            chat_path = f"{self.chat_history_path}/"
            chat_files = [f for f in files if f.startswith(chat_path) and f.endswith('.json')]
            logger.debug(f"Found {len(chat_files)} chat files")  # More compact log

            histories = []
            for file in chat_files:
                try:
                    # Download and parse each chat file
                    content = self.api.hf_hub_download(
                        repo_id=self.dataset_id,
                        filename=file,
                        repo_type="dataset"
                    )
                    with open(content, 'r', encoding='utf-8') as f:
                        chat_data = json.load(f)
                        if isinstance(chat_data, dict) and 'history' in chat_data:
                            histories.append(chat_data)
                        else:
                            logger.warning(f"Invalid chat history format in {file}")
                except Exception as e:
                    logger.error(f"Error processing chat file {file}: {e}")
                    continue

            # Cache the results
            self._chat_histories = histories
            return histories

        except Exception as e:
            logger.error(f"Failed to get chat histories: {e}")
            return []

    def extract_qa_pairs(self, histories: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Extract question-answer pairs from chat histories
        """
        qa_pairs = []
        
        for history in histories:
            messages = history.get('history', [])
            current_question = None
            
            for msg in messages:
                if msg.get('role') == 'user':
                    current_question = msg.get('content')
                elif msg.get('role') == 'assistant' and current_question:
                    qa_pairs.append({
                        'conversation_id': history.get('conversation_id'),
                        'question': current_question,
                        'answer': msg.get('content'),
                        'timestamp': history.get('timestamp')
                    })
                    current_question = None

        logger.debug(f"Extracted {len(qa_pairs)} QA pairs")
        return qa_pairs

    def get_qa_pairs_for_evaluation(self, limit: int = 50, force_reload=False) -> List[Dict[str, Any]]:
        """
        Extract question-answer pairs for evaluation
        
        Args:
            limit: Maximum number of pairs to return
            force_reload: If True, force reload from dataset
            
        Returns:
            List of QA pairs with metadata
        """
        # Return cached data if available and not forcing reload
        if self._qa_pairs is not None and not force_reload:
            logger.debug("Returning cached QA pairs")
            return self._qa_pairs[:limit]  # Respect the limit parameter
        
        chat_data = self.get_chat_history(force_reload=force_reload)
        qa_pairs = []
        
        logger.debug(f"Processing {len(chat_data)} chat histories")
        
        for chat in chat_data:
            conversation_id = chat.get("conversation_id", "unknown")
            timestamp = chat.get("timestamp", "")
            messages = chat.get("history", [])
            
            # Find user-assistant pairs in messages
            for i in range(len(messages) - 1):
                if (messages[i].get("role") == "user" and 
                    messages[i+1].get("role") == "assistant"):
                    question = messages[i].get("content", "").strip()
                    answer = messages[i+1].get("content", "").strip()
                    
                    # Only include non-empty pairs
                    if question and answer:
                        qa_pairs.append({
                            "conversation_id": conversation_id,
                            "timestamp": timestamp,
                            "question": question,
                            "original_answer": answer,
                            "question_timestamp": messages[i].get("timestamp", ""),
                            "answer_timestamp": messages[i+1].get("timestamp", "")
                        })
        
        # Cache the results
        self._qa_pairs = qa_pairs
        
        logger.debug(f"Extracted {len(qa_pairs)} QA pairs")
        # Return up to the limit
        return qa_pairs[:limit]
    
    def get_evaluation_status(self, force_reload=False) -> Dict[str, int]:
        """
        Get status of evaluated QA pairs
        
        Args:
            force_reload: If True, force reload from dataset
            
        Returns:
            Dictionary with counts of evaluated and unevaluated QA pairs
        """
        all_pairs = self.get_qa_pairs_for_evaluation(limit=1000, force_reload=force_reload)  # Get a large sample
        evaluated_pairs = self.get_annotations(force_reload=force_reload)
        
        # Count evaluated conversation IDs
        evaluated_ids = set(item.get("conversation_id") for item in evaluated_pairs)
        
        return {
            "total_qa_pairs": len(all_pairs),
            "evaluated_pairs": len(evaluated_pairs),
            "unevaluated_pairs": len(all_pairs) - len(evaluated_pairs),
            "evaluated_conversations": len(evaluated_ids)
        }
    
    def save_annotation(self, 
                       conversation_id: str,
                       question: str,
                       original_answer: str,
                       improved_answer: str,
                       ratings: Dict[str, int],
                       notes: str = "") -> Tuple[bool, str]:
        """
        Save evaluation annotation
        """
        try:
            # Create annotation object
            annotation = {
                "conversation_id": conversation_id,
                "timestamp": datetime.datetime.now().isoformat(),
                "question": question,
                "original_answer": original_answer,
                "improved_answer": improved_answer,
                "ratings": ratings,
                "notes": notes
            }
            
            # Create filename with conversation_id
            filename = f"{self.annotations_path}/annotation_{conversation_id}.json"
            
            # Convert to JSON bytes
            json_content = json.dumps(annotation, ensure_ascii=False, indent=2).encode('utf-8')
            
            # Upload to dataset using bytes buffer
            self.api.upload_file(
                path_or_fileobj=io.BytesIO(json_content),
                path_in_repo=filename,
                repo_id=self.dataset_id,
                repo_type="dataset"
            )
            
            # Reset annotations cache
            self._annotations = None
            
            return True, "Annotation saved successfully"
            
        except Exception as e:
            logger.error(f"Error saving annotation: {e}")
            return False, f"Failed to save annotation: {str(e)}"
    
    def get_annotations(self, force_reload=False) -> List[Dict[str, Any]]:
        """
        Get all saved annotations from dataset
        
        Args:
            force_reload: If True, force reload from dataset
        """
        # Return cached data if available and not forcing reload
        if self._annotations is not None and not force_reload:
            logger.debug("Returning cached annotations")
            return self._annotations
        
        try:
            annotations = []
            files = self.api.list_repo_files(self.dataset_id, repo_type="dataset")
            
            for file in files:
                if file.startswith(f"{self.annotations_path}/annotation_") and file.endswith(".json"):
                    try:
                        # Download and parse annotation file
                        content = self.api.hf_hub_download(
                            repo_id=self.dataset_id,
                            filename=file,
                            repo_type="dataset"
                        )
                        with open(content, 'r', encoding='utf-8') as f:
                            annotation = json.load(f)
                            annotations.append(annotation)
                    except Exception as e:
                        logger.error(f"Error loading annotation {file}: {e}")
            
            # Sort by timestamp (newest first)
            annotations.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
            
            # Cache the results
            self._annotations = annotations
            
            return annotations
            
        except Exception as e:
            logger.error(f"Error getting annotations: {e}")
            return []
    
    def get_annotation(self, conversation_id: str) -> Optional[Dict[str, Any]]:
        """
        Get specific annotation by conversation ID
        """
        try:
            # First check if annotations are loaded
            if self._annotations is not None:
                for annotation in self._annotations:
                    if annotation.get("conversation_id") == conversation_id:
                        return annotation
            
            # If not found in cache, try direct file access
            filename = f"{self.annotations_path}/annotation_{conversation_id}.json"
            try:
                content = self.api.hf_hub_download(
                    repo_id=self.dataset_id,
                    filename=filename,
                    repo_type="dataset"
                )
                
                with open(content, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except Exception as e:
                logger.error(f"Error loading annotation for {conversation_id}: {e}")
                return None
                
        except Exception as e:
            logger.error(f"Error getting annotation: {e}")
            return None
    
    def export_training_data(self, output_file: str, min_rating: int = 4) -> Tuple[bool, str]:
        """
        Export high-quality annotated data for fine-tuning
        
        Args:
            output_file: Path to output file
            min_rating: Minimum average rating to include in training data
            
        Returns:
            (success, message)
        """
        annotations = self.get_annotations()
        
        if not annotations:
            return False, "No annotations available for export"
        
        try:
            # Filter annotations by quality
            high_quality_examples = []
            
            for annotation in annotations:
                ratings = annotation.get("ratings", {})
                
                # Calculate average rating
                if ratings:
                    avg_rating = sum(ratings.values()) / len(ratings)
                    
                    # Include only high-quality examples
                    if avg_rating >= min_rating:
                        high_quality_examples.append({
                            "messages": [
                                {"role": "user", "content": annotation.get("question", "")},
                                {"role": "assistant", "content": annotation.get("improved_answer", "")}
                            ]
                        })
            
            if not high_quality_examples:
                return False, f"No examples meet the minimum quality threshold of {min_rating}"
            
            # Save to JSONL format
            with open(output_file, "w", encoding="utf-8") as f:
                for example in high_quality_examples:
                    f.write(json.dumps(example, ensure_ascii=False) + "\n")
            
            return True, f"Successfully exported {len(high_quality_examples)} high-quality examples for training"
        except Exception as e:
            return False, f"Error exporting training data: {str(e)}"
    
    def generate_evaluation_report(self) -> Dict[str, Any]:
        """
        Generate evaluation summary report
        
        Returns:
            Dictionary with evaluation metrics
        """
        annotations = self.get_annotations()
        
        if not annotations:
            return {
                "total_evaluations": 0,
                "message": "No evaluations available"
            }
        
        # Initialize metrics
        criteria = set()
        for annotation in annotations:
            criteria.update(annotation.get("ratings", {}).keys())
        
        metrics = {
            "total_evaluations": len(annotations),
            "criteria_averages": {},
            "overall_average": 0,
            "improvement_rate": 0  # Percentage of answers that were improved
        }
        
        # Calculate averages for each criterion
        for criterion in criteria:
            values = [a.get("ratings", {}).get(criterion, 0) for a in annotations if criterion in a.get("ratings", {})]
            if values:
                metrics["criteria_averages"][criterion] = sum(values) / len(values)
        
        # Calculate overall average
        all_ratings = []
        for annotation in annotations:
            all_ratings.extend(annotation.get("ratings", {}).values())
        
        if all_ratings:
            metrics["overall_average"] = sum(all_ratings) / len(all_ratings)
        
        # Calculate improvement rate
        improved_count = sum(1 for a in annotations if a.get("original_answer") != a.get("improved_answer"))
        metrics["improvement_rate"] = (improved_count / len(annotations)) * 100
        
        return metrics