| | """
|
| | Data processing utilities for AgriPredict Analysis Service
|
| | """
|
| |
|
| | import pandas as pd
|
| | import numpy as np
|
| | from datetime import datetime
|
| | from typing import List, Dict, Any
|
| | from utils.logger import setup_logger
|
| | from utils.config import settings
|
| |
|
| | logger = setup_logger(__name__)
|
| |
|
| | class DataProcessor:
|
| | """Handles data processing and validation for forecasting"""
|
| |
|
| | def __init__(self):
|
| | self.logger = logger
|
| |
|
| | def process_historical_data(self, historical_data: List[Dict[str, Any]]) -> pd.DataFrame:
|
| | """
|
| | Process and validate historical demand data
|
| |
|
| | Args:
|
| | historical_data: List of demand data points
|
| |
|
| | Returns:
|
| | Processed pandas DataFrame
|
| | """
|
| | try:
|
| | self.logger.info(f"Processing {len(historical_data)} historical data points")
|
| |
|
| |
|
| | processed_data = []
|
| | for i, item in enumerate(historical_data):
|
| | if hasattr(item, 'model_dump'):
|
| | processed_data.append(item.model_dump())
|
| | self.logger.info(f"Item {i}: Converted Pydantic v2 model")
|
| | elif hasattr(item, 'dict'):
|
| | processed_data.append(item.dict())
|
| | self.logger.info(f"Item {i}: Converted Pydantic v1 model")
|
| | else:
|
| | processed_data.append(item)
|
| | self.logger.info(f"Item {i}: Already dict - {type(item)}")
|
| |
|
| | self.logger.info(f"Processed data sample: {processed_data[0] if processed_data else 'None'}")
|
| |
|
| |
|
| | df = pd.DataFrame(processed_data)
|
| |
|
| | self.logger.info(f"DataFrame columns: {list(df.columns)}")
|
| | self.logger.info(f"DataFrame shape: {df.shape}")
|
| |
|
| |
|
| | required_columns = ['date', 'quantity', 'price']
|
| | missing_columns = [col for col in required_columns if col not in df.columns]
|
| | if missing_columns:
|
| | self.logger.error(f"Missing columns: {missing_columns}")
|
| | raise ValueError(f"Missing required columns: {missing_columns}")
|
| |
|
| |
|
| | df['date'] = pd.to_datetime(df['date'])
|
| |
|
| |
|
| | df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce')
|
| | df['price'] = pd.to_numeric(df['price'], errors='coerce')
|
| |
|
| |
|
| | df = df.dropna(subset=['quantity', 'price'])
|
| | df = df[df['quantity'] > 0]
|
| | df = df[df['price'] > 0]
|
| |
|
| |
|
| | df = df.sort_values('date').reset_index(drop=True)
|
| |
|
| |
|
| | df = df.drop_duplicates(subset=['date'], keep='last')
|
| |
|
| |
|
| | if len(df) > settings.MAX_DATA_POINTS:
|
| | self.logger.warning(f"Limiting data from {len(df)} to {settings.MAX_DATA_POINTS} points")
|
| | df = df.tail(settings.MAX_DATA_POINTS)
|
| |
|
| | self.logger.info(f"Successfully processed {len(df)} data points")
|
| | return df
|
| |
|
| | except Exception as e:
|
| | self.logger.error(f"Data processing failed: {str(e)}")
|
| | raise
|
| |
|
| | def validate_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]:
|
| | """
|
| | Validate data quality and return metrics
|
| |
|
| | Args:
|
| | df: Processed DataFrame
|
| |
|
| | Returns:
|
| | Dictionary with quality metrics
|
| | """
|
| | try:
|
| | quality_metrics = {
|
| | 'total_points': len(df),
|
| | 'date_range': {
|
| | 'start': df['date'].min().isoformat() if len(df) > 0 else None,
|
| | 'end': df['date'].max().isoformat() if len(df) > 0 else None
|
| | },
|
| | 'missing_values': {
|
| | 'quantity': df['quantity'].isnull().sum(),
|
| | 'price': df['price'].isnull().sum()
|
| | },
|
| | 'outliers': {
|
| | 'quantity': self._detect_outliers(df['quantity']),
|
| | 'price': self._detect_outliers(df['price'])
|
| | },
|
| | 'data_completeness': self._calculate_completeness(df)
|
| | }
|
| |
|
| | return quality_metrics
|
| |
|
| | except Exception as e:
|
| | self.logger.error(f"Quality validation failed: {str(e)}")
|
| | return {}
|
| |
|
| | def _detect_outliers(self, series: pd.Series) -> int:
|
| | """Detect outliers using IQR method"""
|
| | try:
|
| | Q1 = series.quantile(0.25)
|
| | Q3 = series.quantile(0.75)
|
| | IQR = Q3 - Q1
|
| | lower_bound = Q1 - 1.5 * IQR
|
| | upper_bound = Q3 + 1.5 * IQR
|
| |
|
| | outliers = ((series < lower_bound) | (series > upper_bound)).sum()
|
| | return int(outliers)
|
| | except:
|
| | return 0
|
| |
|
| | def _calculate_completeness(self, df: pd.DataFrame) -> float:
|
| | """Calculate data completeness percentage"""
|
| | try:
|
| | total_cells = len(df) * 2
|
| | missing_cells = df[['quantity', 'price']].isnull().sum().sum()
|
| | completeness = ((total_cells - missing_cells) / total_cells) * 100
|
| | return round(completeness, 2)
|
| | except:
|
| | return 0.0
|
| |
|
| | def prepare_features_for_ml(self, df: pd.DataFrame) -> pd.DataFrame:
|
| | """
|
| | Prepare features for machine learning models
|
| |
|
| | Args:
|
| | df: Processed DataFrame
|
| |
|
| | Returns:
|
| | DataFrame with engineered features
|
| | """
|
| | try:
|
| |
|
| | feature_df = df.copy()
|
| |
|
| |
|
| | feature_df['day_of_week'] = feature_df['date'].dt.dayofweek
|
| | feature_df['month'] = feature_df['date'].dt.month
|
| | feature_df['day_of_month'] = feature_df['date'].dt.day
|
| | feature_df['quarter'] = feature_df['date'].dt.quarter
|
| |
|
| |
|
| | for lag in [1, 7, 14, 30]:
|
| | if len(feature_df) > lag:
|
| | feature_df[f'price_lag_{lag}'] = feature_df['price'].shift(lag)
|
| | feature_df[f'quantity_lag_{lag}'] = feature_df['quantity'].shift(lag)
|
| |
|
| |
|
| | for window in [7, 14, 30]:
|
| | if len(feature_df) > window:
|
| | feature_df[f'price_rolling_mean_{window}'] = feature_df['price'].rolling(window).mean()
|
| | feature_df[f'price_rolling_std_{window}'] = feature_df['price'].rolling(window).std()
|
| | feature_df[f'quantity_rolling_mean_{window}'] = feature_df['quantity'].rolling(window).mean()
|
| |
|
| |
|
| | feature_df['price_change'] = feature_df['price'].pct_change()
|
| | feature_df['price_change_7d'] = feature_df['price'].pct_change(7)
|
| |
|
| |
|
| | feature_df['value'] = feature_df['quantity'] * feature_df['price']
|
| |
|
| |
|
| | feature_df = feature_df.dropna()
|
| |
|
| | self.logger.info(f"Created {len(feature_df.columns) - len(df.columns)} additional features")
|
| | return feature_df
|
| |
|
| | except Exception as e:
|
| | self.logger.error(f"Feature engineering failed: {str(e)}")
|
| | return df
|
| |
|