Spaces:
Running
Running
File size: 1,897 Bytes
a63c61f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | import logging
from typing import Dict, Any, List, Optional
import clickhouse_connect
from src.core.config import settings
from src.core.ports.analytics_db_port import AnalyticsDbPort
logger = logging.getLogger(__name__)
class ClickHouseAdapter(AnalyticsDbPort):
def __init__(self):
try:
self.client = clickhouse_connect.get_client(
host=settings.CLICKHOUSE_HOST,
port=settings.CLICKHOUSE_PORT,
username=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
database=settings.CLICKHOUSE_DB,
secure=settings.CLICKHOUSE_SECURE
)
logger.info("Connected to ClickHouse")
except Exception as e:
logger.error(f"Failed to connect to ClickHouse: {e}")
self.client = None
def query(self, sql: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if not self.client:
return {"columns": [], "rows": []}
try:
result = self.client.query(sql, parameters)
return {
"columns": list(result.column_names),
"rows": result.result_rows
}
except Exception as e:
logger.error(f"Query error: {e}")
return {"columns": [], "rows": [], "error": str(e)}
def get_trends(self, days: int = 3, limit: int = 10) -> Dict[str, Any]:
sql = f"""
SELECT
JSONExtractString(entity, 'name') as entity_name,
avg(sentiment_score) as avg_sentiment,
count(*) as mention_count
FROM sentiment_results
ARRAY JOIN entities AS entity
WHERE published_at >= now() - INTERVAL {days} DAY
GROUP BY entity_name
ORDER BY mention_count DESC
LIMIT {limit}
"""
return self.query(sql)
|