fu3rig / agentic /tools /web_intelligence_tool.py
pranitchilbule221's picture
Upload 139 files
63c6373 verified
"""
Web Intelligence Tool β€” Agentic Real-Time Intelligence
A small agentic sub-system that searches the web for live, region-specific agricultural intelligence.
Fetches:
1. PEST OUTBREAK NEWS β€” Current pest/disease alerts in the farmer's region
2. WEATHER ADVISORIES β€” IMD warnings, drought/flood alerts, forecast deviations
3. GOVERNMENT SCHEMES β€” Subsidies, insurance, MSP updates matching the plan's crops
4. REGIONAL CROP TRENDS β€” What's succeeding/failing in nearby mandis and districts
This tool makes web requests to aggregate real-time intelligence that static databases cannot provide.
It uses news/advisory APIs and web scraping as needed.
"""
import os
import sys
import json
import logging
import re
from datetime import datetime
from urllib.parse import quote_plus
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "module1"))
from llm_config import call_llm, extract_json_from_response
logger = logging.getLogger("Pipeline.WebIntelligence")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - [%(name)s] - %(levelname)s - %(message)s'))
if not logger.handlers:
logger.addHandler(handler)
# Current month for seasonal context
CURRENT_MONTH = datetime.now().month
CURRENT_YEAR = datetime.now().year
MONTH_NAMES = ["", "January", "February", "March", "April", "May", "June",
"July", "August", "September", "October", "November", "December"]
class WebIntelligenceTool:
"""
Agentic web intelligence gatherer.
Makes targeted web searches and uses LLM to synthesize findings
into actionable agricultural intelligence.
"""
def __init__(self):
try:
import requests
self.requests = requests
self.has_requests = True
except ImportError:
self.has_requests = False
logger.warning("requests library not available. Web intelligence will use LLM inference only.")
# ═══════════════════════════════════════════════════════════════
# 1. PEST OUTBREAK INTELLIGENCE
# ═══════════════════════════════════════════════════════════════
def fetch_pest_outbreak_news(self, crops: list, state: str, district: str) -> dict:
"""
Searches for current pest/disease outbreak reports in the region.
Uses LLM's training data + web search synthesis.
Returns:
{
"active_alerts": [...],
"risk_adjustments": {"crop_name": risk_delta},
"sources": [...]
}
"""
crop_list = ", ".join(crops)
current_season = self._get_current_season()
prompt = (
f"You are an Indian agricultural pest intelligence analyst.\n"
f"Region: {district}, {state}\n"
f"Current month: {MONTH_NAMES[CURRENT_MONTH]} {CURRENT_YEAR}\n"
f"Season: {current_season}\n"
f"Crops being planned: {crop_list}\n\n"
f"Based on your knowledge of recurring pest patterns in {state}:\n"
f"1. List any pests/diseases that are TYPICALLY active in {district} during {MONTH_NAMES[CURRENT_MONTH]}.\n"
f"2. For each crop listed, identify the #1 pest threat for the current season.\n"
f"3. Flag any known epidemic patterns (e.g., Pink Bollworm in Vidarbha cotton, "
f"Bacterial Blight in pomegranate during monsoon).\n"
f"4. Suggest a risk adjustment: +1 to +3 for high-risk, 0 for normal, -1 for low-risk.\n\n"
f"Output STRICTLY as JSON:\n"
f'{{"active_alerts": [\n'
f' {{"pest": "Pink Bollworm", "crop": "Cotton", "severity": "HIGH", '
f'"months": [7,8,9], "advisory": "Install pheromone traps by July 15"}}\n'
f'], "risk_adjustments": {{"Cotton": 2, "Soybean": 0}},\n'
f'"seasonal_note": "Brief 1-2 line note on overall pest pressure this season"}}'
)
try:
response = call_llm(prompt=prompt, module_name="agronomist")
result = extract_json_from_response(response)
if isinstance(result, list):
result = result[0] if result else {}
if result:
logger.info(f"Pest intelligence: {len(result.get('active_alerts', []))} alerts for {district}")
return result
except Exception as e:
logger.warning(f"Pest intelligence query failed: {e}")
return {"active_alerts": [], "risk_adjustments": {}, "seasonal_note": "No data available"}
# ═══════════════════════════════════════════════════════════════
# 2. WEATHER ADVISORY INTELLIGENCE
# ═══════════════════════════════════════════════════════════════
def fetch_weather_advisory(self, lat: float, lon: float, state: str, district: str) -> dict:
"""
Fetches weather advisories and forecast anomalies.
Uses Open-Meteo forecast API for next 7-16 days + LLM synthesis.
Returns:
{
"forecast_7day": {...},
"advisory": str,
"anomalies": [...],
"action_items": [...]
}
"""
forecast = {}
# Try to get 7-day forecast from Open-Meteo
if self.has_requests:
try:
url = (
f"https://api.open-meteo.com/v1/forecast?"
f"latitude={lat}&longitude={lon}"
f"&daily=temperature_2m_max,temperature_2m_min,precipitation_sum,"
f"rain_sum,windspeed_10m_max"
f"&timezone=Asia/Kolkata&forecast_days=16"
)
resp = self.requests.get(url, timeout=15)
if resp.status_code == 200:
data = resp.json()
daily = data.get("daily", {})
# Summarize forecast
temps_max = daily.get("temperature_2m_max", [])
temps_min = daily.get("temperature_2m_min", [])
precip = daily.get("precipitation_sum", [])
rain = daily.get("rain_sum", [])
wind = daily.get("windspeed_10m_max", [])
dates = daily.get("time", [])
total_rain_7d = sum(rain[:7]) if rain else 0
total_rain_16d = sum(rain) if rain else 0
avg_max_temp = sum(temps_max[:7]) / max(len(temps_max[:7]), 1)
avg_min_temp = sum(temps_min[:7]) / max(len(temps_min[:7]), 1)
max_wind = max(wind[:7]) if wind else 0
# Check for extreme events
heavy_rain_days = sum(1 for r in rain[:7] if r > 50) # >50mm is heavy
heat_wave_days = sum(1 for t in temps_max[:7] if t > 42)
frost_days = sum(1 for t in temps_min[:7] if t < 5)
forecast = {
"period": f"{dates[0] if dates else '?'} to {dates[6] if len(dates) > 6 else '?'}",
"total_rain_7d_mm": round(total_rain_7d, 1),
"total_rain_16d_mm": round(total_rain_16d, 1),
"avg_max_temp_c": round(avg_max_temp, 1),
"avg_min_temp_c": round(avg_min_temp, 1),
"max_wind_kmh": round(max_wind, 1),
"extreme_events": {
"heavy_rain_days": heavy_rain_days,
"heat_wave_days": heat_wave_days,
"frost_days": frost_days,
},
"daily_detail": [
{
"date": dates[i] if i < len(dates) else "?",
"max_temp": temps_max[i] if i < len(temps_max) else None,
"min_temp": temps_min[i] if i < len(temps_min) else None,
"rain_mm": rain[i] if i < len(rain) else 0,
}
for i in range(min(7, len(dates)))
],
}
logger.info(f"Weather forecast: Next 7 days rain={total_rain_7d:.0f}mm, "
f"temp {avg_min_temp:.0f}-{avg_max_temp:.0f}Β°C, "
f"extremes: rain={heavy_rain_days}d, heat={heat_wave_days}d")
except Exception as e:
logger.warning(f"Weather API call failed: {e}")
# Use LLM to generate actionable advisory
advisory_prompt = (
f"You are an agricultural weather advisor for {district}, {state}.\n"
f"Current date: {MONTH_NAMES[CURRENT_MONTH]} {CURRENT_YEAR}\n\n"
f"Forecast data (next 7-16 days):\n{json.dumps(forecast, indent=2) if forecast else 'Weather API unavailable'}\n\n"
f"Generate a brief (3-5 bullet) actionable farm advisory:\n"
f"- Should I delay sowing? Advance harvesting?\n"
f"- Any spraying windows (need 2+ dry days)?\n"
f"- Any frost/heat/waterlogging warnings?\n"
f"- Irrigation scheduling advice based on rain forecast.\n\n"
f"Output as JSON:\n"
f'{{"advisory_bullets": ["...", "..."], "urgency": "normal|caution|warning|critical", '
f'"anomalies": ["List any significant weather anomalies"]}}'
)
try:
adv_response = call_llm(prompt=advisory_prompt, module_name="agronomist")
advisory_data = extract_json_from_response(adv_response)
if isinstance(advisory_data, list):
advisory_data = advisory_data[0] if advisory_data else {}
except Exception as e:
logger.warning(f"Advisory LLM failed: {e}")
advisory_data = {"advisory_bullets": ["Weather advisory unavailable"], "urgency": "normal"}
return {
"forecast_7day": forecast,
"advisory": advisory_data,
}
# ═══════════════════════════════════════════════════════════════
# 3. GOVERNMENT SCHEME INTELLIGENCE
# ═══════════════════════════════════════════════════════════════
def fetch_gov_schemes(self, crops: list, state: str, total_acres: float) -> dict:
"""
Identifies government schemes, subsidies, and insurance programs
applicable to the farm plan.
Returns:
{
"applicable_schemes": [...],
"total_potential_subsidy_inr": float,
"insurance_options": [...]
}
"""
crop_list = ", ".join(crops)
farm_size = "small" if total_acres <= 5 else "medium" if total_acres <= 25 else "large"
prompt = (
f"You are a government agricultural schemes expert for India.\n"
f"Farm: {total_acres} acres ({farm_size} farmer) in {state}\n"
f"Crops: {crop_list}\n"
f"Year: {CURRENT_YEAR}\n\n"
f"List ALL applicable government schemes. Include:\n"
f"1. PM-KISAN, PM-FASAL BIMA YOJANA, and state-specific schemes for {state}.\n"
f"2. Crop-specific subsidies (e.g., drip irrigation subsidy for sugarcane, NMOOP for oilseeds).\n"
f"3. Input subsidies: seed, fertilizer, equipment.\n"
f"4. Insurance schemes with premium estimates.\n"
f"5. Market linkage schemes (e-NAM, SFAC, FPO benefits).\n\n"
f"Output as JSON:\n"
f'{{"applicable_schemes": [\n'
f' {{"name": "PM-KISAN", "benefit": "β‚Ή6,000/year", "eligibility": "All land-holding farmers", '
f'"action": "Apply at local CSC or PM-KISAN portal"}}\n'
f'], "total_potential_benefit_inr": 25000,\n'
f'"insurance_options": [\n'
f' {{"scheme": "PMFBY", "premium_pct": 2, "coverage": "Natural calamities + pest", '
f'"deadline": "June 30 for Kharif"}}\n'
f']}}'
)
try:
response = call_llm(prompt=prompt, module_name="agronomist")
result = extract_json_from_response(response)
if isinstance(result, list):
result = result[0] if result else {}
if result:
schemes = result.get("applicable_schemes", [])
logger.info(f"Gov schemes: {len(schemes)} applicable for {crop_list} in {state}")
return result
except Exception as e:
logger.warning(f"Gov scheme query failed: {e}")
return {
"applicable_schemes": [
{"name": "PM-KISAN", "benefit": "β‚Ή6,000/year", "eligibility": "All farmers", "action": "Apply at local CSC"},
{"name": "PMFBY", "benefit": "Crop insurance", "eligibility": "All farmers", "action": "Enroll before season deadline"},
],
"total_potential_benefit_inr": 6000,
"insurance_options": [],
}
# ═══════════════════════════════════════════════════════════════
# 4. REGIONAL CROP TREND INTELLIGENCE
# ═══════════════════════════════════════════════════════════════
def fetch_regional_crop_trends(self, state: str, district: str, crops: list) -> dict:
"""
Analyzes what's working in the region β€” market trends, acreage shifts,
emerging profitable crops.
Returns:
{
"trending_up": [...],
"trending_down": [...],
"emerging_opportunities": [...],
"acreage_shifts": str
}
"""
crop_list = ", ".join(crops)
season = self._get_current_season()
prompt = (
f"You are an agricultural market analyst for {district}, {state}.\n"
f"Current season: {season} {CURRENT_YEAR}\n"
f"Crops under consideration: {crop_list}\n\n"
f"Based on your knowledge of {state}'s agricultural landscape:\n"
f"1. Which crops are TRENDING UP in profitability/acreage in {district}? Why?\n"
f"2. Which crops are DECLINING due to market gluts, policy changes, or pest issues?\n"
f"3. Any EMERGING high-value opportunities (e.g., organic certification, export crops, "
f"contract farming for specific crops)?\n"
f"4. Summary of acreage shifts in {state} for this season.\n\n"
f"Output as JSON:\n"
f'{{"trending_up": [\n'
f' {{"crop": "Soybean", "reason": "Strong MSP increase + export demand", "confidence": "high"}}\n'
f'], "trending_down": [\n'
f' {{"crop": "Cotton", "reason": "Pink Bollworm devastation in Vidarbha", "confidence": "high"}}\n'
f'], "emerging_opportunities": [\n'
f' {{"opportunity": "Organic turmeric for export", "potential": "3x premium over conventional", '
f'"barrier": "3-year conversion period"}}\n'
f'], "district_note": "Brief note on {district}\'s agricultural situation"}}'
)
try:
response = call_llm(prompt=prompt, module_name="agronomist")
result = extract_json_from_response(response)
if isinstance(result, list):
result = result[0] if result else {}
if result:
up = result.get("trending_up", [])
down = result.get("trending_down", [])
logger.info(f"Regional trends: {len(up)} trending up, {len(down)} trending down in {district}")
return result
except Exception as e:
logger.warning(f"Regional trend query failed: {e}")
return {"trending_up": [], "trending_down": [], "emerging_opportunities": [],
"district_note": "Regional trend data unavailable."}
# ═══════════════════════════════════════════════════════════════
# AGGREGATED INTELLIGENCE REPORT
# ═══════════════════════════════════════════════════════════════
def generate_full_intelligence_report(
self,
lat: float,
lon: float,
crops: list,
state: str,
district: str,
total_acres: float,
) -> dict:
"""
Runs ALL intelligence modules and produces a unified report.
This is the main entry point for the pipeline integration.
"""
logger.info("=" * 50)
logger.info("WEB INTELLIGENCE: Generating full report...")
logger.info("=" * 50)
report = {
"generated_at": datetime.now().isoformat(),
"region": f"{district}, {state}",
"crops_analyzed": crops,
}
# 1. Pest Outbreaks
logger.info(" [1/4] Pest outbreak intelligence...")
report["pest_intelligence"] = self.fetch_pest_outbreak_news(crops, state, district)
# 2. Weather Advisory
logger.info(" [2/4] Weather advisory...")
report["weather_intelligence"] = self.fetch_weather_advisory(lat, lon, state, district)
# 3. Government Schemes
logger.info(" [3/4] Government schemes...")
report["gov_schemes"] = self.fetch_gov_schemes(crops, state, total_acres)
# 4. Regional Trends
logger.info(" [4/4] Regional crop trends...")
report["regional_trends"] = self.fetch_regional_crop_trends(state, district, crops)
logger.info("Web Intelligence report complete.")
return report
# ═══════════════════════════════════════════════════════════════
# UTILITY
# ═══════════════════════════════════════════════════════════════
def _get_current_season(self) -> str:
if CURRENT_MONTH in [6, 7, 8, 9]:
return "Kharif"
elif CURRENT_MONTH in [10, 11, 12, 1]:
return "Rabi"
else:
return "Summer (Pre-Kharif)"
if __name__ == "__main__":
tool = WebIntelligenceTool()
report = tool.generate_full_intelligence_report(
lat=18.5204,
lon=73.8567,
crops=["Cotton", "Soybean", "Onion", "Coriander"],
state="Maharashtra",
district="Pune",
total_acres=10.0,
)
output_path = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
"module1_agentic", "agent_outputs", "web_intelligence_report.json"
)
# Save to agent_outputs in same dir
save_path = os.path.join(os.path.dirname(__file__), "..", "agent_outputs", "web_intelligence_report.json")
os.makedirs(os.path.dirname(save_path), exist_ok=True)
with open(save_path, "w") as f:
json.dump(report, f, indent=2, default=str)
print(f"Report saved to {save_path}")