import os import csv import json import ast from supabase import create_client, Client from dotenv import load_dotenv def clean_data(row: dict) -> dict: """ Cleans a single row of data to prepare it for Supabase insertion. - Handles nested JSON strings (like in 'metadata'). - Handles string representations of lists for vector embeddings with a robust fallback. - Converts empty strings to None. - Converts boolean-like strings to booleans. - Attempts to convert other values to numbers. Args: row (dict): A dictionary representing a row from the CSV. Returns: dict: The cleaned dictionary. """ cleaned_row = {} for key, value in row.items(): if value is None or value == '': cleaned_row[key] = None continue # Handle string-formatted data which is the most common issue if isinstance(value, str): # Check for string-formatted JSON object (like metadata) if value.startswith('{') and value.endswith('}'): try: cleaned_row[key] = ast.literal_eval(value) continue except (ValueError, SyntaxError): pass # Fall through if parsing fails # Check for string-formatted list (like embedding) if key == 'embedding' and value.startswith('[') and value.endswith(']'): try: # First, try the standard, fast way cleaned_row[key] = json.loads(value) continue except json.JSONDecodeError: # If standard parsing fails, try a more robust manual parsing. print(f" - Warning: Standard JSON parsing failed for an embedding. Attempting manual parse.") try: content = value.strip()[1:-1] # Remove brackets items = content.replace('\n', '').split(',') # Clean newlines and split float_list = [float(item.strip()) for item in items] cleaned_row[key] = float_list print(f" - Manual parse successful.") continue except Exception as manual_e: print(f" - ERROR: Manual parsing of embedding also failed: {manual_e}. Leaving as is for debugging.") # Keep original bad value, it will cause the single-row insert to fail, but with a clear log. cleaned_row[key] = value continue # Handle potential boolean values val_lower = value.lower() if val_lower in ('true', 't'): cleaned_row[key] = True continue if val_lower in ('false', 'f'): cleaned_row[key] = False continue # Attempt to convert to a number (integer then float) try: float_val = float(value) if float_val.is_integer(): cleaned_row[key] = int(float_val) else: cleaned_row[key] = float_val except (ValueError, TypeError): # If all conversions fail, it's just a regular string cleaned_row[key] = value return cleaned_row def import_csv_to_supabase(csv_file_path: str, table_name: str): """ Reads data from a CSV file, cleans it, and inserts it into a Supabase table. If a batch fails, it switches to single-row insertion to find the problematic row. This script requires a .env file in the same directory with: SUPABASE_URL="https://your-project-id.supabase.co" SUPABASE_KEY="your-service-role-key" Args: csv_file_path (str): The full path to the input CSV file. table_name (str): The name of the target table in your Supabase database. """ # --- 1. Load Environment Variables --- load_dotenv() # --- 2. Supabase Configuration --- supabase_url = os.environ.get("SUPABASE_URL") supabase_key = os.environ.get("SUPABASE_KEY") # --- 3. Input Validation --- if not supabase_url or not supabase_key: print("Error: SUPABASE_URL and SUPABASE_KEY must be set in your .env file.") return if not os.path.exists(csv_file_path): print(f"Error: The file '{csv_file_path}' was not found.") return try: # --- 4. Initialize Supabase Client --- print("Initializing Supabase client...") supabase: Client = create_client(supabase_url, supabase_key) print("Supabase client initialized successfully.") # --- 5. Read and Clean data from CSV file --- print(f"Reading and cleaning data from '{csv_file_path}'...") with open(csv_file_path, mode='r', encoding='utf-8') as file: csv_reader = csv.DictReader(file) data_to_insert = [clean_data(row) for row in csv_reader] if not data_to_insert: print("No data found in the CSV file to insert.") return print(f"Found and cleaned {len(data_to_insert)} rows to insert.") # --- 6. Insert data into Supabase in batches --- batch_size = 100 total_inserted_count = 0 for i in range(0, len(data_to_insert), batch_size): batch = data_to_insert[i:i + batch_size] print(f"--- Inserting batch {i//batch_size + 1} (rows {i+1}-{i+len(batch)}) ---") try: # Try to insert the whole batch first for efficiency data, count = supabase.table(table_name).insert(batch).execute() # Check for API error in the response from the batch insert if data and isinstance(data, list) and data[0].get('error'): raise Exception(f"Batch failed with Supabase error: {data[0]['error']}") if count and isinstance(count, list) and len(count) > 0: total_inserted_count += len(count) except Exception as e: # This block catches errors from the batch insert. print(f"!!! Batch insertion failed: {e}") print("--- Switching to single-row insertion for this batch to find the exact error... ---") # Iterate through the failing batch row by row for j, row in enumerate(batch): row_index_in_csv = i + j + 2 # +1 for 0-index, +1 for header try: # Attempt to insert just the single row single_data, single_count = supabase.table(table_name).insert(row).execute() if single_data and isinstance(single_data, list) and single_data[0].get('error'): print(f"\n--- > ERROR on row {row_index_in_csv} in CSV.") print(f"--- > Supabase Error: {single_data[0]['error']}") print(f"--- > Problematic Row Data: {json.dumps(row, indent=2)}") print("\n--- Import process stopped. Please fix the data in the CSV or your table schema and try again. ---") return # Stop the entire process except Exception as single_e: print(f"\n--- > CRITICAL ERROR on row {row_index_in_csv} in CSV.") print(f"--- > Exception: {single_e}") print(f"--- > Problematic Row Data: {json.dumps(row, indent=2)}") print("\n--- Import process stopped. Please fix the data in the CSV or your table schema and try again. ---") return # Stop the entire process break # Stop processing further batches after a failed one print("\n--- Import process finished. ---") print(f"Total rows successfully inserted: {total_inserted_count}") except Exception as e: print(f"An unexpected error occurred: {e}") if __name__ == '__main__': # --- Main Execution --- # Define the path to your CSV file and the target table name here. # The name of your CSV file input_csv_file = 'supabase_docs.csv' # The name of your table in Supabase (changed based on CSV filename) target_table_name = 'documents' # Get the absolute path to the CSV file # This assumes the script and the CSV are in the same directory script_dir = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.join(script_dir, input_csv_file) # Run the import process import_csv_to_supabase(file_path, target_table_name)