| | import json |
| | import logging |
| | from datetime import datetime |
| | from uuid import uuid4 |
| | import requests |
| | from pathlib import Path |
| | from datasets import load_dataset, Dataset |
| | import os |
| | from huggingface_hub import CommitScheduler, HfApi |
| | import random |
| |
|
| | class ChatLogger: |
| | def __init__(self, scheduler): |
| | """Initialize the chat logger with paths and configurations""" |
| | if not scheduler: |
| | raise ValueError("Scheduler is required") |
| | |
| | self.scheduler = scheduler |
| | self.json_dataset_dir = Path(scheduler.folder_path) |
| | |
| | |
| | try: |
| | self.json_dataset_dir.mkdir(parents=True, exist_ok=True) |
| | logging.info(f"Using dataset directory at: {self.json_dataset_dir}") |
| | except Exception as e: |
| | logging.error(f"Error creating dataset directory: {str(e)}") |
| | raise |
| | |
| | self.logs_path = self.json_dataset_dir / f"logs-{uuid4()}.jsonl" |
| | logging.info(f"Log file will be created at: {self.logs_path}") |
| |
|
| | def get_client_ip(self, request=None): |
| | """Get the client IP address from the request context""" |
| | try: |
| | if request: |
| | |
| | ip = request.client.host |
| | |
| | forwarded_for = request.headers.get('X-Forwarded-For') |
| | if forwarded_for: |
| | |
| | ip = forwarded_for.split(',')[0].strip() |
| | |
| | logging.debug(f"Client IP detected: {ip}") |
| | return ip |
| | except Exception as e: |
| | logging.error(f"Error getting client IP: {e}") |
| | return "127.0.0.1" |
| |
|
| | def get_client_location(self, ip_address): |
| | """Get geolocation info using ipapi.co""" |
| | headers = { |
| | 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
| | } |
| | try: |
| | response = requests.get( |
| | f'https://ipapi.co/{ip_address}/json/', |
| | headers=headers, |
| | timeout=5 |
| | ) |
| | if response.status_code == 200: |
| | data = response.json() |
| | |
| | lat = data.get('latitude') |
| | lon = data.get('longitude') |
| | if lat is not None and lon is not None: |
| | lat += random.uniform(-0.01, 0.01) |
| | lon += random.uniform(-0.01, 0.01) |
| | |
| | return { |
| | 'city': data.get('city'), |
| | 'region': data.get('region'), |
| | 'country': data.get('country_name'), |
| | 'latitude': lat, |
| | 'longitude': lon |
| | } |
| | elif response.status_code == 429: |
| | logging.warning(f"Rate limit exceeded for IP lookup") |
| | return None |
| | else: |
| | logging.error(f"Error in IP lookup: Status code {response.status_code}") |
| | return None |
| | |
| | except requests.exceptions.RequestException as e: |
| | logging.error(f"Request failed in IP lookup: {str(e)}") |
| | return None |
| |
|
| | def create_log_entry(self, query, answer, retrieved_content, feedback=None, request=None): |
| | """Create a structured log entry with all required fields""" |
| | timestamp = datetime.now().timestamp() |
| | |
| | |
| | ip = self.get_client_ip(request) if request else None |
| | location = self.get_client_location(ip) if ip else None |
| | |
| | log_entry = { |
| | "record_id": str(uuid4()), |
| | "session_id": str(uuid4()), |
| | "time": str(timestamp), |
| | "client_location": location, |
| | "question": query, |
| | "answer": answer, |
| | "retrieved_content": retrieved_content if isinstance(retrieved_content, list) else [retrieved_content], |
| | "feedback": feedback |
| | } |
| | |
| | return log_entry |
| |
|
| | def cleanup_local_files(self): |
| | """Delete local JSON files after successful upload""" |
| | try: |
| | |
| | for file in self.json_dataset_dir.glob("*.json*"): |
| | try: |
| | file.unlink() |
| | logging.info(f"Deleted local file: {file}") |
| | except Exception as e: |
| | logging.error(f"Error deleting file {file}: {e}") |
| | |
| | |
| | if not any(self.json_dataset_dir.iterdir()): |
| | self.json_dataset_dir.rmdir() |
| | logging.info("Removed empty json_dataset directory") |
| | except Exception as e: |
| | logging.error(f"Error in cleanup: {e}") |
| |
|
| | def save_local(self, log_entry): |
| | """Save log entry to local JSONL file""" |
| | try: |
| | |
| | self.logs_path.parent.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | field_order = [ |
| | "record_id", |
| | "session_id", |
| | "time", |
| | "client_location", |
| | "question", |
| | "answer", |
| | "retrieved_content", |
| | "feedback" |
| | ] |
| | ordered_logs = {k: log_entry.get(k) for k in field_order if k in log_entry} |
| | |
| | with self.scheduler.lock: |
| | with open(self.logs_path, 'a') as f: |
| | json.dump(ordered_logs, f) |
| | f.write('\n') |
| | logging.info(f"Log entry saved to {self.logs_path}") |
| | |
| | |
| | self.cleanup_local_files() |
| | return True |
| | except Exception as e: |
| | logging.error(f"Error saving to local file: {str(e)}") |
| | return False |
| |
|
| | def log(self, query, answer, retrieved_content, feedback=None, request=None): |
| | """Main logging method that handles both local and HF storage""" |
| | |
| | log_entry = self.create_log_entry( |
| | query=query, |
| | answer=answer, |
| | retrieved_content=retrieved_content, |
| | feedback=feedback, |
| | request=request |
| | ) |
| | logging.info("Logging results completed") |
| | |
| | return self.save_local(log_entry) |
| |
|