scam / scripts /init_database.py
Gankit12's picture
Upload 129 files
31f0e50 verified
Raw
History Blame Contribute Delete
6.76 kB
#!/usr/bin/env python
"""
Database Initialization Script.
Creates PostgreSQL schema and initializes database:
- conversations table
- messages table
- extracted_intelligence table
- Required indexes
Run this script after setting up PostgreSQL connection.
"""
import os
import sys
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import text
# SQL schema definition
SCHEMA_SQL = """
-- Create conversations table
CREATE TABLE IF NOT EXISTS conversations (
id SERIAL PRIMARY KEY,
session_id VARCHAR(255) UNIQUE NOT NULL,
language VARCHAR(10) NOT NULL,
persona VARCHAR(50),
scam_detected BOOLEAN DEFAULT FALSE,
confidence FLOAT,
turn_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create messages table
CREATE TABLE IF NOT EXISTS messages (
id SERIAL PRIMARY KEY,
conversation_id INTEGER REFERENCES conversations(id) ON DELETE CASCADE,
turn_number INTEGER NOT NULL,
sender VARCHAR(50) NOT NULL,
message TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create extracted_intelligence table
CREATE TABLE IF NOT EXISTS extracted_intelligence (
id SERIAL PRIMARY KEY,
conversation_id INTEGER REFERENCES conversations(id) ON DELETE CASCADE,
upi_ids TEXT[],
bank_accounts TEXT[],
ifsc_codes TEXT[],
phone_numbers TEXT[],
phishing_links TEXT[],
extraction_confidence FLOAT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_session_id ON conversations(session_id);
CREATE INDEX IF NOT EXISTS idx_conversation_id ON messages(conversation_id);
CREATE INDEX IF NOT EXISTS idx_created_at ON conversations(created_at);
CREATE INDEX IF NOT EXISTS idx_scam_detected ON conversations(scam_detected);
"""
def check_environment():
"""Check required environment variables."""
postgres_url = os.getenv("POSTGRES_URL")
if not postgres_url:
print("[WARNING] POSTGRES_URL environment variable not set")
print("Set it using: export POSTGRES_URL=postgresql://user:pass@host:5432/dbname")
return False
return True
def init_postgres():
"""Initialize PostgreSQL database with schema."""
print("Initializing PostgreSQL database...")
try:
from app.database.postgres import init_database, verify_schema, get_db_connection
# Initialize database schema
init_database()
print(" βœ“ Schema created successfully")
# Verify schema
if verify_schema():
print(" βœ“ Schema verification passed")
else:
print(" ⚠ Schema verification found issues")
# Test connection
conn = get_db_connection()
result = conn.execute(text("SELECT 1")).fetchone()
conn.close()
print(f" βœ“ Connection test: {result}")
except ConnectionError as e:
print(f" βœ— Connection failed: {e}")
print(" [INFO] Set POSTGRES_URL environment variable to enable database")
except Exception as e:
print(f" βœ— Initialization failed: {e}")
import traceback
traceback.print_exc()
def init_redis():
"""Initialize Redis connection and test."""
print("Testing Redis connection...")
redis_url = os.getenv("REDIS_URL")
if not redis_url:
print(" [WARNING] REDIS_URL not set, skipping Redis test")
return
try:
from app.database.redis_client import get_redis_client, health_check
# Test connection
client = get_redis_client()
client.ping()
print(" βœ“ Connection successful")
# Test read/write
test_key = "test:init"
test_value = "ok"
client.set(test_key, test_value, ex=10)
retrieved = client.get(test_key)
client.delete(test_key)
if retrieved == test_value:
print(" βœ“ Read/write test passed")
else:
print(f" ⚠ Read/write test failed: expected '{test_value}', got '{retrieved}'")
# Health check
if health_check():
print(" βœ“ Health check passed")
else:
print(" ⚠ Health check failed")
except ConnectionError as e:
print(f" βœ— Connection failed: {e}")
print(" [INFO] Set REDIS_URL environment variable to enable Redis")
except Exception as e:
print(f" βœ— Redis test failed: {e}")
import traceback
traceback.print_exc()
def verify_database():
"""Verify database tables exist."""
print("\nVerifying database setup...")
try:
from app.database.postgres import verify_schema, get_db_connection
from sqlalchemy import text
if verify_schema():
print(" βœ“ All tables and indexes exist")
# Additional verification: check table counts
conn = get_db_connection()
try:
tables = ['conversations', 'messages', 'extracted_intelligence']
for table in tables:
result = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).fetchone()
print(f" βœ“ {table}: {result[0]} rows")
finally:
conn.close()
return True
else:
print(" βœ— Schema verification failed")
return False
except ConnectionError:
print(" [INFO] Database not configured, skipping verification")
return False
except Exception as e:
print(f" βœ— Verification failed: {e}")
return False
def main():
"""Main entry point for database initialization."""
print("=" * 60)
print("ScamShield AI - Database Initialization")
print("=" * 60)
print()
# Check environment
if not check_environment():
print("\n[INFO] Running in stub mode - no actual database operations")
print()
# Initialize databases
init_postgres()
print()
init_redis()
print()
# Verify
if verify_database():
print("\n" + "=" * 60)
print("Database initialization complete!")
print("=" * 60)
print("\nSchema SQL for manual execution:")
print("-" * 40)
print(SCHEMA_SQL)
if __name__ == "__main__":
main()