Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| #!/usr/bin/env python3 | |
| """ | |
| Upload Consolidated Gold Tables to HuggingFace | |
| Uploads the consolidated gold parquet files (21 files) to HuggingFace Datasets. | |
| Works with the new consolidated structure where state data is combined with | |
| a 'state' column instead of separate state directories. | |
| Usage: | |
| python scripts/huggingface/upload_consolidated_gold.py | |
| python scripts/huggingface/upload_consolidated_gold.py --private | |
| python scripts/huggingface/upload_consolidated_gold.py --file bills_bills.parquet | |
| """ | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime | |
| import pandas as pd | |
| from huggingface_hub import HfApi, create_repo | |
| from datasets import Dataset | |
| from loguru import logger | |
| from dotenv import load_dotenv | |
| import argparse | |
| # Add project root to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent.parent)) | |
| # Load environment variables | |
| load_dotenv() | |
| # Configuration | |
| HUGGINGFACE_TOKEN = os.getenv('HUGGINGFACE_TOKEN') | |
| HF_ORGANIZATION = os.getenv('HF_ORGANIZATION', 'CommunityOne') | |
| HF_DATASET_PREFIX = os.getenv('HF_DATASET_PREFIX', 'one') | |
| # Path to consolidated gold directory | |
| GOLD_DIR = Path("data/gold") | |
| def get_dataset_name(file_stem: str) -> str: | |
| """ | |
| Convert file stem to HuggingFace dataset name. | |
| Examples: | |
| bills_bills -> bills | |
| nonprofits_organizations -> nonprofits-organizations | |
| event_documents -> event-documents | |
| """ | |
| # Special cases - simplify some names | |
| simplifications = { | |
| 'bills_bills': 'bills', | |
| 'bills_bill_actions': 'bill-actions', | |
| 'bills_bill_sponsorships': 'bill-sponsorships', | |
| 'contacts_officials': 'officials', | |
| 'contacts_local_officials': 'local-officials', | |
| 'event_participants': 'event-participants', | |
| 'event_documents': 'event-documents', | |
| } | |
| if file_stem in simplifications: | |
| return simplifications[file_stem] | |
| # Default: replace underscores with hyphens | |
| return file_stem.replace('_', '-') | |
| def upload_parquet_to_hf( | |
| file_path: Path, | |
| api: HfApi, | |
| private: bool = False, | |
| max_rows: int = None | |
| ) -> dict: | |
| """ | |
| Upload a single parquet file to HuggingFace. | |
| Args: | |
| file_path: Path to parquet file | |
| api: HuggingFace API instance | |
| private: Whether to make the dataset private | |
| max_rows: Optional limit on rows to upload (for testing large files) | |
| Returns: | |
| Dict with upload results | |
| """ | |
| if not file_path.exists(): | |
| logger.warning(f"⚠️ Skipping {file_path.name} - file not found") | |
| return {"error": "File not found"} | |
| # Get dataset name | |
| dataset_name = get_dataset_name(file_path.stem) | |
| repo_id = f"{HF_ORGANIZATION}/{HF_DATASET_PREFIX}-{dataset_name}" | |
| # Get file size | |
| size_mb = file_path.stat().st_size / (1024 * 1024) | |
| logger.info(f"📤 Uploading {file_path.name} ({size_mb:.1f} MB) to {repo_id}...") | |
| try: | |
| # Load parquet file | |
| df = pd.read_parquet(file_path) | |
| original_rows = len(df) | |
| # Limit rows if requested (for testing) | |
| if max_rows and len(df) > max_rows: | |
| logger.info(f" Limiting to {max_rows:,} rows (testing mode)") | |
| df = df.head(max_rows) | |
| logger.info(f" Loaded {len(df):,} records with {len(df.columns)} columns") | |
| logger.debug(f" Columns: {', '.join(df.columns.tolist()[:10])}...") | |
| # Check if state column exists (for consolidated state files) | |
| if 'state' in df.columns: | |
| states = df['state'].unique() | |
| logger.info(f" States: {', '.join(sorted(states))}") | |
| # Create HuggingFace dataset | |
| dataset = Dataset.from_pandas(df) | |
| # Create repo if it doesn't exist | |
| try: | |
| create_repo( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| private=private, | |
| exist_ok=True, | |
| token=HUGGINGFACE_TOKEN | |
| ) | |
| logger.debug(f" Created/verified repo: {repo_id}") | |
| except Exception as e: | |
| logger.debug(f" Repo handling: {e}") | |
| # Push to hub | |
| dataset.push_to_hub( | |
| repo_id=repo_id, | |
| private=private, | |
| commit_message=f"Upload consolidated gold table - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
| token=HUGGINGFACE_TOKEN | |
| ) | |
| url = f"https://huggingface.co/datasets/{repo_id}" | |
| logger.success(f" ✅ Uploaded {len(df):,} records to {url}") | |
| return { | |
| "file": file_path.name, | |
| "repo_id": repo_id, | |
| "url": url, | |
| "records": len(df), | |
| "original_records": original_rows, | |
| "size_mb": size_mb, | |
| "columns": len(df.columns) | |
| } | |
| except Exception as e: | |
| logger.error(f" ❌ Upload failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return { | |
| "file": file_path.name, | |
| "error": str(e) | |
| } | |
| def main(): | |
| """Upload all consolidated gold tables to HuggingFace.""" | |
| parser = argparse.ArgumentParser(description="Upload consolidated gold tables to HuggingFace") | |
| parser.add_argument("--private", action="store_true", help="Make datasets private") | |
| parser.add_argument("--file", help="Upload only this specific file") | |
| parser.add_argument("--max-rows", type=int, help="Limit rows per file (for testing)") | |
| parser.add_argument("--skip-large", action="store_true", help="Skip files larger than 100MB") | |
| args = parser.parse_args() | |
| # Validate token | |
| if not HUGGINGFACE_TOKEN: | |
| logger.error("❌ HUGGINGFACE_TOKEN not set") | |
| logger.error(" Set it in .env file or export HUGGINGFACE_TOKEN=your_token") | |
| sys.exit(1) | |
| logger.info("=" * 70) | |
| logger.info("UPLOADING CONSOLIDATED GOLD TABLES TO HUGGINGFACE") | |
| logger.info("=" * 70) | |
| logger.info(f"Organization: {HF_ORGANIZATION}") | |
| logger.info(f"Prefix: {HF_DATASET_PREFIX}") | |
| logger.info(f"Private: {args.private}") | |
| logger.info(f"Gold directory: {GOLD_DIR}") | |
| logger.info("=" * 70) | |
| # Initialize HuggingFace API | |
| api = HfApi(token=HUGGINGFACE_TOKEN) | |
| # Get list of parquet files | |
| if args.file: | |
| # Upload specific file | |
| parquet_files = [GOLD_DIR / args.file] | |
| if not parquet_files[0].exists(): | |
| logger.error(f"❌ File not found: {parquet_files[0]}") | |
| sys.exit(1) | |
| else: | |
| # Upload all parquet files | |
| parquet_files = sorted(GOLD_DIR.glob("*.parquet")) | |
| if not parquet_files: | |
| logger.error(f"❌ No parquet files found in {GOLD_DIR}") | |
| sys.exit(1) | |
| logger.info(f"\nFound {len(parquet_files)} parquet files to upload\n") | |
| # Upload each file | |
| results = [] | |
| skipped = [] | |
| for i, file_path in enumerate(parquet_files, 1): | |
| # Check file size | |
| size_mb = file_path.stat().st_size / (1024 * 1024) | |
| if args.skip_large and size_mb > 100: | |
| logger.info(f"⏭️ Skipping {file_path.name} ({size_mb:.1f} MB) - too large") | |
| skipped.append(file_path.name) | |
| continue | |
| logger.info(f"\n[{i}/{len(parquet_files)}] Processing {file_path.name}") | |
| logger.info("-" * 70) | |
| result = upload_parquet_to_hf( | |
| file_path, | |
| api, | |
| private=args.private, | |
| max_rows=args.max_rows | |
| ) | |
| results.append(result) | |
| # Summary | |
| logger.info("") | |
| logger.info("=" * 70) | |
| logger.info("UPLOAD SUMMARY") | |
| logger.info("=" * 70) | |
| successful = [r for r in results if "error" not in r] | |
| failed = [r for r in results if "error" in r] | |
| if successful: | |
| logger.info(f"\n✅ Successfully uploaded {len(successful)} datasets:\n") | |
| for r in successful: | |
| logger.info(f" • {r['file']}: {r['records']:,} records → {r['url']}") | |
| if failed: | |
| logger.info(f"\n❌ Failed uploads ({len(failed)}):\n") | |
| for r in failed: | |
| logger.error(f" • {r['file']}: {r['error']}") | |
| if skipped: | |
| logger.info(f"\n⏭️ Skipped {len(skipped)} large files:\n") | |
| for f in skipped: | |
| logger.info(f" • {f}") | |
| # Final stats | |
| total_records = sum(r.get('records', 0) for r in successful) | |
| total_size_mb = sum(r.get('size_mb', 0) for r in successful) | |
| logger.info("") | |
| logger.info(f"Total records uploaded: {total_records:,}") | |
| logger.info(f"Total data size: {total_size_mb:.1f} MB") | |
| logger.info("") | |
| logger.info("Next steps:") | |
| logger.info("1. View datasets at https://huggingface.co/CommunityOne") | |
| logger.info("2. Test loading: from datasets import load_dataset") | |
| logger.info(f"3. Example: ds = load_dataset('{HF_ORGANIZATION}/{HF_DATASET_PREFIX}-bills')") | |
| if __name__ == "__main__": | |
| main() | |