Search-SHA / app.py
FlameF0X's picture
Update app.py
ac50a58 verified
import gradio as gr
from huggingface_hub import list_models, model_info, hf_hub_download, upload_file
import pandas as pd
import datetime
import os
# --- DATABASE MANAGER ---
class ModelDatabase:
def __init__(self):
# Initialize an empty DataFrame in memory.
self.df = pd.DataFrame(columns=["sha256", "repo_id", "filename", "timestamp", "tags"])
self.dataset_id = "SHA-index/model-dna-index"
self.token = ""
self.csv_name = "model_dna.csv"
def connect_to_hub(self, dataset_id, token=None):
"""Loads the CSV from a HF Dataset if it exists."""
self.dataset_id = dataset_id
self.token = token or os.environ.get("HF_TOKEN")
if not self.dataset_id:
return "⚠️ No Dataset ID provided."
try:
print(f"Attempting to download {self.csv_name} from {self.dataset_id}...")
path = hf_hub_download(
repo_id=self.dataset_id,
filename=self.csv_name,
repo_type="dataset",
token=self.token
)
self.df = pd.read_csv(path)
# Ensure columns exist (in case of schema drift)
for col in ["sha256", "repo_id", "filename", "timestamp", "tags"]:
if col not in self.df.columns:
self.df[col] = ""
return f"✅ Successfully loaded {len(self.df)} records from {self.dataset_id}."
except Exception as e:
# If file doesn't exist, we assume it's a new dataset and will create it on save
if "404" in str(e) or "EntryNotFound" in str(e):
return f"⚠️ Connected to {self.dataset_id}, but '{self.csv_name}' was not found. A new file will be created upon saving."
return f"❌ Error loading from Hub: {e}"
def save_to_hub(self):
"""Pushes the current DataFrame to the HF Dataset."""
if not self.dataset_id:
return "⚠️ Persistence not configured (No Dataset ID)."
try:
# Save to local temporary CSV
local_path = "temp_model_dna.csv"
self.df.to_csv(local_path, index=False)
# Upload to Hub
upload_file(
path_or_fileobj=local_path,
path_in_repo=self.csv_name,
repo_id=self.dataset_id,
repo_type="dataset",
token=self.token,
commit_message=f"Auto-save: Updated index with {len(self.df)} records"
)
return f"✅ Saved {len(self.df)} records to {self.dataset_id}."
except Exception as e:
return f"❌ Failed to save to Hub: {e}"
def add_record(self, sha256, repo_id, filename, timestamp, tags=""):
# Check if hash already exists in our session
if not self.df.empty and sha256 in self.df['sha256'].values:
# If it exists, check timestamps to see if we found an older (original) version
existing_row = self.df[self.df['sha256'] == sha256].iloc[0]
existing_time = pd.to_datetime(existing_row['timestamp'])
new_time = pd.to_datetime(timestamp)
if new_time < existing_time:
# Update the record to the older version (The true original)
self.df.loc[self.df['sha256'] == sha256, ['repo_id', 'filename', 'timestamp', 'tags']] = [repo_id, filename, timestamp, tags]
return "updated_original"
return "duplicate"
# Add new record
new_row = pd.DataFrame([{
"sha256": sha256,
"repo_id": repo_id,
"filename": filename,
"timestamp": timestamp,
"tags": tags
}])
if self.df.empty:
self.df = new_row
else:
self.df = pd.concat([self.df, new_row], ignore_index=True)
return "added"
def search_hash(self, sha256):
if self.df.empty:
return None
sha256 = sha256.strip().lower()
match = self.df[self.df['sha256'] == sha256]
if not match.empty:
return match.iloc[0].to_dict()
return None
def get_stats(self):
return len(self.df)
# Initialize Database
db = ModelDatabase()
# --- DETECTIVE LOGIC ---
def get_repo_dna(repo_id):
"""Scans a repo for LFS files and returns their hashes."""
try:
# We use model_info with files_metadata=True.
info = model_info(repo_id, files_metadata=True)
created_at = info.created_at if info.created_at else datetime.datetime.now()
tags = ", ".join(info.tags) if info.tags else ""
dna_list = []
if info.siblings:
for file in info.siblings:
filename = file.rfilename
is_weight_file = any(filename.endswith(ext) for ext in [".safetensors", ".bin", ".pt", ".pth", ".ckpt", ".gguf"])
# Check if it has LFS metadata
if is_weight_file and hasattr(file, 'lfs') and file.lfs:
dna_list.append({
"sha256": file.lfs["sha256"],
"filename": filename,
"repo_id": repo_id,
"timestamp": str(created_at),
"tags": tags
})
return dna_list, None
except Exception as e:
return [], str(e)
def scan_and_index(repo_id, progress=gr.Progress()):
"""Manually scan a repo and add it to the DB."""
if not repo_id:
return "⚠️ Please enter a Repository ID.", db.get_stats()
progress(0, desc=f"Connecting to {repo_id}...")
dna_list, error = get_repo_dna(repo_id)
if error:
return f"❌ Error scanning {repo_id}: {error}", db.get_stats()
if not dna_list:
return f"⚠️ No LFS weight files found in {repo_id}.", db.get_stats()
added_count = 0
updated_count = 0
progress(0.5, desc="Analyzing hashes...")
for item in dna_list:
status = db.add_record(
item['sha256'], item['repo_id'], item['filename'], item['timestamp'], item['tags']
)
if status == "added":
added_count += 1
elif status == "updated_original":
updated_count += 1
save_msg = ""
if db.dataset_id:
save_msg = db.save_to_hub()
return f"✅ Scanned {repo_id}.\n🆕 Added {added_count} new hashes.\n🔄 Updated {updated_count} originals.\n💾 {save_msg}", db.get_stats()
def scan_org(org_id, limit=20, progress=gr.Progress()):
"""Scans multiple models from a specific user or organization."""
if not org_id:
return "⚠️ Please enter an Organization or User ID.", db.get_stats()
progress(0, desc=f"Fetching models for {org_id}...")
try:
# Fixed list_models call: removed 'direction' and simplified 'sort'
# Recent huggingface_hub uses sort="downloads" (descending is usually default)
models = list(list_models(author=org_id, sort="downloads", limit=limit))
except Exception as e:
return f"❌ Error fetching models for {org_id}: {e}", db.get_stats()
if not models:
return f"⚠️ No models found for {org_id}.", db.get_stats()
total_added = 0
total_updated = 0
for i, model in enumerate(models):
repo_id = model.modelId
progress((i / len(models)), desc=f"Scanning {repo_id}...")
dna_list, error = get_repo_dna(repo_id)
if error or not dna_list:
continue
for item in dna_list:
status = db.add_record(
item['sha256'], item['repo_id'], item['filename'], item['timestamp'], item['tags']
)
if status == "added":
total_added += 1
elif status == "updated_original":
total_updated += 1
save_msg = ""
if db.dataset_id:
save_msg = db.save_to_hub()
return f"✅ Bulk Scan Complete for {org_id}.\nChecked {len(models)} models.\n🆕 Added {total_added} new hashes.\n🔄 Updated {total_updated} originals.\n💾 {save_msg}", db.get_stats()
def patrol_new_uploads(limit=10, progress=gr.Progress()):
"""The 'Watchdog': Scans the latest models."""
progress(0, desc="Fetching latest models...")
try:
# Fixed list_models call: removed 'direction' and used 'lastModified' for newest
models = list(list_models(filter="safetensors", sort="lastModified", limit=limit))
except Exception as e:
return f"Error fetching models: {e}", db.get_stats()
log_results = []
for i, model in enumerate(models):
repo_id = model.modelId
progress((i / len(models)), desc=f"Checking {repo_id}...")
dna_list, error = get_repo_dna(repo_id)
if error or not dna_list:
continue
for item in dna_list:
existing = db.search_hash(item['sha256'])
if existing:
original_repo = existing['repo_id']
if original_repo != repo_id:
log = f"""
<div style="background-color: #222; border-left: 4px solid #ff5252; padding: 10px; margin-bottom: 10px; font-family: monospace; color: #ddd;">
<strong style="color: #ff5252;">[MATCH FOUND]</strong><br>
New Upload: <b>{repo_id}</b><br>
Hash: <code>{item['sha256'][:16]}...</code><br>
Original: <a href='https://huggingface.co/{original_repo}' target='_blank' style='color: #4fc3f7;'><b>{original_repo}</b></a>
</div>
"""
log_results.append(log)
else:
db.add_record(item['sha256'], item['repo_id'], item['filename'], item['timestamp'], item['tags'])
if db.dataset_id:
db.save_to_hub()
if not log_results:
return "✅ No obvious copies found in the last batch.", db.get_stats()
return "".join(log_results), db.get_stats()
def check_hash_manually(sha_input):
"""User pastes a hash to search."""
if not sha_input:
return "⚠️ Please enter a SHA256 hash."
result = db.search_hash(sha_input)
if result:
return f"""
<div style="background-color: #1a1a1a; padding: 20px; border: 1px solid #333; font-family: monospace; color: #eee;">
<h3 style="color: #69f0ae; margin-top: 0;">>> HASH_FOUND_IN_INDEX</h3>
<p><strong>Original Repo:</strong> <a href="https://huggingface.co/{result['repo_id']}" target="_blank" style="color: #69f0ae;">{result['repo_id']}</a></p>
<p><strong>Filename:</strong> {result['filename']}</p>
<p><strong>First Seen:</strong> {result['timestamp']}</p>
</div>
"""
else:
return f"""
<div style="background-color: #1a1a1a; padding: 20px; border: 1px solid #333; font-family: monospace; color: #bbb;">
<h3 style="color: #ffab40; margin-top: 0;">>> HASH_NOT_FOUND</h3>
<p>This hash is not in the current index.</p>
</div>
"""
def configure_persistence(dataset_id, token):
return db.connect_to_hub(dataset_id, token), db.get_stats()
# --- GRADIO UI ---
with gr.Blocks(theme=gr.themes.Monochrome(primary_hue="neutral", radius_size="none"), title="Search-SHA") as demo:
gr.Markdown("# # Search-SHA")
gr.Markdown("Utilitarian tool to identify model weight provenance via LFS SHA256 hashes.")
with gr.Row():
stats_box = gr.Textbox(label="INDEX_SIZE", value=db.get_stats(), interactive=False)
with gr.Tabs():
# TAB 1: PERSISTENCE
with gr.Tab("SETTINGS"):
gr.Markdown("### Persistence Configuration")
with gr.Row():
dataset_input = gr.Textbox(label="Dataset ID", value="SHA-index/model-dna-index")
token_input = gr.Textbox(label="HF Token (Write Access)", type="password")
connect_btn = gr.Button("CONNECT_AND_LOAD")
status_box = gr.Textbox(label="STATUS")
connect_btn.click(configure_persistence, inputs=[dataset_input, token_input], outputs=[status_box, stats_box])
# TAB 2: INDEXING
with gr.Tab("INDEXER"):
with gr.Row():
with gr.Column():
gr.Markdown("### Single Repository")
repo_input = gr.Textbox(label="Repository ID", placeholder="mistralai/Mistral-7B-v0.1")
scan_btn = gr.Button("SCAN_REPO")
with gr.Column():
gr.Markdown("### Bulk Organization")
org_input = gr.Textbox(label="Org/User ID", placeholder="meta-llama")
limit_slider = gr.Slider(minimum=10, maximum=100, value=20, step=10, label="Limit")
bulk_btn = gr.Button("BULK_SCAN")
scan_log = gr.Textbox(label="LOGS", lines=5)
scan_btn.click(scan_and_index, inputs=repo_input, outputs=[scan_log, stats_box])
bulk_btn.click(scan_org, inputs=[org_input, limit_slider], outputs=[scan_log, stats_box])
# TAB 3: SEARCH
with gr.Tab("SEARCH"):
hash_input = gr.Textbox(label="SHA256 Hash", placeholder="Paste SHA256 string...")
search_btn = gr.Button("TRACE_ORIGIN")
search_output = gr.HTML()
search_btn.click(check_hash_manually, inputs=hash_input, outputs=search_output)
# TAB 4: PATROL
with gr.Tab("PATROL"):
gr.Markdown("### Live Watchdog")
with gr.Row():
limit_slider_patrol = gr.Slider(minimum=5, maximum=50, value=10, step=5, label="Batch Size")
patrol_btn = gr.Button("RUN_PATROL")
patrol_output = gr.HTML(label="Suspicious Findings")
patrol_btn.click(patrol_new_uploads, inputs=limit_slider_patrol, outputs=[patrol_output, stats_box])
if __name__ == "__main__":
demo.launch()