Spaces:
Sleeping
Sleeping
| import os | |
| import psycopg2 | |
| from datetime import datetime, timezone | |
| from typing import Dict, List, Optional, Any | |
| import json | |
| import uuid | |
| import sys | |
| sys.path.append('..') | |
| from config import config | |
| class DatabaseManager: | |
| def __init__(self): | |
| self.connection_string = config.DATABASE_URL | |
| self.connection = None | |
| self._connect() | |
| def _connect(self): | |
| """Establish database connection with SSL support for Supabase""" | |
| try: | |
| # Parse the connection string and add SSL mode for Supabase | |
| if "supabase.co" in self.connection_string: | |
| # For Supabase, we need to add SSL mode | |
| if "sslmode=" not in self.connection_string: | |
| self.connection_string += "?sslmode=require" | |
| self.connection = psycopg2.connect( | |
| self.connection_string, | |
| connect_timeout=10, | |
| sslmode='require' if 'supabase.co' in self.connection_string else 'prefer' | |
| ) | |
| self.connection.autocommit = True | |
| print("β Database connected successfully") | |
| except Exception as e: | |
| print(f"β Database connection failed: {e}") | |
| self.connection = None | |
| def test_connection(self) -> bool: | |
| """Test if database connection is active""" | |
| try: | |
| if self.connection: | |
| cursor = self.connection.cursor() | |
| cursor.execute("SELECT 1") | |
| result = cursor.fetchone() | |
| cursor.close() | |
| return result is not None | |
| except Exception as e: | |
| print(f"Database test failed: {e}") | |
| # Try to reconnect | |
| self._connect() | |
| return self.connection is not None | |
| def initialize_schema(self): | |
| """Initialize the Supabase database schema and insert sample data""" | |
| if not self.connection: | |
| print("β No database connection") | |
| return | |
| try: | |
| cursor = self.connection.cursor() | |
| # Create narratives table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS narratives ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| content TEXT NOT NULL, | |
| title VARCHAR(500), | |
| first_detected TIMESTAMP WITH TIME ZONE DEFAULT NOW(), | |
| sources JSONB DEFAULT '[]', | |
| spread_velocity DECIMAL DEFAULT 0.0, | |
| risk_score INTEGER DEFAULT 0, | |
| embedding TEXT, | |
| metadata JSONB DEFAULT '{}', | |
| status VARCHAR(50) DEFAULT 'active', | |
| created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), | |
| updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() | |
| ); | |
| """) | |
| # Create predictions table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS predictions ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| narrative_id UUID REFERENCES narratives(id) ON DELETE CASCADE, | |
| scenario_description TEXT NOT NULL, | |
| probability_score DECIMAL NOT NULL, | |
| potential_reach BIGINT DEFAULT 0, | |
| timeline_hours INTEGER DEFAULT 24, | |
| confidence_level DECIMAL DEFAULT 0.0, | |
| generated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), | |
| accuracy_score DECIMAL, | |
| actual_outcome TEXT, | |
| created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() | |
| ); | |
| """) | |
| # Create impacts table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS impacts ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| narrative_id UUID REFERENCES narratives(id) ON DELETE CASCADE, | |
| prediction_id UUID REFERENCES predictions(id) ON DELETE CASCADE, | |
| actual_reach BIGINT DEFAULT 0, | |
| predicted_reach BIGINT DEFAULT 0, | |
| engagement_metrics JSONB DEFAULT '{}', | |
| sentiment_shift DECIMAL DEFAULT 0.0, | |
| geographic_spread JSONB DEFAULT '{}', | |
| measured_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), | |
| impact_score DECIMAL DEFAULT 0.0, | |
| created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() | |
| ); | |
| """) | |
| # Create monitoring_keywords table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS monitoring_keywords ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| keyword VARCHAR(255) NOT NULL, | |
| alert_threshold DECIMAL DEFAULT 0.7, | |
| category VARCHAR(100), | |
| is_active BOOLEAN DEFAULT TRUE, | |
| created_by VARCHAR(255), | |
| created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), | |
| last_triggered TIMESTAMP WITH TIME ZONE | |
| ); | |
| """) | |
| # Create historical_patterns table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS historical_patterns ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| pattern_name VARCHAR(500) NOT NULL, | |
| description TEXT, | |
| campaign_data JSONB NOT NULL, | |
| pattern_features JSONB DEFAULT '{}', | |
| effectiveness_score DECIMAL DEFAULT 0.0, | |
| date_range JSONB, | |
| geographic_data JSONB DEFAULT '{}', | |
| archived_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), | |
| tags JSONB DEFAULT '[]', | |
| created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() | |
| ); | |
| """) | |
| # Create alerts table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS alerts ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| narrative_id UUID REFERENCES narratives(id) ON DELETE CASCADE, | |
| alert_level VARCHAR(20) NOT NULL, | |
| title VARCHAR(500) NOT NULL, | |
| description TEXT, | |
| triggered_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), | |
| acknowledged_at TIMESTAMP WITH TIME ZONE, | |
| resolved_at TIMESTAMP WITH TIME ZONE, | |
| status VARCHAR(50) DEFAULT 'active', | |
| metadata JSONB DEFAULT '{}', | |
| created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() | |
| ); | |
| """) | |
| # Create counter_narratives table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS counter_narratives ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| narrative_id UUID REFERENCES narratives(id) ON DELETE CASCADE, | |
| fact_based_response TEXT NOT NULL, | |
| educational_content TEXT, | |
| trusted_sources JSONB DEFAULT '[]', | |
| shareable_content TEXT, | |
| effectiveness_rating DECIMAL, | |
| generated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), | |
| created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() | |
| ); | |
| """) | |
| # Create indexes for better performance | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_narratives_risk_score ON narratives(risk_score DESC);") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_narratives_first_detected ON narratives(first_detected DESC);") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_predictions_probability ON predictions(probability_score DESC);") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_alerts_status ON alerts(status, triggered_at DESC);") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_monitoring_keywords_active ON monitoring_keywords(is_active, keyword);") | |
| # Insert sample data if tables are empty | |
| self._insert_sample_data(cursor) | |
| cursor.close() | |
| print("β Database schema initialized successfully") | |
| except Exception as e: | |
| print(f"β Schema initialization failed: {e}") | |
| def _insert_sample_data(self, cursor): | |
| """Insert sample data for demo purposes""" | |
| try: | |
| # Check if we already have data | |
| cursor.execute("SELECT COUNT(*) FROM narratives") | |
| count = cursor.fetchone()[0] | |
| if count == 0: | |
| print("π Inserting sample data for demo...") | |
| # Insert sample narratives | |
| sample_narratives = [ | |
| ("Election Security Misinformation Campaign", "Widespread claims about voting machine vulnerabilities without evidence, targeting swing states during election period.", 95, 15.8), | |
| ("Health Misinformation About New Treatment", "False claims about side effects of approved medical treatment, causing public health concerns.", 87, 12.3), | |
| ("Economic Conspiracy Theory Spread", "Unfounded claims about economic manipulation targeting specific demographics during market volatility.", 78, 9.7), | |
| ("Climate Change Denial Campaign", "Coordinated effort to spread debunked climate science claims ahead of environmental policy announcement.", 82, 11.2), | |
| ("Technology Fear-Mongering", "Exaggerated claims about privacy risks in new technology platform, causing unnecessary panic.", 73, 8.4) | |
| ] | |
| for title, content, risk_score, velocity in sample_narratives: | |
| cursor.execute(""" | |
| INSERT INTO narratives (title, content, risk_score, spread_velocity, sources, metadata) | |
| VALUES (%s, %s, %s, %s, %s, %s) | |
| """, ( | |
| title, | |
| content, | |
| risk_score, | |
| velocity, | |
| json.dumps(["social_media", "alternative_news", "forums"]), | |
| json.dumps({"demo": True, "category": "sample"}) | |
| )) | |
| print("β Sample data inserted successfully") | |
| except Exception as e: | |
| print(f"β οΈ Sample data insertion failed: {e}") | |
| # Not critical, continue anyway | |
| def insert_narrative(self, content: str, title: str, sources: List[str], | |
| risk_score: int, metadata: Dict = None) -> str: | |
| """Insert a new narrative into the database""" | |
| try: | |
| cursor = self.connection.cursor() | |
| narrative_id = str(uuid.uuid4()) | |
| cursor.execute(""" | |
| INSERT INTO narratives (id, content, title, sources, risk_score, metadata) | |
| VALUES (%s, %s, %s, %s, %s, %s) | |
| """, (narrative_id, content, title, json.dumps(sources), risk_score, json.dumps(metadata or {}))) | |
| cursor.close() | |
| return narrative_id | |
| except Exception as e: | |
| print(f"β Failed to insert narrative: {e}") | |
| return None | |
| def get_active_narratives(self, limit: int = 50) -> List[Dict]: | |
| """Get active narratives from the database""" | |
| try: | |
| if not self.connection: | |
| print("β No database connection available") | |
| return [] | |
| cursor = self.connection.cursor() | |
| cursor.execute(""" | |
| SELECT id, content, title, first_detected, sources, spread_velocity, | |
| risk_score, metadata, status | |
| FROM narratives | |
| WHERE status = 'active' | |
| ORDER BY first_detected DESC | |
| LIMIT %s | |
| """, (limit,)) | |
| results = [] | |
| for row in cursor.fetchall(): | |
| results.append({ | |
| 'id': str(row[0]), | |
| 'content': row[1], | |
| 'title': row[2], | |
| 'first_detected': row[3], | |
| 'sources': row[4], | |
| 'spread_velocity': float(row[5]) if row[5] else 0.0, | |
| 'risk_score': row[6], | |
| 'metadata': row[7], | |
| 'status': row[8] | |
| }) | |
| cursor.close() | |
| return results | |
| except Exception as e: | |
| print(f"β Failed to get narratives: {e}") | |
| return [] | |
| def insert_prediction(self, narrative_id: str, scenario: str, probability: float, | |
| potential_reach: int, timeline_hours: int = 24) -> str: | |
| """Insert a prediction for a narrative""" | |
| try: | |
| cursor = self.connection.cursor() | |
| prediction_id = str(uuid.uuid4()) | |
| cursor.execute(""" | |
| INSERT INTO predictions (id, narrative_id, scenario_description, | |
| probability_score, potential_reach, timeline_hours) | |
| VALUES (%s, %s, %s, %s, %s, %s) | |
| """, (prediction_id, narrative_id, scenario, probability, potential_reach, timeline_hours)) | |
| cursor.close() | |
| return prediction_id | |
| except Exception as e: | |
| print(f"β Failed to insert prediction: {e}") | |
| return None | |
| def get_high_risk_narratives(self, threshold: int = 70) -> List[Dict]: | |
| """Get narratives with high risk scores""" | |
| try: | |
| if not self.connection: | |
| print("β No database connection available") | |
| return [] | |
| cursor = self.connection.cursor() | |
| cursor.execute(""" | |
| SELECT id, content, title, risk_score, first_detected | |
| FROM narratives | |
| WHERE risk_score >= %s AND status = 'active' | |
| ORDER BY risk_score DESC | |
| """, (threshold,)) | |
| results = [] | |
| for row in cursor.fetchall(): | |
| results.append({ | |
| 'id': str(row[0]), | |
| 'content': row[1], | |
| 'title': row[2], | |
| 'risk_score': row[3], | |
| 'first_detected': row[4] | |
| }) | |
| cursor.close() | |
| return results | |
| except Exception as e: | |
| print(f"β Failed to get high-risk narratives: {e}") | |
| return [] | |
| def close(self): | |
| """Close database connection""" | |
| if self.connection: | |
| self.connection.close() |