File size: 6,761 Bytes
befd390 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | import asyncio
from datetime import datetime, timedelta
from app.database import AsyncSessionLocal, get_db
from app.db_storage import db_storage
from app.grabber import GitHubGrabber
from app.sources import SourceRegistry
import logging
logger = logging.getLogger(__name__)
async def background_scraper_worker(interval_minutes: int = 10):
"""Automatically scrape all enabled sources periodically"""
# Initial scrape on startup
await asyncio.sleep(10) # Wait 10 seconds for app to fully start
while True:
try:
async with AsyncSessionLocal() as session:
# Get all enabled sources from database
sources_db = await db_storage.get_sources(session, enabled_only=True)
if not sources_db:
logger.warning("β οΈ No enabled sources found")
else:
grabber = GitHubGrabber()
total_scraped = 0
total_added = 0
for source_db in sources_db:
try:
from app.models import SourceConfig, SourceType
# Create SourceConfig from database source
source = SourceConfig(
url=source_db.url, type=SourceType(source_db.type)
)
proxies = await grabber.extract_proxies(source)
proxies_data = []
for p in proxies:
data = (
p.model_dump()
if hasattr(p, "model_dump")
else p.__dict__
)
proxies_data.append(
{
"url": f"{data.get('protocol', 'http')}://{data.get('ip')}:{data.get('port')}",
"protocol": data.get("protocol", "http"),
"ip": data.get("ip"),
"port": data.get("port"),
"country_code": data.get("country_code"),
"country_name": data.get("country_name"),
"city": data.get("city"),
"latency_ms": data.get("latency_ms"),
"speed_mbps": data.get("speed_mbps"),
"anonymity": data.get("anonymity"),
"proxy_type": data.get("proxy_type"),
"source_id": source_db.id,
}
)
added = await db_storage.add_proxies(session, proxies_data)
total_scraped += len(proxies)
total_added += added
# Update source stats
source_db.total_scraped = (
source_db.total_scraped or 0
) + len(proxies)
source_db.last_scraped = datetime.utcnow()
logger.info(
f"β
Scraped {len(proxies)} proxies from {source_db.name} (added {added} new)"
)
except Exception as e:
logger.error(f"β Failed to scrape {source_db.url}: {e}")
continue
await session.commit()
logger.info(
f"β
Auto-scraping complete: {total_scraped} scraped, {total_added} new proxies added"
)
await asyncio.sleep(interval_minutes * 60)
except Exception as e:
logger.error(f"β οΈ Background scraper error: {e}")
await asyncio.sleep(300)
async def background_validation_worker(
batch_size: int = 50, interval_seconds: int = 60
):
"""Continuously validate pending proxies in the background"""
logger.info("π Background validation worker started")
while True:
try:
async with AsyncSessionLocal() as session:
result = await db_storage.validate_and_update_proxies(
session, limit=batch_size
)
if result["total"] > 0:
logger.info(
f"β
Validated {result['validated']} proxies, "
f"β {result['failed']} failed ({result['total']} total)"
)
await asyncio.sleep(interval_seconds)
except Exception as e:
logger.error(f"β οΈ Background validation error: {e}")
await asyncio.sleep(interval_seconds)
async def revalidate_old_proxies(hours: int = 24, batch_size: int = 20):
"""Revalidate proxies that haven't been checked in X hours"""
# Wait for initial surge to pass
await asyncio.sleep(60)
from sqlalchemy import select, or_
from app.db_models import Proxy
logger.info(f"π Revalidating proxies older than {hours} hours")
try:
async with AsyncSessionLocal() as session:
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
query = (
select(Proxy)
.where(
or_(
Proxy.last_validated < cutoff_time,
Proxy.last_validated.is_(None),
)
)
.limit(batch_size)
)
result = await session.execute(query)
old_proxies = result.scalars().all()
if not old_proxies:
logger.info("β
No old proxies to revalidate")
return
for proxy in old_proxies:
proxy.validation_status = "pending"
await session.commit()
validation_result = await db_storage.validate_and_update_proxies(
session, proxy_ids=[p.id for p in old_proxies]
)
logger.info(
f"β
Revalidated {validation_result['validated']} old proxies, "
f"β {validation_result['failed']} failed"
)
except Exception as e:
logger.error(f"β οΈ Revalidation error: {e}")
|