1proxy / app /db_storage_extended.py
paijo77's picture
update app/db_storage_extended.py
aeba6f7 verified
"""
Extended database storage methods for enhanced scraping functionality.
Builds upon the existing db_storage.py with additional session management,
Hunter Protocol integration, and performance monitoring.
"""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_, desc, asc, update
from sqlalchemy.orm import selectinload
from typing import List, Optional, Dict, Tuple
from datetime import datetime, timedelta
import logging
import json
from app.db_models import User, ProxySource, Proxy, CandidateSource
from app.db_storage import DatabaseStorage
logger = logging.getLogger(__name__)
class ExtendedDatabaseStorage(DatabaseStorage):
"""Extended storage with enhanced scraping session management and monitoring."""
async def create_scraping_session(
self,
session: AsyncSession,
source_id: int,
scraping_type: str = "scheduled",
config: Optional[Dict] = None,
initiated_by: Optional[int] = None,
) -> Dict:
"""Create a new scraping session record."""
from app.db_models import ScrapingSession
scraping_session = ScrapingSession(
source_id=source_id,
scraping_type=scraping_type,
status="running",
config=config or {},
initiated_by=initiated_by,
started_at=datetime.utcnow(),
)
session.add(scraping_session)
await session.commit()
await session.refresh(scraping_session)
return {
"session_id": scraping_session.id,
"status": scraping_session.status,
"started_at": scraping_session.started_at,
}
async def update_scraping_session(
self,
session: AsyncSession,
session_id: int,
status: str,
proxies_found: int = 0,
proxies_valid: int = 0,
error_message: Optional[str] = None,
metadata: Optional[Dict] = None,
) -> bool:
"""Update scraping session status and results."""
from app.db_models import ScrapingSession
try:
stmt = (
update(ScrapingSession)
.where(ScrapingSession.id == session_id)
.values(
status=status,
proxies_found=proxies_found,
proxies_valid=proxies_valid,
error_message=error_message,
metadata=metadata or {},
finished_at=datetime.utcnow()
if status in ["completed", "failed"]
else None,
)
)
await session.execute(stmt)
await session.commit()
return True
except Exception as e:
logger.error(f"Error updating scraping session {session_id}: {e}")
await session.rollback()
return False
async def get_scraping_sessions(
self,
session: AsyncSession,
source_id: Optional[int] = None,
status: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> Tuple[List[Dict], int]:
"""Get scraping sessions with filtering."""
from app.db_models import ScrapingSession
query = select(ScrapingSession).options(selectinload(ScrapingSession.source))
if source_id:
query = query.where(ScrapingSession.source_id == source_id)
if status:
query = query.where(ScrapingSession.status == status)
count_query = select(func.count()).select_from(query.subquery())
total_result = await session.execute(count_query)
total = total_result.scalar()
query = (
query.order_by(desc(ScrapingSession.started_at)).limit(limit).offset(offset)
)
result = await session.execute(query)
sessions = result.scalars().all()
sessions_data = []
for s in sessions:
sessions_data.append(
{
"id": s.id,
"source_id": s.source_id,
"source_name": s.source.name if s.source else "Unknown",
"scraping_type": s.scraping_type,
"status": s.status,
"proxies_found": s.proxies_found,
"proxies_valid": s.proxies_valid,
"started_at": s.started_at,
"finished_at": s.finished_at,
"error_message": s.error_message,
"config": s.config,
}
)
return sessions_data, total
async def get_scraping_stats(
self,
session: AsyncSession,
days: int = 7,
) -> Dict:
"""Get scraping statistics for the last N days."""
from app.db_models import ScrapingSession
since_date = datetime.utcnow() - timedelta(days=days)
status_stats = await session.execute(
select(
ScrapingSession.status,
func.count(ScrapingSession.id).label("count"),
)
.where(ScrapingSession.started_at >= since_date)
.group_by(ScrapingSession.status)
)
status_counts = {}
for row in status_stats:
status_counts[row.status] = row.count
daily_stats = await session.execute(
select(
func.date(ScrapingSession.started_at).label("date"),
func.count(ScrapingSession.id).label("sessions"),
func.sum(ScrapingSession.proxies_found).label("proxies_found"),
func.sum(ScrapingSession.proxies_valid).label("proxies_valid"),
)
.where(ScrapingSession.started_at >= since_date)
.group_by(func.date(ScrapingSession.started_at))
.order_by(desc(func.date(ScrapingSession.started_at)))
)
daily_data = []
for row in daily_stats:
daily_data.append(
{
"date": row.date.isoformat(),
"sessions": row.sessions or 0,
"proxies_found": row.proxies_found or 0,
"proxies_valid": row.proxies_valid or 0,
}
)
total_completed = status_counts.get("completed", 0) + status_counts.get(
"failed", 0
)
success_rate = (
(status_counts.get("completed", 0) / total_completed * 100)
if total_completed > 0
else 0
)
return {
"period_days": days,
"status_counts": status_counts,
"daily_stats": daily_data,
"success_rate": round(success_rate, 2),
"total_sessions": sum(status_counts.values()),
}
async def create_hunter_candidate(
self,
session: AsyncSession,
url: str,
discovery_method: str,
confidence_score: int = 0,
metadata: Optional[Dict] = None,
) -> Dict:
"""Create a new Hunter candidate source."""
from urllib.parse import urlparse
domain = urlparse(url).netloc
existing = await session.execute(
select(CandidateSource).where(CandidateSource.url == url)
)
if existing.scalar_one_or_none():
return {"error": "Candidate already exists"}
candidate = CandidateSource(
url=url,
domain=domain,
discovery_method=discovery_method,
confidence_score=confidence_score,
metadata=metadata or {},
)
session.add(candidate)
await session.commit()
await session.refresh(candidate)
return {
"id": candidate.id,
"url": candidate.url,
"domain": candidate.domain,
"discovery_method": candidate.discovery_method,
"confidence_score": candidate.confidence_score,
"status": candidate.status,
}
async def update_hunter_candidate(
self,
session: AsyncSession,
candidate_id: int,
status: Optional[str] = None,
confidence_score: Optional[int] = None,
proxies_found: Optional[int] = None,
fail_count: Optional[int] = None,
) -> bool:
"""Update Hunter candidate status and metrics."""
try:
update_data = {}
if status is not None:
update_data["status"] = status
if status in ["approved", "rejected"]:
update_data["last_checked_at"] = datetime.utcnow()
if confidence_score is not None:
update_data["confidence_score"] = confidence_score
if proxies_found is not None:
update_data["proxies_found_count"] = proxies_found
if fail_count is not None:
update_data["fail_count"] = fail_count
if not update_data:
return False
stmt = (
update(CandidateSource)
.where(CandidateSource.id == candidate_id)
.values(**update_data)
)
await session.execute(stmt)
await session.commit()
return True
except Exception as e:
logger.error(f"Error updating hunter candidate {candidate_id}: {e}")
await session.rollback()
return False
async def get_hunter_statistics(
self,
session: AsyncSession,
days: int = 30,
) -> Dict:
"""Get Hunter protocol statistics and performance metrics."""
since_date = datetime.utcnow() - timedelta(days=days)
status_stats = await session.execute(
select(
CandidateSource.status,
func.count(CandidateSource.id).label("count"),
func.avg(CandidateSource.confidence_score).label("avg_confidence"),
)
.where(CandidateSource.created_at >= since_date)
.group_by(CandidateSource.status)
)
status_data = {}
for row in status_stats:
status_data[row.status] = {
"count": row.count,
"avg_confidence": round(row.avg_confidence or 0, 2),
}
method_stats = await session.execute(
select(
CandidateSource.discovery_method,
func.count(CandidateSource.id).label("count"),
func.avg(CandidateSource.proxies_found_count).label("avg_proxies"),
)
.where(CandidateSource.created_at >= since_date)
.group_by(CandidateSource.discovery_method)
)
method_data = {}
for row in method_stats:
method_data[row.discovery_method] = {
"count": row.count,
"avg_proxies": round(row.avg_proxies or 0, 2),
}
top_domains = await session.execute(
select(
CandidateSource.domain,
func.count(CandidateSource.id).label("candidates"),
func.sum(CandidateSource.proxies_found_count).label("total_proxies"),
)
.where(CandidateSource.created_at >= since_date)
.group_by(CandidateSource.domain)
.having(func.count(CandidateSource.id) >= 2)
.order_by(desc(func.sum(CandidateSource.proxies_found_count)))
.limit(10)
)
domain_data = []
for row in top_domains:
domain_data.append(
{
"domain": row.domain,
"candidates": row.candidates,
"total_proxies": row.total_proxies or 0,
"avg_proxies_per_candidate": round(
(row.total_proxies or 0) / row.candidates, 2
),
}
)
total_processed = sum(
data["count"]
for status, data in status_data.items()
if status in ["approved", "rejected"]
)
approved_count = status_data.get("approved", {}).get("count", 0)
approval_rate = (
(approved_count / total_processed * 100) if total_processed > 0 else 0
)
return {
"period_days": days,
"status_stats": status_data,
"method_stats": method_data,
"top_domains": domain_data,
"approval_rate": round(approval_rate, 2),
"total_candidates": sum(data["count"] for data in status_data.values()),
}
async def batch_process_proxies_with_rate_limit(
self,
session: AsyncSession,
proxies_data: List[Dict],
source_id: Optional[int] = None,
rate_limit_per_second: int = 10,
batch_size: int = 50,
) -> Dict:
"""Process proxies with rate limiting and detailed progress tracking."""
import asyncio
import time
if not proxies_data:
return {"processed": 0, "success": 0, "failed": 0, "errors": []}
processed = 0
success = 0
failed = 0
errors = []
start_time = time.time()
for i in range(0, len(proxies_data), batch_size):
batch = proxies_data[i : i + batch_size]
batch_start = time.time()
for proxy_data in batch:
try:
result = await self.add_proxy_with_validation(
session, proxy_data, source_id
)
if result:
success += 1
else:
failed += 1
errors.append(
f"Failed to add proxy: {proxy_data.get('url', 'unknown')}"
)
processed += 1
await asyncio.sleep(1.0 / rate_limit_per_second)
except Exception as e:
failed += 1
errors.append(f"Error processing proxy: {str(e)}")
processed += 1
batch_time = time.time() - batch_start
expected_batch_time = len(batch) / rate_limit_per_second
if batch_time < expected_batch_time:
await asyncio.sleep(expected_batch_time - batch_time)
total_time = time.time() - start_time
actual_rate = processed / total_time if total_time > 0 else 0
return {
"processed": processed,
"success": success,
"failed": failed,
"errors": errors[:10],
"total_time_seconds": round(total_time, 2),
"actual_rate_per_second": round(actual_rate, 2),
"target_rate_per_second": rate_limit_per_second,
}
async def get_performance_metrics(
self,
session: AsyncSession,
days: int = 7,
) -> Dict:
"""Get comprehensive performance metrics."""
since_date = datetime.utcnow() - timedelta(days=days)
validation_metrics = await session.execute(
select(
func.avg(Proxy.latency_ms).label("avg_latency"),
func.min(Proxy.latency_ms).label("min_latency"),
func.max(Proxy.latency_ms).label("max_latency"),
func.count(Proxy.id).label("total_validated"),
)
.where(Proxy.validation_status == "validated")
.where(Proxy.last_validated >= since_date)
)
val_row = validation_metrics.first()
quality_dist = await session.execute(
select(
Proxy.quality_score,
func.count(Proxy.id).label("count"),
)
.where(Proxy.validation_status == "validated")
.where(Proxy.last_validated >= since_date)
.group_by(Proxy.quality_score)
.order_by(Proxy.quality_score)
)
quality_buckets = {"0-20": 0, "21-40": 0, "41-60": 0, "61-80": 0, "81-100": 0}
for row in quality_dist:
score = row.quality_score or 0
if score <= 20:
quality_buckets["0-20"] += row.count
elif score <= 40:
quality_buckets["21-40"] += row.count
elif score <= 60:
quality_buckets["41-60"] += row.count
elif score <= 80:
quality_buckets["61-80"] += row.count
else:
quality_buckets["81-100"] += row.count
protocol_dist = await session.execute(
select(
Proxy.protocol,
func.count(Proxy.id).label("count"),
)
.where(Proxy.validation_status == "validated")
.where(Proxy.last_validated >= since_date)
.group_by(Proxy.protocol)
.order_by(desc(func.count(Proxy.id)))
)
protocol_data = {}
for row in protocol_dist:
protocol_data[row.protocol or "unknown"] = row.count
country_dist = await session.execute(
select(
Proxy.country_code,
func.count(Proxy.id).label("count"),
)
.where(Proxy.validation_status == "validated")
.where(Proxy.last_validated >= since_date)
.where(Proxy.country_code.isnot(None))
.group_by(Proxy.country_code)
.order_by(desc(func.count(Proxy.id)))
.limit(10)
)
country_data = []
for row in country_dist:
country_data.append(
{
"country_code": row.country_code,
"count": row.count,
}
)
return {
"period_days": days,
"validation_performance": {
"avg_latency_ms": round(val_row.avg_latency or 0, 2),
"min_latency_ms": val_row.min_latency or 0,
"max_latency_ms": val_row.max_latency or 0,
"total_validated": val_row.total_validated or 0,
},
"quality_distribution": quality_buckets,
"protocol_distribution": protocol_data,
"top_countries": country_data,
}
async def create_background_task(
self,
session: AsyncSession,
task_type: str,
task_data: Dict,
scheduled_for: Optional[datetime] = None,
retry_count: int = 0,
max_retries: int = 3,
) -> Dict:
"""Create a background task record."""
from app.db_models import BackgroundTask
task = BackgroundTask(
task_type=task_type,
task_data=task_data,
status="pending",
scheduled_for=scheduled_for or datetime.utcnow(),
retry_count=retry_count,
max_retries=max_retries,
)
session.add(task)
await session.commit()
await session.refresh(task)
return {
"task_id": task.id,
"task_type": task.task_type,
"status": task.status,
"scheduled_for": task.scheduled_for,
}
async def update_background_task(
self,
session: AsyncSession,
task_id: int,
status: str,
result: Optional[Dict] = None,
error_message: Optional[str] = None,
) -> bool:
"""Update background task status and results."""
from app.db_models import BackgroundTask
try:
task = await session.execute(
select(BackgroundTask).where(BackgroundTask.id == task_id)
)
task_obj = task.scalar_one_or_none()
if not task_obj:
return False
update_data = {
"status": status,
"completed_at": datetime.utcnow()
if status in ["completed", "failed"]
else None,
}
if result:
update_data["result"] = result
if error_message:
update_data["error_message"] = error_message
update_data["retry_count"] = task_obj.retry_count + 1
stmt = (
update(BackgroundTask)
.where(BackgroundTask.id == task_id)
.values(**update_data)
)
await session.execute(stmt)
await session.commit()
return True
except Exception as e:
logger.error(f"Error updating background task {task_id}: {e}")
await session.rollback()
return False
async def get_pending_background_tasks(
self,
session: AsyncSession,
task_type: Optional[str] = None,
limit: int = 50,
) -> List[Dict]:
"""Get pending background tasks for execution."""
from app.db_models import BackgroundTask
query = select(BackgroundTask).where(
and_(
BackgroundTask.status == "pending",
BackgroundTask.scheduled_for <= datetime.utcnow(),
)
)
if task_type:
query = query.where(BackgroundTask.task_type == task_type)
query = query.order_by(BackgroundTask.scheduled_for).limit(limit)
result = await session.execute(query)
tasks = result.scalars().all()
tasks_data = []
for task in tasks:
tasks_data.append(
{
"id": task.id,
"task_type": task.task_type,
"task_data": task.task_data,
"scheduled_for": task.scheduled_for,
"retry_count": task.retry_count,
"max_retries": task.max_retries,
}
)
return tasks_data
extended_db_storage = ExtendedDatabaseStorage()