Spaces:
Running
Running
Ashraf Al-Kassem
fix: correct HF dataset card task_categories (remove unofficial 'conversational')
afe106b | """ | |
| export_to_hf.py β LeadPilot V2 | |
| Exports static seed data and all user-generated data to the | |
| HuggingFace dataset repo: ashrafkassem/leadpilot-data | |
| Usage: | |
| cd backend/ | |
| python scripts/export_to_hf.py [--mode all|static|dynamic] [--dry-run] | |
| python scripts/export_to_hf.py --mode static --hf-token hf_xxx | |
| python scripts/export_to_hf.py --dry-run | |
| Env vars (can also be passed as CLI flags): | |
| DATABASE_URL β SQLite/Postgres URL (from app config) | |
| HF_TOKEN β HuggingFace write token (required for live export) | |
| HF_DATASET_REPO β Dataset repo ID (default: ashrafkassem/leadpilot-data) | |
| """ | |
| import sys | |
| import os | |
| import asyncio | |
| import argparse | |
| import hashlib | |
| import json | |
| import io | |
| import logging | |
| from datetime import datetime, timezone | |
| # ββ path bootstrap (same pattern as create_test_user.py) βββββββββββββββββββββ | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| parent_dir = os.path.dirname(current_dir) | |
| sys.path.insert(0, parent_dir) | |
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession | |
| from sqlmodel import select | |
| from app.core.config import settings | |
| from app.models.models import ( | |
| Workspace, Contact, Conversation, Message, | |
| Flow, ExecutionInstance, AdminAuditLog, | |
| ) | |
| from app.core.seed import SEED_TEMPLATES, DEFAULT_PLANS | |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") | |
| logger = logging.getLogger(__name__) | |
| # ββ PII pseudonymization helpers βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _hash(value: str, length: int = 16) -> str: | |
| """Deterministic, non-reversible SHA-256 truncated to `length` hex chars.""" | |
| return hashlib.sha256(value.encode("utf-8")).hexdigest()[:length] | |
| def _redact_contact(row: dict) -> dict: | |
| """Replace PII fields in a contact dict with deterministic hashes.""" | |
| for field in ("first_name", "last_name", "display_name"): | |
| if row.get(field): | |
| row[field] = _hash(row[field], 8) | |
| if row.get("external_id"): | |
| row["external_id"] = _hash(row["external_id"], 12) | |
| if row.get("zoho_lead_id"): | |
| row["zoho_lead_id"] = _hash(row["zoho_lead_id"], 12) | |
| # additional_metadata may contain raw lead form data (emails, phones) | |
| row["additional_metadata"] = {} | |
| return row | |
| def _redact_message(row: dict) -> dict: | |
| """Replace message content with [REDACTED], keep content_length as proxy metric.""" | |
| content = row.get("content") or "" | |
| row["content_length"] = len(content) | |
| row["content"] = "[REDACTED]" | |
| # additional_metadata may contain media URLs or phone numbers | |
| row["additional_metadata"] = {} | |
| return row | |
| def _redact_audit_log(row: dict) -> dict: | |
| """Strip IP, user agent, and hash actor/entity IDs.""" | |
| row["ip_address"] = None | |
| row["user_agent"] = None | |
| row["error_message"] = None | |
| if row.get("actor_user_id"): | |
| row["actor_user_id"] = _hash(row["actor_user_id"], 12) | |
| if row.get("entity_id"): | |
| row["entity_id"] = _hash(row["entity_id"], 12) | |
| return row | |
| # ββ Static builders (no DB required) βββββββββββββββββββββββββββββββββββββββββ | |
| def build_templates_jsonl() -> str: | |
| """Build JSONL for automation templates directly from SEED_TEMPLATES constant.""" | |
| lines = [] | |
| for t in SEED_TEMPLATES: | |
| row = { | |
| "slug": t["slug"], | |
| "name": t["name"], | |
| "description": t["description"], | |
| "category": t["category"], | |
| "platforms": t["platforms"], | |
| "industry_tags": t.get("industry_tags", []), | |
| "required_integrations": t.get("required_integrations", []), | |
| "is_featured": t.get("is_featured", False), | |
| "variables": [ | |
| { | |
| "key": v["key"], | |
| "label": v["label"], | |
| "var_type": v.get("var_type", "text"), | |
| "required": v.get("required", True), | |
| "description": v.get("description"), | |
| "default_value": v.get("default_value"), | |
| } | |
| for v in t.get("variables", []) | |
| ], | |
| "builder_graph_json": t["builder_graph_json"], | |
| } | |
| lines.append(json.dumps(row, default=str)) | |
| return "\n".join(lines) | |
| def build_plans_jsonl() -> str: | |
| """Build JSONL for plan tiers + entitlements from DEFAULT_PLANS constant.""" | |
| lines = [] | |
| for name, display_name, sort_order, description, entitlements in DEFAULT_PLANS: | |
| row = { | |
| "name": name, | |
| "display_name": display_name, | |
| "sort_order": sort_order, | |
| "description": description, | |
| "entitlements": [ | |
| {"module_key": k, "hard_limit": v} | |
| for k, v in entitlements.items() | |
| ], | |
| } | |
| lines.append(json.dumps(row, default=str)) | |
| return "\n".join(lines) | |
| # ββ Dynamic builders (async DB, paginated) ββββββββββββββββββββββββββββββββββββ | |
| async def build_workspaces_jsonl(session: AsyncSession, batch_size: int) -> str: | |
| lines = [] | |
| offset = 0 | |
| while True: | |
| result = await session.execute(select(Workspace).offset(offset).limit(batch_size)) | |
| rows = result.scalars().all() | |
| if not rows: | |
| break | |
| for ws in rows: | |
| lines.append(json.dumps({ | |
| "id": str(ws.id), | |
| "subscription_tier": ws.subscription_tier, | |
| "created_at": ws.created_at.isoformat(), | |
| }, default=str)) | |
| offset += batch_size | |
| return "\n".join(lines) | |
| async def build_contacts_jsonl(session: AsyncSession, batch_size: int) -> str: | |
| lines = [] | |
| offset = 0 | |
| while True: | |
| result = await session.execute(select(Contact).offset(offset).limit(batch_size)) | |
| rows = result.scalars().all() | |
| if not rows: | |
| break | |
| for c in rows: | |
| row = _redact_contact({ | |
| "id": str(c.id), | |
| "workspace_id": str(c.workspace_id), | |
| "external_id": c.external_id, | |
| "first_name": c.first_name, | |
| "last_name": c.last_name, | |
| "display_name": c.display_name, | |
| "zoho_lead_id": c.zoho_lead_id, | |
| "zoho_last_synced_at": c.zoho_last_synced_at.isoformat() if c.zoho_last_synced_at else None, | |
| "additional_metadata": dict(c.additional_metadata or {}), | |
| "created_at": c.created_at.isoformat(), | |
| }) | |
| lines.append(json.dumps(row, default=str)) | |
| offset += batch_size | |
| return "\n".join(lines) | |
| async def build_conversations_jsonl(session: AsyncSession, batch_size: int) -> str: | |
| lines = [] | |
| offset = 0 | |
| while True: | |
| result = await session.execute(select(Conversation).offset(offset).limit(batch_size)) | |
| rows = result.scalars().all() | |
| if not rows: | |
| break | |
| for conv in rows: | |
| status = conv.status.value if hasattr(conv.status, "value") else conv.status | |
| lines.append(json.dumps({ | |
| "id": str(conv.id), | |
| "workspace_id": str(conv.workspace_id), | |
| "contact_id": str(conv.contact_id), | |
| "status": status, | |
| "last_message_at": conv.last_message_at.isoformat() if conv.last_message_at else None, | |
| "created_at": conv.created_at.isoformat(), | |
| }, default=str)) | |
| offset += batch_size | |
| return "\n".join(lines) | |
| async def build_messages_jsonl(session: AsyncSession, batch_size: int) -> str: | |
| lines = [] | |
| offset = 0 | |
| while True: | |
| result = await session.execute(select(Message).offset(offset).limit(batch_size)) | |
| rows = result.scalars().all() | |
| if not rows: | |
| break | |
| for msg in rows: | |
| delivery = msg.delivery_status.value if hasattr(msg.delivery_status, "value") else msg.delivery_status | |
| row = _redact_message({ | |
| "id": str(msg.id), | |
| "workspace_id": str(msg.workspace_id), | |
| "conversation_id": str(msg.conversation_id), | |
| "direction": msg.direction, | |
| "content": msg.content, | |
| "platform": msg.platform, | |
| "delivery_status": delivery, | |
| "attempt_count": msg.attempt_count, | |
| "sent_at": msg.sent_at.isoformat() if msg.sent_at else None, | |
| "delivered_at": msg.delivered_at.isoformat() if msg.delivered_at else None, | |
| "read_at": msg.read_at.isoformat() if msg.read_at else None, | |
| "additional_metadata": dict(msg.additional_metadata or {}), | |
| "created_at": msg.created_at.isoformat(), | |
| }) | |
| lines.append(json.dumps(row, default=str)) | |
| offset += batch_size | |
| return "\n".join(lines) | |
| async def build_flows_jsonl(session: AsyncSession, batch_size: int) -> str: | |
| lines = [] | |
| offset = 0 | |
| while True: | |
| result = await session.execute(select(Flow).offset(offset).limit(batch_size)) | |
| rows = result.scalars().all() | |
| if not rows: | |
| break | |
| for flow in rows: | |
| status = flow.status.value if hasattr(flow.status, "value") else flow.status | |
| lines.append(json.dumps({ | |
| "id": str(flow.id), | |
| "workspace_id": str(flow.workspace_id), | |
| "name": _hash(flow.name, 8), # pseudonymize (may contain business name) | |
| "status": status, | |
| "source_template_id": str(flow.source_template_id) if flow.source_template_id else None, | |
| "created_at": flow.created_at.isoformat(), | |
| "updated_at": flow.updated_at.isoformat(), | |
| }, default=str)) | |
| offset += batch_size | |
| return "\n".join(lines) | |
| async def build_executions_jsonl(session: AsyncSession, batch_size: int) -> str: | |
| lines = [] | |
| offset = 0 | |
| while True: | |
| result = await session.execute(select(ExecutionInstance).offset(offset).limit(batch_size)) | |
| rows = result.scalars().all() | |
| if not rows: | |
| break | |
| for ex in rows: | |
| status = ex.status.value if hasattr(ex.status, "value") else ex.status | |
| lines.append(json.dumps({ | |
| "id": str(ex.id), | |
| "workspace_id": str(ex.workspace_id), | |
| "flow_version_id": str(ex.flow_version_id), | |
| "contact_id": str(ex.contact_id) if ex.contact_id else None, | |
| "status": status, | |
| "abort_reason": ex.abort_reason, | |
| "created_at": ex.created_at.isoformat(), | |
| "updated_at": ex.updated_at.isoformat(), | |
| }, default=str)) | |
| offset += batch_size | |
| return "\n".join(lines) | |
| async def build_audit_logs_jsonl(session: AsyncSession, batch_size: int) -> str: | |
| lines = [] | |
| offset = 0 | |
| while True: | |
| result = await session.execute(select(AdminAuditLog).offset(offset).limit(batch_size)) | |
| rows = result.scalars().all() | |
| if not rows: | |
| break | |
| for log in rows: | |
| row = _redact_audit_log({ | |
| "id": str(log.id), | |
| "workspace_id": str(log.workspace_id) if log.workspace_id else None, | |
| "action": log.action, | |
| "entity_type": log.entity_type, | |
| "entity_id": log.entity_id, | |
| "actor_type": log.actor_type, | |
| "actor_user_id": str(log.actor_user_id) if log.actor_user_id else None, | |
| "outcome": log.outcome, | |
| "ip_address": log.ip_address, | |
| "user_agent": log.user_agent, | |
| "request_path": log.request_path, | |
| "request_method": log.request_method, | |
| "error_message": log.error_message, | |
| "created_at": log.created_at.isoformat(), | |
| }) | |
| lines.append(json.dumps(row, default=str)) | |
| offset += batch_size | |
| return "\n".join(lines) | |
| # ββ Dataset card ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DATASET_README = """\ | |
| --- | |
| license: mit | |
| task_categories: | |
| - text-generation | |
| - text-classification | |
| language: | |
| - en | |
| - ar | |
| tags: | |
| - lead-generation | |
| - automation | |
| - meta | |
| - crm | |
| - saas | |
| pretty_name: LeadPilot Automation Dataset | |
| size_categories: | |
| - 1K<n<10K | |
| --- | |
| # LeadPilot Automation Dataset | |
| Companion dataset for the [LeadPilot HuggingFace Space](https://huggingface.co/spaces/ashrafkassem/LeadPilot). | |
| LeadPilot is a multi-tenant SaaS automation platform for lead capture via WhatsApp and Meta | |
| (Instagram DM, Messenger, Lead Ads), with Zoho CRM sync. | |
| ## Dataset Contents | |
| ### static/ | |
| Pre-seeded catalog data β does not change at runtime. | |
| | File | Rows | Description | | |
| |---|---|---| | |
| | `static/templates.jsonl` | 12 | Automation flow templates (slug, name, category, platforms, builder_graph_json, variables) | | |
| | `static/plans.jsonl` | 3 | Subscription plans (free/growth/enterprise) with module entitlements | | |
| ### dynamic/ | |
| User-generated operational data, exported daily at 02:00 UTC from the live app. | |
| All PII fields are pseudonymized using SHA-256 truncated hashes before export. | |
| | File | Description | | |
| |---|---| | |
| | `dynamic/workspaces.jsonl` | Workspace records (id, subscription_tier, created_at) | | |
| | `dynamic/contacts.jsonl` | Pseudonymized contacts (hashed name, hashed external_id/zoho_id) | | |
| | `dynamic/conversations.jsonl` | Conversation threads (ids, status, timestamps) | | |
| | `dynamic/messages.jsonl` | Messages (content replaced with [REDACTED], content_length kept) | | |
| | `dynamic/flows.jsonl` | Automation flows (name hashed, status, source_template_id) | | |
| | `dynamic/executions.jsonl` | Flow execution instances (status, timing) | | |
| | `dynamic/audit_logs.jsonl` | Admin audit trail (IPs/UAs stripped, actor/entity ids hashed) | | |
| | `dynamic/meta.json` | Export metadata (exported_at, mode) | | |
| ## PII Handling | |
| All personally identifiable information is removed or pseudonymized before export: | |
| - `Contact.first_name`, `last_name`, `display_name` β SHA-256[:8] hex | |
| - `Contact.external_id` (WhatsApp phone / Meta PSID) β SHA-256[:12] hex | |
| - `Contact.zoho_lead_id` β SHA-256[:12] hex | |
| - `Contact.additional_metadata` β replaced with `{}` | |
| - `Message.content` β replaced with `"[REDACTED]"`, `content_length` field added | |
| - `Message.additional_metadata` β replaced with `{}` | |
| - `Flow.name` β SHA-256[:8] hex (may contain business names) | |
| - `AdminAuditLog.ip_address`, `user_agent`, `error_message` β `null` | |
| - `AdminAuditLog.actor_user_id`, `entity_id` β SHA-256[:12] hex | |
| ## Export Schedule | |
| Updated daily at 02:00 UTC via Celery Beat. | |
| Last export timestamp is in `dynamic/meta.json`. | |
| ## License | |
| MIT | |
| """ | |
| # ββ HuggingFace upload ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def upload_to_hf( | |
| files: dict, | |
| repo_id: str, | |
| token: str, | |
| dry_run: bool = False, | |
| ) -> None: | |
| """Upload a dict of {path_in_repo: content_str} to a HuggingFace dataset repo.""" | |
| if dry_run: | |
| logger.info("[dry-run] Would upload %d files to %s:", len(files), repo_id) | |
| for path, content in files.items(): | |
| line_count = content.count("\n") + 1 if content.strip() else 0 | |
| logger.info(" %-45s (%d lines)", path, line_count) | |
| return | |
| from huggingface_hub import HfApi, CommitOperationAdd | |
| api = HfApi(token=token) | |
| # Create repo if it doesn't exist | |
| try: | |
| api.repo_info(repo_id=repo_id, repo_type="dataset") | |
| logger.info("Dataset repo %s already exists.", repo_id) | |
| except Exception: | |
| logger.info("Dataset repo %s not found β creating it.", repo_id) | |
| api.create_repo(repo_id=repo_id, repo_type="dataset", private=False, exist_ok=True) | |
| operations = [ | |
| CommitOperationAdd( | |
| path_in_repo=path, | |
| path_or_fileobj=io.BytesIO(content.encode("utf-8")), | |
| ) | |
| for path, content in files.items() | |
| ] | |
| commit_msg = f"chore: automated export {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}" | |
| api.create_commit( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| operations=operations, | |
| commit_message=commit_msg, | |
| ) | |
| logger.info("Committed %d files to %s", len(operations), repo_id) | |
| # ββ Orchestrator ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def run_export( | |
| mode: str, | |
| batch_size: int, | |
| dry_run: bool, | |
| hf_token: str, | |
| repo_id: str, | |
| ) -> None: | |
| files: dict = {} | |
| # ββ Static files (always from constants, never from DB) βββββββββββββββββββ | |
| if mode in ("static", "all"): | |
| logger.info("Building static/templates.jsonl ...") | |
| files["static/templates.jsonl"] = build_templates_jsonl() | |
| logger.info("Building static/plans.jsonl ...") | |
| files["static/plans.jsonl"] = build_plans_jsonl() | |
| # ββ Dynamic files (from live DB) ββββββββββββββββββββββββββββββββββββββββββ | |
| if mode in ("dynamic", "all"): | |
| engine = create_async_engine(settings.DATABASE_URL, echo=False) | |
| async with AsyncSession(engine, expire_on_commit=False) as session: | |
| logger.info("Building dynamic/workspaces.jsonl ...") | |
| files["dynamic/workspaces.jsonl"] = await build_workspaces_jsonl(session, batch_size) | |
| logger.info("Building dynamic/contacts.jsonl ...") | |
| files["dynamic/contacts.jsonl"] = await build_contacts_jsonl(session, batch_size) | |
| logger.info("Building dynamic/conversations.jsonl ...") | |
| files["dynamic/conversations.jsonl"] = await build_conversations_jsonl(session, batch_size) | |
| logger.info("Building dynamic/messages.jsonl ...") | |
| files["dynamic/messages.jsonl"] = await build_messages_jsonl(session, batch_size) | |
| logger.info("Building dynamic/flows.jsonl ...") | |
| files["dynamic/flows.jsonl"] = await build_flows_jsonl(session, batch_size) | |
| logger.info("Building dynamic/executions.jsonl ...") | |
| files["dynamic/executions.jsonl"] = await build_executions_jsonl(session, batch_size) | |
| logger.info("Building dynamic/audit_logs.jsonl ...") | |
| files["dynamic/audit_logs.jsonl"] = await build_audit_logs_jsonl(session, batch_size) | |
| files["dynamic/meta.json"] = json.dumps({ | |
| "exported_at": datetime.now(timezone.utc).isoformat(), | |
| "mode": mode, | |
| }) | |
| # Dataset card (always refreshed) | |
| files["README.md"] = DATASET_README | |
| upload_to_hf(files, repo_id=repo_id, token=hf_token, dry_run=dry_run) | |
| # ββ Entry point βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Export LeadPilot data to HuggingFace dataset repo" | |
| ) | |
| parser.add_argument( | |
| "--mode", choices=["static", "dynamic", "all"], default="all", | |
| help="Which data to export (default: all)", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", action="store_true", | |
| help="Print what would be uploaded without making any HF API calls", | |
| ) | |
| parser.add_argument( | |
| "--hf-token", default=None, | |
| help="HuggingFace write token (overrides HF_TOKEN env var)", | |
| ) | |
| parser.add_argument( | |
| "--repo-id", default=None, | |
| help="Dataset repo ID (overrides HF_DATASET_REPO env var)", | |
| ) | |
| parser.add_argument( | |
| "--batch-size", type=int, default=5000, | |
| help="DB pagination batch size for dynamic exports (default: 5000)", | |
| ) | |
| args = parser.parse_args() | |
| hf_token = args.hf_token or settings.HF_TOKEN | |
| repo_id = args.repo_id or settings.HF_DATASET_REPO | |
| if not args.dry_run and not hf_token: | |
| logger.error("HF_TOKEN is not set. Use --hf-token or set the HF_TOKEN env var.") | |
| sys.exit(1) | |
| asyncio.run(run_export( | |
| mode=args.mode, | |
| batch_size=args.batch_size, | |
| dry_run=args.dry_run, | |
| hf_token=hf_token or "dry-run", | |
| repo_id=repo_id, | |
| )) | |
| if __name__ == "__main__": | |
| main() | |