Fetii_AI / utils.py
parthsinha's picture
added files
ed0810b
"""
Utility functions for Fetii AI Chatbot
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import re
from typing import List, Dict, Any, Tuple, Optional
import config
def clean_location_name(location: str) -> str:
"""Clean and standardize location names."""
if pd.isna(location) or not location:
return "Unknown"
cleaned = location.strip().title()
suffixes_to_remove = [", Austin, TX", ", Austin, Texas", ", USA", ", United States"]
for suffix in suffixes_to_remove:
if cleaned.endswith(suffix):
cleaned = cleaned[:-len(suffix)]
return cleaned
def categorize_location(location: str) -> str:
"""Categorize location type based on keywords."""
location_lower = location.lower()
for category, keywords in config.LOCATION_CATEGORIES.items():
if any(keyword in location_lower for keyword in keywords):
return category.title()
return "Other"
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate approximate distance between two coordinates in kilometers."""
lat_diff = lat2 - lat1
lon_diff = lon2 - lon1
distance = np.sqrt(lat_diff**2 + lon_diff**2) * 111
return round(distance, 2)
def format_time(hour: int) -> str:
"""Format hour as readable time string."""
if hour == 0:
return "12:00 AM"
elif hour < 12:
return f"{hour}:00 AM"
elif hour == 12:
return "12:00 PM"
else:
return f"{hour-12}:00 PM"
def get_time_category(hour: int) -> str:
"""Get time category for a given hour."""
for category, (start, end) in config.TIME_CATEGORIES.items():
if start <= hour < end:
return category.replace('_', ' ').title()
return "Unknown"
def get_group_size_category(passengers: int) -> str:
"""Get group size category for passenger count."""
for category, (min_size, max_size) in config.GROUP_SIZE_CATEGORIES.items():
if min_size <= passengers <= max_size:
return category.replace('_', ' ').title()
return "Unknown"
def extract_numbers_from_text(text: str) -> List[int]:
"""Extract all numbers from text."""
numbers = re.findall(r'\d+', text)
return [int(num) for num in numbers]
def parse_date_string(date_str: str) -> Optional[datetime]:
"""Parse various date string formats."""
formats = [
'%m/%d/%y %H:%M',
'%m/%d/%Y %H:%M',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M',
'%m/%d/%y %H:%M:%S'
]
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
return None
def generate_insights(data: pd.DataFrame) -> Dict[str, Any]:
"""Generate comprehensive insights from trip data."""
insights = {}
insights['total_trips'] = len(data)
insights['total_passengers'] = data['Total Passengers'].sum()
insights['avg_group_size'] = data['Total Passengers'].mean()
insights['median_group_size'] = data['Total Passengers'].median()
if 'hour' in data.columns:
insights['peak_hour'] = data['hour'].mode().iloc[0] if len(data['hour'].mode()) > 0 else None
insights['hour_distribution'] = data['hour'].value_counts().to_dict()
if 'pickup_main' in data.columns:
insights['top_pickups'] = data['pickup_main'].value_counts().head(10).to_dict()
insights['unique_pickup_locations'] = data['pickup_main'].nunique()
if 'dropoff_main' in data.columns:
insights['top_dropoffs'] = data['dropoff_main'].value_counts().head(10).to_dict()
insights['unique_dropoff_locations'] = data['dropoff_main'].nunique()
insights['group_size_distribution'] = data['Total Passengers'].value_counts().to_dict()
insights['large_groups'] = len(data[data['Total Passengers'] >= config.ANALYSIS_THRESHOLDS['large_group_threshold']])
insights['large_groups_percentage'] = (insights['large_groups'] / insights['total_trips']) * 100
if 'date' in data.columns:
insights['date_range'] = {
'start': data['date'].min(),
'end': data['date'].max(),
'days_covered': (data['date'].max() - data['date'].min()).days + 1
}
insights['daily_average'] = insights['total_trips'] / insights['date_range']['days_covered']
return insights
def format_number(number: float, decimals: int = 1) -> str:
"""Format numbers for display."""
if number >= 1000000:
return f"{number/1000000:.{decimals}f}M"
elif number >= 1000:
return f"{number/1000:.{decimals}f}K"
else:
return f"{number:.{decimals}f}" if decimals > 0 else str(int(number))
def create_summary_stats(data: pd.DataFrame) -> Dict[str, str]:
"""Create formatted summary statistics for display."""
insights = generate_insights(data)
return {
'Total Trips': format_number(insights['total_trips'], 0),
'Total Passengers': format_number(insights['total_passengers'], 0),
'Average Group Size': f"{insights['avg_group_size']:.1f}",
'Peak Hour': format_time(insights.get('peak_hour', 22)),
'Large Groups': f"{insights['large_groups_percentage']:.1f}%",
'Unique Pickup Locations': format_number(insights.get('unique_pickup_locations', 0), 0),
'Unique Destinations': format_number(insights.get('unique_dropoff_locations', 0), 0),
'Daily Average': f"{insights.get('daily_average', 0):.1f} trips/day"
}
def validate_data(data: pd.DataFrame) -> Tuple[bool, List[str]]:
"""Validate data quality and return issues found."""
issues = []
required_columns = ['Trip ID', 'Total Passengers', 'Trip Date and Time']
missing_columns = [col for col in required_columns if col not in data.columns]
if missing_columns:
issues.append(f"Missing required columns: {', '.join(missing_columns)}")
if len(data) == 0:
issues.append("Dataset is empty")
return False, issues
if 'Total Passengers' in data.columns:
invalid_passengers = data[
(data['Total Passengers'] < 1) |
(data['Total Passengers'] > 20) |
(data['Total Passengers'].isna())
]
if len(invalid_passengers) > 0:
issues.append(f"Found {len(invalid_passengers)} trips with invalid passenger counts")
if 'Trip Date and Time' in data.columns:
invalid_dates = 0
for date_str in data['Trip Date and Time'].dropna():
if parse_date_string(str(date_str)) is None:
invalid_dates += 1
if invalid_dates > 0:
issues.append(f"Found {invalid_dates} trips with invalid date formats")
if 'Trip ID' in data.columns:
duplicates = data['Trip ID'].duplicated().sum()
if duplicates > 0:
issues.append(f"Found {duplicates} duplicate trip IDs")
return len(issues) == 0, issues
def create_export_data(data: pd.DataFrame, insights: Dict[str, Any], format_type: str = 'csv') -> Any:
"""Create data for export in specified format."""
if format_type == 'csv':
return data.to_csv(index=False)
elif format_type == 'json':
export_data = {
'metadata': {
'export_date': datetime.now().isoformat(),
'total_records': len(data),
'insights': insights
},
'data': data.to_dict('records')
}
return export_data
elif format_type == 'summary':
summary = create_summary_stats(data)
return summary
else:
raise ValueError(f"Unsupported export format: {format_type}")
def search_locations(query: str, locations: List[str], max_results: int = 5) -> List[str]:
"""Search for locations matching a query."""
query_lower = query.lower()
matches = []
for location in locations:
if query_lower == location.lower():
matches.append(location)
for location in locations:
if query_lower in location.lower() and location not in matches:
matches.append(location)
query_words = query_lower.split()
for location in locations:
location_lower = location.lower()
if (any(word in location_lower for word in query_words) and
location not in matches):
matches.append(location)
return matches[:max_results]
def get_color_palette(num_colors: int) -> List[str]:
"""Get a color palette for visualizations."""
base_colors = [
'#667eea', '#764ba2', '#f093fb', '#f5576c',
'#4facfe', '#00f2fe', '#43e97b', '#38f9d7',
'#ffecd2', '#fcb69f', '#a8edea', '#fed6e3'
]
if num_colors <= len(base_colors):
return base_colors[:num_colors]
import colorsys
additional_colors = []
for i in range(num_colors - len(base_colors)):
hue = (i * 0.618033988749895) % 1
rgb = colorsys.hsv_to_rgb(hue, 0.7, 0.9)
hex_color = '#%02x%02x%02x' % tuple(int(c * 255) for c in rgb)
additional_colors.append(hex_color)
return base_colors + additional_colors