Spaces:
Sleeping
Sleeping
File size: 4,957 Bytes
fa4fc8b ecb9d4e fa4fc8b ecb9d4e fa4fc8b ecb9d4e fa4fc8b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
import pandas as pd
from datetime import datetime, timedelta
from .config import get_collections
from .scraper import api_client
def preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
df = df[['Reported Date', 'Modal Price (Rs./Quintal)']].copy()
# Ensure datetime and numeric types (may already be converted)
if not pd.api.types.is_datetime64_any_dtype(df['Reported Date']):
df['Reported Date'] = pd.to_datetime(df['Reported Date'])
df['Modal Price (Rs./Quintal)'] = pd.to_numeric(df['Modal Price (Rs./Quintal)'], errors='coerce')
# Drop any rows with NaT dates or NaN prices
df = df.dropna(subset=['Reported Date', 'Modal Price (Rs./Quintal)'])
if df.empty:
return df
df = df.groupby('Reported Date', as_index=False).mean()
full_date_range = pd.date_range(df['Reported Date'].min(), df['Reported Date'].max())
df = df.set_index('Reported Date').reindex(full_date_range).rename_axis('Reported Date').reset_index()
df['Modal Price (Rs./Quintal)'] = df['Modal Price (Rs./Quintal)'].ffill().bfill()
return df
def fetch_and_process_data(query_filter: dict):
cols = get_collections()
collection = cols['collection']
try:
# Fetch all fields - MongoDB handles this efficiently
cursor = collection.find(query_filter).sort('Reported Date', 1)
data = list(cursor)
if not data:
return None
df = pd.DataFrame(data)
# Check if required columns exist
if 'Reported Date' not in df.columns or 'Modal Price (Rs./Quintal)' not in df.columns:
import streamlit as st
st.error(f"Missing required columns. Available: {df.columns.tolist()}")
return None
# Ensure proper data types before preprocessing
df['Reported Date'] = pd.to_datetime(df['Reported Date'])
df['Modal Price (Rs./Quintal)'] = pd.to_numeric(df['Modal Price (Rs./Quintal)'], errors='coerce')
df = preprocess_data(df)
if df is None or df.empty:
return None
return df
except Exception as e:
import streamlit as st
st.error(f"Error fetching data: {str(e)}")
return None
return None
def fetch_and_store_data():
"""Fetch new data from Agmarknet API and store in MongoDB.
Fetches data from the day after the latest date in the database
until yesterday. Uses the Agmarknet API client.
Returns:
pd.DataFrame: The fetched and stored data, or None if no data
"""
cols = get_collections()
collection = cols['collection']
latest_doc = collection.find_one(sort=[("Reported Date", -1)])
latest_date = latest_doc["Reported Date"] if latest_doc and "Reported Date" in latest_doc else None
# Calculate date range
if latest_date:
from_date = latest_date + timedelta(days=1)
else:
from_date = datetime(2000, 1, 1)
to_date = datetime.now() - timedelta(days=1)
# Format dates for API (YYYY-MM-DD)
from_date_str = from_date.strftime('%Y-%m-%d')
to_date_str = to_date.strftime('%Y-%m-%d')
# Fetch data using API client
responses = api_client.fetch_date_range(from_date_str, to_date_str)
if not responses:
return None
# Parse responses to DataFrame
df = api_client.parse_multiple_responses_to_dataframe(responses)
if df.empty:
return None
# Filter for White variety only
df = df[df['Variety'] == "White"]
# Ensure proper data types for MongoDB
df["Reported Date"] = pd.to_datetime(df["Reported Date"])
# Convert numeric fields with proper type handling
df["Modal Price (Rs./Quintal)"] = pd.to_numeric(df["Modal Price (Rs./Quintal)"], errors='coerce').astype('Int64')
df["Min Price (Rs./Quintal)"] = pd.to_numeric(df["Min Price (Rs./Quintal)"], errors='coerce').astype(str)
df["Max Price (Rs./Quintal)"] = pd.to_numeric(df["Max Price (Rs./Quintal)"], errors='coerce').astype(str)
df["Arrivals (Tonnes)"] = pd.to_numeric(df["Arrivals (Tonnes)"], errors='coerce').astype(float)
# Sort by date
df.sort_values(by="Reported Date", inplace=True)
# Insert into MongoDB with proper format
for _, row in df.iterrows():
doc = row.to_dict()
# Ensure NaN values are handled properly
for key in doc:
if pd.isna(doc[key]):
doc[key] = None
collection.insert_one(doc)
return df
def get_dataframe_from_collection(collection):
data = list(collection.find())
df = pd.DataFrame(data)
if '_id' in df.columns:
df = df.drop(columns=['_id'])
return df
def collection_to_dataframe(collection, drop_id=True):
documents = list(collection.find())
df = pd.DataFrame(documents)
if drop_id and '_id' in df.columns:
df = df.drop(columns=['_id'])
return df
|