Spaces:
Running
Running
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from .config import get_collections | |
| from .scraper import api_client | |
| def preprocess_data(df: pd.DataFrame) -> pd.DataFrame: | |
| df = df[['Reported Date', 'Modal Price (Rs./Quintal)']].copy() | |
| # Ensure datetime and numeric types (may already be converted) | |
| if not pd.api.types.is_datetime64_any_dtype(df['Reported Date']): | |
| df['Reported Date'] = pd.to_datetime(df['Reported Date']) | |
| df['Modal Price (Rs./Quintal)'] = pd.to_numeric(df['Modal Price (Rs./Quintal)'], errors='coerce') | |
| # Drop any rows with NaT dates or NaN prices | |
| df = df.dropna(subset=['Reported Date', 'Modal Price (Rs./Quintal)']) | |
| if df.empty: | |
| return df | |
| df = df.groupby('Reported Date', as_index=False).mean() | |
| full_date_range = pd.date_range(df['Reported Date'].min(), df['Reported Date'].max()) | |
| df = df.set_index('Reported Date').reindex(full_date_range).rename_axis('Reported Date').reset_index() | |
| df['Modal Price (Rs./Quintal)'] = df['Modal Price (Rs./Quintal)'].ffill().bfill() | |
| return df | |
| def fetch_and_process_data(query_filter: dict): | |
| cols = get_collections() | |
| collection = cols['collection'] | |
| try: | |
| # Fetch all fields - MongoDB handles this efficiently | |
| cursor = collection.find(query_filter).sort('Reported Date', 1) | |
| data = list(cursor) | |
| if not data: | |
| return None | |
| df = pd.DataFrame(data) | |
| # Check if required columns exist | |
| if 'Reported Date' not in df.columns or 'Modal Price (Rs./Quintal)' not in df.columns: | |
| import streamlit as st | |
| st.error(f"Missing required columns. Available: {df.columns.tolist()}") | |
| return None | |
| # Ensure proper data types before preprocessing | |
| df['Reported Date'] = pd.to_datetime(df['Reported Date']) | |
| df['Modal Price (Rs./Quintal)'] = pd.to_numeric(df['Modal Price (Rs./Quintal)'], errors='coerce') | |
| df = preprocess_data(df) | |
| if df is None or df.empty: | |
| return None | |
| return df | |
| except Exception as e: | |
| import streamlit as st | |
| st.error(f"Error fetching data: {str(e)}") | |
| return None | |
| return None | |
| def fetch_and_store_data(): | |
| """Fetch new data from Agmarknet API and store in MongoDB. | |
| Fetches data from the day after the latest date in the database | |
| until yesterday. Uses the Agmarknet API client. | |
| Returns: | |
| pd.DataFrame: The fetched and stored data, or None if no data | |
| """ | |
| cols = get_collections() | |
| collection = cols['collection'] | |
| latest_doc = collection.find_one(sort=[("Reported Date", -1)]) | |
| latest_date = latest_doc["Reported Date"] if latest_doc and "Reported Date" in latest_doc else None | |
| # Calculate date range | |
| if latest_date: | |
| from_date = latest_date + timedelta(days=1) | |
| else: | |
| from_date = datetime(2000, 1, 1) | |
| to_date = datetime.now() - timedelta(days=1) | |
| # Format dates for API (YYYY-MM-DD) | |
| from_date_str = from_date.strftime('%Y-%m-%d') | |
| to_date_str = to_date.strftime('%Y-%m-%d') | |
| # Fetch data using API client | |
| responses = api_client.fetch_date_range(from_date_str, to_date_str) | |
| if not responses: | |
| return None | |
| # Parse responses to DataFrame | |
| df = api_client.parse_multiple_responses_to_dataframe(responses) | |
| if df.empty: | |
| return None | |
| # Filter for White variety only | |
| df = df[df['Variety'] == "White"] | |
| # Ensure proper data types for MongoDB | |
| df["Reported Date"] = pd.to_datetime(df["Reported Date"]) | |
| # Convert numeric fields with proper type handling | |
| df["Modal Price (Rs./Quintal)"] = pd.to_numeric(df["Modal Price (Rs./Quintal)"], errors='coerce').astype('Int64') | |
| df["Min Price (Rs./Quintal)"] = pd.to_numeric(df["Min Price (Rs./Quintal)"], errors='coerce').astype(str) | |
| df["Max Price (Rs./Quintal)"] = pd.to_numeric(df["Max Price (Rs./Quintal)"], errors='coerce').astype(str) | |
| df["Arrivals (Tonnes)"] = pd.to_numeric(df["Arrivals (Tonnes)"], errors='coerce').astype(float) | |
| # Sort by date | |
| df.sort_values(by="Reported Date", inplace=True) | |
| # Insert into MongoDB with proper format | |
| for _, row in df.iterrows(): | |
| doc = row.to_dict() | |
| # Ensure NaN values are handled properly | |
| for key in doc: | |
| if pd.isna(doc[key]): | |
| doc[key] = None | |
| collection.insert_one(doc) | |
| return df | |
| def get_dataframe_from_collection(collection): | |
| data = list(collection.find()) | |
| df = pd.DataFrame(data) | |
| if '_id' in df.columns: | |
| df = df.drop(columns=['_id']) | |
| return df | |
| def collection_to_dataframe(collection, drop_id=True): | |
| documents = list(collection.find()) | |
| df = pd.DataFrame(documents) | |
| if drop_id and '_id' in df.columns: | |
| df = df.drop(columns=['_id']) | |
| return df | |