rag-api-node-1 / src /api /routes /analytics.py
Peterase's picture
feat(rag): implement hybrid search with live sources and production-grade intent classification
a63c61f
from fastapi import APIRouter, Depends
from src.core.use_cases.analytics_use_case import AnalyticsUseCase
from src.core.ports.vector_store_port import VectorStorePort
from src.api.dependencies import get_analytics_use_case, get_vector_store_port
router = APIRouter()
@router.get("/sentiment")
def get_sentiment(analytics_use_case: AnalyticsUseCase = Depends(get_analytics_use_case)):
query = """
SELECT
entity,
avg(sentiment_score) as avg_sentiment,
count() as mention_count
FROM sentiment_results
GROUP BY entity
ORDER BY mention_count DESC
LIMIT 10
"""
results = analytics_use_case.execute_raw_query(query)
if not results or "error" in results:
return {"error": "Could not fetch sentiment."}
data = []
for row in results.get("rows", []):
data.append({
"entity": row[0],
"avg_sentiment": float(row[1]),
"mention_count": int(row[2])
})
return {"data": data}
@router.get("/trends")
def get_trends(
days: int = 7,
analytics_use_case: AnalyticsUseCase = Depends(get_analytics_use_case)
):
results = analytics_use_case.get_trends(days)
if not results or "error" in results:
return {"error": "Could not fetch trends."}
data = []
for row in results.get("rows", []):
data.append({
"topic": row[0],
"momentum": float(row[1]),
"volume": int(row[2])
})
return {"data": data}
@router.get("/articles-over-time")
def get_articles_over_time(
days: int = 30,
analytics_use_case: AnalyticsUseCase = Depends(get_analytics_use_case)
):
query = f"""
SELECT
toDate(scraped_at) as date,
count() as article_count
FROM sentiment_results
GROUP BY date
ORDER BY date ASC
"""
results = analytics_use_case.execute_raw_query(query)
if not results or "error" in results:
return {"error": "Could not fetch articles over time."}
data = []
for row in results.get("rows", []):
data.append({
"date": str(row[0]),
"count": int(row[1])
})
return {"data": data}
@router.get("/source-stats")
def get_source_stats(analytics_use_case: AnalyticsUseCase = Depends(get_analytics_use_case)):
query = """
SELECT
source,
count() as article_count,
avg(sentiment_score) as avg_sentiment
FROM sentiment_results
GROUP BY source
ORDER BY article_count DESC
"""
results = analytics_use_case.execute_raw_query(query)
if not results or "error" in results:
return {"error": "Could not fetch source stats."}
data = []
for row in results.get("rows", []):
data.append({
"source": row[0],
"article_count": int(row[1]),
"avg_sentiment": float(row[2])
})
return {"data": data}
@router.get("/pipeline-stats")
def get_pipeline_stats(
analytics_use_case: AnalyticsUseCase = Depends(get_analytics_use_case),
vector_store: VectorStorePort = Depends(get_vector_store_port)
):
qdrant_stats = vector_store.get_collection_stats()
query = "SELECT count() FROM sentiment_results"
ch_res = analytics_use_case.execute_raw_query(query)
ch_count = 0
if ch_res and not "error" in ch_res and ch_res.get("rows"):
ch_count = int(ch_res["rows"][0][0])
return {
"total_articles_in_vector_db": qdrant_stats.get("vectors_count", 0) if qdrant_stats else 0,
"total_sentiment_results": ch_count
}