|
|
"""API endpoints for advanced analytics.""" |
|
|
|
|
|
import logging |
|
|
from typing import List, Dict, Optional |
|
|
from fastapi import APIRouter, HTTPException |
|
|
from pydantic import BaseModel, Field, validator |
|
|
|
|
|
from analysis.predictive_intervals import ( |
|
|
calculate_predictive_interval, |
|
|
rank_by_predictive_interval, |
|
|
get_top_positive_by_interval, |
|
|
get_top_negative_by_interval |
|
|
) |
|
|
from analysis.category_analytics import CategoryAnalytics |
|
|
from analysis.thread_analysis import ThreadAnalyzer |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
router = APIRouter(prefix="/analytics", tags=["analytics"]) |
|
|
|
|
|
|
|
|
category_analytics: Optional[CategoryAnalytics] = None |
|
|
thread_analyzer: Optional[ThreadAnalyzer] = None |
|
|
|
|
|
|
|
|
def get_category_analytics() -> CategoryAnalytics: |
|
|
"""Get or create category analytics instance.""" |
|
|
global category_analytics |
|
|
if category_analytics is None: |
|
|
category_analytics = CategoryAnalytics() |
|
|
return category_analytics |
|
|
|
|
|
|
|
|
def get_thread_analyzer() -> ThreadAnalyzer: |
|
|
"""Get or create thread analyzer instance.""" |
|
|
global thread_analyzer |
|
|
if thread_analyzer is None: |
|
|
thread_analyzer = ThreadAnalyzer() |
|
|
return thread_analyzer |
|
|
|
|
|
|
|
|
|
|
|
class SentimentCounts(BaseModel): |
|
|
"""Sentiment counts for an item.""" |
|
|
|
|
|
id: str = Field(..., description="Item identifier") |
|
|
positive_count: int = Field(..., description="Number of positive comments", ge=0) |
|
|
negative_count: int = Field(..., description="Number of negative comments", ge=0) |
|
|
neutral_count: int = Field(0, description="Number of neutral comments", ge=0) |
|
|
|
|
|
|
|
|
class PredictiveIntervalRequest(BaseModel): |
|
|
"""Request model for predictive interval calculation.""" |
|
|
|
|
|
data: List[SentimentCounts] = Field(..., description="List of items with sentiment counts", min_items=1) |
|
|
confidence_level: float = Field(0.95, description="Confidence level", ge=0.90, le=0.99) |
|
|
|
|
|
class Config: |
|
|
json_schema_extra = { |
|
|
"example": { |
|
|
"data": [ |
|
|
{"id": "news_1", "positive_count": 80, "negative_count": 20, "neutral_count": 0}, |
|
|
{"id": "news_2", "positive_count": 1, "negative_count": 0, "neutral_count": 0} |
|
|
], |
|
|
"confidence_level": 0.95 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class PredictiveIntervalResponse(BaseModel): |
|
|
"""Response model for predictive interval calculation.""" |
|
|
|
|
|
ranked_data: List[Dict] = Field(..., description="Items ranked by predictive interval") |
|
|
top_positive: List[Dict] = Field(..., description="Top positive items") |
|
|
top_negative: List[Dict] = Field(..., description="Top negative items") |
|
|
|
|
|
|
|
|
class CategorySentimentRequest(BaseModel): |
|
|
"""Request model for category sentiment analysis.""" |
|
|
|
|
|
data: List[Dict[str, str]] = Field(..., description="List of items with category and text", min_items=1) |
|
|
|
|
|
class Config: |
|
|
json_schema_extra = { |
|
|
"example": { |
|
|
"data": [ |
|
|
{"category": "politics", "text": "Отличная новость!"}, |
|
|
{"category": "politics", "text": "Ужасная ситуация..."}, |
|
|
{"category": "economy", "text": "Нормально"} |
|
|
] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class CategorySentimentResponse(BaseModel): |
|
|
"""Response model for category sentiment analysis.""" |
|
|
|
|
|
category_stats: Dict[str, Dict] = Field(..., description="Statistics per category") |
|
|
top_positive_categories: List[Dict] = Field(..., description="Top positive categories") |
|
|
top_negative_categories: List[Dict] = Field(..., description="Top negative categories") |
|
|
|
|
|
|
|
|
class ThreadAnalysisRequest(BaseModel): |
|
|
"""Request model for thread analysis.""" |
|
|
|
|
|
data: List[Dict[str, str]] = Field(..., description="List of comments with news_id and text", min_items=1) |
|
|
|
|
|
class Config: |
|
|
json_schema_extra = { |
|
|
"example": { |
|
|
"data": [ |
|
|
{"news_id": "1", "text": "Отлично!"}, |
|
|
{"news_id": "1", "text": "Ужасно!"}, |
|
|
{"news_id": "2", "text": "Нормально"} |
|
|
] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class ThreadAnalysisResponse(BaseModel): |
|
|
"""Response model for thread analysis.""" |
|
|
|
|
|
thread_stats: List[Dict] = Field(..., description="Thread statistics per news item") |
|
|
correlation: Dict = Field(..., description="Correlation analysis results") |
|
|
|
|
|
|
|
|
|
|
|
@router.post("/predictive-intervals", response_model=PredictiveIntervalResponse) |
|
|
async def calculate_predictive_intervals(request: PredictiveIntervalRequest): |
|
|
""" |
|
|
Calculate predictive intervals for ranking items by positive sentiment. |
|
|
|
|
|
Uses Beta distribution to account for uncertainty when sample sizes are small. |
|
|
Useful for ranking news articles or categories by positive sentiment. |
|
|
|
|
|
Args: |
|
|
request: Request with sentiment counts and confidence level |
|
|
|
|
|
Returns: |
|
|
Ranked items with predictive intervals |
|
|
""" |
|
|
try: |
|
|
|
|
|
data = [ |
|
|
{ |
|
|
"id": item.id, |
|
|
"positive_count": item.positive_count, |
|
|
"negative_count": item.negative_count, |
|
|
"neutral_count": item.neutral_count |
|
|
} |
|
|
for item in request.data |
|
|
] |
|
|
|
|
|
|
|
|
ranked_data = rank_by_predictive_interval( |
|
|
data, |
|
|
confidence_level=request.confidence_level |
|
|
) |
|
|
|
|
|
|
|
|
top_positive = get_top_positive_by_interval( |
|
|
data, |
|
|
top_k=10, |
|
|
confidence_level=request.confidence_level |
|
|
) |
|
|
top_negative = get_top_negative_by_interval( |
|
|
data, |
|
|
top_k=10, |
|
|
confidence_level=request.confidence_level |
|
|
) |
|
|
|
|
|
return PredictiveIntervalResponse( |
|
|
ranked_data=ranked_data, |
|
|
top_positive=top_positive, |
|
|
top_negative=top_negative |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error calculating predictive intervals: {e}") |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Predictive interval calculation failed: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
@router.post("/category-sentiment", response_model=CategorySentimentResponse) |
|
|
async def analyze_category_sentiment(request: CategorySentimentRequest): |
|
|
""" |
|
|
Analyze sentiment distribution across categories. |
|
|
|
|
|
Calculates sentiment statistics for each category and ranks them |
|
|
using predictive intervals. |
|
|
|
|
|
Args: |
|
|
request: Request with category and text data |
|
|
|
|
|
Returns: |
|
|
Category sentiment statistics and rankings |
|
|
""" |
|
|
try: |
|
|
analytics = get_category_analytics() |
|
|
|
|
|
|
|
|
category_stats = analytics.analyze_category_sentiment( |
|
|
request.data, |
|
|
category_key="category", |
|
|
text_key="text" |
|
|
) |
|
|
|
|
|
|
|
|
top_positive = analytics.get_top_positive_categories( |
|
|
category_stats, |
|
|
top_k=10 |
|
|
) |
|
|
top_negative = analytics.get_top_negative_categories( |
|
|
category_stats, |
|
|
top_k=10 |
|
|
) |
|
|
|
|
|
return CategorySentimentResponse( |
|
|
category_stats=category_stats, |
|
|
top_positive_categories=top_positive, |
|
|
top_negative_categories=top_negative |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error analyzing category sentiment: {e}") |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Category sentiment analysis failed: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
@router.post("/thread-analysis", response_model=ThreadAnalysisResponse) |
|
|
async def analyze_thread_correlation(request: ThreadAnalysisRequest): |
|
|
""" |
|
|
Analyze correlation between thread length and sentiment temperature. |
|
|
|
|
|
Thread length is the number of comments under a news article. |
|
|
Temperature is the probability that a comment is negative. |
|
|
|
|
|
Args: |
|
|
request: Request with news_id and text data |
|
|
|
|
|
Returns: |
|
|
Thread statistics and correlation analysis |
|
|
""" |
|
|
try: |
|
|
analyzer = get_thread_analyzer() |
|
|
|
|
|
|
|
|
thread_lengths = analyzer.calculate_thread_lengths( |
|
|
request.data, |
|
|
news_id_key="news_id" |
|
|
) |
|
|
temperatures = analyzer.calculate_temperature( |
|
|
request.data, |
|
|
news_id_key="news_id", |
|
|
text_key="text" |
|
|
) |
|
|
|
|
|
|
|
|
correlation_results = analyzer.analyze_correlation( |
|
|
thread_lengths, |
|
|
temperatures |
|
|
) |
|
|
|
|
|
|
|
|
def convert_to_python(obj): |
|
|
"""Recursively convert numpy types to Python native types.""" |
|
|
import numpy as np |
|
|
if isinstance(obj, np.integer): |
|
|
return int(obj) |
|
|
elif isinstance(obj, np.floating): |
|
|
return float(obj) |
|
|
elif isinstance(obj, np.bool_): |
|
|
return bool(obj) |
|
|
elif isinstance(obj, dict): |
|
|
return {k: convert_to_python(v) for k, v in obj.items()} |
|
|
elif isinstance(obj, (list, tuple)): |
|
|
return [convert_to_python(item) for item in obj] |
|
|
return obj |
|
|
|
|
|
correlation_results = convert_to_python(correlation_results) |
|
|
|
|
|
|
|
|
common_ids = set(thread_lengths.keys()) & set(temperatures.keys()) |
|
|
thread_stats = [ |
|
|
{ |
|
|
"news_id": news_id, |
|
|
"thread_length": int(thread_lengths[news_id]), |
|
|
"temperature": float(temperatures[news_id]) |
|
|
} |
|
|
for news_id in common_ids |
|
|
] |
|
|
|
|
|
return ThreadAnalysisResponse( |
|
|
thread_stats=thread_stats, |
|
|
correlation=correlation_results |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error analyzing thread correlation: {e}") |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Thread analysis failed: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
@router.get("/health") |
|
|
async def analytics_health(): |
|
|
""" |
|
|
Health check for analytics service. |
|
|
|
|
|
Returns: |
|
|
Status of analytics components |
|
|
""" |
|
|
try: |
|
|
return { |
|
|
"status": "healthy", |
|
|
"category_analytics_loaded": category_analytics is not None, |
|
|
"thread_analyzer_loaded": thread_analyzer is not None |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error checking analytics health: {e}") |
|
|
return { |
|
|
"status": "unhealthy", |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|