Ranjit Behera
feat: Add comprehensive data pipeline and fine-tuning
9101d7e
#!/usr/bin/env python3
"""
Step 1: Data Unification Script
================================
Reads various data formats (XML, JSON, CSV, MBOX) and
combines them into a single standardized DataFrame.
Output Schema: ['timestamp', 'sender', 'body', 'source']
Usage:
python step1_unify.py --input /path/to/raw/data --output step1_unified.csv
"""
import argparse
import json
import csv
import os
import re
import pandas as pd
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any, Optional
import mailbox
import email
from email.utils import parsedate_to_datetime
import xml.etree.ElementTree as ET
def parse_mbox(filepath: Path) -> List[Dict[str, Any]]:
"""Parse Gmail MBOX export."""
records = []
try:
mbox = mailbox.mbox(str(filepath))
for message in mbox:
try:
# Get timestamp
date_str = message.get('Date', '')
try:
timestamp = parsedate_to_datetime(date_str).isoformat()
except:
timestamp = date_str
# Get sender
sender = message.get('From', '')
# Get body
body = ''
if message.is_multipart():
for part in message.walk():
if part.get_content_type() == 'text/plain':
payload = part.get_payload(decode=True)
if payload:
body = payload.decode('utf-8', errors='ignore')
break
else:
payload = message.get_payload(decode=True)
if payload:
body = payload.decode('utf-8', errors='ignore')
if body.strip():
records.append({
'timestamp': timestamp,
'sender': sender,
'body': body.strip(),
'source': 'mbox'
})
except Exception as e:
continue
except Exception as e:
print(f" ⚠️ Error parsing MBOX {filepath}: {e}")
return records
def parse_json(filepath: Path) -> List[Dict[str, Any]]:
"""Parse JSON exports (Google Takeout format)."""
records = []
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
# Handle different JSON structures
if isinstance(data, list):
items = data
elif isinstance(data, dict):
# Common Google Takeout patterns
items = data.get('messages', []) or \
data.get('transactions', []) or \
data.get('items', []) or \
data.get('data', []) or \
[data]
else:
items = []
for item in items:
if not isinstance(item, dict):
continue
# Try common field names
timestamp = item.get('timestamp') or item.get('date') or \
item.get('time') or item.get('created_at') or ''
sender = item.get('sender') or item.get('from') or \
item.get('source') or item.get('merchant') or ''
body = item.get('body') or item.get('message') or \
item.get('text') or item.get('content') or \
item.get('description') or item.get('title') or ''
# For Google Pay transactions
if 'amount' in item:
amount = item.get('amount', '')
merchant = item.get('merchant', {})
if isinstance(merchant, dict):
merchant_name = merchant.get('name', '')
else:
merchant_name = str(merchant)
body = f"Transaction: Rs.{amount} to {merchant_name}"
if body and str(body).strip():
records.append({
'timestamp': str(timestamp),
'sender': str(sender),
'body': str(body).strip(),
'source': f'json:{filepath.name}'
})
except Exception as e:
print(f" ⚠️ Error parsing JSON {filepath}: {e}")
return records
def parse_csv(filepath: Path) -> List[Dict[str, Any]]:
"""Parse CSV exports."""
records = []
try:
df = pd.read_csv(filepath, encoding='utf-8', on_bad_lines='skip')
# Find relevant columns (case-insensitive)
cols = {c.lower(): c for c in df.columns}
timestamp_col = None
for name in ['timestamp', 'date', 'time', 'datetime', 'created_at']:
if name in cols:
timestamp_col = cols[name]
break
sender_col = None
for name in ['sender', 'from', 'source', 'bank', 'merchant']:
if name in cols:
sender_col = cols[name]
break
body_col = None
for name in ['body', 'message', 'text', 'content', 'description', 'sms']:
if name in cols:
body_col = cols[name]
break
if body_col:
for _, row in df.iterrows():
body = str(row.get(body_col, ''))
if body.strip() and body != 'nan':
records.append({
'timestamp': str(row.get(timestamp_col, '')) if timestamp_col else '',
'sender': str(row.get(sender_col, '')) if sender_col else '',
'body': body.strip(),
'source': f'csv:{filepath.name}'
})
except Exception as e:
print(f" ⚠️ Error parsing CSV {filepath}: {e}")
return records
def parse_xml(filepath: Path) -> List[Dict[str, Any]]:
"""Parse XML exports (SMS Backup format)."""
records = []
try:
tree = ET.parse(filepath)
root = tree.getroot()
# Common SMS backup format
for sms in root.findall('.//sms') or root.findall('.//message'):
body = sms.get('body') or sms.text or ''
timestamp = sms.get('date') or sms.get('timestamp') or ''
sender = sms.get('address') or sms.get('sender') or sms.get('from') or ''
if body.strip():
# Convert timestamp if it's milliseconds
if timestamp.isdigit() and len(timestamp) > 10:
try:
timestamp = datetime.fromtimestamp(int(timestamp)/1000).isoformat()
except:
pass
records.append({
'timestamp': timestamp,
'sender': sender,
'body': body.strip(),
'source': f'xml:{filepath.name}'
})
except Exception as e:
print(f" ⚠️ Error parsing XML {filepath}: {e}")
return records
def find_all_files(input_dir: Path) -> Dict[str, List[Path]]:
"""Find all data files recursively."""
files = {
'mbox': [],
'json': [],
'csv': [],
'xml': []
}
for filepath in input_dir.rglob('*'):
if filepath.is_file():
ext = filepath.suffix.lower()
if ext == '.mbox':
files['mbox'].append(filepath)
elif ext == '.json':
files['json'].append(filepath)
elif ext == '.csv':
files['csv'].append(filepath)
elif ext == '.xml':
files['xml'].append(filepath)
return files
def unify_data(input_dir: Path) -> pd.DataFrame:
"""Main function to unify all data sources."""
print("=" * 60)
print("πŸ“‚ STEP 1: DATA UNIFICATION")
print("=" * 60)
all_records = []
# Find all files
print(f"\nπŸ” Scanning: {input_dir}")
files = find_all_files(input_dir)
total_files = sum(len(v) for v in files.values())
print(f" Found {total_files} files to process")
# Parse MBOX files
if files['mbox']:
print(f"\nπŸ“§ Processing {len(files['mbox'])} MBOX files...")
for f in files['mbox']:
print(f" Processing: {f.name}")
records = parse_mbox(f)
all_records.extend(records)
print(f" βœ… Extracted {len(records)} messages")
# Parse JSON files
if files['json']:
print(f"\nπŸ“‹ Processing {len(files['json'])} JSON files...")
for f in files['json']:
print(f" Processing: {f.name}")
records = parse_json(f)
all_records.extend(records)
print(f" βœ… Extracted {len(records)} records")
# Parse CSV files
if files['csv']:
print(f"\nπŸ“Š Processing {len(files['csv'])} CSV files...")
for f in files['csv']:
print(f" Processing: {f.name}")
records = parse_csv(f)
all_records.extend(records)
print(f" βœ… Extracted {len(records)} records")
# Parse XML files
if files['xml']:
print(f"\nπŸ“ Processing {len(files['xml'])} XML files...")
for f in files['xml']:
print(f" Processing: {f.name}")
records = parse_xml(f)
all_records.extend(records)
print(f" βœ… Extracted {len(records)} records")
# Create DataFrame
df = pd.DataFrame(all_records, columns=['timestamp', 'sender', 'body', 'source'])
# Remove exact duplicates
original_count = len(df)
df = df.drop_duplicates(subset=['body'])
dedup_count = len(df)
print(f"\nπŸ“Š SUMMARY:")
print(f" Total records: {original_count}")
print(f" After dedup: {dedup_count}")
print(f" Removed: {original_count - dedup_count} duplicates")
return df
def main():
parser = argparse.ArgumentParser(description="Step 1: Unify data sources")
parser.add_argument("--input", "-i", required=True, help="Input directory with raw data")
parser.add_argument("--output", "-o", default="data/pipeline/step1_unified.csv",
help="Output CSV path")
args = parser.parse_args()
input_dir = Path(args.input)
if not input_dir.exists():
print(f"❌ Input directory not found: {input_dir}")
return
# Unify data
df = unify_data(input_dir)
if len(df) == 0:
print("\n❌ No data extracted! Check your input directory.")
return
# Save output
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(output_path, index=False)
print(f"\nβœ… Saved to: {output_path}")
print(f" Records: {len(df)}")
print("\nNext: python scripts/data_pipeline/step2_filter.py")
if __name__ == "__main__":
main()