#!/usr/bin/env python3 """ Upload Consolidated Gold Tables to HuggingFace Uploads the consolidated gold parquet files (21 files) to HuggingFace Datasets. Works with the new consolidated structure where state data is combined with a 'state' column instead of separate state directories. Usage: python scripts/huggingface/upload_consolidated_gold.py python scripts/huggingface/upload_consolidated_gold.py --private python scripts/huggingface/upload_consolidated_gold.py --file bills_bills.parquet """ import os import sys from pathlib import Path from datetime import datetime import pandas as pd from huggingface_hub import HfApi, create_repo from datasets import Dataset from loguru import logger from dotenv import load_dotenv import argparse # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) # Load environment variables load_dotenv() # Configuration HUGGINGFACE_TOKEN = os.getenv('HUGGINGFACE_TOKEN') HF_ORGANIZATION = os.getenv('HF_ORGANIZATION', 'CommunityOne') HF_DATASET_PREFIX = os.getenv('HF_DATASET_PREFIX', 'one') # Path to consolidated gold directory GOLD_DIR = Path("data/gold") def get_dataset_name(file_stem: str) -> str: """ Convert file stem to HuggingFace dataset name. Examples: bills_bills -> bills nonprofits_organizations -> nonprofits-organizations event_documents -> event-documents """ # Special cases - simplify some names simplifications = { 'bills_bills': 'bills', 'bills_bill_actions': 'bill-actions', 'bills_bill_sponsorships': 'bill-sponsorships', 'contacts_officials': 'officials', 'contacts_local_officials': 'local-officials', 'event_participants': 'event-participants', 'event_documents': 'event-documents', } if file_stem in simplifications: return simplifications[file_stem] # Default: replace underscores with hyphens return file_stem.replace('_', '-') def upload_parquet_to_hf( file_path: Path, api: HfApi, private: bool = False, max_rows: int = None ) -> dict: """ Upload a single parquet file to HuggingFace. Args: file_path: Path to parquet file api: HuggingFace API instance private: Whether to make the dataset private max_rows: Optional limit on rows to upload (for testing large files) Returns: Dict with upload results """ if not file_path.exists(): logger.warning(f"āš ļø Skipping {file_path.name} - file not found") return {"error": "File not found"} # Get dataset name dataset_name = get_dataset_name(file_path.stem) repo_id = f"{HF_ORGANIZATION}/{HF_DATASET_PREFIX}-{dataset_name}" # Get file size size_mb = file_path.stat().st_size / (1024 * 1024) logger.info(f"šŸ“¤ Uploading {file_path.name} ({size_mb:.1f} MB) to {repo_id}...") try: # Load parquet file df = pd.read_parquet(file_path) original_rows = len(df) # Limit rows if requested (for testing) if max_rows and len(df) > max_rows: logger.info(f" Limiting to {max_rows:,} rows (testing mode)") df = df.head(max_rows) logger.info(f" Loaded {len(df):,} records with {len(df.columns)} columns") logger.debug(f" Columns: {', '.join(df.columns.tolist()[:10])}...") # Check if state column exists (for consolidated state files) if 'state' in df.columns: states = df['state'].unique() logger.info(f" States: {', '.join(sorted(states))}") # Create HuggingFace dataset dataset = Dataset.from_pandas(df) # Create repo if it doesn't exist try: create_repo( repo_id=repo_id, repo_type="dataset", private=private, exist_ok=True, token=HUGGINGFACE_TOKEN ) logger.debug(f" Created/verified repo: {repo_id}") except Exception as e: logger.debug(f" Repo handling: {e}") # Push to hub dataset.push_to_hub( repo_id=repo_id, private=private, commit_message=f"Upload consolidated gold table - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", token=HUGGINGFACE_TOKEN ) url = f"https://huggingface.co/datasets/{repo_id}" logger.success(f" āœ… Uploaded {len(df):,} records to {url}") return { "file": file_path.name, "repo_id": repo_id, "url": url, "records": len(df), "original_records": original_rows, "size_mb": size_mb, "columns": len(df.columns) } except Exception as e: logger.error(f" āŒ Upload failed: {e}") import traceback traceback.print_exc() return { "file": file_path.name, "error": str(e) } def main(): """Upload all consolidated gold tables to HuggingFace.""" parser = argparse.ArgumentParser(description="Upload consolidated gold tables to HuggingFace") parser.add_argument("--private", action="store_true", help="Make datasets private") parser.add_argument("--file", help="Upload only this specific file") parser.add_argument("--max-rows", type=int, help="Limit rows per file (for testing)") parser.add_argument("--skip-large", action="store_true", help="Skip files larger than 100MB") args = parser.parse_args() # Validate token if not HUGGINGFACE_TOKEN: logger.error("āŒ HUGGINGFACE_TOKEN not set") logger.error(" Set it in .env file or export HUGGINGFACE_TOKEN=your_token") sys.exit(1) logger.info("=" * 70) logger.info("UPLOADING CONSOLIDATED GOLD TABLES TO HUGGINGFACE") logger.info("=" * 70) logger.info(f"Organization: {HF_ORGANIZATION}") logger.info(f"Prefix: {HF_DATASET_PREFIX}") logger.info(f"Private: {args.private}") logger.info(f"Gold directory: {GOLD_DIR}") logger.info("=" * 70) # Initialize HuggingFace API api = HfApi(token=HUGGINGFACE_TOKEN) # Get list of parquet files if args.file: # Upload specific file parquet_files = [GOLD_DIR / args.file] if not parquet_files[0].exists(): logger.error(f"āŒ File not found: {parquet_files[0]}") sys.exit(1) else: # Upload all parquet files parquet_files = sorted(GOLD_DIR.glob("*.parquet")) if not parquet_files: logger.error(f"āŒ No parquet files found in {GOLD_DIR}") sys.exit(1) logger.info(f"\nFound {len(parquet_files)} parquet files to upload\n") # Upload each file results = [] skipped = [] for i, file_path in enumerate(parquet_files, 1): # Check file size size_mb = file_path.stat().st_size / (1024 * 1024) if args.skip_large and size_mb > 100: logger.info(f"ā­ļø Skipping {file_path.name} ({size_mb:.1f} MB) - too large") skipped.append(file_path.name) continue logger.info(f"\n[{i}/{len(parquet_files)}] Processing {file_path.name}") logger.info("-" * 70) result = upload_parquet_to_hf( file_path, api, private=args.private, max_rows=args.max_rows ) results.append(result) # Summary logger.info("") logger.info("=" * 70) logger.info("UPLOAD SUMMARY") logger.info("=" * 70) successful = [r for r in results if "error" not in r] failed = [r for r in results if "error" in r] if successful: logger.info(f"\nāœ… Successfully uploaded {len(successful)} datasets:\n") for r in successful: logger.info(f" • {r['file']}: {r['records']:,} records → {r['url']}") if failed: logger.info(f"\nāŒ Failed uploads ({len(failed)}):\n") for r in failed: logger.error(f" • {r['file']}: {r['error']}") if skipped: logger.info(f"\nā­ļø Skipped {len(skipped)} large files:\n") for f in skipped: logger.info(f" • {f}") # Final stats total_records = sum(r.get('records', 0) for r in successful) total_size_mb = sum(r.get('size_mb', 0) for r in successful) logger.info("") logger.info(f"Total records uploaded: {total_records:,}") logger.info(f"Total data size: {total_size_mb:.1f} MB") logger.info("") logger.info("Next steps:") logger.info("1. View datasets at https://huggingface.co/CommunityOne") logger.info("2. Test loading: from datasets import load_dataset") logger.info(f"3. Example: ds = load_dataset('{HF_ORGANIZATION}/{HF_DATASET_PREFIX}-bills')") if __name__ == "__main__": main()