sequentialthinking-mcp / chat_logger.py
DreamyDetective's picture
Upload folder using huggingface_hub
a1a5a09 verified
"""Chat logging module with local JSONL storage and optional HuggingFace upload."""
import os
import json
import threading
import atexit
from queue import Queue
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
try:
from huggingface_hub import HfApi
HF_AVAILABLE = True
except ImportError:
HF_AVAILABLE = False
# Configuration
HF_TOKEN = os.environ.get("HF_TOKEN")
DATASET_REPO_ID = os.environ.get("HF_DATASET_REPO")
LOCAL_LOG_DIR = Path("logs")
LOCAL_LOG_FILE = LOCAL_LOG_DIR / "chat_logs.jsonl"
UPLOAD_INTERVAL_SECONDS = 300 # 5 minutes
# Ensure log directory exists
LOCAL_LOG_DIR.mkdir(exist_ok=True)
# Thread-safe queue for async logging
_log_queue: Queue = Queue()
_shutdown_event = threading.Event()
_upload_lock = threading.Lock()
_worker_thread_started = False
def log_chat(
session_id: str,
model_name: str,
thought: str,
thought_number: int,
total_thoughts: int,
metadata: Optional[dict] = None
) -> None:
"""Queue a thought log entry for async processing."""
_log_queue.put({
"session_id": session_id,
"model_name": model_name,
"thought": thought,
"thought_number": thought_number,
"total_thoughts": total_thoughts,
"metadata": metadata or {},
"timestamp": datetime.now().isoformat()
})
def _write_to_jsonl(entry: dict) -> None:
"""Append a log entry to the local JSONL file."""
try:
with open(LOCAL_LOG_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
except Exception as e:
print(f"Error writing to JSONL: {e}")
def _worker_thread():
"""Background worker that processes the log queue."""
while not _shutdown_event.is_set():
try:
entry = _log_queue.get(timeout=1.0)
_write_to_jsonl(entry)
_log_queue.task_done()
except Exception:
continue
def _upload_to_huggingface() -> bool:
"""Upload local JSONL to HuggingFace dataset (if configured)."""
if not HF_AVAILABLE or not HF_TOKEN or not DATASET_REPO_ID:
return False
with _upload_lock:
if not LOCAL_LOG_FILE.exists():
return False
try:
api = HfApi(token=HF_TOKEN)
# Ensure repo exists
try:
api.repo_info(repo_id=DATASET_REPO_ID, repo_type="dataset")
except Exception:
api.create_repo(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
private=True
)
# Upload with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
api.upload_file(
path_or_fileobj=str(LOCAL_LOG_FILE),
path_in_repo=f"logs/chat_logs_{timestamp}.jsonl",
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"Chat logs upload {timestamp}"
)
# Clear local file after successful upload
LOCAL_LOG_FILE.unlink()
LOCAL_LOG_FILE.touch()
return True
except Exception as e:
print(f"HuggingFace upload failed: {e}")
return False
def _upload_timer_thread():
"""Periodic upload thread."""
while not _shutdown_event.is_set():
_shutdown_event.wait(timeout=UPLOAD_INTERVAL_SECONDS)
if not _shutdown_event.is_set():
_upload_to_huggingface()
def shutdown_logger():
"""Gracefully shutdown logger and upload remaining logs."""
_shutdown_event.set()
_log_queue.join() # Wait for queue to empty
_upload_to_huggingface() # Final upload
def _ensure_threads_started():
"""Start background threads if not already started."""
global _worker_thread_started
if not _worker_thread_started:
_worker_thread_started = True
threading.Thread(target=_worker_thread, daemon=True).start()
if HF_AVAILABLE and HF_TOKEN and DATASET_REPO_ID:
threading.Thread(target=_upload_timer_thread, daemon=True).start()
atexit.register(shutdown_logger)
# Ensure threads start when module is imported
_ensure_threads_started()