rag-api-node-1 / src /infrastructure /adapters /clickhouse_adapter.py
Peterase's picture
feat(rag): implement hybrid search with live sources and production-grade intent classification
a63c61f
import logging
from typing import Dict, Any, List, Optional
import clickhouse_connect
from src.core.config import settings
from src.core.ports.analytics_db_port import AnalyticsDbPort
logger = logging.getLogger(__name__)
class ClickHouseAdapter(AnalyticsDbPort):
def __init__(self):
try:
self.client = clickhouse_connect.get_client(
host=settings.CLICKHOUSE_HOST,
port=settings.CLICKHOUSE_PORT,
username=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
database=settings.CLICKHOUSE_DB,
secure=settings.CLICKHOUSE_SECURE
)
logger.info("Connected to ClickHouse")
except Exception as e:
logger.error(f"Failed to connect to ClickHouse: {e}")
self.client = None
def query(self, sql: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if not self.client:
return {"columns": [], "rows": []}
try:
result = self.client.query(sql, parameters)
return {
"columns": list(result.column_names),
"rows": result.result_rows
}
except Exception as e:
logger.error(f"Query error: {e}")
return {"columns": [], "rows": [], "error": str(e)}
def get_trends(self, days: int = 3, limit: int = 10) -> Dict[str, Any]:
sql = f"""
SELECT
JSONExtractString(entity, 'name') as entity_name,
avg(sentiment_score) as avg_sentiment,
count(*) as mention_count
FROM sentiment_results
ARRAY JOIN entities AS entity
WHERE published_at >= now() - INTERVAL {days} DAY
GROUP BY entity_name
ORDER BY mention_count DESC
LIMIT {limit}
"""
return self.query(sql)