File size: 4,957 Bytes
fa4fc8b
 
 
 
 
 
 
 
 
ecb9d4e
 
 
 
 
 
 
 
 
 
 
fa4fc8b
 
 
 
 
 
 
 
 
 
 
ecb9d4e
 
fa4fc8b
ecb9d4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa4fc8b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import pandas as pd
from datetime import datetime, timedelta

from .config import get_collections
from .scraper import api_client


def preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
    df = df[['Reported Date', 'Modal Price (Rs./Quintal)']].copy()
    # Ensure datetime and numeric types (may already be converted)
    if not pd.api.types.is_datetime64_any_dtype(df['Reported Date']):
        df['Reported Date'] = pd.to_datetime(df['Reported Date'])
    df['Modal Price (Rs./Quintal)'] = pd.to_numeric(df['Modal Price (Rs./Quintal)'], errors='coerce')
    
    # Drop any rows with NaT dates or NaN prices
    df = df.dropna(subset=['Reported Date', 'Modal Price (Rs./Quintal)'])
    
    if df.empty:
        return df
    
    df = df.groupby('Reported Date', as_index=False).mean()
    full_date_range = pd.date_range(df['Reported Date'].min(), df['Reported Date'].max())
    df = df.set_index('Reported Date').reindex(full_date_range).rename_axis('Reported Date').reset_index()
    df['Modal Price (Rs./Quintal)'] = df['Modal Price (Rs./Quintal)'].ffill().bfill()
    return df


def fetch_and_process_data(query_filter: dict):
    cols = get_collections()
    collection = cols['collection']
    try:
        # Fetch all fields - MongoDB handles this efficiently
        cursor = collection.find(query_filter).sort('Reported Date', 1)
        data = list(cursor)
        if not data:
            return None
            
        df = pd.DataFrame(data)
        
        # Check if required columns exist
        if 'Reported Date' not in df.columns or 'Modal Price (Rs./Quintal)' not in df.columns:
            import streamlit as st
            st.error(f"Missing required columns. Available: {df.columns.tolist()}")
            return None
            
        # Ensure proper data types before preprocessing
        df['Reported Date'] = pd.to_datetime(df['Reported Date'])
        df['Modal Price (Rs./Quintal)'] = pd.to_numeric(df['Modal Price (Rs./Quintal)'], errors='coerce')
        df = preprocess_data(df)
        
        if df is None or df.empty:
            return None
        return df
    except Exception as e:
        import streamlit as st
        st.error(f"Error fetching data: {str(e)}")
        return None
        return None


def fetch_and_store_data():
    """Fetch new data from Agmarknet API and store in MongoDB.
    
    Fetches data from the day after the latest date in the database
    until yesterday. Uses the Agmarknet API client.
    
    Returns:
        pd.DataFrame: The fetched and stored data, or None if no data
    """
    cols = get_collections()
    collection = cols['collection']

    latest_doc = collection.find_one(sort=[("Reported Date", -1)])
    latest_date = latest_doc["Reported Date"] if latest_doc and "Reported Date" in latest_doc else None
    
    # Calculate date range
    if latest_date:
        from_date = latest_date + timedelta(days=1)
    else:
        from_date = datetime(2000, 1, 1)
    
    to_date = datetime.now() - timedelta(days=1)
    
    # Format dates for API (YYYY-MM-DD)
    from_date_str = from_date.strftime('%Y-%m-%d')
    to_date_str = to_date.strftime('%Y-%m-%d')
    
    # Fetch data using API client
    responses = api_client.fetch_date_range(from_date_str, to_date_str)
    
    if not responses:
        return None
    
    # Parse responses to DataFrame
    df = api_client.parse_multiple_responses_to_dataframe(responses)
    
    if df.empty:
        return None
    
    # Filter for White variety only
    df = df[df['Variety'] == "White"]
    
    # Ensure proper data types for MongoDB
    df["Reported Date"] = pd.to_datetime(df["Reported Date"])
    
    # Convert numeric fields with proper type handling
    df["Modal Price (Rs./Quintal)"] = pd.to_numeric(df["Modal Price (Rs./Quintal)"], errors='coerce').astype('Int64')
    df["Min Price (Rs./Quintal)"] = pd.to_numeric(df["Min Price (Rs./Quintal)"], errors='coerce').astype(str)
    df["Max Price (Rs./Quintal)"] = pd.to_numeric(df["Max Price (Rs./Quintal)"], errors='coerce').astype(str)
    df["Arrivals (Tonnes)"] = pd.to_numeric(df["Arrivals (Tonnes)"], errors='coerce').astype(float)
    
    # Sort by date
    df.sort_values(by="Reported Date", inplace=True)
    
    # Insert into MongoDB with proper format
    for _, row in df.iterrows():
        doc = row.to_dict()
        # Ensure NaN values are handled properly
        for key in doc:
            if pd.isna(doc[key]):
                doc[key] = None
        collection.insert_one(doc)
    
    return df


def get_dataframe_from_collection(collection):
    data = list(collection.find())
    df = pd.DataFrame(data)
    if '_id' in df.columns:
        df = df.drop(columns=['_id'])
    return df


def collection_to_dataframe(collection, drop_id=True):
    documents = list(collection.find())
    df = pd.DataFrame(documents)
    if drop_id and '_id' in df.columns:
        df = df.drop(columns=['_id'])
    return df