Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from huggingface_hub import list_models, model_info, hf_hub_download, upload_file | |
| import pandas as pd | |
| import datetime | |
| import os | |
| # --- DATABASE MANAGER --- | |
| class ModelDatabase: | |
| def __init__(self): | |
| # Initialize an empty DataFrame in memory. | |
| self.df = pd.DataFrame(columns=["sha256", "repo_id", "filename", "timestamp", "tags"]) | |
| self.dataset_id = "SHA-index/model-dna-index" | |
| self.token = "" | |
| self.csv_name = "model_dna.csv" | |
| def connect_to_hub(self, dataset_id, token=None): | |
| """Loads the CSV from a HF Dataset if it exists.""" | |
| self.dataset_id = dataset_id | |
| self.token = token or os.environ.get("HF_TOKEN") | |
| if not self.dataset_id: | |
| return "β οΈ No Dataset ID provided." | |
| try: | |
| print(f"Attempting to download {self.csv_name} from {self.dataset_id}...") | |
| path = hf_hub_download( | |
| repo_id=self.dataset_id, | |
| filename=self.csv_name, | |
| repo_type="dataset", | |
| token=self.token | |
| ) | |
| self.df = pd.read_csv(path) | |
| # Ensure columns exist (in case of schema drift) | |
| for col in ["sha256", "repo_id", "filename", "timestamp", "tags"]: | |
| if col not in self.df.columns: | |
| self.df[col] = "" | |
| return f"β Successfully loaded {len(self.df)} records from {self.dataset_id}." | |
| except Exception as e: | |
| # If file doesn't exist, we assume it's a new dataset and will create it on save | |
| if "404" in str(e) or "EntryNotFound" in str(e): | |
| return f"β οΈ Connected to {self.dataset_id}, but '{self.csv_name}' was not found. A new file will be created upon saving." | |
| return f"β Error loading from Hub: {e}" | |
| def save_to_hub(self): | |
| """Pushes the current DataFrame to the HF Dataset.""" | |
| if not self.dataset_id: | |
| return "β οΈ Persistence not configured (No Dataset ID)." | |
| try: | |
| # Save to local temporary CSV | |
| local_path = "temp_model_dna.csv" | |
| self.df.to_csv(local_path, index=False) | |
| # Upload to Hub | |
| upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=self.csv_name, | |
| repo_id=self.dataset_id, | |
| repo_type="dataset", | |
| token=self.token, | |
| commit_message=f"Auto-save: Updated index with {len(self.df)} records" | |
| ) | |
| return f"β Saved {len(self.df)} records to {self.dataset_id}." | |
| except Exception as e: | |
| return f"β Failed to save to Hub: {e}" | |
| def add_record(self, sha256, repo_id, filename, timestamp, tags=""): | |
| # Check if hash already exists in our session | |
| if not self.df.empty and sha256 in self.df['sha256'].values: | |
| # If it exists, check timestamps to see if we found an older (original) version | |
| existing_row = self.df[self.df['sha256'] == sha256].iloc[0] | |
| existing_time = pd.to_datetime(existing_row['timestamp']) | |
| new_time = pd.to_datetime(timestamp) | |
| if new_time < existing_time: | |
| # Update the record to the older version (The true original) | |
| self.df.loc[self.df['sha256'] == sha256, ['repo_id', 'filename', 'timestamp', 'tags']] = [repo_id, filename, timestamp, tags] | |
| return "updated_original" | |
| return "duplicate" | |
| # Add new record | |
| new_row = pd.DataFrame([{ | |
| "sha256": sha256, | |
| "repo_id": repo_id, | |
| "filename": filename, | |
| "timestamp": timestamp, | |
| "tags": tags | |
| }]) | |
| if self.df.empty: | |
| self.df = new_row | |
| else: | |
| self.df = pd.concat([self.df, new_row], ignore_index=True) | |
| return "added" | |
| def search_hash(self, sha256): | |
| if self.df.empty: | |
| return None | |
| sha256 = sha256.strip().lower() | |
| match = self.df[self.df['sha256'] == sha256] | |
| if not match.empty: | |
| return match.iloc[0].to_dict() | |
| return None | |
| def get_stats(self): | |
| return len(self.df) | |
| # Initialize Database | |
| db = ModelDatabase() | |
| # --- DETECTIVE LOGIC --- | |
| def get_repo_dna(repo_id): | |
| """Scans a repo for LFS files and returns their hashes.""" | |
| try: | |
| # We use model_info with files_metadata=True. | |
| # This is the API equivalent of reading the "Raw pointer file" you found! | |
| info = model_info(repo_id, files_metadata=True) | |
| created_at = info.created_at if info.created_at else datetime.datetime.now() | |
| tags = ", ".join(info.tags) if info.tags else "" | |
| dna_list = [] | |
| # info.siblings contains the file list with metadata | |
| if info.siblings: | |
| for file in info.siblings: | |
| # Check filename extension | |
| filename = file.rfilename | |
| is_weight_file = filename.endswith(".safetensors") or filename.endswith(".bin") or filename.endswith(".pt") | |
| # Check if it has LFS metadata (this is the pointer file data) | |
| if is_weight_file and file.lfs: | |
| dna_list.append({ | |
| "sha256": file.lfs["sha256"], | |
| "filename": filename, | |
| "repo_id": repo_id, | |
| "timestamp": str(created_at), | |
| "tags": tags | |
| }) | |
| return dna_list, None | |
| except Exception as e: | |
| return [], str(e) | |
| def scan_and_index(repo_id, progress=gr.Progress()): | |
| """Manually scan a repo and add it to the DB.""" | |
| if not repo_id: | |
| return "β οΈ Please enter a Repository ID.", db.get_stats() | |
| progress(0, desc=f"Connecting to {repo_id}...") | |
| dna_list, error = get_repo_dna(repo_id) | |
| if error: | |
| return f"β Error scanning {repo_id}: {error}", db.get_stats() | |
| if not dna_list: | |
| return f"β οΈ No LFS weight files found in {repo_id}.", db.get_stats() | |
| added_count = 0 | |
| updated_count = 0 | |
| progress(0.5, desc="Analyzing hashes...") | |
| for item in dna_list: | |
| status = db.add_record( | |
| item['sha256'], item['repo_id'], item['filename'], item['timestamp'], item['tags'] | |
| ) | |
| if status == "added": | |
| added_count += 1 | |
| elif status == "updated_original": | |
| updated_count += 1 | |
| # Auto-save after indexing | |
| save_msg = "" | |
| if db.dataset_id: | |
| save_msg = db.save_to_hub() | |
| return f"β Scanned {repo_id}.\nπ Added {added_count} new hashes.\nπ Updated {updated_count} originals.\nπΎ {save_msg}", db.get_stats() | |
| def scan_org(org_id, limit=20, progress=gr.Progress()): | |
| """Scans multiple models from a specific user or organization.""" | |
| if not org_id: | |
| return "β οΈ Please enter an Organization or User ID.", db.get_stats() | |
| progress(0, desc=f"Fetching top {limit} models for {org_id}...") | |
| try: | |
| # Fetch models sorted by downloads to get the most important ones first | |
| models = list(list_models(author=org_id, sort="downloads", direction=-1, limit=limit)) | |
| except Exception as e: | |
| return f"β Error fetching models for {org_id}: {e}", db.get_stats() | |
| if not models: | |
| return f"β οΈ No models found for {org_id}.", db.get_stats() | |
| total_added = 0 | |
| total_updated = 0 | |
| for i, model in enumerate(models): | |
| repo_id = model.modelId | |
| progress((i / len(models)), desc=f"Scanning {repo_id}...") | |
| dna_list, error = get_repo_dna(repo_id) | |
| if error: | |
| continue # Skip errors in bulk mode to keep going | |
| if not dna_list: | |
| continue | |
| for item in dna_list: | |
| status = db.add_record( | |
| item['sha256'], item['repo_id'], item['filename'], item['timestamp'], item['tags'] | |
| ) | |
| if status == "added": | |
| total_added += 1 | |
| elif status == "updated_original": | |
| total_updated += 1 | |
| # Auto-save after indexing | |
| save_msg = "" | |
| if db.dataset_id: | |
| save_msg = db.save_to_hub() | |
| return f"β Bulk Scan Complete for {org_id}.\nChecked {len(models)} models.\nπ Added {total_added} new hashes.\nπ Updated {total_updated} originals.\nπΎ {save_msg}", db.get_stats() | |
| def patrol_new_uploads(limit=10, progress=gr.Progress()): | |
| """The 'Watchdog': Scans the latest models tagged with 'safetensors'.""" | |
| progress(0, desc="Fetching latest Safetensors models...") | |
| # 1. Fetch latest models | |
| try: | |
| models = list_models(filter="safetensors", sort="createdAt", direction=-1, limit=limit) | |
| models = list(models) | |
| except Exception as e: | |
| return f"Error fetching models: {e}", "" | |
| log_results = [] | |
| for i, model in enumerate(models): | |
| repo_id = model.modelId | |
| progress((i / len(models)), desc=f"Checking {repo_id}...") | |
| dna_list, error = get_repo_dna(repo_id) | |
| if error or not dna_list: | |
| continue | |
| for item in dna_list: | |
| # CHECK THE DB | |
| existing = db.search_hash(item['sha256']) | |
| if existing: | |
| # We found a match! | |
| original_repo = existing['repo_id'] | |
| # If the current repo is NOT the original | |
| if original_repo != repo_id: | |
| # Dark Mode Friendly HTML | |
| log = f""" | |
| <div style="background-color: rgba(255, 82, 82, 0.15); border: 1px solid rgba(255, 82, 82, 0.5); padding: 10px; margin-bottom: 10px; border-radius: 5px; color: #ffcdd2;"> | |
| <strong>π¨ MATCH FOUND</strong><br> | |
| New Upload: <b>{repo_id}</b><br> | |
| Matches Hash: <code>{item['sha256'][:10]}...</code><br> | |
| Likely Original: <a href='https://huggingface.co/{original_repo}' target='_blank' style='color: #ef9a9a;'><b>{original_repo}</b></a> | |
| </div> | |
| """ | |
| log_results.append(log) | |
| else: | |
| # If unknown, we add it to DB so it becomes the "first seen" | |
| db.add_record(item['sha256'], item['repo_id'], item['filename'], item['timestamp'], item['tags']) | |
| # Auto-save after patrol | |
| if db.dataset_id: | |
| db.save_to_hub() | |
| if not log_results: | |
| return "β No obvious copies found in the last batch.", db.get_stats() | |
| return "".join(log_results), db.get_stats() | |
| def check_hash_manually(sha_input): | |
| """User pastes a hash to search.""" | |
| if not sha_input: | |
| return "β οΈ Please enter a SHA256 hash." | |
| result = db.search_hash(sha_input) | |
| if result: | |
| # Dark Mode Friendly HTML | |
| return f""" | |
| <div style="background-color: rgba(76, 175, 80, 0.15); padding: 20px; border-radius: 10px; border: 1px solid rgba(76, 175, 80, 0.5); color: #c8e6c9;"> | |
| <h3>β Hash Found in Index</h3> | |
| <p><strong>Original Repo:</strong> <a href="https://huggingface.co/{result['repo_id']}" target="_blank" style="color: #a5d6a7;">{result['repo_id']}</a></p> | |
| <p><strong>Filename:</strong> {result['filename']}</p> | |
| <p><strong>First Seen:</strong> {result['timestamp']}</p> | |
| </div> | |
| """ | |
| else: | |
| # Dark Mode Friendly HTML | |
| return f""" | |
| <div style="background-color: rgba(255, 152, 0, 0.15); padding: 20px; border-radius: 10px; border: 1px solid rgba(255, 152, 0, 0.5); color: #ffe0b2;"> | |
| <h3>β Hash Not Found</h3> | |
| <p>This hash is not in our <strong>current session index</strong>.</p> | |
| <p><em>Since we are not saving data, you must index a repository (like the original model source) or run a patrol first to populate the database.</em></p> | |
| </div> | |
| """ | |
| def configure_persistence(dataset_id, token): | |
| return db.connect_to_hub(dataset_id, token), db.get_stats() | |
| # --- GRADIO UI --- | |
| # Added js to force dark mode on body load | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="gray", neutral_hue="gray"), title="HF Model Detective", js="document.body.classList.add('dark')") as demo: | |
| gr.Markdown("# π΅οΈ Hugging Face Model Detective") | |
| gr.Markdown("Identify the original source of model weights using LFS SHA256 hashes.") | |
| with gr.Row(): | |
| stats_box = gr.Textbox(label="Hashes in Memory", value=db.get_stats(), interactive=False) | |
| with gr.Tabs(): | |
| # TAB 1: PERSISTENCE (Configuration) | |
| with gr.Tab("βοΈ Persistence Settings"): | |
| gr.Markdown("Connect a **Hugging Face Dataset** to save your findings permanently. The app will sync `model_dna.csv` with this dataset.") | |
| with gr.Row(): | |
| dataset_input = gr.Textbox(label="Dataset ID", value="SHA-index/model-dna-index", placeholder="username/my-hash-dataset") | |
| token_input = gr.Textbox(label="HF Token (Write Access)", type="password", placeholder="hf_...") | |
| connect_btn = gr.Button("Connect & Load Data") | |
| status_box = gr.Textbox(label="Connection Status") | |
| connect_btn.click(configure_persistence, inputs=[dataset_input, token_input], outputs=[status_box, stats_box]) | |
| # TAB 2: INDEXING ZONE | |
| with gr.Tab("πΎ Indexing Zone"): | |
| gr.Markdown("Grow the 'Truth Database' by indexing models.") | |
| with gr.Row(): | |
| # LEFT COLUMN: Single Repo | |
| with gr.Column(): | |
| gr.Markdown("### π Single Repository") | |
| repo_input = gr.Textbox(label="Repository ID", placeholder="e.g. mistralai/Mistral-7B-v0.1") | |
| scan_btn = gr.Button("Scan & Index") | |
| # RIGHT COLUMN: Bulk Org | |
| with gr.Column(): | |
| gr.Markdown("### π’ Bulk Organization/User") | |
| org_input = gr.Textbox(label="Org/User ID", placeholder="e.g. meta-llama, google, TheBloke") | |
| limit_slider = gr.Slider(minimum=10, maximum=100, value=20, step=10, label="Max Models to Scan") | |
| bulk_btn = gr.Button("Bulk Scan & Index", variant="primary") | |
| scan_log = gr.Textbox(label="Scan Log", lines=5) | |
| # Button Logic | |
| scan_btn.click(scan_and_index, inputs=repo_input, outputs=[scan_log, stats_box]) | |
| bulk_btn.click(scan_org, inputs=[org_input, limit_slider], outputs=[scan_log, stats_box]) | |
| # TAB 3: SEARCH | |
| with gr.Tab("π Search by Hash"): | |
| hash_input = gr.Textbox(label="SHA256 Hash", placeholder="Paste the SHA256 string here...") | |
| search_btn = gr.Button("Trace Origin", variant="primary") | |
| search_output = gr.HTML() | |
| search_btn.click(check_hash_manually, inputs=hash_input, outputs=search_output) | |
| # TAB 4: PATROL | |
| with gr.Tab("π¨ Live Patrol"): | |
| gr.Markdown("Scan the most recently uploaded models and check if they match any hashes currently in memory.") | |
| with gr.Row(): | |
| limit_slider_patrol = gr.Slider(minimum=5, maximum=50, value=10, step=5, label="Models to Check") | |
| patrol_btn = gr.Button("Run Patrol", variant="stop") | |
| patrol_output = gr.HTML(label="Suspicious Findings") | |
| patrol_btn.click(patrol_new_uploads, inputs=limit_slider_patrol, outputs=[patrol_output, stats_box]) | |
| gr.Markdown(""" | |
| ### How it works | |
| 1. **Index:** We extract the SHA256 hash from the Git LFS pointer files (no huge downloads!). | |
| 2. **Compare:** We check if that hash was previously seen in an older repository. | |
| 3. **Detect:** If a new repo has the exact same hash as an old repo, it's a re-upload. | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() |