Spaces:
Runtime error
Runtime error
| """ | |
| Neon PostgreSQL Database Integration | |
| Stores job history, results, and metadata | |
| """ | |
| import asyncpg | |
| import json | |
| import os | |
| from datetime import datetime | |
| from typing import Optional, List, Dict, Any | |
| class Database: | |
| def __init__(self): | |
| self.pool: Optional[asyncpg.Pool] = None | |
| self.database_url = os.getenv("DATABASE_URL") | |
| async def connect(self): | |
| """Initialize database connection pool""" | |
| if not self.database_url: | |
| print("β οΈ DATABASE_URL not set, database features disabled") | |
| return | |
| try: | |
| self.pool = await asyncpg.create_pool( | |
| self.database_url, | |
| min_size=2, | |
| max_size=10, | |
| command_timeout=60 | |
| ) | |
| await self.init_tables() | |
| print("β Database connected successfully") | |
| except Exception as e: | |
| print(f"β Database connection failed: {e}") | |
| self.pool = None | |
| async def disconnect(self): | |
| """Close database connections""" | |
| if self.pool: | |
| await self.pool.close() | |
| print("π Database disconnected") | |
| async def init_tables(self): | |
| """Create tables if they don't exist""" | |
| if not self.pool: | |
| return | |
| async with self.pool.acquire() as conn: | |
| # Jobs table | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS jobs ( | |
| id TEXT PRIMARY KEY, | |
| prompt TEXT NOT NULL, | |
| format TEXT NOT NULL, | |
| status TEXT DEFAULT 'pending', | |
| created_at TIMESTAMPTZ DEFAULT NOW(), | |
| completed_at TIMESTAMPTZ, | |
| file_extension TEXT, | |
| content_type TEXT, | |
| proxy_server TEXT, | |
| headless BOOLEAN DEFAULT FALSE, | |
| streaming_enabled BOOLEAN DEFAULT FALSE, | |
| error_message TEXT | |
| ) | |
| """) | |
| # Job results table (stores extracted content metadata) | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS job_results ( | |
| id SERIAL PRIMARY KEY, | |
| job_id TEXT REFERENCES jobs(id) ON DELETE CASCADE, | |
| content_length INTEGER, | |
| extraction_time TIMESTAMPTZ DEFAULT NOW(), | |
| format TEXT, | |
| metadata JSONB | |
| ) | |
| """) | |
| # Proxy usage tracking | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS proxy_stats ( | |
| id SERIAL PRIMARY KEY, | |
| job_id TEXT REFERENCES jobs(id) ON DELETE SET NULL, | |
| proxy_server TEXT, | |
| success BOOLEAN, | |
| error_message TEXT, | |
| recorded_at TIMESTAMPTZ DEFAULT NOW() | |
| ) | |
| """) | |
| print("π Database tables initialized") | |
| async def create_job(self, job_id: str, prompt: str, format: str, | |
| headless: bool = False, streaming_enabled: bool = False, | |
| proxy_server: Optional[str] = None) -> bool: | |
| """Create a new job record""" | |
| if not self.pool: | |
| return False | |
| try: | |
| async with self.pool.acquire() as conn: | |
| await conn.execute(""" | |
| INSERT INTO jobs (id, prompt, format, headless, streaming_enabled, proxy_server, status) | |
| VALUES ($1, $2, $3, $4, $5, $6, 'running') | |
| """, job_id, prompt, format, headless, streaming_enabled, proxy_server) | |
| return True | |
| except Exception as e: | |
| print(f"β Failed to create job: {e}") | |
| return False | |
| async def update_job_status(self, job_id: str, status: str, | |
| error_message: Optional[str] = None) -> bool: | |
| """Update job status""" | |
| if not self.pool: | |
| return False | |
| try: | |
| async with self.pool.acquire() as conn: | |
| completed_at = datetime.utcnow() if status in ['completed', 'failed'] else None | |
| await conn.execute(""" | |
| UPDATE jobs | |
| SET status = $2, | |
| completed_at = $3, | |
| error_message = $4 | |
| WHERE id = $1 | |
| """, job_id, status, completed_at, error_message) | |
| return True | |
| except Exception as e: | |
| print(f"β Failed to update job status: {e}") | |
| return False | |
| async def update_job_info(self, job_id: str, file_extension: str, | |
| content_type: str) -> bool: | |
| """Update job file information""" | |
| if not self.pool: | |
| return False | |
| try: | |
| async with self.pool.acquire() as conn: | |
| await conn.execute(""" | |
| UPDATE jobs | |
| SET file_extension = $2, content_type = $3 | |
| WHERE id = $1 | |
| """, job_id, file_extension, content_type) | |
| return True | |
| except Exception as e: | |
| print(f"β Failed to update job info: {e}") | |
| return False | |
| async def save_job_result(self, job_id: str, content_length: int, | |
| format: str, metadata: Dict[str, Any]) -> bool: | |
| """Save job result metadata""" | |
| if not self.pool: | |
| return False | |
| try: | |
| async with self.pool.acquire() as conn: | |
| await conn.execute(""" | |
| INSERT INTO job_results (job_id, content_length, format, metadata) | |
| VALUES ($1, $2, $3, $4) | |
| """, job_id, content_length, format, json.dumps(metadata)) | |
| return True | |
| except Exception as e: | |
| print(f"β Failed to save job result: {e}") | |
| return False | |
| async def log_proxy_usage(self, job_id: str, proxy_server: str, | |
| success: bool, error_message: Optional[str] = None) -> bool: | |
| """Log proxy usage for a job""" | |
| if not self.pool: | |
| return False | |
| try: | |
| async with self.pool.acquire() as conn: | |
| await conn.execute(""" | |
| INSERT INTO proxy_stats (job_id, proxy_server, success, error_message) | |
| VALUES ($1, $2, $3, $4) | |
| """, job_id, proxy_server, success, error_message) | |
| return True | |
| except Exception as e: | |
| print(f"β Failed to log proxy usage: {e}") | |
| return False | |
| async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]: | |
| """Get job by ID""" | |
| if not self.pool: | |
| return None | |
| try: | |
| async with self.pool.acquire() as conn: | |
| row = await conn.fetchrow(""" | |
| SELECT * FROM jobs WHERE id = $1 | |
| """, job_id) | |
| if row: | |
| return dict(row) | |
| return None | |
| except Exception as e: | |
| print(f"β Failed to get job: {e}") | |
| return None | |
| async def get_all_jobs(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]: | |
| """Get all jobs with pagination""" | |
| if not self.pool: | |
| return [] | |
| try: | |
| async with self.pool.acquire() as conn: | |
| rows = await conn.fetch(""" | |
| SELECT * FROM jobs | |
| ORDER BY created_at DESC | |
| LIMIT $1 OFFSET $2 | |
| """, limit, offset) | |
| return [dict(row) for row in rows] | |
| except Exception as e: | |
| print(f"β Failed to get jobs: {e}") | |
| return [] | |
| async def get_job_stats(self) -> Dict[str, Any]: | |
| """Get overall job statistics""" | |
| if not self.pool: | |
| return {} | |
| try: | |
| async with self.pool.acquire() as conn: | |
| stats = await conn.fetchrow(""" | |
| SELECT | |
| COUNT(*) as total_jobs, | |
| COUNT(*) FILTER (WHERE status = 'completed') as completed, | |
| COUNT(*) FILTER (WHERE status = 'failed') as failed, | |
| COUNT(*) FILTER (WHERE status = 'running') as running | |
| FROM jobs | |
| """) | |
| return dict(stats) if stats else {} | |
| except Exception as e: | |
| print(f"β Failed to get stats: {e}") | |
| return {} | |
| async def delete_job(self, job_id: str) -> bool: | |
| """Delete a job and its results""" | |
| if not self.pool: | |
| return False | |
| try: | |
| async with self.pool.acquire() as conn: | |
| await conn.execute("DELETE FROM jobs WHERE id = $1", job_id) | |
| return True | |
| except Exception as e: | |
| print(f"β Failed to delete job: {e}") | |
| return False | |
| # Global database instance | |
| db = Database() | |