File size: 6,761 Bytes
befd390
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import asyncio
from datetime import datetime, timedelta
from app.database import AsyncSessionLocal, get_db
from app.db_storage import db_storage
from app.grabber import GitHubGrabber
from app.sources import SourceRegistry
import logging

logger = logging.getLogger(__name__)


async def background_scraper_worker(interval_minutes: int = 10):
    """Automatically scrape all enabled sources periodically"""

    # Initial scrape on startup
    await asyncio.sleep(10)  # Wait 10 seconds for app to fully start

    while True:
        try:
            async with AsyncSessionLocal() as session:
                # Get all enabled sources from database
                sources_db = await db_storage.get_sources(session, enabled_only=True)

                if not sources_db:
                    logger.warning("⚠️  No enabled sources found")
                else:
                    grabber = GitHubGrabber()
                    total_scraped = 0
                    total_added = 0

                    for source_db in sources_db:
                        try:
                            from app.models import SourceConfig, SourceType

                            # Create SourceConfig from database source
                            source = SourceConfig(
                                url=source_db.url, type=SourceType(source_db.type)
                            )

                            proxies = await grabber.extract_proxies(source)

                            proxies_data = []
                            for p in proxies:
                                data = (
                                    p.model_dump()
                                    if hasattr(p, "model_dump")
                                    else p.__dict__
                                )
                                proxies_data.append(
                                    {
                                        "url": f"{data.get('protocol', 'http')}://{data.get('ip')}:{data.get('port')}",
                                        "protocol": data.get("protocol", "http"),
                                        "ip": data.get("ip"),
                                        "port": data.get("port"),
                                        "country_code": data.get("country_code"),
                                        "country_name": data.get("country_name"),
                                        "city": data.get("city"),
                                        "latency_ms": data.get("latency_ms"),
                                        "speed_mbps": data.get("speed_mbps"),
                                        "anonymity": data.get("anonymity"),
                                        "proxy_type": data.get("proxy_type"),
                                        "source_id": source_db.id,
                                    }
                                )

                            added = await db_storage.add_proxies(session, proxies_data)
                            total_scraped += len(proxies)
                            total_added += added

                            # Update source stats
                            source_db.total_scraped = (
                                source_db.total_scraped or 0
                            ) + len(proxies)
                            source_db.last_scraped = datetime.utcnow()

                            logger.info(
                                f"βœ… Scraped {len(proxies)} proxies from {source_db.name} (added {added} new)"
                            )

                        except Exception as e:
                            logger.error(f"❌ Failed to scrape {source_db.url}: {e}")
                            continue

                    await session.commit()
                    logger.info(
                        f"βœ… Auto-scraping complete: {total_scraped} scraped, {total_added} new proxies added"
                    )

            await asyncio.sleep(interval_minutes * 60)

        except Exception as e:
            logger.error(f"⚠️  Background scraper error: {e}")
            await asyncio.sleep(300)


async def background_validation_worker(

    batch_size: int = 50, interval_seconds: int = 60

):
    """Continuously validate pending proxies in the background"""
    logger.info("πŸ”„ Background validation worker started")

    while True:
        try:
            async with AsyncSessionLocal() as session:
                result = await db_storage.validate_and_update_proxies(
                    session, limit=batch_size
                )

                if result["total"] > 0:
                    logger.info(
                        f"βœ… Validated {result['validated']} proxies, "
                        f"❌ {result['failed']} failed ({result['total']} total)"
                    )

                await asyncio.sleep(interval_seconds)

        except Exception as e:
            logger.error(f"⚠️  Background validation error: {e}")
            await asyncio.sleep(interval_seconds)


async def revalidate_old_proxies(hours: int = 24, batch_size: int = 20):
    """Revalidate proxies that haven't been checked in X hours"""
    # Wait for initial surge to pass
    await asyncio.sleep(60)

    from sqlalchemy import select, or_
    from app.db_models import Proxy

    logger.info(f"πŸ”„ Revalidating proxies older than {hours} hours")

    try:
        async with AsyncSessionLocal() as session:
            cutoff_time = datetime.utcnow() - timedelta(hours=hours)

            query = (
                select(Proxy)
                .where(
                    or_(
                        Proxy.last_validated < cutoff_time,
                        Proxy.last_validated.is_(None),
                    )
                )
                .limit(batch_size)
            )

            result = await session.execute(query)
            old_proxies = result.scalars().all()

            if not old_proxies:
                logger.info("βœ… No old proxies to revalidate")
                return

            for proxy in old_proxies:
                proxy.validation_status = "pending"

            await session.commit()

            validation_result = await db_storage.validate_and_update_proxies(
                session, proxy_ids=[p.id for p in old_proxies]
            )

            logger.info(
                f"βœ… Revalidated {validation_result['validated']} old proxies, "
                f"❌ {validation_result['failed']} failed"
            )

    except Exception as e:
        logger.error(f"⚠️  Revalidation error: {e}")