1proxy / app /background_validator.py
paijo77's picture
update app/background_validator.py
befd390 verified
import asyncio
from datetime import datetime, timedelta
from app.database import AsyncSessionLocal, get_db
from app.db_storage import db_storage
from app.grabber import GitHubGrabber
from app.sources import SourceRegistry
import logging
logger = logging.getLogger(__name__)
async def background_scraper_worker(interval_minutes: int = 10):
"""Automatically scrape all enabled sources periodically"""
# Initial scrape on startup
await asyncio.sleep(10) # Wait 10 seconds for app to fully start
while True:
try:
async with AsyncSessionLocal() as session:
# Get all enabled sources from database
sources_db = await db_storage.get_sources(session, enabled_only=True)
if not sources_db:
logger.warning("⚠️ No enabled sources found")
else:
grabber = GitHubGrabber()
total_scraped = 0
total_added = 0
for source_db in sources_db:
try:
from app.models import SourceConfig, SourceType
# Create SourceConfig from database source
source = SourceConfig(
url=source_db.url, type=SourceType(source_db.type)
)
proxies = await grabber.extract_proxies(source)
proxies_data = []
for p in proxies:
data = (
p.model_dump()
if hasattr(p, "model_dump")
else p.__dict__
)
proxies_data.append(
{
"url": f"{data.get('protocol', 'http')}://{data.get('ip')}:{data.get('port')}",
"protocol": data.get("protocol", "http"),
"ip": data.get("ip"),
"port": data.get("port"),
"country_code": data.get("country_code"),
"country_name": data.get("country_name"),
"city": data.get("city"),
"latency_ms": data.get("latency_ms"),
"speed_mbps": data.get("speed_mbps"),
"anonymity": data.get("anonymity"),
"proxy_type": data.get("proxy_type"),
"source_id": source_db.id,
}
)
added = await db_storage.add_proxies(session, proxies_data)
total_scraped += len(proxies)
total_added += added
# Update source stats
source_db.total_scraped = (
source_db.total_scraped or 0
) + len(proxies)
source_db.last_scraped = datetime.utcnow()
logger.info(
f"βœ… Scraped {len(proxies)} proxies from {source_db.name} (added {added} new)"
)
except Exception as e:
logger.error(f"❌ Failed to scrape {source_db.url}: {e}")
continue
await session.commit()
logger.info(
f"βœ… Auto-scraping complete: {total_scraped} scraped, {total_added} new proxies added"
)
await asyncio.sleep(interval_minutes * 60)
except Exception as e:
logger.error(f"⚠️ Background scraper error: {e}")
await asyncio.sleep(300)
async def background_validation_worker(
batch_size: int = 50, interval_seconds: int = 60
):
"""Continuously validate pending proxies in the background"""
logger.info("πŸ”„ Background validation worker started")
while True:
try:
async with AsyncSessionLocal() as session:
result = await db_storage.validate_and_update_proxies(
session, limit=batch_size
)
if result["total"] > 0:
logger.info(
f"βœ… Validated {result['validated']} proxies, "
f"❌ {result['failed']} failed ({result['total']} total)"
)
await asyncio.sleep(interval_seconds)
except Exception as e:
logger.error(f"⚠️ Background validation error: {e}")
await asyncio.sleep(interval_seconds)
async def revalidate_old_proxies(hours: int = 24, batch_size: int = 20):
"""Revalidate proxies that haven't been checked in X hours"""
# Wait for initial surge to pass
await asyncio.sleep(60)
from sqlalchemy import select, or_
from app.db_models import Proxy
logger.info(f"πŸ”„ Revalidating proxies older than {hours} hours")
try:
async with AsyncSessionLocal() as session:
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
query = (
select(Proxy)
.where(
or_(
Proxy.last_validated < cutoff_time,
Proxy.last_validated.is_(None),
)
)
.limit(batch_size)
)
result = await session.execute(query)
old_proxies = result.scalars().all()
if not old_proxies:
logger.info("βœ… No old proxies to revalidate")
return
for proxy in old_proxies:
proxy.validation_status = "pending"
await session.commit()
validation_result = await db_storage.validate_and_update_proxies(
session, proxy_ids=[p.id for p in old_proxies]
)
logger.info(
f"βœ… Revalidated {validation_result['validated']} old proxies, "
f"❌ {validation_result['failed']} failed"
)
except Exception as e:
logger.error(f"⚠️ Revalidation error: {e}")