Spaces:
Running
Running
| from fastapi import APIRouter, Depends | |
| from src.core.use_cases.analytics_use_case import AnalyticsUseCase | |
| from src.core.ports.vector_store_port import VectorStorePort | |
| from src.api.dependencies import get_analytics_use_case, get_vector_store_port | |
| router = APIRouter() | |
| def get_sentiment(analytics_use_case: AnalyticsUseCase = Depends(get_analytics_use_case)): | |
| query = """ | |
| SELECT | |
| entity, | |
| avg(sentiment_score) as avg_sentiment, | |
| count() as mention_count | |
| FROM sentiment_results | |
| GROUP BY entity | |
| ORDER BY mention_count DESC | |
| LIMIT 10 | |
| """ | |
| results = analytics_use_case.execute_raw_query(query) | |
| if not results or "error" in results: | |
| return {"error": "Could not fetch sentiment."} | |
| data = [] | |
| for row in results.get("rows", []): | |
| data.append({ | |
| "entity": row[0], | |
| "avg_sentiment": float(row[1]), | |
| "mention_count": int(row[2]) | |
| }) | |
| return {"data": data} | |
| def get_trends( | |
| days: int = 7, | |
| analytics_use_case: AnalyticsUseCase = Depends(get_analytics_use_case) | |
| ): | |
| results = analytics_use_case.get_trends(days) | |
| if not results or "error" in results: | |
| return {"error": "Could not fetch trends."} | |
| data = [] | |
| for row in results.get("rows", []): | |
| data.append({ | |
| "topic": row[0], | |
| "momentum": float(row[1]), | |
| "volume": int(row[2]) | |
| }) | |
| return {"data": data} | |
| def get_articles_over_time( | |
| days: int = 30, | |
| analytics_use_case: AnalyticsUseCase = Depends(get_analytics_use_case) | |
| ): | |
| query = f""" | |
| SELECT | |
| toDate(scraped_at) as date, | |
| count() as article_count | |
| FROM sentiment_results | |
| GROUP BY date | |
| ORDER BY date ASC | |
| """ | |
| results = analytics_use_case.execute_raw_query(query) | |
| if not results or "error" in results: | |
| return {"error": "Could not fetch articles over time."} | |
| data = [] | |
| for row in results.get("rows", []): | |
| data.append({ | |
| "date": str(row[0]), | |
| "count": int(row[1]) | |
| }) | |
| return {"data": data} | |
| def get_source_stats(analytics_use_case: AnalyticsUseCase = Depends(get_analytics_use_case)): | |
| query = """ | |
| SELECT | |
| source, | |
| count() as article_count, | |
| avg(sentiment_score) as avg_sentiment | |
| FROM sentiment_results | |
| GROUP BY source | |
| ORDER BY article_count DESC | |
| """ | |
| results = analytics_use_case.execute_raw_query(query) | |
| if not results or "error" in results: | |
| return {"error": "Could not fetch source stats."} | |
| data = [] | |
| for row in results.get("rows", []): | |
| data.append({ | |
| "source": row[0], | |
| "article_count": int(row[1]), | |
| "avg_sentiment": float(row[2]) | |
| }) | |
| return {"data": data} | |
| def get_pipeline_stats( | |
| analytics_use_case: AnalyticsUseCase = Depends(get_analytics_use_case), | |
| vector_store: VectorStorePort = Depends(get_vector_store_port) | |
| ): | |
| qdrant_stats = vector_store.get_collection_stats() | |
| query = "SELECT count() FROM sentiment_results" | |
| ch_res = analytics_use_case.execute_raw_query(query) | |
| ch_count = 0 | |
| if ch_res and not "error" in ch_res and ch_res.get("rows"): | |
| ch_count = int(ch_res["rows"][0][0]) | |
| return { | |
| "total_articles_in_vector_db": qdrant_stats.get("vectors_count", 0) if qdrant_stats else 0, | |
| "total_sentiment_results": ch_count | |
| } | |