status-law-gbot / repair-script.py
Rulga's picture
Refactor app layout and add repair script for conversation IDs
a76b710
#!/usr/bin/env python
# repair_conversation_ids.py
"""
Script to restore empty conversation_ids in chat history files.
One-time operation with hardcoded paths.
"""
import os
import sys
import json
import codecs
import datetime
import logging
import tempfile
from huggingface_hub import HfApi
from dotenv import load_dotenv
import time
from tenacity import retry, stop_after_attempt, wait_exponential
# Load environment variables
load_dotenv()
# PATHS AND PARAMETERS CONFIGURATION
# =============================
# Modify these values according to your configuration
CHAT_HISTORY_PATH = './chat_history' # Path to local chat history files
DATASET_ID = 'Rulga/status-law-knowledge-base' # HuggingFace dataset ID
HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # HuggingFace API access token
if not HF_TOKEN:
raise ValueError("HUGGINGFACE_TOKEN not found in environment variables")
# Dataset paths
DATASET_CHAT_HISTORY_PATH = "chat_history"
DATASET_VECTOR_STORE_PATH = "vector_store"
DATASET_FINE_TUNED_PATH = "fine_tuned_models"
DATASET_ANNOTATIONS_PATH = "annotations"
DATASET_ERROR_LOGS_PATH = "error_logs"
DATASET_PREFERENCES_PATH = "preferences/user_preferences.json"
# If True, script won't make actual changes (test mode)
DRY_RUN = False
# If True, script will update only local files
LOCAL_ONLY = False
# Add temporary directory for downloads
TEMP_DIR = tempfile.mkdtemp()
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("repair_conversation_ids.log", encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Configure stdout encoding
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=60, min=60, max=180)
)
def safe_api_call(func, *args, **kwargs):
"""Wrapper for API calls with retry logic"""
try:
return func(*args, **kwargs)
except Exception as e:
if "429 Client Error: Too Many Requests" in str(e):
logger.warning("Rate limit hit, waiting before retry...")
raise # Let retry handle it
raise # Other errors
def repair_conversation_ids():
"""
Restore conversation_ids in chat history files directly in HuggingFace dataset
"""
try:
api = HfApi(token=HF_TOKEN)
# List all files with retry
files = safe_api_call(
api.list_repo_files,
repo_id=DATASET_ID,
repo_type="dataset"
)
chat_files = [f for f in files
if f.startswith(DATASET_CHAT_HISTORY_PATH) and
f.endswith('.json') and
os.path.basename(f).startswith('None_')]
logger.info(f"Found {len(chat_files)} files with 'None_' prefix in dataset")
repaired_count = 0
skipped_count = 0
error_count = 0
for file_path in chat_files:
try:
# Add delay between files
time.sleep(2) # 2 seconds between files
# Download file content with retry
file_content = safe_api_call(
api.hf_hub_download,
repo_id=DATASET_ID,
repo_type="dataset",
filename=file_path,
local_dir=TEMP_DIR,
local_dir_use_symlinks=False
)
with open(file_content, 'r', encoding='utf-8') as f:
chat_data = json.load(f)
# Generate new ID based on timestamp and file details
timestamp_str = chat_data.get('timestamp', '')
try:
timestamp_dt = datetime.datetime.fromisoformat(timestamp_str)
time_part = timestamp_dt.strftime('%Y%m%d%H%M%S')
except (ValueError, TypeError):
time_part = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
filename = os.path.basename(file_path)
filename_part = os.path.splitext(filename)[0].replace('None_', '')
if len(filename_part) > 10:
filename_part = filename_part[:10]
new_id = f"conv_{time_part}_{filename_part}"
chat_data['conversation_id'] = new_id
if not DRY_RUN:
# Create new filename without None_ prefix
new_filename = filename.replace('None_', '')
new_path = os.path.join(
os.path.dirname(file_path),
new_filename
)
# First move the old file to archive
archive_timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
archive_filename = f"archive/None_{archive_timestamp}_{filename}"
archive_path = os.path.join(DATASET_CHAT_HISTORY_PATH, archive_filename)
# Create archive directory if it doesn't exist
try:
api.upload_file(
path_or_fileobj=b"",
path_in_repo=f"{DATASET_CHAT_HISTORY_PATH}/archive/.gitkeep",
repo_id=DATASET_ID,
repo_type="dataset"
)
except Exception:
pass # Directory might already exist
# Move old file to archive with retry
safe_api_call(
api.upload_file,
path_or_fileobj=file_content,
path_in_repo=archive_path,
repo_id=DATASET_ID,
repo_type="dataset"
)
# Upload updated content with retry
json_content = json.dumps(chat_data, ensure_ascii=False, indent=2)
safe_api_call(
api.upload_file,
path_or_fileobj=json_content.encode('utf-8'),
path_in_repo=new_path,
repo_id=DATASET_ID,
repo_type="dataset"
)
# Only after successful upload of both files, delete the original with retry
safe_api_call(
api.delete_file,
path_in_repo=file_path,
repo_id=DATASET_ID,
repo_type="dataset"
)
logger.info(f"Repaired: {filename} -> {new_filename} (archived as {archive_filename}) - New ID: {new_id}")
repaired_count += 1
except Exception as e:
logger.error(f"Error processing {file_path}: {str(e)}")
error_count += 1
continue # Skip to next file on error
logger.info(f"Repair completed: {repaired_count} files repaired, {skipped_count} skipped, {error_count} errors")
return repaired_count
except Exception as e:
logger.error(f"Error accessing dataset: {str(e)}")
return 0
if __name__ == "__main__":
# Display configuration information
logger.info("=== CONFIGURATION ===")
logger.info(f"Chat history path: {CHAT_HISTORY_PATH}")
logger.info(f"Dataset ID: {DATASET_ID}")
logger.info(f"Test mode: {'Yes' if DRY_RUN else 'No'}")
logger.info(f"Local only: {'Yes' if LOCAL_ONLY else 'No'}")
logger.info("==================")
# Start repair process
repaired = repair_conversation_ids()
if DRY_RUN:
logger.info(f"TEST MODE: Would have repaired {repaired} files")
else:
logger.info(f"Successfully repaired {repaired} files")