| """
|
| Advanced Scheduling untuk Enhanced Scraping
|
|
|
| Implementasi jadwal otomatis untuk scraping otomatis
|
| dengan kontrol waktu, retry logic, dan resource optimization.
|
| """
|
|
|
| import asyncio
|
| import logging
|
| from typing import Dict, List, Any, Optional
|
| from datetime import datetime, time, timedelta
|
| from pydantic import BaseModel, Field, validator
|
| from enum import Enum
|
|
|
| from app.database import get_db
|
| from app.dependencies import require_admin
|
| from app.db_storage import db_storage
|
|
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class ScheduleType(str, Enum):
|
| """Jenis jadwal scraping"""
|
| HOURLY = "hourly"
|
| DAILY = "daily"
|
| WEEKLY = "weekly"
|
| MONTHLY = "monthly"
|
| CUSTOM = "custom"
|
| RUN_NOW = "run_now"
|
|
|
|
|
| class ScheduleEntry(BaseModel):
|
| """Entry jadwal scraping"""
|
| hour: int = Field(None, ge=0, le=23, description="Jam (0-23)")
|
| minute: int = Field(None, ge=0, le=59, description="Menit (0-59)")
|
| cron_expression: str = Field(None, description="Cron expression")
|
| timezone: str = Field("UTC", description="Timezone")
|
| enabled: bool = Field(True, description="Apakah jadwal aktif?")
|
| weekdays: List[str] = Field(default=["mon", "tue", "wed", "thu", "fri", "sat", "sun"], description="Hari aktif (default: Senin-Sabtu)")
|
| max_concurrent_tasks: int = Field(5, description="Maksimal concurrent tasks")
|
|
|
|
|
| class ScrapingSchedule(BaseModel):
|
| """Konfigurasi jadwal scraping"""
|
| enabled: bool = Field(False, description="Enable scheduling")
|
| default_timezone: str = Field("UTC", description="Default timezone")
|
| entries: List[ScheduleEntry] = Field(default_factory=list)
|
|
|
| def validate(self) -> List[str]:
|
| """Validasi jadwal"""
|
| errors = []
|
|
|
| if not self.enabled:
|
| return ["Scheduling tidak diaktifkan"]
|
|
|
| for entry in self.entries:
|
| if entry.cron_expression and not self._validate_cron_expression(entry.cron_expression):
|
| errors.append(f"Invalid cron expression: {entry.cron_expression}")
|
|
|
| if entry.hour and (entry.hour < 0 or entry.hour > 23):
|
| errors.append(f"Invalid hour: {entry.hour}. Harus 0-23")
|
|
|
| if entry.minute and (entry.minute < 0 or entry.minute > 59):
|
| errors.append(f"Invalid minute: {entry.minute}. Harus 0-59")
|
|
|
| if entry.weekdays and not all(day in entry.weekdays for day in entry.weekdays):
|
| errors.append(f"Invalid weekday: {entry.weekdays}. Harus: {entry.weekdays}")
|
|
|
| if entry.timezone and entry.timezone != "UTC":
|
| try:
|
| import pytz
|
| pytz.timezone(entry.timezone)
|
| except Exception:
|
| errors.append(f"Invalid timezone: {entry.timezone}")
|
|
|
| return errors if errors else None
|
|
|
|
|
| class ScheduledJob:
|
| """Tugas jadwal yang dijadwalkal"""
|
| def __init__(
|
| self,
|
| job_id: str,
|
| schedule: ScrapingScheduleEntry,
|
| source_config: Dict[str, Any],
|
| scraper_service,
|
| background_tasks
|
| ):
|
| self.job_id = job_id
|
| self.schedule = schedule
|
| self.source_config = source_config
|
| self.scraper_service = scraper_service
|
| self.background_tasks = background_tasks
|
| self.background_job = None
|
| self.last_run = None
|
| self.next_run = None
|
| self.run_count = 0
|
| self.is_running = False
|
|
|
| async def calculate_next_run(self) -> datetime:
|
| """Hitung waktu eksekusi berikutnya"""
|
| now = datetime.now()
|
|
|
| if self.schedule.schedule_type == ScheduleType.HOURLY:
|
|
|
| next_run = now.replace(hour=self.schedule.hour, minute=0, second=0, microsecond=0)
|
| while next_run <= now:
|
| next_run += timedelta(hours=1)
|
| elif self.schedule.schedule_type == ScheduleType.DAILY:
|
|
|
| next_run = (now + timedelta(days=1)).replace(hour=self.schedule.hour, minute=self.schedule.minute, second=0, microsecond=0)
|
| elif self.schedule.schedule_type == ScheduleType.WEEKLY:
|
|
|
| days_ahead = (7 - now.weekday()) % 7
|
| next_run = (now + timedelta(days=days_ahead)).replace(hour=self.schedule.hour, minute=self.schedule.minute, second=0, microsecond=0)
|
| elif self.schedule_type == ScheduleType.MONTHLY:
|
|
|
| next_run = (now + timedelta(days=30)).replace(day=1, hour=self.schedule.hour, minute=self.schedule.minute, second=0, microsecond=0)
|
| elif self.schedule_type == ScheduleType.RUN_NOW:
|
| next_run = now
|
|
|
| self.next_run = next_run
|
| return next_run
|
|
|
| async def is_time_to_run(self) -> bool:
|
| """Check if sudah waktunya menjalankan tugas"""
|
| now = datetime.now()
|
| return self.next_run <= now
|
|
|
| async def execute(self) -> Dict[str, Any]:
|
| """Eksekusi tugas jadwal"""
|
| if self.is_running:
|
| return {"status": "already_running", "message": "Task sedang berjalan"}
|
|
|
| try:
|
| self.is_running = True
|
| self.run_count += 1
|
| self.last_run = datetime.now()
|
|
|
|
|
| if self.job_id == "hunter_protocol":
|
| scraper_service = self.scraper_service
|
| job_func = scraper_service.run_hunter
|
| else:
|
| scraper_service = self.scraper_service
|
| job_func = scraper_service.run_scraping_session
|
|
|
|
|
| result = await job_func(self.source_config)
|
|
|
| self.background_job = result
|
|
|
| self.is_running = False
|
| self.last_run = datetime.now()
|
|
|
| logger.info(f"Jadwal {self.job_id} dijalankan: {self.run_count}x ke-{self.run_count}x")
|
|
|
| return {
|
| "status": "success",
|
| "result": result,
|
| "run_count": self.run_count,
|
| "last_run": self.last_run.isoformat()
|
| }
|
|
|
| except Exception as e:
|
| self.is_running = False
|
| logger.error(f"Error menjalankan jadwal {self.job_id}: {str(e)}")
|
| return {
|
| "status": "error",
|
| "error": str(e)
|
| }
|
|
|
|
|
| class ScrapingScheduler:
|
| """Manager untuk menjalankan semua tugas jadwal scraping"""
|
|
|
| def __init__(self):
|
| self.jobs: Dict[str, ScheduledJob] = {}
|
| self.background_tasks = BackgroundTasks()
|
| self.scheduler_enabled = False
|
|
|
| def add_job(self, job_id: str, schedule: ScrapingScheduleEntry, source_config: Dict[str, Any], scraper_service) -> ScheduledJob:
|
| """Tambah tugas jadwal baru"""
|
| job = ScheduledJob(
|
| job_id=job_id,
|
| schedule=schedule,
|
| source_config=source_config,
|
| scraper_service=scraper_service,
|
| background_tasks=self.background_tasks
|
| )
|
|
|
| self.jobs[job_id] = job
|
| logger.info(f"Tugas {job_id} ditambahkan ke jadwal")
|
|
|
|
|
| if not self.scheduler_enabled:
|
| await self.start_scheduler()
|
|
|
| return job
|
|
|
| def remove_job(self, job_id: str) -> bool:
|
| """Hapus tugas jadwal"""
|
| if job_id in self.jobs:
|
|
|
| if self.jobs[job_id].is_running:
|
| return False
|
|
|
|
|
| if self.jobs[job_id].background_job:
|
| self.background_tasks.cancel_task(self.jobs[job_id].background_job)
|
|
|
| del self.jobs[job_id]
|
|
|
| logger.info(f"Tugas {job_id} dihapus dari jadwal")
|
| return True
|
|
|
| return False
|
|
|
| async def start_scheduler(self):
|
| """Mulai penjadwalan tugas"""
|
| if self.scheduler_enabled:
|
| return {"status": "already_enabled"}
|
|
|
| self.scheduler_enabled = True
|
| logger.info("Scheduler started")
|
|
|
|
|
| while True:
|
| current_time = datetime.now()
|
|
|
| for job_id, job in list(self.jobs.items()):
|
| if job.is_time_to_run():
|
| task_id = self.background_tasks.add_task(
|
| job.execute(),
|
| name=f"scheduled-{job_id}",
|
| tags=[job_id, "scheduler"]
|
| )
|
| logger.info(f"Menjalankan tugas {job_id} pada {current_time.isoformat()}")
|
|
|
|
|
| await asyncio.sleep(60)
|
|
|
| async def get_status(self) -> Dict[str, Any]:
|
| """Dapatkan status semua tugas jadwal"""
|
| jobs_status = {}
|
|
|
| for job_id, job in self.jobs.items():
|
| jobs_status[job_id] = {
|
| "schedule_type": job.schedule.schedule_type.value if job.schedule else "manual",
|
| "next_run": job.next_run.isoformat(),
|
| "last_run": job.last_run.isoformat(),
|
| "is_running": job.is_running,
|
| "run_count": job.run_count,
|
| "enabled": job.enabled
|
| }
|
|
|
| return {
|
| "scheduler_enabled": self.scheduler_enabled,
|
| "jobs": jobs_status,
|
| "total_jobs": len(self.jobs)
|
| }
|
|
|
| async def get_job_status(self, job_id: str) -> Dict[str, Any]:
|
| """Dapatkan status tugas spesifik"""
|
| if job_id not in self.jobs:
|
| return {"status": "not_found", "message": f"Job {job_id} tidak ditemukan"}
|
|
|
| return self.jobs[job_id].get_status()
|
|
|
| async def run_job_now(self, job_id: str) -> Dict[str, Any]:
|
| """Jalankan tugas secara manual"""
|
| job = self.jobs[job_id]
|
| if job:
|
| return await job.execute()
|
|
|
| return {"status": "not_found", "message": f"Job {job_id} tidak ditemukan"}
|
|
|
|
|
|
|
| @router.on_event("startup")
|
| async def startup_event():
|
| """Inisialisasi scheduler pada startup"""
|
| scheduler = ScrapingScheduler()
|
| await scheduler.start_scheduler()
|
|
|
| logger.info("Scraping scheduler initialized")
|
|
|
|
|
|
|
| async def create_scraping_session(
|
| db: AsyncSession,
|
| session_id: str,
|
| start_time: datetime,
|
| end_time: Optional[datetime] = None,
|
| requests_made: int = 0,
|
| successful_requests: int = 0,
|
| failed_requests: int = 0,
|
| success_rate: float = 0.0,
|
| total_data_bytes: int = 0,
|
| avg_response_time: float = 0.0,
|
| proxies_used: List[str] = []
|
| ) -> str:
|
| """Buat sesi scraping baru di database"""
|
|
|
| try:
|
| session_record = await db_storage.create_scraping_session(
|
| db=db,
|
| session_id=session_id,
|
| start_time=start_time,
|
| end_time=end_time,
|
| requests_made=requests_made,
|
| successful_requests=successful_requests,
|
| failed_requests=failed_requests,
|
| success_rate=success_rate,
|
| total_data_bytes=total_data_bytes,
|
| proxies_used=proxies_used
|
| )
|
|
|
| return session_id
|
|
|
| except Exception as e:
|
| logger.error(f"Error creating scraping session: {str(e)}")
|
| raise HTTPException(status_code=500, detail=f"Error creating scraping session: {str(e)}")
|
|
|
|
|
| async def update_scraping_session(
|
| db: AsyncSession,
|
| session_id: str,
|
| end_time: datetime,
|
| requests_made: int,
|
| successful_requests: int,
|
| failed_requests: int,
|
| success_rate: float,
|
| total_data_bytes: int,
|
| proxies_used: List[str],
|
| ) -> str:
|
| """Update sesi scraping yang ada"""
|
|
|
| try:
|
| success = await db_storage.update_scraping_session(
|
| db=db,
|
| session_id=session_id,
|
| end_time=end_time,
|
| requests_made=requests_made,
|
| successful_requests=successful_requests,
|
| failed_requests=failed_requests,
|
| success_rate=success_rate,
|
| total_data_bytes=total_data_bytes,
|
| proxies_used=proxies_used
|
| proxies_tested=proxies_tested
|
| )
|
|
|
| return f"Sesi {session_id} berhasil diupdate"
|
|
|
| except Exception as e:
|
| logger.error(f"Error updating scraping session: {str(e)}")
|
| raise HTTPException(status_code=500, detail=f"Error updating scraping session: {str(e)}")
|
|
|
|
|
| async def validate_proxy_source(
|
| db: AsyncSession,
|
| source_id: int,
|
| background_tasks: BackgroundTasks,
|
| ) -> str:
|
| """Validasi dan test proxy source"""
|
|
|
| try:
|
| task_id = background_tasks.add_task(
|
| service.validate_proxy_source(db, source_id),
|
| name=f"validate-source-{source_id}"
|
| )
|
|
|
| return f"Validasi proxy source {source_id} dimulai"
|
|
|
| except Exception as e:
|
| logger.error(f"Error validating proxy source: {str(e)}")
|
| raise HTTPException(status_code=500, detail=f"Error validating proxy source: {str(e)}") |