Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import sqlite3 | |
| import gradio as gr | |
| import unicodedata | |
| import re | |
| import ast | |
| import requests | |
| from pathlib import Path | |
| from typing import Optional | |
| # --- Constants --- | |
| DATABASE_URL = "https://raw.githubusercontent.com/R3gm/database_zip_files/main/archive/database.csv" | |
| DATABASE_PATH = Path("database.csv") | |
| DB_CONNECTION = None | |
| # --- UI Configuration --- | |
| APP_TITLE = "## 🔍 RVC Voice Finder" | |
| APP_DESCRIPTION = ( | |
| "This app digs through Hugging Face’s public zip files hunting for RVC models… " | |
| "and occasionally brings back random stuff that has nothing to do with them. " | |
| "Don’t worry though—the best matches are always shown first." | |
| ) | |
| # --- Function Definitions --- | |
| def setup_database() -> Optional[sqlite3.Connection]: | |
| """ | |
| Downloads the database, preprocesses it, and loads it into an in-memory | |
| SQLite FTS5 table for fast text searching. | |
| """ | |
| print("Setting up the database...") | |
| try: | |
| # Download the database file | |
| print(f"Downloading data from {DATABASE_URL}...") | |
| response = requests.get(DATABASE_URL, stream=True, timeout=30) | |
| response.raise_for_status() | |
| with open(DATABASE_PATH, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print("Download complete.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error downloading the database: {e}") | |
| # Use local file if it exists, otherwise fail | |
| if not DATABASE_PATH.exists(): | |
| raise FileNotFoundError(f"Failed to download and local copy not found at {DATABASE_PATH}") from e | |
| print("Using existing local database file.") | |
| # Load and preprocess the data with pandas | |
| df = pd.read_csv(DATABASE_PATH) | |
| df.rename(columns={"FILENAME": "Filename", "PARSED_URL": "URL", "MODEL_ID": "Repo ID"}, inplace=True) | |
| df['normalized_filename'] = df['Filename'].apply(normalize_text) | |
| df['URL'] = df['URL'].apply(clean_file_url) | |
| df = df.reset_index().rename(columns={'index': 'rowid'}) # Use original index as rowid | |
| # Connect to an in-memory SQLite database | |
| conn = sqlite3.connect(":memory:", check_same_thread=False) | |
| # Load the main data into a standard table | |
| df.to_sql("models", conn, index=False, if_exists="replace") | |
| # Create and populate the FTS5 virtual table for fast searching | |
| conn.execute(""" | |
| CREATE VIRTUAL TABLE models_fts USING fts5( | |
| Filename, | |
| normalized_filename, | |
| URL, | |
| 'Repo ID', | |
| content='models', | |
| content_rowid='rowid' | |
| ); | |
| """) | |
| conn.execute(""" | |
| INSERT INTO models_fts(rowid, Filename, normalized_filename, URL, "Repo ID") | |
| SELECT rowid, Filename, normalized_filename, URL, "Repo ID" FROM models; | |
| """) | |
| print("Database setup complete and loaded into memory.") | |
| return conn | |
| def normalize_text(text: str) -> str: | |
| """ | |
| Cleans and standardizes text for searching by lowercasing, | |
| removing accents, and replacing separators with spaces. | |
| """ | |
| if pd.isna(text): | |
| return "" | |
| text = text.lower() | |
| text = ''.join(c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn') | |
| return re.sub(r"[+()\-_/.]", " ", text) | |
| def clean_file_url(val) -> str: | |
| """ | |
| Cleans the URL column, handling lists stored as strings. | |
| """ | |
| if pd.isna(val): | |
| return "" | |
| if isinstance(val, str) and val.strip().startswith("["): | |
| try: | |
| # Safely evaluate string representation of a list | |
| parsed_list = ast.literal_eval(val) | |
| return ", ".join(map(str, parsed_list)) if isinstance(parsed_list, list) else val | |
| except (ValueError, SyntaxError): | |
| return val # Return original string if parsing fails | |
| return str(val) | |
| def search_models(query: str) -> Optional[pd.DataFrame]: | |
| """ | |
| Searches the FTS table for models matching the query. | |
| """ | |
| if not query.strip() or DB_CONNECTION is None: | |
| return None | |
| # Sanitize query and prepare for FTS by joining with "AND" | |
| keywords = normalize_text(query).split() | |
| fts_query = " AND ".join(keywords) | |
| if not fts_query: | |
| return None | |
| # Use FTS MATCH operator for efficient search | |
| sql_query = f""" | |
| SELECT Filename, URL, "Repo ID" | |
| FROM models_fts | |
| WHERE normalized_filename MATCH ? | |
| ORDER BY rank | |
| LIMIT 250; | |
| """ | |
| try: | |
| df_results = pd.read_sql_query(sql_query, DB_CONNECTION, params=(fts_query,)) | |
| except sqlite3.OperationalError as e: | |
| # This can happen if FTS query syntax is invalid | |
| gr.Warning(f"Search error: {e}") | |
| return None | |
| if df_results.empty: | |
| gr.Info("No matches found for your query.") | |
| return None | |
| return df_results | |
| # --- Main Execution & Gradio App --- | |
| if __name__ == "__main__": | |
| DB_CONNECTION = setup_database() | |
| with gr.Blocks() as demo: | |
| gr.Markdown(APP_TITLE) | |
| with gr.Row(): | |
| query_input = gr.Textbox( | |
| label="Search here", | |
| placeholder="e.g., Hatsune Miku", | |
| scale=4, | |
| ) | |
| search_button = gr.Button("Search", variant="primary", scale=1) | |
| output_df = gr.HTML(label="Search Results") | |
| gr.Markdown(APP_DESCRIPTION) | |
| # Event listeners | |
| query_input.submit(search_models, inputs=query_input, outputs=output_df) | |
| search_button.click(search_models, inputs=query_input, outputs=output_df) | |
| demo.launch(debug=True, show_error=True) |