import pandas as pd from datetime import datetime, timedelta from .config import get_collections from .scraper import api_client def preprocess_data(df: pd.DataFrame) -> pd.DataFrame: df = df[['Reported Date', 'Modal Price (Rs./Quintal)']].copy() # Ensure datetime and numeric types (may already be converted) if not pd.api.types.is_datetime64_any_dtype(df['Reported Date']): df['Reported Date'] = pd.to_datetime(df['Reported Date']) df['Modal Price (Rs./Quintal)'] = pd.to_numeric(df['Modal Price (Rs./Quintal)'], errors='coerce') # Drop any rows with NaT dates or NaN prices df = df.dropna(subset=['Reported Date', 'Modal Price (Rs./Quintal)']) if df.empty: return df df = df.groupby('Reported Date', as_index=False).mean() full_date_range = pd.date_range(df['Reported Date'].min(), df['Reported Date'].max()) df = df.set_index('Reported Date').reindex(full_date_range).rename_axis('Reported Date').reset_index() df['Modal Price (Rs./Quintal)'] = df['Modal Price (Rs./Quintal)'].ffill().bfill() return df def fetch_and_process_data(query_filter: dict): cols = get_collections() collection = cols['collection'] try: # Fetch all fields - MongoDB handles this efficiently cursor = collection.find(query_filter).sort('Reported Date', 1) data = list(cursor) if not data: return None df = pd.DataFrame(data) # Check if required columns exist if 'Reported Date' not in df.columns or 'Modal Price (Rs./Quintal)' not in df.columns: import streamlit as st st.error(f"Missing required columns. Available: {df.columns.tolist()}") return None # Ensure proper data types before preprocessing df['Reported Date'] = pd.to_datetime(df['Reported Date']) df['Modal Price (Rs./Quintal)'] = pd.to_numeric(df['Modal Price (Rs./Quintal)'], errors='coerce') df = preprocess_data(df) if df is None or df.empty: return None return df except Exception as e: import streamlit as st st.error(f"Error fetching data: {str(e)}") return None return None def fetch_and_store_data(): """Fetch new data from Agmarknet API and store in MongoDB. Fetches data from the day after the latest date in the database until yesterday. Uses the Agmarknet API client. Returns: pd.DataFrame: The fetched and stored data, or None if no data """ cols = get_collections() collection = cols['collection'] latest_doc = collection.find_one(sort=[("Reported Date", -1)]) latest_date = latest_doc["Reported Date"] if latest_doc and "Reported Date" in latest_doc else None # Calculate date range if latest_date: from_date = latest_date + timedelta(days=1) else: from_date = datetime(2000, 1, 1) to_date = datetime.now() - timedelta(days=1) # Format dates for API (YYYY-MM-DD) from_date_str = from_date.strftime('%Y-%m-%d') to_date_str = to_date.strftime('%Y-%m-%d') # Fetch data using API client responses = api_client.fetch_date_range(from_date_str, to_date_str) if not responses: return None # Parse responses to DataFrame df = api_client.parse_multiple_responses_to_dataframe(responses) if df.empty: return None # Filter for White variety only df = df[df['Variety'] == "White"] # Ensure proper data types for MongoDB df["Reported Date"] = pd.to_datetime(df["Reported Date"]) # Convert numeric fields with proper type handling df["Modal Price (Rs./Quintal)"] = pd.to_numeric(df["Modal Price (Rs./Quintal)"], errors='coerce').astype('Int64') df["Min Price (Rs./Quintal)"] = pd.to_numeric(df["Min Price (Rs./Quintal)"], errors='coerce').astype(str) df["Max Price (Rs./Quintal)"] = pd.to_numeric(df["Max Price (Rs./Quintal)"], errors='coerce').astype(str) df["Arrivals (Tonnes)"] = pd.to_numeric(df["Arrivals (Tonnes)"], errors='coerce').astype(float) # Sort by date df.sort_values(by="Reported Date", inplace=True) # Insert into MongoDB with proper format for _, row in df.iterrows(): doc = row.to_dict() # Ensure NaN values are handled properly for key in doc: if pd.isna(doc[key]): doc[key] = None collection.insert_one(doc) return df def get_dataframe_from_collection(collection): data = list(collection.find()) df = pd.DataFrame(data) if '_id' in df.columns: df = df.drop(columns=['_id']) return df def collection_to_dataframe(collection, drop_id=True): documents = list(collection.find()) df = pd.DataFrame(documents) if drop_id and '_id' in df.columns: df = df.drop(columns=['_id']) return df