ForesightSphere / src /database.py
syaikhipin's picture
Upload 25 files
0e66264 verified
import os
import psycopg2
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
import json
import uuid
import sys
sys.path.append('..')
from config import config
class DatabaseManager:
def __init__(self):
self.connection_string = config.DATABASE_URL
self.connection = None
self._connect()
def _connect(self):
"""Establish database connection with SSL support for Supabase"""
try:
# Parse the connection string and add SSL mode for Supabase
if "supabase.co" in self.connection_string:
# For Supabase, we need to add SSL mode
if "sslmode=" not in self.connection_string:
self.connection_string += "?sslmode=require"
self.connection = psycopg2.connect(
self.connection_string,
connect_timeout=10,
sslmode='require' if 'supabase.co' in self.connection_string else 'prefer'
)
self.connection.autocommit = True
print("βœ… Database connected successfully")
except Exception as e:
print(f"❌ Database connection failed: {e}")
self.connection = None
def test_connection(self) -> bool:
"""Test if database connection is active"""
try:
if self.connection:
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
cursor.close()
return result is not None
except Exception as e:
print(f"Database test failed: {e}")
# Try to reconnect
self._connect()
return self.connection is not None
def initialize_schema(self):
"""Initialize the Supabase database schema and insert sample data"""
if not self.connection:
print("❌ No database connection")
return
try:
cursor = self.connection.cursor()
# Create narratives table
cursor.execute("""
CREATE TABLE IF NOT EXISTS narratives (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
title VARCHAR(500),
first_detected TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
sources JSONB DEFAULT '[]',
spread_velocity DECIMAL DEFAULT 0.0,
risk_score INTEGER DEFAULT 0,
embedding TEXT,
metadata JSONB DEFAULT '{}',
status VARCHAR(50) DEFAULT 'active',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
""")
# Create predictions table
cursor.execute("""
CREATE TABLE IF NOT EXISTS predictions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
narrative_id UUID REFERENCES narratives(id) ON DELETE CASCADE,
scenario_description TEXT NOT NULL,
probability_score DECIMAL NOT NULL,
potential_reach BIGINT DEFAULT 0,
timeline_hours INTEGER DEFAULT 24,
confidence_level DECIMAL DEFAULT 0.0,
generated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
accuracy_score DECIMAL,
actual_outcome TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
""")
# Create impacts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS impacts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
narrative_id UUID REFERENCES narratives(id) ON DELETE CASCADE,
prediction_id UUID REFERENCES predictions(id) ON DELETE CASCADE,
actual_reach BIGINT DEFAULT 0,
predicted_reach BIGINT DEFAULT 0,
engagement_metrics JSONB DEFAULT '{}',
sentiment_shift DECIMAL DEFAULT 0.0,
geographic_spread JSONB DEFAULT '{}',
measured_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
impact_score DECIMAL DEFAULT 0.0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
""")
# Create monitoring_keywords table
cursor.execute("""
CREATE TABLE IF NOT EXISTS monitoring_keywords (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
keyword VARCHAR(255) NOT NULL,
alert_threshold DECIMAL DEFAULT 0.7,
category VARCHAR(100),
is_active BOOLEAN DEFAULT TRUE,
created_by VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_triggered TIMESTAMP WITH TIME ZONE
);
""")
# Create historical_patterns table
cursor.execute("""
CREATE TABLE IF NOT EXISTS historical_patterns (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pattern_name VARCHAR(500) NOT NULL,
description TEXT,
campaign_data JSONB NOT NULL,
pattern_features JSONB DEFAULT '{}',
effectiveness_score DECIMAL DEFAULT 0.0,
date_range JSONB,
geographic_data JSONB DEFAULT '{}',
archived_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
tags JSONB DEFAULT '[]',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
""")
# Create alerts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
narrative_id UUID REFERENCES narratives(id) ON DELETE CASCADE,
alert_level VARCHAR(20) NOT NULL,
title VARCHAR(500) NOT NULL,
description TEXT,
triggered_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
acknowledged_at TIMESTAMP WITH TIME ZONE,
resolved_at TIMESTAMP WITH TIME ZONE,
status VARCHAR(50) DEFAULT 'active',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
""")
# Create counter_narratives table
cursor.execute("""
CREATE TABLE IF NOT EXISTS counter_narratives (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
narrative_id UUID REFERENCES narratives(id) ON DELETE CASCADE,
fact_based_response TEXT NOT NULL,
educational_content TEXT,
trusted_sources JSONB DEFAULT '[]',
shareable_content TEXT,
effectiveness_rating DECIMAL,
generated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
""")
# Create indexes for better performance
cursor.execute("CREATE INDEX IF NOT EXISTS idx_narratives_risk_score ON narratives(risk_score DESC);")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_narratives_first_detected ON narratives(first_detected DESC);")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_predictions_probability ON predictions(probability_score DESC);")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_alerts_status ON alerts(status, triggered_at DESC);")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_monitoring_keywords_active ON monitoring_keywords(is_active, keyword);")
# Insert sample data if tables are empty
self._insert_sample_data(cursor)
cursor.close()
print("βœ… Database schema initialized successfully")
except Exception as e:
print(f"❌ Schema initialization failed: {e}")
def _insert_sample_data(self, cursor):
"""Insert sample data for demo purposes"""
try:
# Check if we already have data
cursor.execute("SELECT COUNT(*) FROM narratives")
count = cursor.fetchone()[0]
if count == 0:
print("πŸ“Š Inserting sample data for demo...")
# Insert sample narratives
sample_narratives = [
("Election Security Misinformation Campaign", "Widespread claims about voting machine vulnerabilities without evidence, targeting swing states during election period.", 95, 15.8),
("Health Misinformation About New Treatment", "False claims about side effects of approved medical treatment, causing public health concerns.", 87, 12.3),
("Economic Conspiracy Theory Spread", "Unfounded claims about economic manipulation targeting specific demographics during market volatility.", 78, 9.7),
("Climate Change Denial Campaign", "Coordinated effort to spread debunked climate science claims ahead of environmental policy announcement.", 82, 11.2),
("Technology Fear-Mongering", "Exaggerated claims about privacy risks in new technology platform, causing unnecessary panic.", 73, 8.4)
]
for title, content, risk_score, velocity in sample_narratives:
cursor.execute("""
INSERT INTO narratives (title, content, risk_score, spread_velocity, sources, metadata)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
title,
content,
risk_score,
velocity,
json.dumps(["social_media", "alternative_news", "forums"]),
json.dumps({"demo": True, "category": "sample"})
))
print("βœ… Sample data inserted successfully")
except Exception as e:
print(f"⚠️ Sample data insertion failed: {e}")
# Not critical, continue anyway
def insert_narrative(self, content: str, title: str, sources: List[str],
risk_score: int, metadata: Dict = None) -> str:
"""Insert a new narrative into the database"""
try:
cursor = self.connection.cursor()
narrative_id = str(uuid.uuid4())
cursor.execute("""
INSERT INTO narratives (id, content, title, sources, risk_score, metadata)
VALUES (%s, %s, %s, %s, %s, %s)
""", (narrative_id, content, title, json.dumps(sources), risk_score, json.dumps(metadata or {})))
cursor.close()
return narrative_id
except Exception as e:
print(f"❌ Failed to insert narrative: {e}")
return None
def get_active_narratives(self, limit: int = 50) -> List[Dict]:
"""Get active narratives from the database"""
try:
if not self.connection:
print("❌ No database connection available")
return []
cursor = self.connection.cursor()
cursor.execute("""
SELECT id, content, title, first_detected, sources, spread_velocity,
risk_score, metadata, status
FROM narratives
WHERE status = 'active'
ORDER BY first_detected DESC
LIMIT %s
""", (limit,))
results = []
for row in cursor.fetchall():
results.append({
'id': str(row[0]),
'content': row[1],
'title': row[2],
'first_detected': row[3],
'sources': row[4],
'spread_velocity': float(row[5]) if row[5] else 0.0,
'risk_score': row[6],
'metadata': row[7],
'status': row[8]
})
cursor.close()
return results
except Exception as e:
print(f"❌ Failed to get narratives: {e}")
return []
def insert_prediction(self, narrative_id: str, scenario: str, probability: float,
potential_reach: int, timeline_hours: int = 24) -> str:
"""Insert a prediction for a narrative"""
try:
cursor = self.connection.cursor()
prediction_id = str(uuid.uuid4())
cursor.execute("""
INSERT INTO predictions (id, narrative_id, scenario_description,
probability_score, potential_reach, timeline_hours)
VALUES (%s, %s, %s, %s, %s, %s)
""", (prediction_id, narrative_id, scenario, probability, potential_reach, timeline_hours))
cursor.close()
return prediction_id
except Exception as e:
print(f"❌ Failed to insert prediction: {e}")
return None
def get_high_risk_narratives(self, threshold: int = 70) -> List[Dict]:
"""Get narratives with high risk scores"""
try:
if not self.connection:
print("❌ No database connection available")
return []
cursor = self.connection.cursor()
cursor.execute("""
SELECT id, content, title, risk_score, first_detected
FROM narratives
WHERE risk_score >= %s AND status = 'active'
ORDER BY risk_score DESC
""", (threshold,))
results = []
for row in cursor.fetchall():
results.append({
'id': str(row[0]),
'content': row[1],
'title': row[2],
'risk_score': row[3],
'first_detected': row[4]
})
cursor.close()
return results
except Exception as e:
print(f"❌ Failed to get high-risk narratives: {e}")
return []
def close(self):
"""Close database connection"""
if self.connection:
self.connection.close()