Spaces:
Sleeping
Sleeping
| """API-based scraper for agmarknet.gov.in using direct API calls.""" | |
| import requests | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List, Dict, Any | |
| import logging | |
| from pathlib import Path | |
| from urllib.parse import urlencode | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.INFO) | |
| class AgmarknetAPIClient: | |
| """Client for Agmarknet API using ScraperAPI.""" | |
| BASE_URL = "https://api.agmarknet.gov.in/v1/prices-and-arrivals/market-report/specific" | |
| # Fixed Parameters | |
| COMMODITY_GROUP_ID = 3 | |
| COMMODITY_ID = 11 | |
| INCLUDE_EXCEL = "false" | |
| SCRAPER_API_KEY = "bbbbde6b56c0fde1e2a61c914eb22d14" # <-- Add your key here | |
| SCRAPER_API_URL = "https://api.scraperapi.com" | |
| TIMEOUT = 30 | |
| def __init__(self): | |
| self.session = requests.Session() | |
| logger.info("Agmarknet API client initialized with ScraperAPI") | |
| def _log_api_call(self, date_str: str, url: str, status_code: int, records_count: int = 0): | |
| logger.info( | |
| f"API CALL | Date: {date_str} | Status: {status_code} | " | |
| f"Records: {records_count} | URL: {url}" | |
| ) | |
| def fetch_market_data(self, date_str: str): | |
| """Fetch data using ScraperAPI. | |
| Args: | |
| date_str: Date string (YYYY-MM-DD) | |
| Returns: | |
| JSON response from API | |
| """ | |
| # Original Agmarknet query params | |
| query_params = { | |
| "commodityGroupId": self.COMMODITY_GROUP_ID, | |
| "commodityId": self.COMMODITY_ID, | |
| "date": date_str, | |
| "includeExcel": self.INCLUDE_EXCEL | |
| } | |
| original_url = f"{self.BASE_URL}?{urlencode(query_params)}" | |
| # ScraperAPI wrapper URL | |
| scraper_params = { | |
| "api_key": self.SCRAPER_API_KEY, | |
| "url": original_url, | |
| "render": "false" | |
| } | |
| scraper_url = f"{self.SCRAPER_API_URL}?{urlencode(scraper_params)}" | |
| try: | |
| response = self.session.get(scraper_url, timeout=self.TIMEOUT) | |
| status_code = response.status_code | |
| data = response.json() | |
| records_count = self._count_records(data) if isinstance(data, dict) else 0 | |
| self._log_api_call(date_str, original_url, status_code, records_count) | |
| return data | |
| except Exception as e: | |
| logger.error(f"ScraperAPI request failed for {date_str}: {str(e)}") | |
| raise | |
| def fetch_date_range(self, start_date: str, end_date: str) -> List[Dict[str, Any]]: | |
| """Fetch market data for a date range. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| List of API response dictionaries | |
| """ | |
| logger.info(f"Starting date range fetch | From: {start_date} To: {end_date}") | |
| try: | |
| start = datetime.strptime(start_date, "%Y-%m-%d") | |
| end = datetime.strptime(end_date, "%Y-%m-%d") | |
| except ValueError as e: | |
| logger.error(f"❌ Invalid date format | Error: {str(e)}") | |
| return [] | |
| if start > end: | |
| logger.error(f"❌ Start date cannot be after end date") | |
| return [] | |
| results = [] | |
| current = start | |
| logger.info(f"Fetching {(end - start).days + 1} days of data...") | |
| while current <= end: | |
| date_str = current.strftime("%Y-%m-%d") | |
| data = self.fetch_market_data(date_str) | |
| if data: | |
| results.append(data) | |
| current += timedelta(days=1) | |
| logger.info( | |
| f"✅ Completed date range fetch | " | |
| f"Total days: {(end - start).days + 1} | " | |
| f"Successful fetches: {len(results)}" | |
| ) | |
| return results | |
| def _count_records(data: Dict[str, Any]) -> int: | |
| """Count total records in API response. | |
| Args: | |
| data: API response dictionary | |
| Returns: | |
| Total number of records | |
| """ | |
| count = 0 | |
| states = data.get("states", []) | |
| for state in states: | |
| markets = state.get("markets", []) | |
| for market in markets: | |
| market_data = market.get("data", []) | |
| count += len(market_data) | |
| return count | |
| def parse_response_to_dataframe(api_response: Dict[str, Any]) -> pd.DataFrame: | |
| """Parse API response to DataFrame. | |
| Args: | |
| api_response: API response dictionary | |
| Returns: | |
| Flattened DataFrame with all market data | |
| """ | |
| records = [] | |
| # Extract report date from title | |
| title = api_response.get("title", "") | |
| # Format: "Market wise Daily Report for Sesamum(Sesame,Gingelly,Til) on 01-Nov-2025" | |
| reported_date = None | |
| if " on " in title: | |
| date_part = title.split(" on ")[-1].strip() | |
| try: | |
| reported_date = pd.to_datetime(date_part, format="%d-%b-%Y") | |
| except: | |
| reported_date = None | |
| commodity_name = api_response.get("commodityName", "") | |
| states = api_response.get("states", []) | |
| for state in states: | |
| state_name = state.get("stateName", "") | |
| state_id = state.get("stateId") | |
| markets = state.get("markets", []) | |
| for market in markets: | |
| market_name = market.get("marketName", "") | |
| # Remove "APMC" suffix if present | |
| if market_name.endswith(" APMC"): | |
| market_name = market_name[:-5].strip() | |
| market_id = market.get("marketId") | |
| market_data = market.get("data", []) | |
| for entry in market_data: | |
| record = { | |
| "Reported Date": reported_date, | |
| "State Name": state_name, | |
| "District Name": state_name, # Using state name as district for now | |
| "Market Name": market_name, | |
| "Variety": entry.get("variety"), | |
| "Group": "Oil Seeds", | |
| "Arrivals (Tonnes)": entry.get("arrivals"), | |
| "Min Price (Rs./Quintal)": entry.get("minimumPrice"), | |
| "Max Price (Rs./Quintal)": entry.get("maximumPrice"), | |
| "Modal Price (Rs./Quintal)": entry.get("modalPrice"), | |
| "Grade": entry.get("grade"), | |
| } | |
| records.append(record) | |
| df = pd.DataFrame(records) | |
| logger.info(f"Parsed API response to DataFrame | Records: {len(df)}") | |
| return df | |
| def parse_multiple_responses_to_dataframe( | |
| responses: List[Dict[str, Any]] | |
| ) -> pd.DataFrame: | |
| """Parse multiple API responses to single DataFrame. | |
| Args: | |
| responses: List of API response dictionaries | |
| Returns: | |
| Combined DataFrame | |
| """ | |
| dfs = [] | |
| for response in responses: | |
| df = AgmarknetAPIClient.parse_response_to_dataframe(response) | |
| dfs.append(df) | |
| combined_df = pd.concat(dfs, ignore_index=True) | |
| logger.info( | |
| f"Combined {len(responses)} API responses into DataFrame | " | |
| f"Total records: {len(combined_df)}" | |
| ) | |
| return combined_df | |
| def export_response_to_file(self, api_response: Dict[str, Any], | |
| filename: str = "api_response.json"): | |
| """Export API response to JSON file. | |
| Args: | |
| api_response: API response dictionary | |
| filename: Output filename | |
| """ | |
| import json | |
| filepath = Path(filename) | |
| try: | |
| with open(filepath, 'w') as f: | |
| json.dump(api_response, f, indent=2) | |
| logger.info(f"✅ Exported API response to file | Path: {filepath}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to export API response | Error: {str(e)}") | |
| # Global client instance | |
| api_client = AgmarknetAPIClient() | |