"""Chat logging module with local JSONL storage and optional HuggingFace upload.""" import os import json import threading import atexit from queue import Queue from datetime import datetime from pathlib import Path from typing import Any, Optional try: from huggingface_hub import HfApi HF_AVAILABLE = True except ImportError: HF_AVAILABLE = False # Configuration HF_TOKEN = os.environ.get("HF_TOKEN") DATASET_REPO_ID = os.environ.get("HF_DATASET_REPO") LOCAL_LOG_DIR = Path("logs") LOCAL_LOG_FILE = LOCAL_LOG_DIR / "chat_logs.jsonl" UPLOAD_INTERVAL_SECONDS = 300 # 5 minutes # Ensure log directory exists LOCAL_LOG_DIR.mkdir(exist_ok=True) # Thread-safe queue for async logging _log_queue: Queue = Queue() _shutdown_event = threading.Event() _upload_lock = threading.Lock() _worker_thread_started = False def log_chat( session_id: str, model_name: str, thought: str, thought_number: int, total_thoughts: int, metadata: Optional[dict] = None ) -> None: """Queue a thought log entry for async processing.""" _log_queue.put({ "session_id": session_id, "model_name": model_name, "thought": thought, "thought_number": thought_number, "total_thoughts": total_thoughts, "metadata": metadata or {}, "timestamp": datetime.now().isoformat() }) def _write_to_jsonl(entry: dict) -> None: """Append a log entry to the local JSONL file.""" try: with open(LOCAL_LOG_FILE, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") except Exception as e: print(f"Error writing to JSONL: {e}") def _worker_thread(): """Background worker that processes the log queue.""" while not _shutdown_event.is_set(): try: entry = _log_queue.get(timeout=1.0) _write_to_jsonl(entry) _log_queue.task_done() except Exception: continue def _upload_to_huggingface() -> bool: """Upload local JSONL to HuggingFace dataset (if configured).""" if not HF_AVAILABLE or not HF_TOKEN or not DATASET_REPO_ID: return False with _upload_lock: if not LOCAL_LOG_FILE.exists(): return False try: api = HfApi(token=HF_TOKEN) # Ensure repo exists try: api.repo_info(repo_id=DATASET_REPO_ID, repo_type="dataset") except Exception: api.create_repo( repo_id=DATASET_REPO_ID, repo_type="dataset", private=True ) # Upload with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") api.upload_file( path_or_fileobj=str(LOCAL_LOG_FILE), path_in_repo=f"logs/chat_logs_{timestamp}.jsonl", repo_id=DATASET_REPO_ID, repo_type="dataset", commit_message=f"Chat logs upload {timestamp}" ) # Clear local file after successful upload LOCAL_LOG_FILE.unlink() LOCAL_LOG_FILE.touch() return True except Exception as e: print(f"HuggingFace upload failed: {e}") return False def _upload_timer_thread(): """Periodic upload thread.""" while not _shutdown_event.is_set(): _shutdown_event.wait(timeout=UPLOAD_INTERVAL_SECONDS) if not _shutdown_event.is_set(): _upload_to_huggingface() def shutdown_logger(): """Gracefully shutdown logger and upload remaining logs.""" _shutdown_event.set() _log_queue.join() # Wait for queue to empty _upload_to_huggingface() # Final upload def _ensure_threads_started(): """Start background threads if not already started.""" global _worker_thread_started if not _worker_thread_started: _worker_thread_started = True threading.Thread(target=_worker_thread, daemon=True).start() if HF_AVAILABLE and HF_TOKEN and DATASET_REPO_ID: threading.Thread(target=_upload_timer_thread, daemon=True).start() atexit.register(shutdown_logger) # Ensure threads start when module is imported _ensure_threads_started()