Spaces:
Sleeping
Sleeping
| import os | |
| import csv | |
| import json | |
| import ast | |
| from supabase import create_client, Client | |
| from dotenv import load_dotenv | |
| def clean_data(row: dict) -> dict: | |
| """ | |
| Cleans a single row of data to prepare it for Supabase insertion. | |
| - Handles nested JSON strings (like in 'metadata'). | |
| - Handles string representations of lists for vector embeddings with a robust fallback. | |
| - Converts empty strings to None. | |
| - Converts boolean-like strings to booleans. | |
| - Attempts to convert other values to numbers. | |
| Args: | |
| row (dict): A dictionary representing a row from the CSV. | |
| Returns: | |
| dict: The cleaned dictionary. | |
| """ | |
| cleaned_row = {} | |
| for key, value in row.items(): | |
| if value is None or value == '': | |
| cleaned_row[key] = None | |
| continue | |
| # Handle string-formatted data which is the most common issue | |
| if isinstance(value, str): | |
| # Check for string-formatted JSON object (like metadata) | |
| if value.startswith('{') and value.endswith('}'): | |
| try: | |
| cleaned_row[key] = ast.literal_eval(value) | |
| continue | |
| except (ValueError, SyntaxError): | |
| pass # Fall through if parsing fails | |
| # Check for string-formatted list (like embedding) | |
| if key == 'embedding' and value.startswith('[') and value.endswith(']'): | |
| try: | |
| # First, try the standard, fast way | |
| cleaned_row[key] = json.loads(value) | |
| continue | |
| except json.JSONDecodeError: | |
| # If standard parsing fails, try a more robust manual parsing. | |
| print(f" - Warning: Standard JSON parsing failed for an embedding. Attempting manual parse.") | |
| try: | |
| content = value.strip()[1:-1] # Remove brackets | |
| items = content.replace('\n', '').split(',') # Clean newlines and split | |
| float_list = [float(item.strip()) for item in items] | |
| cleaned_row[key] = float_list | |
| print(f" - Manual parse successful.") | |
| continue | |
| except Exception as manual_e: | |
| print(f" - ERROR: Manual parsing of embedding also failed: {manual_e}. Leaving as is for debugging.") | |
| # Keep original bad value, it will cause the single-row insert to fail, but with a clear log. | |
| cleaned_row[key] = value | |
| continue | |
| # Handle potential boolean values | |
| val_lower = value.lower() | |
| if val_lower in ('true', 't'): | |
| cleaned_row[key] = True | |
| continue | |
| if val_lower in ('false', 'f'): | |
| cleaned_row[key] = False | |
| continue | |
| # Attempt to convert to a number (integer then float) | |
| try: | |
| float_val = float(value) | |
| if float_val.is_integer(): | |
| cleaned_row[key] = int(float_val) | |
| else: | |
| cleaned_row[key] = float_val | |
| except (ValueError, TypeError): | |
| # If all conversions fail, it's just a regular string | |
| cleaned_row[key] = value | |
| return cleaned_row | |
| def import_csv_to_supabase(csv_file_path: str, table_name: str): | |
| """ | |
| Reads data from a CSV file, cleans it, and inserts it into a Supabase table. | |
| If a batch fails, it switches to single-row insertion to find the problematic row. | |
| This script requires a .env file in the same directory with: | |
| SUPABASE_URL="https://your-project-id.supabase.co" | |
| SUPABASE_KEY="your-service-role-key" | |
| Args: | |
| csv_file_path (str): The full path to the input CSV file. | |
| table_name (str): The name of the target table in your Supabase database. | |
| """ | |
| # --- 1. Load Environment Variables --- | |
| load_dotenv() | |
| # --- 2. Supabase Configuration --- | |
| supabase_url = os.environ.get("SUPABASE_URL") | |
| supabase_key = os.environ.get("SUPABASE_KEY") | |
| # --- 3. Input Validation --- | |
| if not supabase_url or not supabase_key: | |
| print("Error: SUPABASE_URL and SUPABASE_KEY must be set in your .env file.") | |
| return | |
| if not os.path.exists(csv_file_path): | |
| print(f"Error: The file '{csv_file_path}' was not found.") | |
| return | |
| try: | |
| # --- 4. Initialize Supabase Client --- | |
| print("Initializing Supabase client...") | |
| supabase: Client = create_client(supabase_url, supabase_key) | |
| print("Supabase client initialized successfully.") | |
| # --- 5. Read and Clean data from CSV file --- | |
| print(f"Reading and cleaning data from '{csv_file_path}'...") | |
| with open(csv_file_path, mode='r', encoding='utf-8') as file: | |
| csv_reader = csv.DictReader(file) | |
| data_to_insert = [clean_data(row) for row in csv_reader] | |
| if not data_to_insert: | |
| print("No data found in the CSV file to insert.") | |
| return | |
| print(f"Found and cleaned {len(data_to_insert)} rows to insert.") | |
| # --- 6. Insert data into Supabase in batches --- | |
| batch_size = 100 | |
| total_inserted_count = 0 | |
| for i in range(0, len(data_to_insert), batch_size): | |
| batch = data_to_insert[i:i + batch_size] | |
| print(f"--- Inserting batch {i//batch_size + 1} (rows {i+1}-{i+len(batch)}) ---") | |
| try: | |
| # Try to insert the whole batch first for efficiency | |
| data, count = supabase.table(table_name).insert(batch).execute() | |
| # Check for API error in the response from the batch insert | |
| if data and isinstance(data, list) and data[0].get('error'): | |
| raise Exception(f"Batch failed with Supabase error: {data[0]['error']}") | |
| if count and isinstance(count, list) and len(count) > 0: | |
| total_inserted_count += len(count) | |
| except Exception as e: | |
| # This block catches errors from the batch insert. | |
| print(f"!!! Batch insertion failed: {e}") | |
| print("--- Switching to single-row insertion for this batch to find the exact error... ---") | |
| # Iterate through the failing batch row by row | |
| for j, row in enumerate(batch): | |
| row_index_in_csv = i + j + 2 # +1 for 0-index, +1 for header | |
| try: | |
| # Attempt to insert just the single row | |
| single_data, single_count = supabase.table(table_name).insert(row).execute() | |
| if single_data and isinstance(single_data, list) and single_data[0].get('error'): | |
| print(f"\n--- > ERROR on row {row_index_in_csv} in CSV.") | |
| print(f"--- > Supabase Error: {single_data[0]['error']}") | |
| print(f"--- > Problematic Row Data: {json.dumps(row, indent=2)}") | |
| print("\n--- Import process stopped. Please fix the data in the CSV or your table schema and try again. ---") | |
| return # Stop the entire process | |
| except Exception as single_e: | |
| print(f"\n--- > CRITICAL ERROR on row {row_index_in_csv} in CSV.") | |
| print(f"--- > Exception: {single_e}") | |
| print(f"--- > Problematic Row Data: {json.dumps(row, indent=2)}") | |
| print("\n--- Import process stopped. Please fix the data in the CSV or your table schema and try again. ---") | |
| return # Stop the entire process | |
| break # Stop processing further batches after a failed one | |
| print("\n--- Import process finished. ---") | |
| print(f"Total rows successfully inserted: {total_inserted_count}") | |
| except Exception as e: | |
| print(f"An unexpected error occurred: {e}") | |
| if __name__ == '__main__': | |
| # --- Main Execution --- | |
| # Define the path to your CSV file and the target table name here. | |
| # The name of your CSV file | |
| input_csv_file = 'supabase_docs.csv' | |
| # The name of your table in Supabase (changed based on CSV filename) | |
| target_table_name = 'documents' | |
| # Get the absolute path to the CSV file | |
| # This assumes the script and the CSV are in the same directory | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| file_path = os.path.join(script_dir, input_csv_file) | |
| # Run the import process | |
| import_csv_to_supabase(file_path, target_table_name) | |