ThejasRao's picture
Fix: Readme
ecb9d4e
import pandas as pd
from datetime import datetime, timedelta
from .config import get_collections
from .scraper import api_client
def preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
df = df[['Reported Date', 'Modal Price (Rs./Quintal)']].copy()
# Ensure datetime and numeric types (may already be converted)
if not pd.api.types.is_datetime64_any_dtype(df['Reported Date']):
df['Reported Date'] = pd.to_datetime(df['Reported Date'])
df['Modal Price (Rs./Quintal)'] = pd.to_numeric(df['Modal Price (Rs./Quintal)'], errors='coerce')
# Drop any rows with NaT dates or NaN prices
df = df.dropna(subset=['Reported Date', 'Modal Price (Rs./Quintal)'])
if df.empty:
return df
df = df.groupby('Reported Date', as_index=False).mean()
full_date_range = pd.date_range(df['Reported Date'].min(), df['Reported Date'].max())
df = df.set_index('Reported Date').reindex(full_date_range).rename_axis('Reported Date').reset_index()
df['Modal Price (Rs./Quintal)'] = df['Modal Price (Rs./Quintal)'].ffill().bfill()
return df
def fetch_and_process_data(query_filter: dict):
cols = get_collections()
collection = cols['collection']
try:
# Fetch all fields - MongoDB handles this efficiently
cursor = collection.find(query_filter).sort('Reported Date', 1)
data = list(cursor)
if not data:
return None
df = pd.DataFrame(data)
# Check if required columns exist
if 'Reported Date' not in df.columns or 'Modal Price (Rs./Quintal)' not in df.columns:
import streamlit as st
st.error(f"Missing required columns. Available: {df.columns.tolist()}")
return None
# Ensure proper data types before preprocessing
df['Reported Date'] = pd.to_datetime(df['Reported Date'])
df['Modal Price (Rs./Quintal)'] = pd.to_numeric(df['Modal Price (Rs./Quintal)'], errors='coerce')
df = preprocess_data(df)
if df is None or df.empty:
return None
return df
except Exception as e:
import streamlit as st
st.error(f"Error fetching data: {str(e)}")
return None
return None
def fetch_and_store_data():
"""Fetch new data from Agmarknet API and store in MongoDB.
Fetches data from the day after the latest date in the database
until yesterday. Uses the Agmarknet API client.
Returns:
pd.DataFrame: The fetched and stored data, or None if no data
"""
cols = get_collections()
collection = cols['collection']
latest_doc = collection.find_one(sort=[("Reported Date", -1)])
latest_date = latest_doc["Reported Date"] if latest_doc and "Reported Date" in latest_doc else None
# Calculate date range
if latest_date:
from_date = latest_date + timedelta(days=1)
else:
from_date = datetime(2000, 1, 1)
to_date = datetime.now() - timedelta(days=1)
# Format dates for API (YYYY-MM-DD)
from_date_str = from_date.strftime('%Y-%m-%d')
to_date_str = to_date.strftime('%Y-%m-%d')
# Fetch data using API client
responses = api_client.fetch_date_range(from_date_str, to_date_str)
if not responses:
return None
# Parse responses to DataFrame
df = api_client.parse_multiple_responses_to_dataframe(responses)
if df.empty:
return None
# Filter for White variety only
df = df[df['Variety'] == "White"]
# Ensure proper data types for MongoDB
df["Reported Date"] = pd.to_datetime(df["Reported Date"])
# Convert numeric fields with proper type handling
df["Modal Price (Rs./Quintal)"] = pd.to_numeric(df["Modal Price (Rs./Quintal)"], errors='coerce').astype('Int64')
df["Min Price (Rs./Quintal)"] = pd.to_numeric(df["Min Price (Rs./Quintal)"], errors='coerce').astype(str)
df["Max Price (Rs./Quintal)"] = pd.to_numeric(df["Max Price (Rs./Quintal)"], errors='coerce').astype(str)
df["Arrivals (Tonnes)"] = pd.to_numeric(df["Arrivals (Tonnes)"], errors='coerce').astype(float)
# Sort by date
df.sort_values(by="Reported Date", inplace=True)
# Insert into MongoDB with proper format
for _, row in df.iterrows():
doc = row.to_dict()
# Ensure NaN values are handled properly
for key in doc:
if pd.isna(doc[key]):
doc[key] = None
collection.insert_one(doc)
return df
def get_dataframe_from_collection(collection):
data = list(collection.find())
df = pd.DataFrame(data)
if '_id' in df.columns:
df = df.drop(columns=['_id'])
return df
def collection_to_dataframe(collection, drop_id=True):
documents = list(collection.find())
df = pd.DataFrame(documents)
if drop_id and '_id' in df.columns:
df = df.drop(columns=['_id'])
return df