Spaces:
Sleeping
Sleeping
| """ | |
| Data processing utilities for AgriPredict Analysis Service | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime | |
| from typing import List, Dict, Any | |
| from utils.logger import setup_logger | |
| from utils.config import settings | |
| logger = setup_logger(__name__) | |
| class DataProcessor: | |
| """Handles data processing and validation for forecasting""" | |
| def __init__(self): | |
| self.logger = logger | |
| def process_historical_data(self, historical_data: List[Dict[str, Any]]) -> pd.DataFrame: | |
| """ | |
| Process and validate historical demand data | |
| Args: | |
| historical_data: List of demand data points | |
| Returns: | |
| Processed pandas DataFrame | |
| """ | |
| try: | |
| self.logger.info(f"Processing {len(historical_data)} historical data points") | |
| # Handle Pydantic model instances - convert to dict if needed | |
| processed_data = [] | |
| for i, item in enumerate(historical_data): | |
| if hasattr(item, 'model_dump'): # Pydantic v2 | |
| processed_data.append(item.model_dump()) | |
| self.logger.info(f"Item {i}: Converted Pydantic v2 model") | |
| elif hasattr(item, 'dict'): # Pydantic v1 | |
| processed_data.append(item.dict()) | |
| self.logger.info(f"Item {i}: Converted Pydantic v1 model") | |
| else: | |
| processed_data.append(item) | |
| self.logger.info(f"Item {i}: Already dict - {type(item)}") | |
| self.logger.info(f"Processed data sample: {processed_data[0] if processed_data else 'None'}") | |
| # Convert to DataFrame | |
| df = pd.DataFrame(processed_data) | |
| self.logger.info(f"DataFrame columns: {list(df.columns)}") | |
| self.logger.info(f"DataFrame shape: {df.shape}") | |
| # Validate required columns | |
| required_columns = ['date', 'quantity', 'price'] | |
| missing_columns = [col for col in required_columns if col not in df.columns] | |
| if missing_columns: | |
| self.logger.error(f"Missing columns: {missing_columns}") | |
| raise ValueError(f"Missing required columns: {missing_columns}") | |
| # Convert date column | |
| df['date'] = pd.to_datetime(df['date']) | |
| # Validate data types and ranges | |
| df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce') | |
| df['price'] = pd.to_numeric(df['price'], errors='coerce') | |
| # Remove invalid data | |
| df = df.dropna(subset=['quantity', 'price']) | |
| df = df[df['quantity'] > 0] | |
| df = df[df['price'] > 0] | |
| # Sort by date | |
| df = df.sort_values('date').reset_index(drop=True) | |
| # Remove duplicates based on date | |
| df = df.drop_duplicates(subset=['date'], keep='last') | |
| # Limit data points if too many | |
| if len(df) > settings.MAX_DATA_POINTS: | |
| self.logger.warning(f"Limiting data from {len(df)} to {settings.MAX_DATA_POINTS} points") | |
| df = df.tail(settings.MAX_DATA_POINTS) | |
| self.logger.info(f"Successfully processed {len(df)} data points") | |
| return df | |
| except Exception as e: | |
| self.logger.error(f"Data processing failed: {str(e)}") | |
| raise | |
| def validate_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Validate data quality and return metrics | |
| Args: | |
| df: Processed DataFrame | |
| Returns: | |
| Dictionary with quality metrics | |
| """ | |
| try: | |
| quality_metrics = { | |
| 'total_points': len(df), | |
| 'date_range': { | |
| 'start': df['date'].min().isoformat() if len(df) > 0 else None, | |
| 'end': df['date'].max().isoformat() if len(df) > 0 else None | |
| }, | |
| 'missing_values': { | |
| 'quantity': df['quantity'].isnull().sum(), | |
| 'price': df['price'].isnull().sum() | |
| }, | |
| 'outliers': { | |
| 'quantity': self._detect_outliers(df['quantity']), | |
| 'price': self._detect_outliers(df['price']) | |
| }, | |
| 'data_completeness': self._calculate_completeness(df) | |
| } | |
| return quality_metrics | |
| except Exception as e: | |
| self.logger.error(f"Quality validation failed: {str(e)}") | |
| return {} | |
| def _detect_outliers(self, series: pd.Series) -> int: | |
| """Detect outliers using IQR method""" | |
| try: | |
| Q1 = series.quantile(0.25) | |
| Q3 = series.quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| outliers = ((series < lower_bound) | (series > upper_bound)).sum() | |
| return int(outliers) | |
| except: | |
| return 0 | |
| def _calculate_completeness(self, df: pd.DataFrame) -> float: | |
| """Calculate data completeness percentage""" | |
| try: | |
| total_cells = len(df) * 2 # quantity and price columns | |
| missing_cells = df[['quantity', 'price']].isnull().sum().sum() | |
| completeness = ((total_cells - missing_cells) / total_cells) * 100 | |
| return round(completeness, 2) | |
| except: | |
| return 0.0 | |
| def prepare_features_for_ml(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Prepare features for machine learning models | |
| Args: | |
| df: Processed DataFrame | |
| Returns: | |
| DataFrame with engineered features | |
| """ | |
| try: | |
| # Create feature engineering | |
| feature_df = df.copy() | |
| # Date-based features | |
| feature_df['day_of_week'] = feature_df['date'].dt.dayofweek | |
| feature_df['month'] = feature_df['date'].dt.month | |
| feature_df['day_of_month'] = feature_df['date'].dt.day | |
| feature_df['quarter'] = feature_df['date'].dt.quarter | |
| # Lag features | |
| for lag in [1, 7, 14, 30]: | |
| if len(feature_df) > lag: | |
| feature_df[f'price_lag_{lag}'] = feature_df['price'].shift(lag) | |
| feature_df[f'quantity_lag_{lag}'] = feature_df['quantity'].shift(lag) | |
| # Rolling statistics | |
| for window in [7, 14, 30]: | |
| if len(feature_df) > window: | |
| feature_df[f'price_rolling_mean_{window}'] = feature_df['price'].rolling(window).mean() | |
| feature_df[f'price_rolling_std_{window}'] = feature_df['price'].rolling(window).std() | |
| feature_df[f'quantity_rolling_mean_{window}'] = feature_df['quantity'].rolling(window).mean() | |
| # Price change features | |
| feature_df['price_change'] = feature_df['price'].pct_change() | |
| feature_df['price_change_7d'] = feature_df['price'].pct_change(7) | |
| # Volume-weighted features | |
| feature_df['value'] = feature_df['quantity'] * feature_df['price'] | |
| # Drop rows with NaN values created by lag features | |
| feature_df = feature_df.dropna() | |
| self.logger.info(f"Created {len(feature_df.columns) - len(df.columns)} additional features") | |
| return feature_df | |
| except Exception as e: | |
| self.logger.error(f"Feature engineering failed: {str(e)}") | |
| return df | |