import os import psycopg2 from psycopg2.extras import RealDictCursor, Json import pandas as pd from typing import List, Dict, Optional, Tuple import logging from datetime import datetime import json # Handle dotenv import gracefully for deployment environments try: from dotenv import load_dotenv load_dotenv() except ImportError: pass logger = logging.getLogger(__name__) class PostgreSQLDatabase: """ PostgreSQL database manager for the automated task manager. Handles storing parsed emails and validated tasks. Optimized for Neon PostgreSQL cloud platform. """ def __init__(self, host: str = None, port: int = None, database: str = None, username: str = None, password: str = None, connection_string: str = None): # Support both individual parameters and Neon connection string self.connection_string = connection_string or os.getenv("DATABASE_URL") if self.connection_string: # Use Neon connection string (preferred method) self.host = None self.port = None self.database = None self.username = None self.password = None else: # Fallback to individual parameters self.host = host or os.getenv("POSTGRES_HOST", "localhost") self.port = port or int(os.getenv("POSTGRES_PORT", "5432")) self.database = database or os.getenv("POSTGRES_DB", "task_manager") self.username = username or os.getenv("POSTGRES_USER", "postgres") self.password = password or os.getenv("POSTGRES_PASSWORD", "password") self.connection = None def connect(self) -> bool: """Establish connection to PostgreSQL database (Neon optimized).""" try: if self.connection_string: # Use Neon connection string (includes SSL and pooling) self.connection = psycopg2.connect( self.connection_string, cursor_factory=RealDictCursor, sslmode='require' # Neon requires SSL ) logger.info("✅ Connected to Neon PostgreSQL database") else: # Fallback to individual parameters self.connection = psycopg2.connect( host=self.host, port=self.port, database=self.database, user=self.username, password=self.password, cursor_factory=RealDictCursor, sslmode='prefer' ) logger.info("✅ Connected to PostgreSQL database") self.connection.autocommit = True return True except Exception as e: logger.error(f"❌ Failed to connect to PostgreSQL: {e}") return False def close(self): """Close PostgreSQL connection.""" if self.connection: self.connection.close() self.connection = None def create_tables(self) -> bool: """Create the necessary tables if they don't exist.""" if not self.connection: if not self.connect(): return False try: with self.connection.cursor() as cursor: # Create parsed_email table cursor.execute(""" CREATE TABLE IF NOT EXISTS parsed_email ( id SERIAL PRIMARY KEY, message_id VARCHAR(255) UNIQUE NOT NULL, date_received TIMESTAMP WITH TIME ZONE NOT NULL, from_email VARCHAR(255), to_email TEXT, cc_email TEXT, bcc_email TEXT, from_name VARCHAR(255), to_name TEXT, cc_name TEXT, bcc_name TEXT, subject TEXT, content TEXT NOT NULL, content_length INTEGER, processed_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, file_source VARCHAR(255) ) """) # Create indexes for parsed_email indexes = [ "CREATE INDEX IF NOT EXISTS idx_parsed_email_message_id ON parsed_email(message_id)", "CREATE INDEX IF NOT EXISTS idx_parsed_email_date_received ON parsed_email(date_received)", "CREATE INDEX IF NOT EXISTS idx_parsed_email_from_email ON parsed_email(from_email)", "CREATE INDEX IF NOT EXISTS idx_parsed_email_subject ON parsed_email(subject)" ] for index in indexes: cursor.execute(index) # Create tasks table cursor.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id SERIAL PRIMARY KEY, email_id INTEGER, message_id VARCHAR(255), topic_name VARCHAR(255) NOT NULL, task_name TEXT NOT NULL, task_summary TEXT, sent_date DATE, due_date DATE, owner_name VARCHAR(255), owner_role VARCHAR(255), owner_department VARCHAR(255), owner_organization VARCHAR(255), validation_status VARCHAR(20) DEFAULT 'llm', confidence_score FLOAT, raw_json JSONB, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (email_id) REFERENCES parsed_email(id) ON DELETE SET NULL, FOREIGN KEY (message_id) REFERENCES parsed_email(message_id) ON DELETE SET NULL ) """) # Create indexes for tasks task_indexes = [ "CREATE INDEX IF NOT EXISTS idx_tasks_email_id ON tasks(email_id)", "CREATE INDEX IF NOT EXISTS idx_tasks_message_id ON tasks(message_id)", "CREATE INDEX IF NOT EXISTS idx_tasks_topic_name ON tasks(topic_name)", "CREATE INDEX IF NOT EXISTS idx_tasks_owner_name ON tasks(owner_name)", "CREATE INDEX IF NOT EXISTS idx_tasks_due_date ON tasks(due_date)", "CREATE INDEX IF NOT EXISTS idx_tasks_start_date ON tasks(sent_date)", "CREATE INDEX IF NOT EXISTS idx_tasks_validation_status ON tasks(validation_status)" ] for index in task_indexes: cursor.execute(index) # Create task_collaborators table cursor.execute(""" CREATE TABLE IF NOT EXISTS task_collaborators ( id SERIAL PRIMARY KEY, task_id INTEGER REFERENCES tasks(id) ON DELETE CASCADE, collaborator_name VARCHAR(255), collaborator_role VARCHAR(255), collaborator_department VARCHAR(255), collaborator_organization VARCHAR(255), validation_status VARCHAR(20) DEFAULT 'llm' ) """) # Create indexes for task_collaborators collab_indexes = [ "CREATE INDEX IF NOT EXISTS idx_task_collaborators_task_id ON task_collaborators(task_id)", "CREATE INDEX IF NOT EXISTS idx_task_collaborators_name ON task_collaborators(collaborator_name)", "CREATE INDEX IF NOT EXISTS idx_task_collaborators_validation_status ON task_collaborators(validation_status)" ] for index in collab_indexes: cursor.execute(index) logger.info("✅ Database tables created successfully") return True except Exception as e: logger.error(f"❌ Error creating tables: {e}") return False def store_parsed_emails(self, emails_df: pd.DataFrame, file_source: str = None) -> List[int]: """ Store parsed email data in the parsed_email table. Args: emails_df: DataFrame with parsed email data file_source: Name of the source .mbox file Returns: List of inserted email IDs """ if not self.connection: if not self.connect(): return [] try: inserted_ids = [] with self.connection.cursor() as cursor: for _, row in emails_df.iterrows(): # Convert date to proper format date_received = pd.to_datetime(row['Date']) # Ensure Message-ID is mapped to message_id in DB message_id = row.get('Message-ID') or row.get('message_id') or None cursor.execute( """ INSERT INTO parsed_email ( message_id, date_received, from_email, to_email, cc_email, bcc_email, from_name, to_name, cc_name, bcc_name, subject, content, content_length, file_source ) VALUES ( %(message_id)s, %(date_received)s, %(from_email)s, %(to_email)s, %(cc_email)s, %(bcc_email)s, %(from_name)s, %(to_name)s, %(cc_name)s, %(bcc_name)s, %(subject)s, %(content)s, %(content_length)s, %(file_source)s ) ON CONFLICT (message_id) DO UPDATE SET processed_at = CURRENT_TIMESTAMP RETURNING id """, { 'message_id': row['Message-ID'], # <-- Always use Message-ID from DataFrame 'date_received': row['Date'], 'from_email': row['From'], 'to_email': row['To'], 'cc_email': row.get('Cc'), 'bcc_email': row.get('Bcc'), 'from_name': row.get('Name-From'), 'to_name': row.get('Name-To'), 'cc_name': row.get('Name-Cc'), 'bcc_name': row.get('Name-Bcc'), 'subject': row['Subject'], 'content': row['content'], 'content_length': len(row['content']) if row['content'] else 0, 'file_source': file_source } ) result = cursor.fetchone() if result: inserted_ids.append(result['id']) logger.info(f"✅ Stored {len(inserted_ids)} parsed emails") return inserted_ids except Exception as e: logger.error(f"❌ Error storing parsed emails: {e}") return [] def store_validated_tasks(self, validated_tasks: List[Dict]) -> List[int]: """ Store validated task data in the tasks and task_collaborators tables. Args: validated_tasks: List of validated task dictionaries Returns: List of inserted task IDs """ if not self.connection: if not self.connect(): return [] try: inserted_task_ids = [] with self.connection.cursor() as cursor: for task_data in validated_tasks: # Handle both nested and flat JSON structures if "Topic" in task_data: # Nested structure topic = task_data["Topic"] topic_name = topic["name"] for task_entry in topic.get("tasks", []): task = task_entry["task"] email_index = task_entry.get("email_index") task_id = self._insert_single_task( cursor, task, topic_name, email_index, task_data ) if task_id: inserted_task_ids.append(task_id) else: # Flat structure topic_name = task_data.get("topic", "Unknown Topic") email_index = task_data.get("email_index") task_id = self._insert_single_task( cursor, task_data, topic_name, email_index, task_data ) if task_id: inserted_task_ids.append(task_id) logger.info(f"✅ Stored {len(inserted_task_ids)} validated tasks") return inserted_task_ids except Exception as e: logger.error(f"❌ Error storing validated tasks: {e}") return [] def _insert_single_task(self, cursor, task: Dict, topic_name: str, email_index: str, raw_json: Dict) -> Optional[int]: """Insert a single task and its collaborators, with deduplication.""" try: # Try to find corresponding email in parsed_email table email_id = None if email_index: cursor.execute( "SELECT id FROM parsed_email WHERE message_id = %s LIMIT 1", (email_index,) ) result = cursor.fetchone() if result: email_id = result['id'] safe_message_id = email_index else: safe_message_id = None logger.warning(f"Email with message_id {email_index} not found in parsed_email table") else: safe_message_id = None # Extract task details task_name = task.get("name") or task.get("deliverable", "Unknown Task") task_summary = task.get("summary", "") sent_date = task.get("sent_date") due_date = task.get("due_date") # Convert date strings to proper dates if sent_date: try: sent_date = pd.to_datetime(sent_date).date() except: sent_date = None if due_date: try: due_date = pd.to_datetime(due_date).date() except: due_date = None # Extract owner information owner = task.get("owner", {}) if isinstance(owner, dict): owner_name = owner.get("name", "Unknown") owner_role = owner.get("role", "Unknown") owner_department = owner.get("department", "Unknown") owner_organization = owner.get("organization", "Unknown") else: owner_name = str(owner) if owner else "Unknown" owner_role = owner_department = owner_organization = "Unknown" # Deduplication: check if task with same message_id and task_name exists if safe_message_id and task_name: cursor.execute( "SELECT id FROM tasks WHERE message_id = %s AND task_name = %s LIMIT 1", (safe_message_id, task_name) ) if cursor.fetchone(): logger.info(f"Duplicate task skipped: {safe_message_id} - {task_name}") return None # Insert task cursor.execute(""" INSERT INTO tasks ( email_id, message_id, topic_name, task_name, task_summary, sent_date, due_date, owner_name, owner_role, owner_department, owner_organization, raw_json, validation_status ) VALUES ( %(email_id)s, %(message_id)s, %(topic_name)s, %(task_name)s, %(task_summary)s, %(sent_date)s, %(due_date)s, %(owner_name)s, %(owner_role)s, %(owner_department)s, %(owner_organization)s, %(raw_json)s, 'llm' ) RETURNING id """, { 'email_id': email_id, 'message_id': safe_message_id, 'topic_name': topic_name, 'task_name': task_name, 'task_summary': task_summary, 'sent_date': sent_date, 'due_date': due_date, 'owner_name': owner_name, 'owner_role': owner_role, 'owner_department': owner_department, 'owner_organization': owner_organization, 'raw_json': Json(raw_json) }) result = cursor.fetchone() task_id = result['id'] if result else None # Insert collaborators if task_id: collaborators = task.get("collaborators", []) for collaborator in collaborators: if isinstance(collaborator, dict): cursor.execute(""" INSERT INTO task_collaborators ( task_id, collaborator_name, collaborator_role, collaborator_department, collaborator_organization, validation_status ) VALUES (%(task_id)s, %(name)s, %(role)s, %(department)s, %(organization)s, 'llm') """, { 'task_id': task_id, 'name': collaborator.get("name", "Unknown"), 'role': collaborator.get("role", "Unknown"), 'department': collaborator.get("department", "Unknown"), 'organization': collaborator.get("organization", "Unknown") }) return task_id except Exception as e: logger.error(f"❌ Error inserting single task: {e}") return None def get_parsed_emails(self, limit: int = 100, offset: int = 0) -> List[Dict]: """Retrieve parsed emails from database.""" if not self.connection: if not self.connect(): return [] try: with self.connection.cursor() as cursor: cursor.execute(""" SELECT * FROM parsed_email ORDER BY date_received DESC LIMIT %s OFFSET %s """, (limit, offset)) return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"❌ Error retrieving parsed emails: {e}") return [] def get_tasks_with_collaborators(self, limit: int = 100, offset: int = 0) -> List[Dict]: """Retrieve tasks with their collaborators.""" if not self.connection: if not self.connect(): return [] try: with self.connection.cursor() as cursor: cursor.execute(""" SELECT t.*, array_agg( json_build_object( 'name', tc.collaborator_name, 'role', tc.collaborator_role, 'department', tc.collaborator_department, 'organization', tc.collaborator_organization ) ) FILTER (WHERE tc.id IS NOT NULL) as collaborators FROM tasks t LEFT JOIN task_collaborators tc ON t.id = tc.task_id GROUP BY t.id ORDER BY t.created_at DESC LIMIT %s OFFSET %s """, (limit, offset)) return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"❌ Error retrieving tasks: {e}") return [] def get_database_stats(self) -> Dict: """Get statistics about the database.""" if not self.connection: if not self.connect(): return {} try: with self.connection.cursor() as cursor: stats = {} # Count parsed emails cursor.execute("SELECT COUNT(*) as count FROM parsed_email") stats['parsed_emails'] = cursor.fetchone()['count'] # Count tasks cursor.execute("SELECT COUNT(*) as count FROM tasks") stats['tasks'] = cursor.fetchone()['count'] # Count collaborators cursor.execute("SELECT COUNT(*) as count FROM task_collaborators") stats['collaborators'] = cursor.fetchone()['count'] # Recent activity cursor.execute(""" SELECT DATE(processed_at) as date, COUNT(*) as count FROM parsed_email WHERE processed_at >= CURRENT_DATE - INTERVAL '7 days' GROUP BY DATE(processed_at) ORDER BY date DESC """) stats['recent_emails'] = [dict(row) for row in cursor.fetchall()] return stats except Exception as e: logger.error(f"❌ Error getting database stats: {e}") return {} def update_task_validation_status(self, task_id: int, status: str) -> bool: """Update the validation_status for a task.""" if not self.connection: if not self.connect(): return False try: with self.connection.cursor() as cursor: cursor.execute( "UPDATE tasks SET validation_status = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s", (status, task_id) ) return True except Exception as e: logger.error(f"❌ Error updating task validation_status: {e}") return False def update_collaborator_validation_status(self, collaborator_id: int, status: str) -> bool: """Update the validation_status for a task collaborator.""" if not self.connection: if not self.connect(): return False try: with self.connection.cursor() as cursor: cursor.execute( "UPDATE task_collaborators SET validation_status = %s WHERE id = %s", (status, collaborator_id) ) return True except Exception as e: logger.error(f"❌ Error updating collaborator validation_status: {e}") return False # Convenience functions for backward compatibility def store_parsed_emails(emails_df: pd.DataFrame, file_source: str = None, connection_string: str = None) -> List[int]: """Store parsed emails in PostgreSQL database (Neon compatible).""" db = PostgreSQLDatabase(connection_string=connection_string) if not db.create_tables(): return [] return db.store_parsed_emails(emails_df, file_source) def store_validated_tasks(validated_tasks: List[Dict], connection_string: str = None) -> List[int]: """Store validated tasks in PostgreSQL database (Neon compatible).""" db = PostgreSQLDatabase(connection_string=connection_string) if not db.create_tables(): return [] return db.store_validated_tasks(validated_tasks) if __name__ == "__main__": # Test the database connection (Neon compatible) db = PostgreSQLDatabase() if db.connect(): if db.connection_string: print("✅ Neon PostgreSQL connection successful") else: print("✅ PostgreSQL connection successful") if db.create_tables(): print("✅ Database tables created") print("Database stats:", db.get_database_stats()) db.close() else: print("❌ PostgreSQL connection failed") print("💡 For Neon, set DATABASE_URL environment variable with your connection string")