File size: 3,248 Bytes
24f95f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
API classifier for categorizing APIs by domain and use case.
"""

import logging
from typing import Dict, Any, List

logger = logging.getLogger(__name__)

# Domain classification keywords
DOMAIN_KEYWORDS = {
    "market_data": ["stock", "crypto", "currency", "finance", "trading", "market", "exchange", "commodity"],
    "news": ["news", "article", "media", "press", "journalism", "headline"],
    "social": ["social", "twitter", "facebook", "reddit", "instagram", "community"],
    "government": ["government", "policy", "regulation", "law", "public", "civic", "open data"],
    "weather": ["weather", "climate", "forecast", "meteorology", "temperature"],
    "general": [],  # Catch-all
}


def classify_api(api_entry: Dict[str, Any]) -> Dict[str, Any]:
    """
    Classify API by category and potential use cases.
    
    Args:
        api_entry: API entry from catalog with API, Description, Category, etc.
        
    Returns:
        Classification with:
        - domain: Detected domain (market_data, news, social, government, weather, general)
        - use_cases: List of potential use cases
        - relevance_score: Relevance to MiroOrg domains (0.0 - 1.0)
    """
    name = api_entry.get("API", "").lower()
    description = api_entry.get("Description", "").lower()
    category = api_entry.get("Category", "").lower()
    
    combined_text = f"{name} {description} {category}"
    
    # Detect domain
    detected_domain = "general"
    max_matches = 0
    
    for domain, keywords in DOMAIN_KEYWORDS.items():
        if domain == "general":
            continue
        matches = sum(1 for keyword in keywords if keyword in combined_text)
        if matches > max_matches:
            max_matches = matches
            detected_domain = domain
    
    # Determine use cases
    use_cases = []
    if detected_domain == "market_data":
        use_cases = ["financial_research", "market_analysis", "portfolio_tracking"]
    elif detected_domain == "news":
        use_cases = ["news_research", "sentiment_analysis", "event_detection"]
    elif detected_domain == "social":
        use_cases = ["social_listening", "sentiment_analysis", "trend_detection"]
    elif detected_domain == "government":
        use_cases = ["policy_research", "regulatory_tracking", "open_data_analysis"]
    elif detected_domain == "weather":
        use_cases = ["weather_forecasting", "climate_analysis"]
    else:
        use_cases = ["general_research"]
    
    # Calculate relevance score (higher for domains we care about)
    relevance_scores = {
        "market_data": 1.0,
        "news": 0.9,
        "government": 0.8,
        "social": 0.7,
        "weather": 0.5,
        "general": 0.3,
    }
    relevance_score = relevance_scores.get(detected_domain, 0.3)
    
    return {
        "domain": detected_domain,
        "use_cases": use_cases,
        "relevance_score": relevance_score,
    }


def classify_multiple_apis(api_entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Classify multiple APIs and return enriched entries."""
    results = []
    for api in api_entries:
        classification = classify_api(api)
        enriched = {**api, **classification}
        results.append(enriched)
    return results