Spaces:
Running
Running
| """ | |
| API classifier for categorizing APIs by domain and use case. | |
| """ | |
| import logging | |
| from typing import Dict, Any, List | |
| logger = logging.getLogger(__name__) | |
| # Domain classification keywords | |
| DOMAIN_KEYWORDS = { | |
| "market_data": ["stock", "crypto", "currency", "finance", "trading", "market", "exchange", "commodity"], | |
| "news": ["news", "article", "media", "press", "journalism", "headline"], | |
| "social": ["social", "twitter", "facebook", "reddit", "instagram", "community"], | |
| "government": ["government", "policy", "regulation", "law", "public", "civic", "open data"], | |
| "weather": ["weather", "climate", "forecast", "meteorology", "temperature"], | |
| "general": [], # Catch-all | |
| } | |
| def classify_api(api_entry: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Classify API by category and potential use cases. | |
| Args: | |
| api_entry: API entry from catalog with API, Description, Category, etc. | |
| Returns: | |
| Classification with: | |
| - domain: Detected domain (market_data, news, social, government, weather, general) | |
| - use_cases: List of potential use cases | |
| - relevance_score: Relevance to MiroOrg domains (0.0 - 1.0) | |
| """ | |
| name = api_entry.get("API", "").lower() | |
| description = api_entry.get("Description", "").lower() | |
| category = api_entry.get("Category", "").lower() | |
| combined_text = f"{name} {description} {category}" | |
| # Detect domain | |
| detected_domain = "general" | |
| max_matches = 0 | |
| for domain, keywords in DOMAIN_KEYWORDS.items(): | |
| if domain == "general": | |
| continue | |
| matches = sum(1 for keyword in keywords if keyword in combined_text) | |
| if matches > max_matches: | |
| max_matches = matches | |
| detected_domain = domain | |
| # Determine use cases | |
| use_cases = [] | |
| if detected_domain == "market_data": | |
| use_cases = ["financial_research", "market_analysis", "portfolio_tracking"] | |
| elif detected_domain == "news": | |
| use_cases = ["news_research", "sentiment_analysis", "event_detection"] | |
| elif detected_domain == "social": | |
| use_cases = ["social_listening", "sentiment_analysis", "trend_detection"] | |
| elif detected_domain == "government": | |
| use_cases = ["policy_research", "regulatory_tracking", "open_data_analysis"] | |
| elif detected_domain == "weather": | |
| use_cases = ["weather_forecasting", "climate_analysis"] | |
| else: | |
| use_cases = ["general_research"] | |
| # Calculate relevance score (higher for domains we care about) | |
| relevance_scores = { | |
| "market_data": 1.0, | |
| "news": 0.9, | |
| "government": 0.8, | |
| "social": 0.7, | |
| "weather": 0.5, | |
| "general": 0.3, | |
| } | |
| relevance_score = relevance_scores.get(detected_domain, 0.3) | |
| return { | |
| "domain": detected_domain, | |
| "use_cases": use_cases, | |
| "relevance_score": relevance_score, | |
| } | |
| def classify_multiple_apis(api_entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Classify multiple APIs and return enriched entries.""" | |
| results = [] | |
| for api in api_entries: | |
| classification = classify_api(api) | |
| enriched = {**api, **classification} | |
| results.append(enriched) | |
| return results | |