""" Web Intelligence Tool — Agentic Real-Time Intelligence A small agentic sub-system that searches the web for live, region-specific agricultural intelligence. Fetches: 1. PEST OUTBREAK NEWS — Current pest/disease alerts in the farmer's region 2. WEATHER ADVISORIES — IMD warnings, drought/flood alerts, forecast deviations 3. GOVERNMENT SCHEMES — Subsidies, insurance, MSP updates matching the plan's crops 4. REGIONAL CROP TRENDS — What's succeeding/failing in nearby mandis and districts This tool makes web requests to aggregate real-time intelligence that static databases cannot provide. It uses news/advisory APIs and web scraping as needed. """ import os import sys import json import logging import re from datetime import datetime from urllib.parse import quote_plus sys.path.append(os.path.dirname(os.path.dirname(__file__))) sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "module1")) from llm_config import call_llm, extract_json_from_response logger = logging.getLogger("Pipeline.WebIntelligence") logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s - [%(name)s] - %(levelname)s - %(message)s')) if not logger.handlers: logger.addHandler(handler) # Current month for seasonal context CURRENT_MONTH = datetime.now().month CURRENT_YEAR = datetime.now().year MONTH_NAMES = ["", "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"] class WebIntelligenceTool: """ Agentic web intelligence gatherer. Makes targeted web searches and uses LLM to synthesize findings into actionable agricultural intelligence. """ def __init__(self): try: import requests self.requests = requests self.has_requests = True except ImportError: self.has_requests = False logger.warning("requests library not available. Web intelligence will use LLM inference only.") # ═══════════════════════════════════════════════════════════════ # 1. PEST OUTBREAK INTELLIGENCE # ═══════════════════════════════════════════════════════════════ def fetch_pest_outbreak_news(self, crops: list, state: str, district: str) -> dict: """ Searches for current pest/disease outbreak reports in the region. Uses LLM's training data + web search synthesis. Returns: { "active_alerts": [...], "risk_adjustments": {"crop_name": risk_delta}, "sources": [...] } """ crop_list = ", ".join(crops) current_season = self._get_current_season() prompt = ( f"You are an Indian agricultural pest intelligence analyst.\n" f"Region: {district}, {state}\n" f"Current month: {MONTH_NAMES[CURRENT_MONTH]} {CURRENT_YEAR}\n" f"Season: {current_season}\n" f"Crops being planned: {crop_list}\n\n" f"Based on your knowledge of recurring pest patterns in {state}:\n" f"1. List any pests/diseases that are TYPICALLY active in {district} during {MONTH_NAMES[CURRENT_MONTH]}.\n" f"2. For each crop listed, identify the #1 pest threat for the current season.\n" f"3. Flag any known epidemic patterns (e.g., Pink Bollworm in Vidarbha cotton, " f"Bacterial Blight in pomegranate during monsoon).\n" f"4. Suggest a risk adjustment: +1 to +3 for high-risk, 0 for normal, -1 for low-risk.\n\n" f"Output STRICTLY as JSON:\n" f'{{"active_alerts": [\n' f' {{"pest": "Pink Bollworm", "crop": "Cotton", "severity": "HIGH", ' f'"months": [7,8,9], "advisory": "Install pheromone traps by July 15"}}\n' f'], "risk_adjustments": {{"Cotton": 2, "Soybean": 0}},\n' f'"seasonal_note": "Brief 1-2 line note on overall pest pressure this season"}}' ) try: response = call_llm(prompt=prompt, module_name="agronomist") result = extract_json_from_response(response) if isinstance(result, list): result = result[0] if result else {} if result: logger.info(f"Pest intelligence: {len(result.get('active_alerts', []))} alerts for {district}") return result except Exception as e: logger.warning(f"Pest intelligence query failed: {e}") return {"active_alerts": [], "risk_adjustments": {}, "seasonal_note": "No data available"} # ═══════════════════════════════════════════════════════════════ # 2. WEATHER ADVISORY INTELLIGENCE # ═══════════════════════════════════════════════════════════════ def fetch_weather_advisory(self, lat: float, lon: float, state: str, district: str) -> dict: """ Fetches weather advisories and forecast anomalies. Uses Open-Meteo forecast API for next 7-16 days + LLM synthesis. Returns: { "forecast_7day": {...}, "advisory": str, "anomalies": [...], "action_items": [...] } """ forecast = {} # Try to get 7-day forecast from Open-Meteo if self.has_requests: try: url = ( f"https://api.open-meteo.com/v1/forecast?" f"latitude={lat}&longitude={lon}" f"&daily=temperature_2m_max,temperature_2m_min,precipitation_sum," f"rain_sum,windspeed_10m_max" f"&timezone=Asia/Kolkata&forecast_days=16" ) resp = self.requests.get(url, timeout=15) if resp.status_code == 200: data = resp.json() daily = data.get("daily", {}) # Summarize forecast temps_max = daily.get("temperature_2m_max", []) temps_min = daily.get("temperature_2m_min", []) precip = daily.get("precipitation_sum", []) rain = daily.get("rain_sum", []) wind = daily.get("windspeed_10m_max", []) dates = daily.get("time", []) total_rain_7d = sum(rain[:7]) if rain else 0 total_rain_16d = sum(rain) if rain else 0 avg_max_temp = sum(temps_max[:7]) / max(len(temps_max[:7]), 1) avg_min_temp = sum(temps_min[:7]) / max(len(temps_min[:7]), 1) max_wind = max(wind[:7]) if wind else 0 # Check for extreme events heavy_rain_days = sum(1 for r in rain[:7] if r > 50) # >50mm is heavy heat_wave_days = sum(1 for t in temps_max[:7] if t > 42) frost_days = sum(1 for t in temps_min[:7] if t < 5) forecast = { "period": f"{dates[0] if dates else '?'} to {dates[6] if len(dates) > 6 else '?'}", "total_rain_7d_mm": round(total_rain_7d, 1), "total_rain_16d_mm": round(total_rain_16d, 1), "avg_max_temp_c": round(avg_max_temp, 1), "avg_min_temp_c": round(avg_min_temp, 1), "max_wind_kmh": round(max_wind, 1), "extreme_events": { "heavy_rain_days": heavy_rain_days, "heat_wave_days": heat_wave_days, "frost_days": frost_days, }, "daily_detail": [ { "date": dates[i] if i < len(dates) else "?", "max_temp": temps_max[i] if i < len(temps_max) else None, "min_temp": temps_min[i] if i < len(temps_min) else None, "rain_mm": rain[i] if i < len(rain) else 0, } for i in range(min(7, len(dates))) ], } logger.info(f"Weather forecast: Next 7 days rain={total_rain_7d:.0f}mm, " f"temp {avg_min_temp:.0f}-{avg_max_temp:.0f}°C, " f"extremes: rain={heavy_rain_days}d, heat={heat_wave_days}d") except Exception as e: logger.warning(f"Weather API call failed: {e}") # Use LLM to generate actionable advisory advisory_prompt = ( f"You are an agricultural weather advisor for {district}, {state}.\n" f"Current date: {MONTH_NAMES[CURRENT_MONTH]} {CURRENT_YEAR}\n\n" f"Forecast data (next 7-16 days):\n{json.dumps(forecast, indent=2) if forecast else 'Weather API unavailable'}\n\n" f"Generate a brief (3-5 bullet) actionable farm advisory:\n" f"- Should I delay sowing? Advance harvesting?\n" f"- Any spraying windows (need 2+ dry days)?\n" f"- Any frost/heat/waterlogging warnings?\n" f"- Irrigation scheduling advice based on rain forecast.\n\n" f"Output as JSON:\n" f'{{"advisory_bullets": ["...", "..."], "urgency": "normal|caution|warning|critical", ' f'"anomalies": ["List any significant weather anomalies"]}}' ) try: adv_response = call_llm(prompt=advisory_prompt, module_name="agronomist") advisory_data = extract_json_from_response(adv_response) if isinstance(advisory_data, list): advisory_data = advisory_data[0] if advisory_data else {} except Exception as e: logger.warning(f"Advisory LLM failed: {e}") advisory_data = {"advisory_bullets": ["Weather advisory unavailable"], "urgency": "normal"} return { "forecast_7day": forecast, "advisory": advisory_data, } # ═══════════════════════════════════════════════════════════════ # 3. GOVERNMENT SCHEME INTELLIGENCE # ═══════════════════════════════════════════════════════════════ def fetch_gov_schemes(self, crops: list, state: str, total_acres: float) -> dict: """ Identifies government schemes, subsidies, and insurance programs applicable to the farm plan. Returns: { "applicable_schemes": [...], "total_potential_subsidy_inr": float, "insurance_options": [...] } """ crop_list = ", ".join(crops) farm_size = "small" if total_acres <= 5 else "medium" if total_acres <= 25 else "large" prompt = ( f"You are a government agricultural schemes expert for India.\n" f"Farm: {total_acres} acres ({farm_size} farmer) in {state}\n" f"Crops: {crop_list}\n" f"Year: {CURRENT_YEAR}\n\n" f"List ALL applicable government schemes. Include:\n" f"1. PM-KISAN, PM-FASAL BIMA YOJANA, and state-specific schemes for {state}.\n" f"2. Crop-specific subsidies (e.g., drip irrigation subsidy for sugarcane, NMOOP for oilseeds).\n" f"3. Input subsidies: seed, fertilizer, equipment.\n" f"4. Insurance schemes with premium estimates.\n" f"5. Market linkage schemes (e-NAM, SFAC, FPO benefits).\n\n" f"Output as JSON:\n" f'{{"applicable_schemes": [\n' f' {{"name": "PM-KISAN", "benefit": "₹6,000/year", "eligibility": "All land-holding farmers", ' f'"action": "Apply at local CSC or PM-KISAN portal"}}\n' f'], "total_potential_benefit_inr": 25000,\n' f'"insurance_options": [\n' f' {{"scheme": "PMFBY", "premium_pct": 2, "coverage": "Natural calamities + pest", ' f'"deadline": "June 30 for Kharif"}}\n' f']}}' ) try: response = call_llm(prompt=prompt, module_name="agronomist") result = extract_json_from_response(response) if isinstance(result, list): result = result[0] if result else {} if result: schemes = result.get("applicable_schemes", []) logger.info(f"Gov schemes: {len(schemes)} applicable for {crop_list} in {state}") return result except Exception as e: logger.warning(f"Gov scheme query failed: {e}") return { "applicable_schemes": [ {"name": "PM-KISAN", "benefit": "₹6,000/year", "eligibility": "All farmers", "action": "Apply at local CSC"}, {"name": "PMFBY", "benefit": "Crop insurance", "eligibility": "All farmers", "action": "Enroll before season deadline"}, ], "total_potential_benefit_inr": 6000, "insurance_options": [], } # ═══════════════════════════════════════════════════════════════ # 4. REGIONAL CROP TREND INTELLIGENCE # ═══════════════════════════════════════════════════════════════ def fetch_regional_crop_trends(self, state: str, district: str, crops: list) -> dict: """ Analyzes what's working in the region — market trends, acreage shifts, emerging profitable crops. Returns: { "trending_up": [...], "trending_down": [...], "emerging_opportunities": [...], "acreage_shifts": str } """ crop_list = ", ".join(crops) season = self._get_current_season() prompt = ( f"You are an agricultural market analyst for {district}, {state}.\n" f"Current season: {season} {CURRENT_YEAR}\n" f"Crops under consideration: {crop_list}\n\n" f"Based on your knowledge of {state}'s agricultural landscape:\n" f"1. Which crops are TRENDING UP in profitability/acreage in {district}? Why?\n" f"2. Which crops are DECLINING due to market gluts, policy changes, or pest issues?\n" f"3. Any EMERGING high-value opportunities (e.g., organic certification, export crops, " f"contract farming for specific crops)?\n" f"4. Summary of acreage shifts in {state} for this season.\n\n" f"Output as JSON:\n" f'{{"trending_up": [\n' f' {{"crop": "Soybean", "reason": "Strong MSP increase + export demand", "confidence": "high"}}\n' f'], "trending_down": [\n' f' {{"crop": "Cotton", "reason": "Pink Bollworm devastation in Vidarbha", "confidence": "high"}}\n' f'], "emerging_opportunities": [\n' f' {{"opportunity": "Organic turmeric for export", "potential": "3x premium over conventional", ' f'"barrier": "3-year conversion period"}}\n' f'], "district_note": "Brief note on {district}\'s agricultural situation"}}' ) try: response = call_llm(prompt=prompt, module_name="agronomist") result = extract_json_from_response(response) if isinstance(result, list): result = result[0] if result else {} if result: up = result.get("trending_up", []) down = result.get("trending_down", []) logger.info(f"Regional trends: {len(up)} trending up, {len(down)} trending down in {district}") return result except Exception as e: logger.warning(f"Regional trend query failed: {e}") return {"trending_up": [], "trending_down": [], "emerging_opportunities": [], "district_note": "Regional trend data unavailable."} # ═══════════════════════════════════════════════════════════════ # AGGREGATED INTELLIGENCE REPORT # ═══════════════════════════════════════════════════════════════ def generate_full_intelligence_report( self, lat: float, lon: float, crops: list, state: str, district: str, total_acres: float, ) -> dict: """ Runs ALL intelligence modules and produces a unified report. This is the main entry point for the pipeline integration. """ logger.info("=" * 50) logger.info("WEB INTELLIGENCE: Generating full report...") logger.info("=" * 50) report = { "generated_at": datetime.now().isoformat(), "region": f"{district}, {state}", "crops_analyzed": crops, } # 1. Pest Outbreaks logger.info(" [1/4] Pest outbreak intelligence...") report["pest_intelligence"] = self.fetch_pest_outbreak_news(crops, state, district) # 2. Weather Advisory logger.info(" [2/4] Weather advisory...") report["weather_intelligence"] = self.fetch_weather_advisory(lat, lon, state, district) # 3. Government Schemes logger.info(" [3/4] Government schemes...") report["gov_schemes"] = self.fetch_gov_schemes(crops, state, total_acres) # 4. Regional Trends logger.info(" [4/4] Regional crop trends...") report["regional_trends"] = self.fetch_regional_crop_trends(state, district, crops) logger.info("Web Intelligence report complete.") return report # ═══════════════════════════════════════════════════════════════ # UTILITY # ═══════════════════════════════════════════════════════════════ def _get_current_season(self) -> str: if CURRENT_MONTH in [6, 7, 8, 9]: return "Kharif" elif CURRENT_MONTH in [10, 11, 12, 1]: return "Rabi" else: return "Summer (Pre-Kharif)" if __name__ == "__main__": tool = WebIntelligenceTool() report = tool.generate_full_intelligence_report( lat=18.5204, lon=73.8567, crops=["Cotton", "Soybean", "Onion", "Coriander"], state="Maharashtra", district="Pune", total_acres=10.0, ) output_path = os.path.join( os.path.dirname(os.path.dirname(__file__)), "module1_agentic", "agent_outputs", "web_intelligence_report.json" ) # Save to agent_outputs in same dir save_path = os.path.join(os.path.dirname(__file__), "..", "agent_outputs", "web_intelligence_report.json") os.makedirs(os.path.dirname(save_path), exist_ok=True) with open(save_path, "w") as f: json.dump(report, f, indent=2, default=str) print(f"Report saved to {save_path}")