AI-Based-Data-Cleaner / src /ai_cleaner.py
midlajvalappil's picture
Upload 14 files
6aa09c0 verified
import openai
import pandas as pd
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
import json
import time
from config import Config
from utils.logger import setup_logger
logger = setup_logger(__name__)
class AICleaner:
"""AI-powered data cleaning using OpenAI GPT API"""
def __init__(self):
"""Initialize AI cleaner with OpenAI configuration"""
self.ai_enabled = Config.validate_config()
if self.ai_enabled:
try:
openai.api_key = Config.OPENAI_API_KEY
self.model = Config.OPENAI_MODEL
self.max_tokens = Config.MAX_TOKENS
self.temperature = Config.TEMPERATURE
logger.info("AI Cleaner initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize AI Cleaner: {str(e)}")
self.ai_enabled = False
else:
logger.warning("AI Cleaner initialized without API key - AI features disabled")
def clean_text_column(self, series: pd.Series, column_name: str) -> Tuple[pd.Series, List[Dict]]:
"""
Clean text data in a column using AI
Args:
series: pandas Series containing text data
column_name: Name of the column being cleaned
Returns:
Tuple of (cleaned_series, list_of_changes)
"""
if not self.ai_enabled:
logger.warning(f"AI cleaning skipped for column '{column_name}' - API key not configured")
return series.copy(), []
changes = []
cleaned_series = series.copy()
# Get sample of non-null values for analysis
non_null_values = series.dropna().unique()
if len(non_null_values) == 0:
return cleaned_series, changes
# Limit sample size for API efficiency
sample_size = min(20, len(non_null_values))
sample_values = non_null_values[:sample_size].tolist()
try:
# Create prompt for text cleaning
prompt = self._create_text_cleaning_prompt(sample_values, column_name)
# Get AI suggestions
response = self._call_openai_api(prompt)
suggestions = self._parse_cleaning_response(response)
# Apply suggestions
for original, cleaned in suggestions.items():
if original != cleaned and original in series.values:
mask = series == original
cleaned_series.loc[mask] = cleaned
changes.append({
'type': 'text_cleaning',
'column': column_name,
'original': original,
'cleaned': cleaned,
'count': mask.sum()
})
logger.info(f"Applied {len(changes)} text cleaning changes to column '{column_name}'")
except Exception as e:
logger.error(f"Error in AI text cleaning for column '{column_name}': {str(e)}")
return cleaned_series, changes
def suggest_missing_values(self, series: pd.Series, column_name: str,
context_columns: Optional[List[str]] = None) -> Tuple[pd.Series, List[Dict]]:
"""
Suggest values for missing data using AI
Args:
series: pandas Series with missing values
column_name: Name of the column
context_columns: List of related column names for context
Returns:
Tuple of (series_with_suggestions, list_of_changes)
"""
if not self.ai_enabled:
logger.warning(f"AI missing value suggestion skipped for column '{column_name}' - API key not configured")
return series.copy(), []
changes = []
filled_series = series.copy()
missing_mask = series.isnull()
if not missing_mask.any():
return filled_series, changes
# Get context for AI suggestions
non_null_sample = series.dropna().head(10).tolist()
if len(non_null_sample) < 3:
logger.warning(f"Insufficient data for AI suggestions in column '{column_name}'")
return filled_series, changes
try:
# Create prompt for missing value suggestions
prompt = self._create_missing_value_prompt(non_null_sample, column_name, context_columns)
# Get AI suggestions
response = self._call_openai_api(prompt)
suggestions = self._parse_missing_value_response(response, series.dtype)
# Apply suggestions to missing values
missing_indices = series[missing_mask].index
suggestion_count = min(len(suggestions), len(missing_indices))
for i in range(suggestion_count):
idx = missing_indices[i]
suggested_value = suggestions[i]
filled_series.loc[idx] = suggested_value
changes.append({
'type': 'missing_value_fill',
'column': column_name,
'index': idx,
'suggested_value': suggested_value
})
logger.info(f"Applied {len(changes)} AI suggestions for missing values in column '{column_name}'")
except Exception as e:
logger.error(f"Error in AI missing value suggestion for column '{column_name}': {str(e)}")
return filled_series, changes
def _create_text_cleaning_prompt(self, sample_values: List[str], column_name: str) -> str:
"""Create prompt for text cleaning"""
prompt = f"""
You are a data cleaning expert. I have a column named '{column_name}' with the following sample values:
{json.dumps(sample_values, indent=2)}
Please clean these text values by:
1. Fixing spelling errors
2. Standardizing capitalization
3. Removing extra whitespace
4. Fixing common typos
Return a JSON object where keys are original values and values are cleaned versions.
Only include entries that need changes. If a value is already clean, don't include it.
Example format:
{{"original_value": "cleaned_value", "another_original": "another_cleaned"}}
"""
return prompt
def _create_missing_value_prompt(self, sample_values: List[Any], column_name: str,
context_columns: Optional[List[str]]) -> str:
"""Create prompt for missing value suggestions"""
context_info = f" (related to columns: {', '.join(context_columns)})" if context_columns else ""
prompt = f"""
You are a data analyst. I have a column named '{column_name}'{context_info} with these sample values:
{json.dumps(sample_values, indent=2)}
Based on the pattern and context, suggest 5 realistic values that could fill missing entries in this column.
The suggestions should be consistent with the existing data pattern and type.
Return a JSON array of suggested values:
["suggestion1", "suggestion2", "suggestion3", "suggestion4", "suggestion5"]
"""
return prompt
def _call_openai_api(self, prompt: str) -> str:
"""Make API call to OpenAI with retry logic"""
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
response = openai.ChatCompletion.create(
model=self.model,
messages=[
{"role": "system", "content": "You are a helpful data cleaning assistant."},
{"role": "user", "content": prompt}
],
max_tokens=self.max_tokens,
temperature=self.temperature
)
return response.choices[0].message.content.strip()
except Exception as e:
logger.warning(f"API call attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
time.sleep(retry_delay * (2 ** attempt))
else:
raise
def _parse_cleaning_response(self, response: str) -> Dict[str, str]:
"""Parse AI response for text cleaning suggestions"""
try:
# Try to extract JSON from response
start_idx = response.find('{')
end_idx = response.rfind('}') + 1
if start_idx != -1 and end_idx != 0:
json_str = response[start_idx:end_idx]
return json.loads(json_str)
else:
logger.warning("No valid JSON found in cleaning response")
return {}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse cleaning response: {str(e)}")
return {}
def _parse_missing_value_response(self, response: str, dtype) -> List[Any]:
"""Parse AI response for missing value suggestions"""
try:
# Try to extract JSON array from response
start_idx = response.find('[')
end_idx = response.rfind(']') + 1
if start_idx != -1 and end_idx != 0:
json_str = response[start_idx:end_idx]
suggestions = json.loads(json_str)
# Convert suggestions to appropriate data type
if dtype in ['int64', 'float64']:
suggestions = [pd.to_numeric(s, errors='coerce') for s in suggestions]
suggestions = [s for s in suggestions if not pd.isna(s)]
return suggestions[:5] # Limit to 5 suggestions
else:
logger.warning("No valid JSON array found in missing value response")
return []
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Failed to parse missing value response: {str(e)}")
return []