GAIA_Evaluation / dataset /insert_data_to_supabase.py
denven
Leverage LangGraph & LLM models to implement
bd62c9a
import os
import csv
import json
import ast
from supabase import create_client, Client
from dotenv import load_dotenv
def clean_data(row: dict) -> dict:
"""
Cleans a single row of data to prepare it for Supabase insertion.
- Handles nested JSON strings (like in 'metadata').
- Handles string representations of lists for vector embeddings with a robust fallback.
- Converts empty strings to None.
- Converts boolean-like strings to booleans.
- Attempts to convert other values to numbers.
Args:
row (dict): A dictionary representing a row from the CSV.
Returns:
dict: The cleaned dictionary.
"""
cleaned_row = {}
for key, value in row.items():
if value is None or value == '':
cleaned_row[key] = None
continue
# Handle string-formatted data which is the most common issue
if isinstance(value, str):
# Check for string-formatted JSON object (like metadata)
if value.startswith('{') and value.endswith('}'):
try:
cleaned_row[key] = ast.literal_eval(value)
continue
except (ValueError, SyntaxError):
pass # Fall through if parsing fails
# Check for string-formatted list (like embedding)
if key == 'embedding' and value.startswith('[') and value.endswith(']'):
try:
# First, try the standard, fast way
cleaned_row[key] = json.loads(value)
continue
except json.JSONDecodeError:
# If standard parsing fails, try a more robust manual parsing.
print(f" - Warning: Standard JSON parsing failed for an embedding. Attempting manual parse.")
try:
content = value.strip()[1:-1] # Remove brackets
items = content.replace('\n', '').split(',') # Clean newlines and split
float_list = [float(item.strip()) for item in items]
cleaned_row[key] = float_list
print(f" - Manual parse successful.")
continue
except Exception as manual_e:
print(f" - ERROR: Manual parsing of embedding also failed: {manual_e}. Leaving as is for debugging.")
# Keep original bad value, it will cause the single-row insert to fail, but with a clear log.
cleaned_row[key] = value
continue
# Handle potential boolean values
val_lower = value.lower()
if val_lower in ('true', 't'):
cleaned_row[key] = True
continue
if val_lower in ('false', 'f'):
cleaned_row[key] = False
continue
# Attempt to convert to a number (integer then float)
try:
float_val = float(value)
if float_val.is_integer():
cleaned_row[key] = int(float_val)
else:
cleaned_row[key] = float_val
except (ValueError, TypeError):
# If all conversions fail, it's just a regular string
cleaned_row[key] = value
return cleaned_row
def import_csv_to_supabase(csv_file_path: str, table_name: str):
"""
Reads data from a CSV file, cleans it, and inserts it into a Supabase table.
If a batch fails, it switches to single-row insertion to find the problematic row.
This script requires a .env file in the same directory with:
SUPABASE_URL="https://your-project-id.supabase.co"
SUPABASE_KEY="your-service-role-key"
Args:
csv_file_path (str): The full path to the input CSV file.
table_name (str): The name of the target table in your Supabase database.
"""
# --- 1. Load Environment Variables ---
load_dotenv()
# --- 2. Supabase Configuration ---
supabase_url = os.environ.get("SUPABASE_URL")
supabase_key = os.environ.get("SUPABASE_KEY")
# --- 3. Input Validation ---
if not supabase_url or not supabase_key:
print("Error: SUPABASE_URL and SUPABASE_KEY must be set in your .env file.")
return
if not os.path.exists(csv_file_path):
print(f"Error: The file '{csv_file_path}' was not found.")
return
try:
# --- 4. Initialize Supabase Client ---
print("Initializing Supabase client...")
supabase: Client = create_client(supabase_url, supabase_key)
print("Supabase client initialized successfully.")
# --- 5. Read and Clean data from CSV file ---
print(f"Reading and cleaning data from '{csv_file_path}'...")
with open(csv_file_path, mode='r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
data_to_insert = [clean_data(row) for row in csv_reader]
if not data_to_insert:
print("No data found in the CSV file to insert.")
return
print(f"Found and cleaned {len(data_to_insert)} rows to insert.")
# --- 6. Insert data into Supabase in batches ---
batch_size = 100
total_inserted_count = 0
for i in range(0, len(data_to_insert), batch_size):
batch = data_to_insert[i:i + batch_size]
print(f"--- Inserting batch {i//batch_size + 1} (rows {i+1}-{i+len(batch)}) ---")
try:
# Try to insert the whole batch first for efficiency
data, count = supabase.table(table_name).insert(batch).execute()
# Check for API error in the response from the batch insert
if data and isinstance(data, list) and data[0].get('error'):
raise Exception(f"Batch failed with Supabase error: {data[0]['error']}")
if count and isinstance(count, list) and len(count) > 0:
total_inserted_count += len(count)
except Exception as e:
# This block catches errors from the batch insert.
print(f"!!! Batch insertion failed: {e}")
print("--- Switching to single-row insertion for this batch to find the exact error... ---")
# Iterate through the failing batch row by row
for j, row in enumerate(batch):
row_index_in_csv = i + j + 2 # +1 for 0-index, +1 for header
try:
# Attempt to insert just the single row
single_data, single_count = supabase.table(table_name).insert(row).execute()
if single_data and isinstance(single_data, list) and single_data[0].get('error'):
print(f"\n--- > ERROR on row {row_index_in_csv} in CSV.")
print(f"--- > Supabase Error: {single_data[0]['error']}")
print(f"--- > Problematic Row Data: {json.dumps(row, indent=2)}")
print("\n--- Import process stopped. Please fix the data in the CSV or your table schema and try again. ---")
return # Stop the entire process
except Exception as single_e:
print(f"\n--- > CRITICAL ERROR on row {row_index_in_csv} in CSV.")
print(f"--- > Exception: {single_e}")
print(f"--- > Problematic Row Data: {json.dumps(row, indent=2)}")
print("\n--- Import process stopped. Please fix the data in the CSV or your table schema and try again. ---")
return # Stop the entire process
break # Stop processing further batches after a failed one
print("\n--- Import process finished. ---")
print(f"Total rows successfully inserted: {total_inserted_count}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
if __name__ == '__main__':
# --- Main Execution ---
# Define the path to your CSV file and the target table name here.
# The name of your CSV file
input_csv_file = 'supabase_docs.csv'
# The name of your table in Supabase (changed based on CSV filename)
target_table_name = 'documents'
# Get the absolute path to the CSV file
# This assumes the script and the CSV are in the same directory
script_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(script_dir, input_csv_file)
# Run the import process
import_csv_to_supabase(file_path, target_table_name)