| """ | |
| Extended database storage methods for enhanced scraping functionality. | |
| Builds upon the existing db_storage.py with additional session management, | |
| Hunter Protocol integration, and performance monitoring. | |
| """ | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, func, and_, or_, desc, asc, update | |
| from sqlalchemy.orm import selectinload | |
| from typing import List, Optional, Dict, Tuple | |
| from datetime import datetime, timedelta | |
| import logging | |
| import json | |
| from app.db_models import User, ProxySource, Proxy, CandidateSource | |
| from app.db_storage import DatabaseStorage | |
| logger = logging.getLogger(__name__) | |
| class ExtendedDatabaseStorage(DatabaseStorage): | |
| """Extended storage with enhanced scraping session management and monitoring.""" | |
| async def create_scraping_session( | |
| self, | |
| session: AsyncSession, | |
| source_id: int, | |
| scraping_type: str = "scheduled", | |
| config: Optional[Dict] = None, | |
| initiated_by: Optional[int] = None, | |
| ) -> Dict: | |
| """Create a new scraping session record.""" | |
| from app.db_models import ScrapingSession | |
| scraping_session = ScrapingSession( | |
| source_id=source_id, | |
| scraping_type=scraping_type, | |
| status="running", | |
| config=config or {}, | |
| initiated_by=initiated_by, | |
| started_at=datetime.utcnow(), | |
| ) | |
| session.add(scraping_session) | |
| await session.commit() | |
| await session.refresh(scraping_session) | |
| return { | |
| "session_id": scraping_session.id, | |
| "status": scraping_session.status, | |
| "started_at": scraping_session.started_at, | |
| } | |
| async def update_scraping_session( | |
| self, | |
| session: AsyncSession, | |
| session_id: int, | |
| status: str, | |
| proxies_found: int = 0, | |
| proxies_valid: int = 0, | |
| error_message: Optional[str] = None, | |
| metadata: Optional[Dict] = None, | |
| ) -> bool: | |
| """Update scraping session status and results.""" | |
| from app.db_models import ScrapingSession | |
| try: | |
| stmt = ( | |
| update(ScrapingSession) | |
| .where(ScrapingSession.id == session_id) | |
| .values( | |
| status=status, | |
| proxies_found=proxies_found, | |
| proxies_valid=proxies_valid, | |
| error_message=error_message, | |
| metadata=metadata or {}, | |
| finished_at=datetime.utcnow() | |
| if status in ["completed", "failed"] | |
| else None, | |
| ) | |
| ) | |
| await session.execute(stmt) | |
| await session.commit() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating scraping session {session_id}: {e}") | |
| await session.rollback() | |
| return False | |
| async def get_scraping_sessions( | |
| self, | |
| session: AsyncSession, | |
| source_id: Optional[int] = None, | |
| status: Optional[str] = None, | |
| limit: int = 50, | |
| offset: int = 0, | |
| ) -> Tuple[List[Dict], int]: | |
| """Get scraping sessions with filtering.""" | |
| from app.db_models import ScrapingSession | |
| query = select(ScrapingSession).options(selectinload(ScrapingSession.source)) | |
| if source_id: | |
| query = query.where(ScrapingSession.source_id == source_id) | |
| if status: | |
| query = query.where(ScrapingSession.status == status) | |
| count_query = select(func.count()).select_from(query.subquery()) | |
| total_result = await session.execute(count_query) | |
| total = total_result.scalar() | |
| query = ( | |
| query.order_by(desc(ScrapingSession.started_at)).limit(limit).offset(offset) | |
| ) | |
| result = await session.execute(query) | |
| sessions = result.scalars().all() | |
| sessions_data = [] | |
| for s in sessions: | |
| sessions_data.append( | |
| { | |
| "id": s.id, | |
| "source_id": s.source_id, | |
| "source_name": s.source.name if s.source else "Unknown", | |
| "scraping_type": s.scraping_type, | |
| "status": s.status, | |
| "proxies_found": s.proxies_found, | |
| "proxies_valid": s.proxies_valid, | |
| "started_at": s.started_at, | |
| "finished_at": s.finished_at, | |
| "error_message": s.error_message, | |
| "config": s.config, | |
| } | |
| ) | |
| return sessions_data, total | |
| async def get_scraping_stats( | |
| self, | |
| session: AsyncSession, | |
| days: int = 7, | |
| ) -> Dict: | |
| """Get scraping statistics for the last N days.""" | |
| from app.db_models import ScrapingSession | |
| since_date = datetime.utcnow() - timedelta(days=days) | |
| status_stats = await session.execute( | |
| select( | |
| ScrapingSession.status, | |
| func.count(ScrapingSession.id).label("count"), | |
| ) | |
| .where(ScrapingSession.started_at >= since_date) | |
| .group_by(ScrapingSession.status) | |
| ) | |
| status_counts = {} | |
| for row in status_stats: | |
| status_counts[row.status] = row.count | |
| daily_stats = await session.execute( | |
| select( | |
| func.date(ScrapingSession.started_at).label("date"), | |
| func.count(ScrapingSession.id).label("sessions"), | |
| func.sum(ScrapingSession.proxies_found).label("proxies_found"), | |
| func.sum(ScrapingSession.proxies_valid).label("proxies_valid"), | |
| ) | |
| .where(ScrapingSession.started_at >= since_date) | |
| .group_by(func.date(ScrapingSession.started_at)) | |
| .order_by(desc(func.date(ScrapingSession.started_at))) | |
| ) | |
| daily_data = [] | |
| for row in daily_stats: | |
| daily_data.append( | |
| { | |
| "date": row.date.isoformat(), | |
| "sessions": row.sessions or 0, | |
| "proxies_found": row.proxies_found or 0, | |
| "proxies_valid": row.proxies_valid or 0, | |
| } | |
| ) | |
| total_completed = status_counts.get("completed", 0) + status_counts.get( | |
| "failed", 0 | |
| ) | |
| success_rate = ( | |
| (status_counts.get("completed", 0) / total_completed * 100) | |
| if total_completed > 0 | |
| else 0 | |
| ) | |
| return { | |
| "period_days": days, | |
| "status_counts": status_counts, | |
| "daily_stats": daily_data, | |
| "success_rate": round(success_rate, 2), | |
| "total_sessions": sum(status_counts.values()), | |
| } | |
| async def create_hunter_candidate( | |
| self, | |
| session: AsyncSession, | |
| url: str, | |
| discovery_method: str, | |
| confidence_score: int = 0, | |
| metadata: Optional[Dict] = None, | |
| ) -> Dict: | |
| """Create a new Hunter candidate source.""" | |
| from urllib.parse import urlparse | |
| domain = urlparse(url).netloc | |
| existing = await session.execute( | |
| select(CandidateSource).where(CandidateSource.url == url) | |
| ) | |
| if existing.scalar_one_or_none(): | |
| return {"error": "Candidate already exists"} | |
| candidate = CandidateSource( | |
| url=url, | |
| domain=domain, | |
| discovery_method=discovery_method, | |
| confidence_score=confidence_score, | |
| metadata=metadata or {}, | |
| ) | |
| session.add(candidate) | |
| await session.commit() | |
| await session.refresh(candidate) | |
| return { | |
| "id": candidate.id, | |
| "url": candidate.url, | |
| "domain": candidate.domain, | |
| "discovery_method": candidate.discovery_method, | |
| "confidence_score": candidate.confidence_score, | |
| "status": candidate.status, | |
| } | |
| async def update_hunter_candidate( | |
| self, | |
| session: AsyncSession, | |
| candidate_id: int, | |
| status: Optional[str] = None, | |
| confidence_score: Optional[int] = None, | |
| proxies_found: Optional[int] = None, | |
| fail_count: Optional[int] = None, | |
| ) -> bool: | |
| """Update Hunter candidate status and metrics.""" | |
| try: | |
| update_data = {} | |
| if status is not None: | |
| update_data["status"] = status | |
| if status in ["approved", "rejected"]: | |
| update_data["last_checked_at"] = datetime.utcnow() | |
| if confidence_score is not None: | |
| update_data["confidence_score"] = confidence_score | |
| if proxies_found is not None: | |
| update_data["proxies_found_count"] = proxies_found | |
| if fail_count is not None: | |
| update_data["fail_count"] = fail_count | |
| if not update_data: | |
| return False | |
| stmt = ( | |
| update(CandidateSource) | |
| .where(CandidateSource.id == candidate_id) | |
| .values(**update_data) | |
| ) | |
| await session.execute(stmt) | |
| await session.commit() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating hunter candidate {candidate_id}: {e}") | |
| await session.rollback() | |
| return False | |
| async def get_hunter_statistics( | |
| self, | |
| session: AsyncSession, | |
| days: int = 30, | |
| ) -> Dict: | |
| """Get Hunter protocol statistics and performance metrics.""" | |
| since_date = datetime.utcnow() - timedelta(days=days) | |
| status_stats = await session.execute( | |
| select( | |
| CandidateSource.status, | |
| func.count(CandidateSource.id).label("count"), | |
| func.avg(CandidateSource.confidence_score).label("avg_confidence"), | |
| ) | |
| .where(CandidateSource.created_at >= since_date) | |
| .group_by(CandidateSource.status) | |
| ) | |
| status_data = {} | |
| for row in status_stats: | |
| status_data[row.status] = { | |
| "count": row.count, | |
| "avg_confidence": round(row.avg_confidence or 0, 2), | |
| } | |
| method_stats = await session.execute( | |
| select( | |
| CandidateSource.discovery_method, | |
| func.count(CandidateSource.id).label("count"), | |
| func.avg(CandidateSource.proxies_found_count).label("avg_proxies"), | |
| ) | |
| .where(CandidateSource.created_at >= since_date) | |
| .group_by(CandidateSource.discovery_method) | |
| ) | |
| method_data = {} | |
| for row in method_stats: | |
| method_data[row.discovery_method] = { | |
| "count": row.count, | |
| "avg_proxies": round(row.avg_proxies or 0, 2), | |
| } | |
| top_domains = await session.execute( | |
| select( | |
| CandidateSource.domain, | |
| func.count(CandidateSource.id).label("candidates"), | |
| func.sum(CandidateSource.proxies_found_count).label("total_proxies"), | |
| ) | |
| .where(CandidateSource.created_at >= since_date) | |
| .group_by(CandidateSource.domain) | |
| .having(func.count(CandidateSource.id) >= 2) | |
| .order_by(desc(func.sum(CandidateSource.proxies_found_count))) | |
| .limit(10) | |
| ) | |
| domain_data = [] | |
| for row in top_domains: | |
| domain_data.append( | |
| { | |
| "domain": row.domain, | |
| "candidates": row.candidates, | |
| "total_proxies": row.total_proxies or 0, | |
| "avg_proxies_per_candidate": round( | |
| (row.total_proxies or 0) / row.candidates, 2 | |
| ), | |
| } | |
| ) | |
| total_processed = sum( | |
| data["count"] | |
| for status, data in status_data.items() | |
| if status in ["approved", "rejected"] | |
| ) | |
| approved_count = status_data.get("approved", {}).get("count", 0) | |
| approval_rate = ( | |
| (approved_count / total_processed * 100) if total_processed > 0 else 0 | |
| ) | |
| return { | |
| "period_days": days, | |
| "status_stats": status_data, | |
| "method_stats": method_data, | |
| "top_domains": domain_data, | |
| "approval_rate": round(approval_rate, 2), | |
| "total_candidates": sum(data["count"] for data in status_data.values()), | |
| } | |
| async def batch_process_proxies_with_rate_limit( | |
| self, | |
| session: AsyncSession, | |
| proxies_data: List[Dict], | |
| source_id: Optional[int] = None, | |
| rate_limit_per_second: int = 10, | |
| batch_size: int = 50, | |
| ) -> Dict: | |
| """Process proxies with rate limiting and detailed progress tracking.""" | |
| import asyncio | |
| import time | |
| if not proxies_data: | |
| return {"processed": 0, "success": 0, "failed": 0, "errors": []} | |
| processed = 0 | |
| success = 0 | |
| failed = 0 | |
| errors = [] | |
| start_time = time.time() | |
| for i in range(0, len(proxies_data), batch_size): | |
| batch = proxies_data[i : i + batch_size] | |
| batch_start = time.time() | |
| for proxy_data in batch: | |
| try: | |
| result = await self.add_proxy_with_validation( | |
| session, proxy_data, source_id | |
| ) | |
| if result: | |
| success += 1 | |
| else: | |
| failed += 1 | |
| errors.append( | |
| f"Failed to add proxy: {proxy_data.get('url', 'unknown')}" | |
| ) | |
| processed += 1 | |
| await asyncio.sleep(1.0 / rate_limit_per_second) | |
| except Exception as e: | |
| failed += 1 | |
| errors.append(f"Error processing proxy: {str(e)}") | |
| processed += 1 | |
| batch_time = time.time() - batch_start | |
| expected_batch_time = len(batch) / rate_limit_per_second | |
| if batch_time < expected_batch_time: | |
| await asyncio.sleep(expected_batch_time - batch_time) | |
| total_time = time.time() - start_time | |
| actual_rate = processed / total_time if total_time > 0 else 0 | |
| return { | |
| "processed": processed, | |
| "success": success, | |
| "failed": failed, | |
| "errors": errors[:10], | |
| "total_time_seconds": round(total_time, 2), | |
| "actual_rate_per_second": round(actual_rate, 2), | |
| "target_rate_per_second": rate_limit_per_second, | |
| } | |
| async def get_performance_metrics( | |
| self, | |
| session: AsyncSession, | |
| days: int = 7, | |
| ) -> Dict: | |
| """Get comprehensive performance metrics.""" | |
| since_date = datetime.utcnow() - timedelta(days=days) | |
| validation_metrics = await session.execute( | |
| select( | |
| func.avg(Proxy.latency_ms).label("avg_latency"), | |
| func.min(Proxy.latency_ms).label("min_latency"), | |
| func.max(Proxy.latency_ms).label("max_latency"), | |
| func.count(Proxy.id).label("total_validated"), | |
| ) | |
| .where(Proxy.validation_status == "validated") | |
| .where(Proxy.last_validated >= since_date) | |
| ) | |
| val_row = validation_metrics.first() | |
| quality_dist = await session.execute( | |
| select( | |
| Proxy.quality_score, | |
| func.count(Proxy.id).label("count"), | |
| ) | |
| .where(Proxy.validation_status == "validated") | |
| .where(Proxy.last_validated >= since_date) | |
| .group_by(Proxy.quality_score) | |
| .order_by(Proxy.quality_score) | |
| ) | |
| quality_buckets = {"0-20": 0, "21-40": 0, "41-60": 0, "61-80": 0, "81-100": 0} | |
| for row in quality_dist: | |
| score = row.quality_score or 0 | |
| if score <= 20: | |
| quality_buckets["0-20"] += row.count | |
| elif score <= 40: | |
| quality_buckets["21-40"] += row.count | |
| elif score <= 60: | |
| quality_buckets["41-60"] += row.count | |
| elif score <= 80: | |
| quality_buckets["61-80"] += row.count | |
| else: | |
| quality_buckets["81-100"] += row.count | |
| protocol_dist = await session.execute( | |
| select( | |
| Proxy.protocol, | |
| func.count(Proxy.id).label("count"), | |
| ) | |
| .where(Proxy.validation_status == "validated") | |
| .where(Proxy.last_validated >= since_date) | |
| .group_by(Proxy.protocol) | |
| .order_by(desc(func.count(Proxy.id))) | |
| ) | |
| protocol_data = {} | |
| for row in protocol_dist: | |
| protocol_data[row.protocol or "unknown"] = row.count | |
| country_dist = await session.execute( | |
| select( | |
| Proxy.country_code, | |
| func.count(Proxy.id).label("count"), | |
| ) | |
| .where(Proxy.validation_status == "validated") | |
| .where(Proxy.last_validated >= since_date) | |
| .where(Proxy.country_code.isnot(None)) | |
| .group_by(Proxy.country_code) | |
| .order_by(desc(func.count(Proxy.id))) | |
| .limit(10) | |
| ) | |
| country_data = [] | |
| for row in country_dist: | |
| country_data.append( | |
| { | |
| "country_code": row.country_code, | |
| "count": row.count, | |
| } | |
| ) | |
| return { | |
| "period_days": days, | |
| "validation_performance": { | |
| "avg_latency_ms": round(val_row.avg_latency or 0, 2), | |
| "min_latency_ms": val_row.min_latency or 0, | |
| "max_latency_ms": val_row.max_latency or 0, | |
| "total_validated": val_row.total_validated or 0, | |
| }, | |
| "quality_distribution": quality_buckets, | |
| "protocol_distribution": protocol_data, | |
| "top_countries": country_data, | |
| } | |
| async def create_background_task( | |
| self, | |
| session: AsyncSession, | |
| task_type: str, | |
| task_data: Dict, | |
| scheduled_for: Optional[datetime] = None, | |
| retry_count: int = 0, | |
| max_retries: int = 3, | |
| ) -> Dict: | |
| """Create a background task record.""" | |
| from app.db_models import BackgroundTask | |
| task = BackgroundTask( | |
| task_type=task_type, | |
| task_data=task_data, | |
| status="pending", | |
| scheduled_for=scheduled_for or datetime.utcnow(), | |
| retry_count=retry_count, | |
| max_retries=max_retries, | |
| ) | |
| session.add(task) | |
| await session.commit() | |
| await session.refresh(task) | |
| return { | |
| "task_id": task.id, | |
| "task_type": task.task_type, | |
| "status": task.status, | |
| "scheduled_for": task.scheduled_for, | |
| } | |
| async def update_background_task( | |
| self, | |
| session: AsyncSession, | |
| task_id: int, | |
| status: str, | |
| result: Optional[Dict] = None, | |
| error_message: Optional[str] = None, | |
| ) -> bool: | |
| """Update background task status and results.""" | |
| from app.db_models import BackgroundTask | |
| try: | |
| task = await session.execute( | |
| select(BackgroundTask).where(BackgroundTask.id == task_id) | |
| ) | |
| task_obj = task.scalar_one_or_none() | |
| if not task_obj: | |
| return False | |
| update_data = { | |
| "status": status, | |
| "completed_at": datetime.utcnow() | |
| if status in ["completed", "failed"] | |
| else None, | |
| } | |
| if result: | |
| update_data["result"] = result | |
| if error_message: | |
| update_data["error_message"] = error_message | |
| update_data["retry_count"] = task_obj.retry_count + 1 | |
| stmt = ( | |
| update(BackgroundTask) | |
| .where(BackgroundTask.id == task_id) | |
| .values(**update_data) | |
| ) | |
| await session.execute(stmt) | |
| await session.commit() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating background task {task_id}: {e}") | |
| await session.rollback() | |
| return False | |
| async def get_pending_background_tasks( | |
| self, | |
| session: AsyncSession, | |
| task_type: Optional[str] = None, | |
| limit: int = 50, | |
| ) -> List[Dict]: | |
| """Get pending background tasks for execution.""" | |
| from app.db_models import BackgroundTask | |
| query = select(BackgroundTask).where( | |
| and_( | |
| BackgroundTask.status == "pending", | |
| BackgroundTask.scheduled_for <= datetime.utcnow(), | |
| ) | |
| ) | |
| if task_type: | |
| query = query.where(BackgroundTask.task_type == task_type) | |
| query = query.order_by(BackgroundTask.scheduled_for).limit(limit) | |
| result = await session.execute(query) | |
| tasks = result.scalars().all() | |
| tasks_data = [] | |
| for task in tasks: | |
| tasks_data.append( | |
| { | |
| "id": task.id, | |
| "task_type": task.task_type, | |
| "task_data": task.task_data, | |
| "scheduled_for": task.scheduled_for, | |
| "retry_count": task.retry_count, | |
| "max_retries": task.max_retries, | |
| } | |
| ) | |
| return tasks_data | |
| extended_db_storage = ExtendedDatabaseStorage() | |