| """Chat logging module with local JSONL storage and optional HuggingFace upload.""" |
|
|
| import os |
| import json |
| import threading |
| import atexit |
| from queue import Queue |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Optional |
|
|
| try: |
| from huggingface_hub import HfApi |
| HF_AVAILABLE = True |
| except ImportError: |
| HF_AVAILABLE = False |
|
|
| |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| DATASET_REPO_ID = os.environ.get("HF_DATASET_REPO") |
| LOCAL_LOG_DIR = Path("logs") |
| LOCAL_LOG_FILE = LOCAL_LOG_DIR / "chat_logs.jsonl" |
| UPLOAD_INTERVAL_SECONDS = 300 |
|
|
| |
| LOCAL_LOG_DIR.mkdir(exist_ok=True) |
|
|
| |
| _log_queue: Queue = Queue() |
| _shutdown_event = threading.Event() |
| _upload_lock = threading.Lock() |
| _worker_thread_started = False |
|
|
|
|
| def log_chat( |
| session_id: str, |
| model_name: str, |
| thought: str, |
| thought_number: int, |
| total_thoughts: int, |
| metadata: Optional[dict] = None |
| ) -> None: |
| """Queue a thought log entry for async processing.""" |
| _log_queue.put({ |
| "session_id": session_id, |
| "model_name": model_name, |
| "thought": thought, |
| "thought_number": thought_number, |
| "total_thoughts": total_thoughts, |
| "metadata": metadata or {}, |
| "timestamp": datetime.now().isoformat() |
| }) |
|
|
|
|
| def _write_to_jsonl(entry: dict) -> None: |
| """Append a log entry to the local JSONL file.""" |
| try: |
| with open(LOCAL_LOG_FILE, "a", encoding="utf-8") as f: |
| f.write(json.dumps(entry, ensure_ascii=False) + "\n") |
| except Exception as e: |
| print(f"Error writing to JSONL: {e}") |
|
|
|
|
| def _worker_thread(): |
| """Background worker that processes the log queue.""" |
| while not _shutdown_event.is_set(): |
| try: |
| entry = _log_queue.get(timeout=1.0) |
| _write_to_jsonl(entry) |
| _log_queue.task_done() |
| except Exception: |
| continue |
|
|
|
|
| def _upload_to_huggingface() -> bool: |
| """Upload local JSONL to HuggingFace dataset (if configured).""" |
| if not HF_AVAILABLE or not HF_TOKEN or not DATASET_REPO_ID: |
| return False |
|
|
| with _upload_lock: |
| if not LOCAL_LOG_FILE.exists(): |
| return False |
|
|
| try: |
| api = HfApi(token=HF_TOKEN) |
|
|
| |
| try: |
| api.repo_info(repo_id=DATASET_REPO_ID, repo_type="dataset") |
| except Exception: |
| api.create_repo( |
| repo_id=DATASET_REPO_ID, |
| repo_type="dataset", |
| private=True |
| ) |
|
|
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| api.upload_file( |
| path_or_fileobj=str(LOCAL_LOG_FILE), |
| path_in_repo=f"logs/chat_logs_{timestamp}.jsonl", |
| repo_id=DATASET_REPO_ID, |
| repo_type="dataset", |
| commit_message=f"Chat logs upload {timestamp}" |
| ) |
|
|
| |
| LOCAL_LOG_FILE.unlink() |
| LOCAL_LOG_FILE.touch() |
| return True |
| except Exception as e: |
| print(f"HuggingFace upload failed: {e}") |
| return False |
|
|
|
|
| def _upload_timer_thread(): |
| """Periodic upload thread.""" |
| while not _shutdown_event.is_set(): |
| _shutdown_event.wait(timeout=UPLOAD_INTERVAL_SECONDS) |
| if not _shutdown_event.is_set(): |
| _upload_to_huggingface() |
|
|
|
|
| def shutdown_logger(): |
| """Gracefully shutdown logger and upload remaining logs.""" |
| _shutdown_event.set() |
| _log_queue.join() |
| _upload_to_huggingface() |
|
|
|
|
| def _ensure_threads_started(): |
| """Start background threads if not already started.""" |
| global _worker_thread_started |
| if not _worker_thread_started: |
| _worker_thread_started = True |
| threading.Thread(target=_worker_thread, daemon=True).start() |
| if HF_AVAILABLE and HF_TOKEN and DATASET_REPO_ID: |
| threading.Thread(target=_upload_timer_thread, daemon=True).start() |
| atexit.register(shutdown_logger) |
|
|
|
|
| |
| _ensure_threads_started() |
|
|