Spaces:
Running
Running
| import gradio as gr | |
| from huggingface_hub import list_models, model_info, hf_hub_download, upload_file | |
| import pandas as pd | |
| import datetime | |
| import os | |
| # --- DATABASE MANAGER --- | |
| class ModelDatabase: | |
| def __init__(self): | |
| # Initialize an empty DataFrame in memory. | |
| self.df = pd.DataFrame(columns=["sha256", "repo_id", "filename", "timestamp", "tags"]) | |
| self.dataset_id = "SHA-index/model-dna-index" | |
| self.token = "" | |
| self.csv_name = "model_dna.csv" | |
| def connect_to_hub(self, dataset_id, token=None): | |
| """Loads the CSV from a HF Dataset if it exists.""" | |
| self.dataset_id = dataset_id | |
| self.token = token or os.environ.get("HF_TOKEN") | |
| if not self.dataset_id: | |
| return "⚠️ No Dataset ID provided." | |
| try: | |
| print(f"Attempting to download {self.csv_name} from {self.dataset_id}...") | |
| path = hf_hub_download( | |
| repo_id=self.dataset_id, | |
| filename=self.csv_name, | |
| repo_type="dataset", | |
| token=self.token | |
| ) | |
| self.df = pd.read_csv(path) | |
| # Ensure columns exist (in case of schema drift) | |
| for col in ["sha256", "repo_id", "filename", "timestamp", "tags"]: | |
| if col not in self.df.columns: | |
| self.df[col] = "" | |
| return f"✅ Successfully loaded {len(self.df)} records from {self.dataset_id}." | |
| except Exception as e: | |
| # If file doesn't exist, we assume it's a new dataset and will create it on save | |
| if "404" in str(e) or "EntryNotFound" in str(e): | |
| return f"⚠️ Connected to {self.dataset_id}, but '{self.csv_name}' was not found. A new file will be created upon saving." | |
| return f"❌ Error loading from Hub: {e}" | |
| def save_to_hub(self): | |
| """Pushes the current DataFrame to the HF Dataset.""" | |
| if not self.dataset_id: | |
| return "⚠️ Persistence not configured (No Dataset ID)." | |
| try: | |
| # Save to local temporary CSV | |
| local_path = "temp_model_dna.csv" | |
| self.df.to_csv(local_path, index=False) | |
| # Upload to Hub | |
| upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=self.csv_name, | |
| repo_id=self.dataset_id, | |
| repo_type="dataset", | |
| token=self.token, | |
| commit_message=f"Auto-save: Updated index with {len(self.df)} records" | |
| ) | |
| return f"✅ Saved {len(self.df)} records to {self.dataset_id}." | |
| except Exception as e: | |
| return f"❌ Failed to save to Hub: {e}" | |
| def add_record(self, sha256, repo_id, filename, timestamp, tags=""): | |
| # Check if hash already exists in our session | |
| if not self.df.empty and sha256 in self.df['sha256'].values: | |
| # If it exists, check timestamps to see if we found an older (original) version | |
| existing_row = self.df[self.df['sha256'] == sha256].iloc[0] | |
| existing_time = pd.to_datetime(existing_row['timestamp']) | |
| new_time = pd.to_datetime(timestamp) | |
| if new_time < existing_time: | |
| # Update the record to the older version (The true original) | |
| self.df.loc[self.df['sha256'] == sha256, ['repo_id', 'filename', 'timestamp', 'tags']] = [repo_id, filename, timestamp, tags] | |
| return "updated_original" | |
| return "duplicate" | |
| # Add new record | |
| new_row = pd.DataFrame([{ | |
| "sha256": sha256, | |
| "repo_id": repo_id, | |
| "filename": filename, | |
| "timestamp": timestamp, | |
| "tags": tags | |
| }]) | |
| if self.df.empty: | |
| self.df = new_row | |
| else: | |
| self.df = pd.concat([self.df, new_row], ignore_index=True) | |
| return "added" | |
| def search_hash(self, sha256): | |
| if self.df.empty: | |
| return None | |
| sha256 = sha256.strip().lower() | |
| match = self.df[self.df['sha256'] == sha256] | |
| if not match.empty: | |
| return match.iloc[0].to_dict() | |
| return None | |
| def get_stats(self): | |
| return len(self.df) | |
| # Initialize Database | |
| db = ModelDatabase() | |
| # --- DETECTIVE LOGIC --- | |
| def get_repo_dna(repo_id): | |
| """Scans a repo for LFS files and returns their hashes.""" | |
| try: | |
| # We use model_info with files_metadata=True. | |
| info = model_info(repo_id, files_metadata=True) | |
| created_at = info.created_at if info.created_at else datetime.datetime.now() | |
| tags = ", ".join(info.tags) if info.tags else "" | |
| dna_list = [] | |
| if info.siblings: | |
| for file in info.siblings: | |
| filename = file.rfilename | |
| is_weight_file = any(filename.endswith(ext) for ext in [".safetensors", ".bin", ".pt", ".pth", ".ckpt", ".gguf"]) | |
| # Check if it has LFS metadata | |
| if is_weight_file and hasattr(file, 'lfs') and file.lfs: | |
| dna_list.append({ | |
| "sha256": file.lfs["sha256"], | |
| "filename": filename, | |
| "repo_id": repo_id, | |
| "timestamp": str(created_at), | |
| "tags": tags | |
| }) | |
| return dna_list, None | |
| except Exception as e: | |
| return [], str(e) | |
| def scan_and_index(repo_id, progress=gr.Progress()): | |
| """Manually scan a repo and add it to the DB.""" | |
| if not repo_id: | |
| return "⚠️ Please enter a Repository ID.", db.get_stats() | |
| progress(0, desc=f"Connecting to {repo_id}...") | |
| dna_list, error = get_repo_dna(repo_id) | |
| if error: | |
| return f"❌ Error scanning {repo_id}: {error}", db.get_stats() | |
| if not dna_list: | |
| return f"⚠️ No LFS weight files found in {repo_id}.", db.get_stats() | |
| added_count = 0 | |
| updated_count = 0 | |
| progress(0.5, desc="Analyzing hashes...") | |
| for item in dna_list: | |
| status = db.add_record( | |
| item['sha256'], item['repo_id'], item['filename'], item['timestamp'], item['tags'] | |
| ) | |
| if status == "added": | |
| added_count += 1 | |
| elif status == "updated_original": | |
| updated_count += 1 | |
| save_msg = "" | |
| if db.dataset_id: | |
| save_msg = db.save_to_hub() | |
| return f"✅ Scanned {repo_id}.\n🆕 Added {added_count} new hashes.\n🔄 Updated {updated_count} originals.\n💾 {save_msg}", db.get_stats() | |
| def scan_org(org_id, limit=20, progress=gr.Progress()): | |
| """Scans multiple models from a specific user or organization.""" | |
| if not org_id: | |
| return "⚠️ Please enter an Organization or User ID.", db.get_stats() | |
| progress(0, desc=f"Fetching models for {org_id}...") | |
| try: | |
| # Fixed list_models call: removed 'direction' and simplified 'sort' | |
| # Recent huggingface_hub uses sort="downloads" (descending is usually default) | |
| models = list(list_models(author=org_id, sort="downloads", limit=limit)) | |
| except Exception as e: | |
| return f"❌ Error fetching models for {org_id}: {e}", db.get_stats() | |
| if not models: | |
| return f"⚠️ No models found for {org_id}.", db.get_stats() | |
| total_added = 0 | |
| total_updated = 0 | |
| for i, model in enumerate(models): | |
| repo_id = model.modelId | |
| progress((i / len(models)), desc=f"Scanning {repo_id}...") | |
| dna_list, error = get_repo_dna(repo_id) | |
| if error or not dna_list: | |
| continue | |
| for item in dna_list: | |
| status = db.add_record( | |
| item['sha256'], item['repo_id'], item['filename'], item['timestamp'], item['tags'] | |
| ) | |
| if status == "added": | |
| total_added += 1 | |
| elif status == "updated_original": | |
| total_updated += 1 | |
| save_msg = "" | |
| if db.dataset_id: | |
| save_msg = db.save_to_hub() | |
| return f"✅ Bulk Scan Complete for {org_id}.\nChecked {len(models)} models.\n🆕 Added {total_added} new hashes.\n🔄 Updated {total_updated} originals.\n💾 {save_msg}", db.get_stats() | |
| def patrol_new_uploads(limit=10, progress=gr.Progress()): | |
| """The 'Watchdog': Scans the latest models.""" | |
| progress(0, desc="Fetching latest models...") | |
| try: | |
| # Fixed list_models call: removed 'direction' and used 'lastModified' for newest | |
| models = list(list_models(filter="safetensors", sort="lastModified", limit=limit)) | |
| except Exception as e: | |
| return f"Error fetching models: {e}", db.get_stats() | |
| log_results = [] | |
| for i, model in enumerate(models): | |
| repo_id = model.modelId | |
| progress((i / len(models)), desc=f"Checking {repo_id}...") | |
| dna_list, error = get_repo_dna(repo_id) | |
| if error or not dna_list: | |
| continue | |
| for item in dna_list: | |
| existing = db.search_hash(item['sha256']) | |
| if existing: | |
| original_repo = existing['repo_id'] | |
| if original_repo != repo_id: | |
| log = f""" | |
| <div style="background-color: #222; border-left: 4px solid #ff5252; padding: 10px; margin-bottom: 10px; font-family: monospace; color: #ddd;"> | |
| <strong style="color: #ff5252;">[MATCH FOUND]</strong><br> | |
| New Upload: <b>{repo_id}</b><br> | |
| Hash: <code>{item['sha256'][:16]}...</code><br> | |
| Original: <a href='https://huggingface.co/{original_repo}' target='_blank' style='color: #4fc3f7;'><b>{original_repo}</b></a> | |
| </div> | |
| """ | |
| log_results.append(log) | |
| else: | |
| db.add_record(item['sha256'], item['repo_id'], item['filename'], item['timestamp'], item['tags']) | |
| if db.dataset_id: | |
| db.save_to_hub() | |
| if not log_results: | |
| return "✅ No obvious copies found in the last batch.", db.get_stats() | |
| return "".join(log_results), db.get_stats() | |
| def check_hash_manually(sha_input): | |
| """User pastes a hash to search.""" | |
| if not sha_input: | |
| return "⚠️ Please enter a SHA256 hash." | |
| result = db.search_hash(sha_input) | |
| if result: | |
| return f""" | |
| <div style="background-color: #1a1a1a; padding: 20px; border: 1px solid #333; font-family: monospace; color: #eee;"> | |
| <h3 style="color: #69f0ae; margin-top: 0;">>> HASH_FOUND_IN_INDEX</h3> | |
| <p><strong>Original Repo:</strong> <a href="https://huggingface.co/{result['repo_id']}" target="_blank" style="color: #69f0ae;">{result['repo_id']}</a></p> | |
| <p><strong>Filename:</strong> {result['filename']}</p> | |
| <p><strong>First Seen:</strong> {result['timestamp']}</p> | |
| </div> | |
| """ | |
| else: | |
| return f""" | |
| <div style="background-color: #1a1a1a; padding: 20px; border: 1px solid #333; font-family: monospace; color: #bbb;"> | |
| <h3 style="color: #ffab40; margin-top: 0;">>> HASH_NOT_FOUND</h3> | |
| <p>This hash is not in the current index.</p> | |
| </div> | |
| """ | |
| def configure_persistence(dataset_id, token): | |
| return db.connect_to_hub(dataset_id, token), db.get_stats() | |
| # --- GRADIO UI --- | |
| with gr.Blocks(theme=gr.themes.Monochrome(primary_hue="neutral", radius_size="none"), title="Search-SHA") as demo: | |
| gr.Markdown("# # Search-SHA") | |
| gr.Markdown("Utilitarian tool to identify model weight provenance via LFS SHA256 hashes.") | |
| with gr.Row(): | |
| stats_box = gr.Textbox(label="INDEX_SIZE", value=db.get_stats(), interactive=False) | |
| with gr.Tabs(): | |
| # TAB 1: PERSISTENCE | |
| with gr.Tab("SETTINGS"): | |
| gr.Markdown("### Persistence Configuration") | |
| with gr.Row(): | |
| dataset_input = gr.Textbox(label="Dataset ID", value="SHA-index/model-dna-index") | |
| token_input = gr.Textbox(label="HF Token (Write Access)", type="password") | |
| connect_btn = gr.Button("CONNECT_AND_LOAD") | |
| status_box = gr.Textbox(label="STATUS") | |
| connect_btn.click(configure_persistence, inputs=[dataset_input, token_input], outputs=[status_box, stats_box]) | |
| # TAB 2: INDEXING | |
| with gr.Tab("INDEXER"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Single Repository") | |
| repo_input = gr.Textbox(label="Repository ID", placeholder="mistralai/Mistral-7B-v0.1") | |
| scan_btn = gr.Button("SCAN_REPO") | |
| with gr.Column(): | |
| gr.Markdown("### Bulk Organization") | |
| org_input = gr.Textbox(label="Org/User ID", placeholder="meta-llama") | |
| limit_slider = gr.Slider(minimum=10, maximum=100, value=20, step=10, label="Limit") | |
| bulk_btn = gr.Button("BULK_SCAN") | |
| scan_log = gr.Textbox(label="LOGS", lines=5) | |
| scan_btn.click(scan_and_index, inputs=repo_input, outputs=[scan_log, stats_box]) | |
| bulk_btn.click(scan_org, inputs=[org_input, limit_slider], outputs=[scan_log, stats_box]) | |
| # TAB 3: SEARCH | |
| with gr.Tab("SEARCH"): | |
| hash_input = gr.Textbox(label="SHA256 Hash", placeholder="Paste SHA256 string...") | |
| search_btn = gr.Button("TRACE_ORIGIN") | |
| search_output = gr.HTML() | |
| search_btn.click(check_hash_manually, inputs=hash_input, outputs=search_output) | |
| # TAB 4: PATROL | |
| with gr.Tab("PATROL"): | |
| gr.Markdown("### Live Watchdog") | |
| with gr.Row(): | |
| limit_slider_patrol = gr.Slider(minimum=5, maximum=50, value=10, step=5, label="Batch Size") | |
| patrol_btn = gr.Button("RUN_PATROL") | |
| patrol_output = gr.HTML(label="Suspicious Findings") | |
| patrol_btn.click(patrol_new_uploads, inputs=limit_slider_patrol, outputs=[patrol_output, stats_box]) | |
| if __name__ == "__main__": | |
| demo.launch() |