Spaces:
Running
Running
| import logging | |
| from typing import Dict, Any, List, Optional | |
| import clickhouse_connect | |
| from src.core.config import settings | |
| from src.core.ports.analytics_db_port import AnalyticsDbPort | |
| logger = logging.getLogger(__name__) | |
| class ClickHouseAdapter(AnalyticsDbPort): | |
| def __init__(self): | |
| try: | |
| self.client = clickhouse_connect.get_client( | |
| host=settings.CLICKHOUSE_HOST, | |
| port=settings.CLICKHOUSE_PORT, | |
| username=settings.CLICKHOUSE_USER, | |
| password=settings.CLICKHOUSE_PASSWORD, | |
| database=settings.CLICKHOUSE_DB, | |
| secure=settings.CLICKHOUSE_SECURE | |
| ) | |
| logger.info("Connected to ClickHouse") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to ClickHouse: {e}") | |
| self.client = None | |
| def query(self, sql: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| if not self.client: | |
| return {"columns": [], "rows": []} | |
| try: | |
| result = self.client.query(sql, parameters) | |
| return { | |
| "columns": list(result.column_names), | |
| "rows": result.result_rows | |
| } | |
| except Exception as e: | |
| logger.error(f"Query error: {e}") | |
| return {"columns": [], "rows": [], "error": str(e)} | |
| def get_trends(self, days: int = 3, limit: int = 10) -> Dict[str, Any]: | |
| sql = f""" | |
| SELECT | |
| JSONExtractString(entity, 'name') as entity_name, | |
| avg(sentiment_score) as avg_sentiment, | |
| count(*) as mention_count | |
| FROM sentiment_results | |
| ARRAY JOIN entities AS entity | |
| WHERE published_at >= now() - INTERVAL {days} DAY | |
| GROUP BY entity_name | |
| ORDER BY mention_count DESC | |
| LIMIT {limit} | |
| """ | |
| return self.query(sql) | |