| import asyncio
|
| from datetime import datetime, timedelta
|
| from app.database import AsyncSessionLocal, get_db
|
| from app.db_storage import db_storage
|
| from app.grabber import GitHubGrabber
|
| from app.sources import SourceRegistry
|
| import logging
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| async def background_scraper_worker(interval_minutes: int = 10):
|
| """Automatically scrape all enabled sources periodically"""
|
|
|
|
|
| await asyncio.sleep(10)
|
|
|
| while True:
|
| try:
|
| async with AsyncSessionLocal() as session:
|
|
|
| sources_db = await db_storage.get_sources(session, enabled_only=True)
|
|
|
| if not sources_db:
|
| logger.warning("β οΈ No enabled sources found")
|
| else:
|
| grabber = GitHubGrabber()
|
| total_scraped = 0
|
| total_added = 0
|
|
|
| for source_db in sources_db:
|
| try:
|
| from app.models import SourceConfig, SourceType
|
|
|
|
|
| source = SourceConfig(
|
| url=source_db.url, type=SourceType(source_db.type)
|
| )
|
|
|
| proxies = await grabber.extract_proxies(source)
|
|
|
| proxies_data = []
|
| for p in proxies:
|
| data = (
|
| p.model_dump()
|
| if hasattr(p, "model_dump")
|
| else p.__dict__
|
| )
|
| proxies_data.append(
|
| {
|
| "url": f"{data.get('protocol', 'http')}://{data.get('ip')}:{data.get('port')}",
|
| "protocol": data.get("protocol", "http"),
|
| "ip": data.get("ip"),
|
| "port": data.get("port"),
|
| "country_code": data.get("country_code"),
|
| "country_name": data.get("country_name"),
|
| "city": data.get("city"),
|
| "latency_ms": data.get("latency_ms"),
|
| "speed_mbps": data.get("speed_mbps"),
|
| "anonymity": data.get("anonymity"),
|
| "proxy_type": data.get("proxy_type"),
|
| "source_id": source_db.id,
|
| }
|
| )
|
|
|
| added = await db_storage.add_proxies(session, proxies_data)
|
| total_scraped += len(proxies)
|
| total_added += added
|
|
|
|
|
| source_db.total_scraped = (
|
| source_db.total_scraped or 0
|
| ) + len(proxies)
|
| source_db.last_scraped = datetime.utcnow()
|
|
|
| logger.info(
|
| f"β
Scraped {len(proxies)} proxies from {source_db.name} (added {added} new)"
|
| )
|
|
|
| except Exception as e:
|
| logger.error(f"β Failed to scrape {source_db.url}: {e}")
|
| continue
|
|
|
| await session.commit()
|
| logger.info(
|
| f"β
Auto-scraping complete: {total_scraped} scraped, {total_added} new proxies added"
|
| )
|
|
|
| await asyncio.sleep(interval_minutes * 60)
|
|
|
| except Exception as e:
|
| logger.error(f"β οΈ Background scraper error: {e}")
|
| await asyncio.sleep(300)
|
|
|
|
|
| async def background_validation_worker(
|
| batch_size: int = 50, interval_seconds: int = 60
|
| ):
|
| """Continuously validate pending proxies in the background"""
|
| logger.info("π Background validation worker started")
|
|
|
| while True:
|
| try:
|
| async with AsyncSessionLocal() as session:
|
| result = await db_storage.validate_and_update_proxies(
|
| session, limit=batch_size
|
| )
|
|
|
| if result["total"] > 0:
|
| logger.info(
|
| f"β
Validated {result['validated']} proxies, "
|
| f"β {result['failed']} failed ({result['total']} total)"
|
| )
|
|
|
| await asyncio.sleep(interval_seconds)
|
|
|
| except Exception as e:
|
| logger.error(f"β οΈ Background validation error: {e}")
|
| await asyncio.sleep(interval_seconds)
|
|
|
|
|
| async def revalidate_old_proxies(hours: int = 24, batch_size: int = 20):
|
| """Revalidate proxies that haven't been checked in X hours"""
|
|
|
| await asyncio.sleep(60)
|
|
|
| from sqlalchemy import select, or_
|
| from app.db_models import Proxy
|
|
|
| logger.info(f"π Revalidating proxies older than {hours} hours")
|
|
|
| try:
|
| async with AsyncSessionLocal() as session:
|
| cutoff_time = datetime.utcnow() - timedelta(hours=hours)
|
|
|
| query = (
|
| select(Proxy)
|
| .where(
|
| or_(
|
| Proxy.last_validated < cutoff_time,
|
| Proxy.last_validated.is_(None),
|
| )
|
| )
|
| .limit(batch_size)
|
| )
|
|
|
| result = await session.execute(query)
|
| old_proxies = result.scalars().all()
|
|
|
| if not old_proxies:
|
| logger.info("β
No old proxies to revalidate")
|
| return
|
|
|
| for proxy in old_proxies:
|
| proxy.validation_status = "pending"
|
|
|
| await session.commit()
|
|
|
| validation_result = await db_storage.validate_and_update_proxies(
|
| session, proxy_ids=[p.id for p in old_proxies]
|
| )
|
|
|
| logger.info(
|
| f"β
Revalidated {validation_result['validated']} old proxies, "
|
| f"β {validation_result['failed']} failed"
|
| )
|
|
|
| except Exception as e:
|
| logger.error(f"β οΈ Revalidation error: {e}")
|
|
|