File size: 4,260 Bytes
a1a5a09 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | """Chat logging module with local JSONL storage and optional HuggingFace upload."""
import os
import json
import threading
import atexit
from queue import Queue
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
try:
from huggingface_hub import HfApi
HF_AVAILABLE = True
except ImportError:
HF_AVAILABLE = False
# Configuration
HF_TOKEN = os.environ.get("HF_TOKEN")
DATASET_REPO_ID = os.environ.get("HF_DATASET_REPO")
LOCAL_LOG_DIR = Path("logs")
LOCAL_LOG_FILE = LOCAL_LOG_DIR / "chat_logs.jsonl"
UPLOAD_INTERVAL_SECONDS = 300 # 5 minutes
# Ensure log directory exists
LOCAL_LOG_DIR.mkdir(exist_ok=True)
# Thread-safe queue for async logging
_log_queue: Queue = Queue()
_shutdown_event = threading.Event()
_upload_lock = threading.Lock()
_worker_thread_started = False
def log_chat(
session_id: str,
model_name: str,
thought: str,
thought_number: int,
total_thoughts: int,
metadata: Optional[dict] = None
) -> None:
"""Queue a thought log entry for async processing."""
_log_queue.put({
"session_id": session_id,
"model_name": model_name,
"thought": thought,
"thought_number": thought_number,
"total_thoughts": total_thoughts,
"metadata": metadata or {},
"timestamp": datetime.now().isoformat()
})
def _write_to_jsonl(entry: dict) -> None:
"""Append a log entry to the local JSONL file."""
try:
with open(LOCAL_LOG_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
except Exception as e:
print(f"Error writing to JSONL: {e}")
def _worker_thread():
"""Background worker that processes the log queue."""
while not _shutdown_event.is_set():
try:
entry = _log_queue.get(timeout=1.0)
_write_to_jsonl(entry)
_log_queue.task_done()
except Exception:
continue
def _upload_to_huggingface() -> bool:
"""Upload local JSONL to HuggingFace dataset (if configured)."""
if not HF_AVAILABLE or not HF_TOKEN or not DATASET_REPO_ID:
return False
with _upload_lock:
if not LOCAL_LOG_FILE.exists():
return False
try:
api = HfApi(token=HF_TOKEN)
# Ensure repo exists
try:
api.repo_info(repo_id=DATASET_REPO_ID, repo_type="dataset")
except Exception:
api.create_repo(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
private=True
)
# Upload with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
api.upload_file(
path_or_fileobj=str(LOCAL_LOG_FILE),
path_in_repo=f"logs/chat_logs_{timestamp}.jsonl",
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"Chat logs upload {timestamp}"
)
# Clear local file after successful upload
LOCAL_LOG_FILE.unlink()
LOCAL_LOG_FILE.touch()
return True
except Exception as e:
print(f"HuggingFace upload failed: {e}")
return False
def _upload_timer_thread():
"""Periodic upload thread."""
while not _shutdown_event.is_set():
_shutdown_event.wait(timeout=UPLOAD_INTERVAL_SECONDS)
if not _shutdown_event.is_set():
_upload_to_huggingface()
def shutdown_logger():
"""Gracefully shutdown logger and upload remaining logs."""
_shutdown_event.set()
_log_queue.join() # Wait for queue to empty
_upload_to_huggingface() # Final upload
def _ensure_threads_started():
"""Start background threads if not already started."""
global _worker_thread_started
if not _worker_thread_started:
_worker_thread_started = True
threading.Thread(target=_worker_thread, daemon=True).start()
if HF_AVAILABLE and HF_TOKEN and DATASET_REPO_ID:
threading.Thread(target=_upload_timer_thread, daemon=True).start()
atexit.register(shutdown_logger)
# Ensure threads start when module is imported
_ensure_threads_started()
|