Spaces:
Running
Running
File size: 9,865 Bytes
6afc01a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# """
# Stage 3: Response Compiler - Data Fusion
# """
# from typing import Dict, Any, List
# class ResponseCompiler:
# """Stage 3: Compile results from multiple servers"""
# def compile(self, raw_results: Dict[str, Any]) -> Dict[str, Any]:
# """
# Merge results into structured format
# Args:
# raw_results: Dictionary containing results from MCPExecutor
# {
# "results": {
# "weather": {"status": "success", "data": {...}},
# "soil_properties": {"status": "success", "data": {...}},
# ...
# },
# "execution_time_seconds": 3.5
# }
# Returns:
# {
# "successful_servers": List[str],
# "failed_servers": List[dict],
# "data": Dict[str, Any],
# "execution_time": float,
# "completeness": str
# }
# """
# results_dict = raw_results.get("results", {})
# successful = []
# failed = []
# compiled_data = {}
# for server_name, result in results_dict.items():
# if result.get("status") == "success":
# successful.append(server_name)
# compiled_data[server_name] = result.get("data", {})
# else:
# failed.append({
# "server": server_name,
# "error": result.get("error", "Unknown error")
# })
# return {
# "successful_servers": successful,
# "failed_servers": failed,
# "data": compiled_data,
# "execution_time": raw_results.get("execution_time_seconds", 0),
# "completeness": f"{len(successful)}/{len(results_dict)} servers"
# }
"""
Response Compiler - Stage 3
Compiles MCP results with focus on ALERTING INFORMATION ONLY
"""
from typing import Dict, Any
from openai import OpenAI
import httpx
import os
class ResponseCompiler:
"""
Compiles MCP server results into alert-focused responses.
KEY PRINCIPLE: All MCPs are queried, but the compiler extracts ONLY
the alerting, concerning, or actionable information. Normal/good status
is minimized or omitted entirely.
"""
def __init__(self):
"""Initialize compiler with OpenAI client"""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable not set")
# Create custom httpx client without proxies parameter (compatibility fix)
http_client = httpx.Client(
timeout=httpx.Timeout(60.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20)
)
self.client = OpenAI(api_key=api_key, http_client=http_client)
def compile_alert_summary(
self,
mcp_results: Dict[str, Any],
location: Dict[str, float],
location_name: str = ""
) -> str:
"""
Compile MCP results into alert summary focusing ONLY on concerning information.
This is where the intelligence lives - not in routing. All MCP servers are
queried, but we extract only what farmers need to act on.
Args:
mcp_results: Results from all MCP servers
location: Dict with 'latitude' and 'longitude' keys
location_name: Optional human-readable location name
Returns:
Alert summary highlighting only actionable concerns
"""
# Build comprehensive context from all MCP data
context_parts = []
for server_name, result in mcp_results.items():
if result["status"] == "success" and result["data"]:
context_parts.append(f"=== {server_name.upper()} DATA ===\n{result['data']}")
if not context_parts:
return "Unable to generate alert summary - no data available from MCP servers."
full_context = "\n\n".join(context_parts)
location_str = f"{location_name} ({location['latitude']:.4f}°N, {location['longitude']:.4f}°E)"
# THE KEY PROMPT: Extract only alerting information
prompt = f"""You are an agricultural alert analyst. Your task is to analyze comprehensive agricultural data and extract ONLY the alerting, concerning, or time-sensitive information.
LOCATION: {location_str}
COMPREHENSIVE DATA FROM ALL MONITORING SYSTEMS:
{full_context}
YOUR TASK:
Generate a concise ALERT SUMMARY that includes ONLY:
1. **CRITICAL ALERTS** - Immediate threats requiring urgent action:
- Extreme weather conditions (heat waves, storms, frost)
- Active pest/disease outbreaks
- Severe water scarcity or excess
- Soil contamination or extreme deficiencies
2. **IMPORTANT WARNINGS** - Developing issues requiring attention:
- Concerning trends (declining water table, degrading soil)
- Moderate pest pressure building up
- Suboptimal weather patterns affecting crops
- Nutrient imbalances needing correction
3. **ACTIONABLE RECOMMENDATIONS** - What farmers should do:
- Specific actions with timing
- Preventive measures
- Mitigation strategies
CRITICAL RULES:
- OMIT all normal/good status information unless it provides important context
- If weather is normal → DON'T mention it or say "Weather: Normal" briefly
- If soil is healthy → SKIP or say "Soil: No concerns" briefly
- If no pest activity → SKIP or say "Pests: No threats detected" briefly
- FOCUS on deviations from normal, risks, and time-sensitive items
- Use specific numbers/dates only when they convey urgency
- Maximum 400 words total
- If everything is fine, say so clearly upfront then provide brief context
Structure:
1. Status Line: "CRITICAL ALERTS DETECTED" or "NO CRITICAL ALERTS - FAVORABLE CONDITIONS"
2. Critical Alerts section (if any)
3. Important Warnings section (if any)
4. Recommended Actions (always include if alerts/warnings exist)
5. Add raw API output in JSON format at end for reference.
Be direct. Skip pleasantries. Farmers need to know what matters."""
try:
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are an expert agricultural alert analyst. Extract ONLY concerning, alerting, or actionable information. Omit normal status unless contextually necessary."
},
{"role": "user", "content": prompt}
],
temperature=0.2,
max_tokens=1000
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"⚠️ Compilation error: {e}")
return self._create_fallback_summary(mcp_results, location_str)
def compile_response(
self,
query: str,
mcp_results: Dict[str, Any],
location: Dict[str, float]
) -> str:
"""
Compile MCP results into a response for a specific query.
Args:
query: User's original query
mcp_results: Results from MCP servers
location: Dict with 'latitude' and 'longitude' keys
Returns:
Compiled response text focusing on query-relevant information
"""
# Format MCP results for context
context_parts = []
for server_name, result in mcp_results.items():
if result["status"] == "success" and result["data"]:
context_parts.append(f"{server_name.upper()}: {result['data']}")
context = "\n\n".join(context_parts)
prompt = f"""Answer this farmer's question using the provided data, focusing on actionable insights.
QUESTION: {query}
LOCATION: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E
AVAILABLE DATA:
{context}
Provide a focused answer that:
1. Directly addresses the question
2. Highlights any concerning information relevant to the query
3. Gives specific recommendations
4. Keeps explanations brief and practical
5. Omits irrelevant normal/good status information
6. Add Raw API Output from all MCP Servers at the end for reference.
Be conversational but professional. Skip unnecessary background unless it aids understanding."""
try:
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a knowledgeable agricultural advisor providing practical guidance to farmers."},
{"role": "user", "content": prompt}
],
temperature=0.5,
max_tokens=800
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"⚠️ Compilation error: {e}")
return f"Error compiling response: {str(e)}"
def _create_fallback_summary(self, mcp_results: Dict[str, Any], location_str: str) -> str:
"""Create basic fallback summary if LLM compilation fails"""
summary_parts = [f"Alert Summary for {location_str}\n\n"]
for server_name, result in mcp_results.items():
if result["status"] == "success" and result["data"]:
summary_parts.append(f"{server_name.upper()}:")
summary_parts.append(str(result["data"])[:300] + "...\n")
return "\n".join(summary_parts) |