File size: 1,897 Bytes
a63c61f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import logging
from typing import Dict, Any, List, Optional
import clickhouse_connect
from src.core.config import settings
from src.core.ports.analytics_db_port import AnalyticsDbPort

logger = logging.getLogger(__name__)

class ClickHouseAdapter(AnalyticsDbPort):
    def __init__(self):
        try:
            self.client = clickhouse_connect.get_client(
                host=settings.CLICKHOUSE_HOST,
                port=settings.CLICKHOUSE_PORT,
                username=settings.CLICKHOUSE_USER,
                password=settings.CLICKHOUSE_PASSWORD,
                database=settings.CLICKHOUSE_DB,
                secure=settings.CLICKHOUSE_SECURE
            )

            logger.info("Connected to ClickHouse")
        except Exception as e:
            logger.error(f"Failed to connect to ClickHouse: {e}")
            self.client = None

    def query(self, sql: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        if not self.client:
            return {"columns": [], "rows": []}
            
        try:
            result = self.client.query(sql, parameters)
            return {
                "columns": list(result.column_names),
                "rows": result.result_rows
            }
        except Exception as e:
            logger.error(f"Query error: {e}")
            return {"columns": [], "rows": [], "error": str(e)}

    def get_trends(self, days: int = 3, limit: int = 10) -> Dict[str, Any]:
        sql = f"""
        SELECT 
            JSONExtractString(entity, 'name') as entity_name,
            avg(sentiment_score) as avg_sentiment,
            count(*) as mention_count
        FROM sentiment_results
        ARRAY JOIN entities AS entity
        WHERE published_at >= now() - INTERVAL {days} DAY
        GROUP BY entity_name
        ORDER BY mention_count DESC
        LIMIT {limit}
        """
        return self.query(sql)