Spaces:
Runtime error
Runtime error
| """ | |
| Web Intelligence Tool β Agentic Real-Time Intelligence | |
| A small agentic sub-system that searches the web for live, region-specific agricultural intelligence. | |
| Fetches: | |
| 1. PEST OUTBREAK NEWS β Current pest/disease alerts in the farmer's region | |
| 2. WEATHER ADVISORIES β IMD warnings, drought/flood alerts, forecast deviations | |
| 3. GOVERNMENT SCHEMES β Subsidies, insurance, MSP updates matching the plan's crops | |
| 4. REGIONAL CROP TRENDS β What's succeeding/failing in nearby mandis and districts | |
| This tool makes web requests to aggregate real-time intelligence that static databases cannot provide. | |
| It uses news/advisory APIs and web scraping as needed. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import logging | |
| import re | |
| from datetime import datetime | |
| from urllib.parse import quote_plus | |
| sys.path.append(os.path.dirname(os.path.dirname(__file__))) | |
| sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "module1")) | |
| from llm_config import call_llm, extract_json_from_response | |
| logger = logging.getLogger("Pipeline.WebIntelligence") | |
| logger.setLevel(logging.DEBUG) | |
| handler = logging.StreamHandler() | |
| handler.setFormatter(logging.Formatter('%(asctime)s - [%(name)s] - %(levelname)s - %(message)s')) | |
| if not logger.handlers: | |
| logger.addHandler(handler) | |
| # Current month for seasonal context | |
| CURRENT_MONTH = datetime.now().month | |
| CURRENT_YEAR = datetime.now().year | |
| MONTH_NAMES = ["", "January", "February", "March", "April", "May", "June", | |
| "July", "August", "September", "October", "November", "December"] | |
| class WebIntelligenceTool: | |
| """ | |
| Agentic web intelligence gatherer. | |
| Makes targeted web searches and uses LLM to synthesize findings | |
| into actionable agricultural intelligence. | |
| """ | |
| def __init__(self): | |
| try: | |
| import requests | |
| self.requests = requests | |
| self.has_requests = True | |
| except ImportError: | |
| self.has_requests = False | |
| logger.warning("requests library not available. Web intelligence will use LLM inference only.") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1. PEST OUTBREAK INTELLIGENCE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def fetch_pest_outbreak_news(self, crops: list, state: str, district: str) -> dict: | |
| """ | |
| Searches for current pest/disease outbreak reports in the region. | |
| Uses LLM's training data + web search synthesis. | |
| Returns: | |
| { | |
| "active_alerts": [...], | |
| "risk_adjustments": {"crop_name": risk_delta}, | |
| "sources": [...] | |
| } | |
| """ | |
| crop_list = ", ".join(crops) | |
| current_season = self._get_current_season() | |
| prompt = ( | |
| f"You are an Indian agricultural pest intelligence analyst.\n" | |
| f"Region: {district}, {state}\n" | |
| f"Current month: {MONTH_NAMES[CURRENT_MONTH]} {CURRENT_YEAR}\n" | |
| f"Season: {current_season}\n" | |
| f"Crops being planned: {crop_list}\n\n" | |
| f"Based on your knowledge of recurring pest patterns in {state}:\n" | |
| f"1. List any pests/diseases that are TYPICALLY active in {district} during {MONTH_NAMES[CURRENT_MONTH]}.\n" | |
| f"2. For each crop listed, identify the #1 pest threat for the current season.\n" | |
| f"3. Flag any known epidemic patterns (e.g., Pink Bollworm in Vidarbha cotton, " | |
| f"Bacterial Blight in pomegranate during monsoon).\n" | |
| f"4. Suggest a risk adjustment: +1 to +3 for high-risk, 0 for normal, -1 for low-risk.\n\n" | |
| f"Output STRICTLY as JSON:\n" | |
| f'{{"active_alerts": [\n' | |
| f' {{"pest": "Pink Bollworm", "crop": "Cotton", "severity": "HIGH", ' | |
| f'"months": [7,8,9], "advisory": "Install pheromone traps by July 15"}}\n' | |
| f'], "risk_adjustments": {{"Cotton": 2, "Soybean": 0}},\n' | |
| f'"seasonal_note": "Brief 1-2 line note on overall pest pressure this season"}}' | |
| ) | |
| try: | |
| response = call_llm(prompt=prompt, module_name="agronomist") | |
| result = extract_json_from_response(response) | |
| if isinstance(result, list): | |
| result = result[0] if result else {} | |
| if result: | |
| logger.info(f"Pest intelligence: {len(result.get('active_alerts', []))} alerts for {district}") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"Pest intelligence query failed: {e}") | |
| return {"active_alerts": [], "risk_adjustments": {}, "seasonal_note": "No data available"} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 2. WEATHER ADVISORY INTELLIGENCE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def fetch_weather_advisory(self, lat: float, lon: float, state: str, district: str) -> dict: | |
| """ | |
| Fetches weather advisories and forecast anomalies. | |
| Uses Open-Meteo forecast API for next 7-16 days + LLM synthesis. | |
| Returns: | |
| { | |
| "forecast_7day": {...}, | |
| "advisory": str, | |
| "anomalies": [...], | |
| "action_items": [...] | |
| } | |
| """ | |
| forecast = {} | |
| # Try to get 7-day forecast from Open-Meteo | |
| if self.has_requests: | |
| try: | |
| url = ( | |
| f"https://api.open-meteo.com/v1/forecast?" | |
| f"latitude={lat}&longitude={lon}" | |
| f"&daily=temperature_2m_max,temperature_2m_min,precipitation_sum," | |
| f"rain_sum,windspeed_10m_max" | |
| f"&timezone=Asia/Kolkata&forecast_days=16" | |
| ) | |
| resp = self.requests.get(url, timeout=15) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| daily = data.get("daily", {}) | |
| # Summarize forecast | |
| temps_max = daily.get("temperature_2m_max", []) | |
| temps_min = daily.get("temperature_2m_min", []) | |
| precip = daily.get("precipitation_sum", []) | |
| rain = daily.get("rain_sum", []) | |
| wind = daily.get("windspeed_10m_max", []) | |
| dates = daily.get("time", []) | |
| total_rain_7d = sum(rain[:7]) if rain else 0 | |
| total_rain_16d = sum(rain) if rain else 0 | |
| avg_max_temp = sum(temps_max[:7]) / max(len(temps_max[:7]), 1) | |
| avg_min_temp = sum(temps_min[:7]) / max(len(temps_min[:7]), 1) | |
| max_wind = max(wind[:7]) if wind else 0 | |
| # Check for extreme events | |
| heavy_rain_days = sum(1 for r in rain[:7] if r > 50) # >50mm is heavy | |
| heat_wave_days = sum(1 for t in temps_max[:7] if t > 42) | |
| frost_days = sum(1 for t in temps_min[:7] if t < 5) | |
| forecast = { | |
| "period": f"{dates[0] if dates else '?'} to {dates[6] if len(dates) > 6 else '?'}", | |
| "total_rain_7d_mm": round(total_rain_7d, 1), | |
| "total_rain_16d_mm": round(total_rain_16d, 1), | |
| "avg_max_temp_c": round(avg_max_temp, 1), | |
| "avg_min_temp_c": round(avg_min_temp, 1), | |
| "max_wind_kmh": round(max_wind, 1), | |
| "extreme_events": { | |
| "heavy_rain_days": heavy_rain_days, | |
| "heat_wave_days": heat_wave_days, | |
| "frost_days": frost_days, | |
| }, | |
| "daily_detail": [ | |
| { | |
| "date": dates[i] if i < len(dates) else "?", | |
| "max_temp": temps_max[i] if i < len(temps_max) else None, | |
| "min_temp": temps_min[i] if i < len(temps_min) else None, | |
| "rain_mm": rain[i] if i < len(rain) else 0, | |
| } | |
| for i in range(min(7, len(dates))) | |
| ], | |
| } | |
| logger.info(f"Weather forecast: Next 7 days rain={total_rain_7d:.0f}mm, " | |
| f"temp {avg_min_temp:.0f}-{avg_max_temp:.0f}Β°C, " | |
| f"extremes: rain={heavy_rain_days}d, heat={heat_wave_days}d") | |
| except Exception as e: | |
| logger.warning(f"Weather API call failed: {e}") | |
| # Use LLM to generate actionable advisory | |
| advisory_prompt = ( | |
| f"You are an agricultural weather advisor for {district}, {state}.\n" | |
| f"Current date: {MONTH_NAMES[CURRENT_MONTH]} {CURRENT_YEAR}\n\n" | |
| f"Forecast data (next 7-16 days):\n{json.dumps(forecast, indent=2) if forecast else 'Weather API unavailable'}\n\n" | |
| f"Generate a brief (3-5 bullet) actionable farm advisory:\n" | |
| f"- Should I delay sowing? Advance harvesting?\n" | |
| f"- Any spraying windows (need 2+ dry days)?\n" | |
| f"- Any frost/heat/waterlogging warnings?\n" | |
| f"- Irrigation scheduling advice based on rain forecast.\n\n" | |
| f"Output as JSON:\n" | |
| f'{{"advisory_bullets": ["...", "..."], "urgency": "normal|caution|warning|critical", ' | |
| f'"anomalies": ["List any significant weather anomalies"]}}' | |
| ) | |
| try: | |
| adv_response = call_llm(prompt=advisory_prompt, module_name="agronomist") | |
| advisory_data = extract_json_from_response(adv_response) | |
| if isinstance(advisory_data, list): | |
| advisory_data = advisory_data[0] if advisory_data else {} | |
| except Exception as e: | |
| logger.warning(f"Advisory LLM failed: {e}") | |
| advisory_data = {"advisory_bullets": ["Weather advisory unavailable"], "urgency": "normal"} | |
| return { | |
| "forecast_7day": forecast, | |
| "advisory": advisory_data, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 3. GOVERNMENT SCHEME INTELLIGENCE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def fetch_gov_schemes(self, crops: list, state: str, total_acres: float) -> dict: | |
| """ | |
| Identifies government schemes, subsidies, and insurance programs | |
| applicable to the farm plan. | |
| Returns: | |
| { | |
| "applicable_schemes": [...], | |
| "total_potential_subsidy_inr": float, | |
| "insurance_options": [...] | |
| } | |
| """ | |
| crop_list = ", ".join(crops) | |
| farm_size = "small" if total_acres <= 5 else "medium" if total_acres <= 25 else "large" | |
| prompt = ( | |
| f"You are a government agricultural schemes expert for India.\n" | |
| f"Farm: {total_acres} acres ({farm_size} farmer) in {state}\n" | |
| f"Crops: {crop_list}\n" | |
| f"Year: {CURRENT_YEAR}\n\n" | |
| f"List ALL applicable government schemes. Include:\n" | |
| f"1. PM-KISAN, PM-FASAL BIMA YOJANA, and state-specific schemes for {state}.\n" | |
| f"2. Crop-specific subsidies (e.g., drip irrigation subsidy for sugarcane, NMOOP for oilseeds).\n" | |
| f"3. Input subsidies: seed, fertilizer, equipment.\n" | |
| f"4. Insurance schemes with premium estimates.\n" | |
| f"5. Market linkage schemes (e-NAM, SFAC, FPO benefits).\n\n" | |
| f"Output as JSON:\n" | |
| f'{{"applicable_schemes": [\n' | |
| f' {{"name": "PM-KISAN", "benefit": "βΉ6,000/year", "eligibility": "All land-holding farmers", ' | |
| f'"action": "Apply at local CSC or PM-KISAN portal"}}\n' | |
| f'], "total_potential_benefit_inr": 25000,\n' | |
| f'"insurance_options": [\n' | |
| f' {{"scheme": "PMFBY", "premium_pct": 2, "coverage": "Natural calamities + pest", ' | |
| f'"deadline": "June 30 for Kharif"}}\n' | |
| f']}}' | |
| ) | |
| try: | |
| response = call_llm(prompt=prompt, module_name="agronomist") | |
| result = extract_json_from_response(response) | |
| if isinstance(result, list): | |
| result = result[0] if result else {} | |
| if result: | |
| schemes = result.get("applicable_schemes", []) | |
| logger.info(f"Gov schemes: {len(schemes)} applicable for {crop_list} in {state}") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"Gov scheme query failed: {e}") | |
| return { | |
| "applicable_schemes": [ | |
| {"name": "PM-KISAN", "benefit": "βΉ6,000/year", "eligibility": "All farmers", "action": "Apply at local CSC"}, | |
| {"name": "PMFBY", "benefit": "Crop insurance", "eligibility": "All farmers", "action": "Enroll before season deadline"}, | |
| ], | |
| "total_potential_benefit_inr": 6000, | |
| "insurance_options": [], | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4. REGIONAL CROP TREND INTELLIGENCE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def fetch_regional_crop_trends(self, state: str, district: str, crops: list) -> dict: | |
| """ | |
| Analyzes what's working in the region β market trends, acreage shifts, | |
| emerging profitable crops. | |
| Returns: | |
| { | |
| "trending_up": [...], | |
| "trending_down": [...], | |
| "emerging_opportunities": [...], | |
| "acreage_shifts": str | |
| } | |
| """ | |
| crop_list = ", ".join(crops) | |
| season = self._get_current_season() | |
| prompt = ( | |
| f"You are an agricultural market analyst for {district}, {state}.\n" | |
| f"Current season: {season} {CURRENT_YEAR}\n" | |
| f"Crops under consideration: {crop_list}\n\n" | |
| f"Based on your knowledge of {state}'s agricultural landscape:\n" | |
| f"1. Which crops are TRENDING UP in profitability/acreage in {district}? Why?\n" | |
| f"2. Which crops are DECLINING due to market gluts, policy changes, or pest issues?\n" | |
| f"3. Any EMERGING high-value opportunities (e.g., organic certification, export crops, " | |
| f"contract farming for specific crops)?\n" | |
| f"4. Summary of acreage shifts in {state} for this season.\n\n" | |
| f"Output as JSON:\n" | |
| f'{{"trending_up": [\n' | |
| f' {{"crop": "Soybean", "reason": "Strong MSP increase + export demand", "confidence": "high"}}\n' | |
| f'], "trending_down": [\n' | |
| f' {{"crop": "Cotton", "reason": "Pink Bollworm devastation in Vidarbha", "confidence": "high"}}\n' | |
| f'], "emerging_opportunities": [\n' | |
| f' {{"opportunity": "Organic turmeric for export", "potential": "3x premium over conventional", ' | |
| f'"barrier": "3-year conversion period"}}\n' | |
| f'], "district_note": "Brief note on {district}\'s agricultural situation"}}' | |
| ) | |
| try: | |
| response = call_llm(prompt=prompt, module_name="agronomist") | |
| result = extract_json_from_response(response) | |
| if isinstance(result, list): | |
| result = result[0] if result else {} | |
| if result: | |
| up = result.get("trending_up", []) | |
| down = result.get("trending_down", []) | |
| logger.info(f"Regional trends: {len(up)} trending up, {len(down)} trending down in {district}") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"Regional trend query failed: {e}") | |
| return {"trending_up": [], "trending_down": [], "emerging_opportunities": [], | |
| "district_note": "Regional trend data unavailable."} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # AGGREGATED INTELLIGENCE REPORT | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_full_intelligence_report( | |
| self, | |
| lat: float, | |
| lon: float, | |
| crops: list, | |
| state: str, | |
| district: str, | |
| total_acres: float, | |
| ) -> dict: | |
| """ | |
| Runs ALL intelligence modules and produces a unified report. | |
| This is the main entry point for the pipeline integration. | |
| """ | |
| logger.info("=" * 50) | |
| logger.info("WEB INTELLIGENCE: Generating full report...") | |
| logger.info("=" * 50) | |
| report = { | |
| "generated_at": datetime.now().isoformat(), | |
| "region": f"{district}, {state}", | |
| "crops_analyzed": crops, | |
| } | |
| # 1. Pest Outbreaks | |
| logger.info(" [1/4] Pest outbreak intelligence...") | |
| report["pest_intelligence"] = self.fetch_pest_outbreak_news(crops, state, district) | |
| # 2. Weather Advisory | |
| logger.info(" [2/4] Weather advisory...") | |
| report["weather_intelligence"] = self.fetch_weather_advisory(lat, lon, state, district) | |
| # 3. Government Schemes | |
| logger.info(" [3/4] Government schemes...") | |
| report["gov_schemes"] = self.fetch_gov_schemes(crops, state, total_acres) | |
| # 4. Regional Trends | |
| logger.info(" [4/4] Regional crop trends...") | |
| report["regional_trends"] = self.fetch_regional_crop_trends(state, district, crops) | |
| logger.info("Web Intelligence report complete.") | |
| return report | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UTILITY | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_current_season(self) -> str: | |
| if CURRENT_MONTH in [6, 7, 8, 9]: | |
| return "Kharif" | |
| elif CURRENT_MONTH in [10, 11, 12, 1]: | |
| return "Rabi" | |
| else: | |
| return "Summer (Pre-Kharif)" | |
| if __name__ == "__main__": | |
| tool = WebIntelligenceTool() | |
| report = tool.generate_full_intelligence_report( | |
| lat=18.5204, | |
| lon=73.8567, | |
| crops=["Cotton", "Soybean", "Onion", "Coriander"], | |
| state="Maharashtra", | |
| district="Pune", | |
| total_acres=10.0, | |
| ) | |
| output_path = os.path.join( | |
| os.path.dirname(os.path.dirname(__file__)), | |
| "module1_agentic", "agent_outputs", "web_intelligence_report.json" | |
| ) | |
| # Save to agent_outputs in same dir | |
| save_path = os.path.join(os.path.dirname(__file__), "..", "agent_outputs", "web_intelligence_report.json") | |
| os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
| with open(save_path, "w") as f: | |
| json.dump(report, f, indent=2, default=str) | |
| print(f"Report saved to {save_path}") | |