Spaces:
Running
Running
| #!/usr/bin/env python | |
| # repair_conversation_ids.py | |
| """ | |
| Script to restore empty conversation_ids in chat history files. | |
| One-time operation with hardcoded paths. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import codecs | |
| import datetime | |
| import logging | |
| import tempfile | |
| from huggingface_hub import HfApi | |
| from dotenv import load_dotenv | |
| import time | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| # Load environment variables | |
| load_dotenv() | |
| # PATHS AND PARAMETERS CONFIGURATION | |
| # ============================= | |
| # Modify these values according to your configuration | |
| CHAT_HISTORY_PATH = './chat_history' # Path to local chat history files | |
| DATASET_ID = 'Rulga/status-law-knowledge-base' # HuggingFace dataset ID | |
| HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # HuggingFace API access token | |
| if not HF_TOKEN: | |
| raise ValueError("HUGGINGFACE_TOKEN not found in environment variables") | |
| # Dataset paths | |
| DATASET_CHAT_HISTORY_PATH = "chat_history" | |
| DATASET_VECTOR_STORE_PATH = "vector_store" | |
| DATASET_FINE_TUNED_PATH = "fine_tuned_models" | |
| DATASET_ANNOTATIONS_PATH = "annotations" | |
| DATASET_ERROR_LOGS_PATH = "error_logs" | |
| DATASET_PREFERENCES_PATH = "preferences/user_preferences.json" | |
| # If True, script won't make actual changes (test mode) | |
| DRY_RUN = False | |
| # If True, script will update only local files | |
| LOCAL_ONLY = False | |
| # Add temporary directory for downloads | |
| TEMP_DIR = tempfile.mkdtemp() | |
| # Logging configuration | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("repair_conversation_ids.log", encoding='utf-8'), | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Configure stdout encoding | |
| sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer) | |
| def safe_api_call(func, *args, **kwargs): | |
| """Wrapper for API calls with retry logic""" | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| if "429 Client Error: Too Many Requests" in str(e): | |
| logger.warning("Rate limit hit, waiting before retry...") | |
| raise # Let retry handle it | |
| raise # Other errors | |
| def repair_conversation_ids(): | |
| """ | |
| Restore conversation_ids in chat history files directly in HuggingFace dataset | |
| """ | |
| try: | |
| api = HfApi(token=HF_TOKEN) | |
| # List all files with retry | |
| files = safe_api_call( | |
| api.list_repo_files, | |
| repo_id=DATASET_ID, | |
| repo_type="dataset" | |
| ) | |
| chat_files = [f for f in files | |
| if f.startswith(DATASET_CHAT_HISTORY_PATH) and | |
| f.endswith('.json') and | |
| os.path.basename(f).startswith('None_')] | |
| logger.info(f"Found {len(chat_files)} files with 'None_' prefix in dataset") | |
| repaired_count = 0 | |
| skipped_count = 0 | |
| error_count = 0 | |
| for file_path in chat_files: | |
| try: | |
| # Add delay between files | |
| time.sleep(2) # 2 seconds between files | |
| # Download file content with retry | |
| file_content = safe_api_call( | |
| api.hf_hub_download, | |
| repo_id=DATASET_ID, | |
| repo_type="dataset", | |
| filename=file_path, | |
| local_dir=TEMP_DIR, | |
| local_dir_use_symlinks=False | |
| ) | |
| with open(file_content, 'r', encoding='utf-8') as f: | |
| chat_data = json.load(f) | |
| # Generate new ID based on timestamp and file details | |
| timestamp_str = chat_data.get('timestamp', '') | |
| try: | |
| timestamp_dt = datetime.datetime.fromisoformat(timestamp_str) | |
| time_part = timestamp_dt.strftime('%Y%m%d%H%M%S') | |
| except (ValueError, TypeError): | |
| time_part = datetime.datetime.now().strftime('%Y%m%d%H%M%S') | |
| filename = os.path.basename(file_path) | |
| filename_part = os.path.splitext(filename)[0].replace('None_', '') | |
| if len(filename_part) > 10: | |
| filename_part = filename_part[:10] | |
| new_id = f"conv_{time_part}_{filename_part}" | |
| chat_data['conversation_id'] = new_id | |
| if not DRY_RUN: | |
| # Create new filename without None_ prefix | |
| new_filename = filename.replace('None_', '') | |
| new_path = os.path.join( | |
| os.path.dirname(file_path), | |
| new_filename | |
| ) | |
| # First move the old file to archive | |
| archive_timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') | |
| archive_filename = f"archive/None_{archive_timestamp}_{filename}" | |
| archive_path = os.path.join(DATASET_CHAT_HISTORY_PATH, archive_filename) | |
| # Create archive directory if it doesn't exist | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=b"", | |
| path_in_repo=f"{DATASET_CHAT_HISTORY_PATH}/archive/.gitkeep", | |
| repo_id=DATASET_ID, | |
| repo_type="dataset" | |
| ) | |
| except Exception: | |
| pass # Directory might already exist | |
| # Move old file to archive with retry | |
| safe_api_call( | |
| api.upload_file, | |
| path_or_fileobj=file_content, | |
| path_in_repo=archive_path, | |
| repo_id=DATASET_ID, | |
| repo_type="dataset" | |
| ) | |
| # Upload updated content with retry | |
| json_content = json.dumps(chat_data, ensure_ascii=False, indent=2) | |
| safe_api_call( | |
| api.upload_file, | |
| path_or_fileobj=json_content.encode('utf-8'), | |
| path_in_repo=new_path, | |
| repo_id=DATASET_ID, | |
| repo_type="dataset" | |
| ) | |
| # Only after successful upload of both files, delete the original with retry | |
| safe_api_call( | |
| api.delete_file, | |
| path_in_repo=file_path, | |
| repo_id=DATASET_ID, | |
| repo_type="dataset" | |
| ) | |
| logger.info(f"Repaired: {filename} -> {new_filename} (archived as {archive_filename}) - New ID: {new_id}") | |
| repaired_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error processing {file_path}: {str(e)}") | |
| error_count += 1 | |
| continue # Skip to next file on error | |
| logger.info(f"Repair completed: {repaired_count} files repaired, {skipped_count} skipped, {error_count} errors") | |
| return repaired_count | |
| except Exception as e: | |
| logger.error(f"Error accessing dataset: {str(e)}") | |
| return 0 | |
| if __name__ == "__main__": | |
| # Display configuration information | |
| logger.info("=== CONFIGURATION ===") | |
| logger.info(f"Chat history path: {CHAT_HISTORY_PATH}") | |
| logger.info(f"Dataset ID: {DATASET_ID}") | |
| logger.info(f"Test mode: {'Yes' if DRY_RUN else 'No'}") | |
| logger.info(f"Local only: {'Yes' if LOCAL_ONLY else 'No'}") | |
| logger.info("==================") | |
| # Start repair process | |
| repaired = repair_conversation_ids() | |
| if DRY_RUN: | |
| logger.info(f"TEST MODE: Would have repaired {repaired} files") | |
| else: | |
| logger.info(f"Successfully repaired {repaired} files") | |