paijo77 commited on
Commit
befd390
Β·
verified Β·
1 Parent(s): 098e7b6

update app/background_validator.py

Browse files
Files changed (1) hide show
  1. app/background_validator.py +169 -0
app/background_validator.py ADDED
@@ -0,0 +1,169 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from datetime import datetime, timedelta
3
+ from app.database import AsyncSessionLocal, get_db
4
+ from app.db_storage import db_storage
5
+ from app.grabber import GitHubGrabber
6
+ from app.sources import SourceRegistry
7
+ import logging
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ async def background_scraper_worker(interval_minutes: int = 10):
13
+ """Automatically scrape all enabled sources periodically"""
14
+
15
+ # Initial scrape on startup
16
+ await asyncio.sleep(10) # Wait 10 seconds for app to fully start
17
+
18
+ while True:
19
+ try:
20
+ async with AsyncSessionLocal() as session:
21
+ # Get all enabled sources from database
22
+ sources_db = await db_storage.get_sources(session, enabled_only=True)
23
+
24
+ if not sources_db:
25
+ logger.warning("⚠️ No enabled sources found")
26
+ else:
27
+ grabber = GitHubGrabber()
28
+ total_scraped = 0
29
+ total_added = 0
30
+
31
+ for source_db in sources_db:
32
+ try:
33
+ from app.models import SourceConfig, SourceType
34
+
35
+ # Create SourceConfig from database source
36
+ source = SourceConfig(
37
+ url=source_db.url, type=SourceType(source_db.type)
38
+ )
39
+
40
+ proxies = await grabber.extract_proxies(source)
41
+
42
+ proxies_data = []
43
+ for p in proxies:
44
+ data = (
45
+ p.model_dump()
46
+ if hasattr(p, "model_dump")
47
+ else p.__dict__
48
+ )
49
+ proxies_data.append(
50
+ {
51
+ "url": f"{data.get('protocol', 'http')}://{data.get('ip')}:{data.get('port')}",
52
+ "protocol": data.get("protocol", "http"),
53
+ "ip": data.get("ip"),
54
+ "port": data.get("port"),
55
+ "country_code": data.get("country_code"),
56
+ "country_name": data.get("country_name"),
57
+ "city": data.get("city"),
58
+ "latency_ms": data.get("latency_ms"),
59
+ "speed_mbps": data.get("speed_mbps"),
60
+ "anonymity": data.get("anonymity"),
61
+ "proxy_type": data.get("proxy_type"),
62
+ "source_id": source_db.id,
63
+ }
64
+ )
65
+
66
+ added = await db_storage.add_proxies(session, proxies_data)
67
+ total_scraped += len(proxies)
68
+ total_added += added
69
+
70
+ # Update source stats
71
+ source_db.total_scraped = (
72
+ source_db.total_scraped or 0
73
+ ) + len(proxies)
74
+ source_db.last_scraped = datetime.utcnow()
75
+
76
+ logger.info(
77
+ f"βœ… Scraped {len(proxies)} proxies from {source_db.name} (added {added} new)"
78
+ )
79
+
80
+ except Exception as e:
81
+ logger.error(f"❌ Failed to scrape {source_db.url}: {e}")
82
+ continue
83
+
84
+ await session.commit()
85
+ logger.info(
86
+ f"βœ… Auto-scraping complete: {total_scraped} scraped, {total_added} new proxies added"
87
+ )
88
+
89
+ await asyncio.sleep(interval_minutes * 60)
90
+
91
+ except Exception as e:
92
+ logger.error(f"⚠️ Background scraper error: {e}")
93
+ await asyncio.sleep(300)
94
+
95
+
96
+ async def background_validation_worker(
97
+ batch_size: int = 50, interval_seconds: int = 60
98
+ ):
99
+ """Continuously validate pending proxies in the background"""
100
+ logger.info("πŸ”„ Background validation worker started")
101
+
102
+ while True:
103
+ try:
104
+ async with AsyncSessionLocal() as session:
105
+ result = await db_storage.validate_and_update_proxies(
106
+ session, limit=batch_size
107
+ )
108
+
109
+ if result["total"] > 0:
110
+ logger.info(
111
+ f"βœ… Validated {result['validated']} proxies, "
112
+ f"❌ {result['failed']} failed ({result['total']} total)"
113
+ )
114
+
115
+ await asyncio.sleep(interval_seconds)
116
+
117
+ except Exception as e:
118
+ logger.error(f"⚠️ Background validation error: {e}")
119
+ await asyncio.sleep(interval_seconds)
120
+
121
+
122
+ async def revalidate_old_proxies(hours: int = 24, batch_size: int = 20):
123
+ """Revalidate proxies that haven't been checked in X hours"""
124
+ # Wait for initial surge to pass
125
+ await asyncio.sleep(60)
126
+
127
+ from sqlalchemy import select, or_
128
+ from app.db_models import Proxy
129
+
130
+ logger.info(f"πŸ”„ Revalidating proxies older than {hours} hours")
131
+
132
+ try:
133
+ async with AsyncSessionLocal() as session:
134
+ cutoff_time = datetime.utcnow() - timedelta(hours=hours)
135
+
136
+ query = (
137
+ select(Proxy)
138
+ .where(
139
+ or_(
140
+ Proxy.last_validated < cutoff_time,
141
+ Proxy.last_validated.is_(None),
142
+ )
143
+ )
144
+ .limit(batch_size)
145
+ )
146
+
147
+ result = await session.execute(query)
148
+ old_proxies = result.scalars().all()
149
+
150
+ if not old_proxies:
151
+ logger.info("βœ… No old proxies to revalidate")
152
+ return
153
+
154
+ for proxy in old_proxies:
155
+ proxy.validation_status = "pending"
156
+
157
+ await session.commit()
158
+
159
+ validation_result = await db_storage.validate_and_update_proxies(
160
+ session, proxy_ids=[p.id for p in old_proxies]
161
+ )
162
+
163
+ logger.info(
164
+ f"βœ… Revalidated {validation_result['validated']} old proxies, "
165
+ f"❌ {validation_result['failed']} failed"
166
+ )
167
+
168
+ except Exception as e:
169
+ logger.error(f"⚠️ Revalidation error: {e}")