import logging from typing import Dict, Any, List, Optional import clickhouse_connect from src.core.config import settings from src.core.ports.analytics_db_port import AnalyticsDbPort logger = logging.getLogger(__name__) class ClickHouseAdapter(AnalyticsDbPort): def __init__(self): try: self.client = clickhouse_connect.get_client( host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT, username=settings.CLICKHOUSE_USER, password=settings.CLICKHOUSE_PASSWORD, database=settings.CLICKHOUSE_DB, secure=settings.CLICKHOUSE_SECURE ) logger.info("Connected to ClickHouse") except Exception as e: logger.error(f"Failed to connect to ClickHouse: {e}") self.client = None def query(self, sql: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: if not self.client: return {"columns": [], "rows": []} try: result = self.client.query(sql, parameters) return { "columns": list(result.column_names), "rows": result.result_rows } except Exception as e: logger.error(f"Query error: {e}") return {"columns": [], "rows": [], "error": str(e)} def get_trends(self, days: int = 3, limit: int = 10) -> Dict[str, Any]: sql = f""" SELECT JSONExtractString(entity, 'name') as entity_name, avg(sentiment_score) as avg_sentiment, count(*) as mention_count FROM sentiment_results ARRAY JOIN entities AS entity WHERE published_at >= now() - INTERVAL {days} DAY GROUP BY entity_name ORDER BY mention_count DESC LIMIT {limit} """ return self.query(sql)