File size: 4,260 Bytes
a1a5a09
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""Chat logging module with local JSONL storage and optional HuggingFace upload."""

import os
import json
import threading
import atexit
from queue import Queue
from datetime import datetime
from pathlib import Path
from typing import Any, Optional

try:
    from huggingface_hub import HfApi
    HF_AVAILABLE = True
except ImportError:
    HF_AVAILABLE = False

# Configuration
HF_TOKEN = os.environ.get("HF_TOKEN")
DATASET_REPO_ID = os.environ.get("HF_DATASET_REPO")
LOCAL_LOG_DIR = Path("logs")
LOCAL_LOG_FILE = LOCAL_LOG_DIR / "chat_logs.jsonl"
UPLOAD_INTERVAL_SECONDS = 300  # 5 minutes

# Ensure log directory exists
LOCAL_LOG_DIR.mkdir(exist_ok=True)

# Thread-safe queue for async logging
_log_queue: Queue = Queue()
_shutdown_event = threading.Event()
_upload_lock = threading.Lock()
_worker_thread_started = False


def log_chat(
    session_id: str,
    model_name: str,
    thought: str,
    thought_number: int,
    total_thoughts: int,
    metadata: Optional[dict] = None
) -> None:
    """Queue a thought log entry for async processing."""
    _log_queue.put({
        "session_id": session_id,
        "model_name": model_name,
        "thought": thought,
        "thought_number": thought_number,
        "total_thoughts": total_thoughts,
        "metadata": metadata or {},
        "timestamp": datetime.now().isoformat()
    })


def _write_to_jsonl(entry: dict) -> None:
    """Append a log entry to the local JSONL file."""
    try:
        with open(LOCAL_LOG_FILE, "a", encoding="utf-8") as f:
            f.write(json.dumps(entry, ensure_ascii=False) + "\n")
    except Exception as e:
        print(f"Error writing to JSONL: {e}")


def _worker_thread():
    """Background worker that processes the log queue."""
    while not _shutdown_event.is_set():
        try:
            entry = _log_queue.get(timeout=1.0)
            _write_to_jsonl(entry)
            _log_queue.task_done()
        except Exception:
            continue


def _upload_to_huggingface() -> bool:
    """Upload local JSONL to HuggingFace dataset (if configured)."""
    if not HF_AVAILABLE or not HF_TOKEN or not DATASET_REPO_ID:
        return False

    with _upload_lock:
        if not LOCAL_LOG_FILE.exists():
            return False

        try:
            api = HfApi(token=HF_TOKEN)

            # Ensure repo exists
            try:
                api.repo_info(repo_id=DATASET_REPO_ID, repo_type="dataset")
            except Exception:
                api.create_repo(
                    repo_id=DATASET_REPO_ID,
                    repo_type="dataset",
                    private=True
                )

            # Upload with timestamp
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            api.upload_file(
                path_or_fileobj=str(LOCAL_LOG_FILE),
                path_in_repo=f"logs/chat_logs_{timestamp}.jsonl",
                repo_id=DATASET_REPO_ID,
                repo_type="dataset",
                commit_message=f"Chat logs upload {timestamp}"
            )

            # Clear local file after successful upload
            LOCAL_LOG_FILE.unlink()
            LOCAL_LOG_FILE.touch()
            return True
        except Exception as e:
            print(f"HuggingFace upload failed: {e}")
            return False


def _upload_timer_thread():
    """Periodic upload thread."""
    while not _shutdown_event.is_set():
        _shutdown_event.wait(timeout=UPLOAD_INTERVAL_SECONDS)
        if not _shutdown_event.is_set():
            _upload_to_huggingface()


def shutdown_logger():
    """Gracefully shutdown logger and upload remaining logs."""
    _shutdown_event.set()
    _log_queue.join()  # Wait for queue to empty
    _upload_to_huggingface()  # Final upload


def _ensure_threads_started():
    """Start background threads if not already started."""
    global _worker_thread_started
    if not _worker_thread_started:
        _worker_thread_started = True
        threading.Thread(target=_worker_thread, daemon=True).start()
        if HF_AVAILABLE and HF_TOKEN and DATASET_REPO_ID:
            threading.Thread(target=_upload_timer_thread, daemon=True).start()
        atexit.register(shutdown_logger)


# Ensure threads start when module is imported
_ensure_threads_started()