Spaces:
Sleeping
Sleeping
| import aiohttp | |
| import re | |
| from typing import Dict, Any, List | |
| from src.utils.config import config | |
| from src.utils.logger import get_logger | |
| logger = get_logger(__name__) | |
| class AIRAAIntegration: | |
| def __init__(self): | |
| self.webhook_url = config.AIRAA_WEBHOOK_URL | |
| self.api_key = config.AIRAA_API_KEY | |
| self.enabled = bool(self.webhook_url) | |
| async def send_research_data(self, research_result: Dict[str, Any]) -> bool: | |
| if not self.enabled: | |
| return False | |
| try: | |
| payload = self._format_for_airaa(research_result) | |
| headers = {"Content-Type": "application/json"} | |
| if self.api_key: | |
| headers["Authorization"] = f"Bearer {self.api_key}" | |
| timeout = aiohttp.ClientTimeout(total=30) | |
| async with aiohttp.ClientSession(timeout=timeout) as session: | |
| async with session.post( | |
| self.webhook_url, json=payload, headers=headers | |
| ) as response: | |
| success = response.status == 200 | |
| if success: | |
| logger.info("Data sent to AIRAA successfully") | |
| else: | |
| logger.warning(f"AIRAA webhook returned {response.status}") | |
| return success | |
| except Exception as e: | |
| logger.error(f"AIRAA integration failed: {e}") | |
| return False | |
| def _format_for_airaa(self, result: Dict[str, Any]) -> Dict[str, Any]: | |
| return { | |
| "source": "web3-research-copilot", | |
| "timestamp": result["metadata"]["timestamp"], | |
| "query": result["query"], | |
| "research_plan": result.get("research_plan", {}), | |
| "findings": result["result"], | |
| "data_sources": result["sources"], | |
| "confidence_score": self._calculate_confidence(result), | |
| "tags": self._extract_tags(result["query"]), | |
| "structured_data": self._extract_structured_data(result["result"]) | |
| } | |
| def _calculate_confidence(self, result: Dict[str, Any]) -> float: | |
| base_score = 0.7 | |
| source_boost = min(len(result.get("sources", [])) * 0.1, 0.3) | |
| error_penalty = 0.3 if not result.get("success", True) else 0 | |
| return max(0.0, min(1.0, base_score + source_boost - error_penalty)) | |
| def _extract_tags(self, query: str) -> List[str]: | |
| tags = [] | |
| query_lower = query.lower() | |
| token_patterns = { | |
| "bitcoin": ["bitcoin", "btc"], | |
| "ethereum": ["ethereum", "eth"], | |
| "defi": ["defi", "defillama", "protocol", "tvl"], | |
| "market-analysis": ["price", "market", "analysis"], | |
| "trading-volume": ["volume", "trading"] | |
| } | |
| for tag, patterns in token_patterns.items(): | |
| if any(pattern in query_lower for pattern in patterns): | |
| tags.append(tag) | |
| return tags | |
| def _extract_structured_data(self, result_text: str) -> Dict[str, Any]: | |
| structured = {} | |
| price_pattern = r'\$([0-9,]+\.?[0-9]*)' | |
| percentage_pattern = r'([+-]?[0-9]+\.?[0-9]*)%' | |
| prices = re.findall(price_pattern, result_text) | |
| percentages = re.findall(percentage_pattern, result_text) | |
| if prices: | |
| structured["prices"] = [float(p.replace(',', '')) for p in prices[:5]] | |
| if percentages: | |
| structured["percentages"] = [float(p) for p in percentages[:5]] | |
| return structured | |