Franko Fišter commited on
Commit
021b7e0
·
1 Parent(s): 2179f8e

Added scheduling to data collection script

Browse files
api/main.py CHANGED
@@ -1,6 +1,14 @@
1
  from fastapi import FastAPI
2
  from fastapi.middleware.cors import CORSMiddleware
3
- from config.settings import API_HOST, API_PORT
 
 
 
 
 
 
 
 
4
  from api.product_routes import router as product_router
5
  from api.receipt_routes import router as receipt_router
6
  from api.scrape_routes import router as scrape_router
@@ -8,8 +16,52 @@ from api.cijene_routes import router as cijene_router
8
  from api.similarity_routes import router as similarity_router
9
  from api.geocoding_routes import router as geocoding_router
10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  # Initialize FastAPI
12
- app = FastAPI(title="SupaKuna API")
13
 
14
  # CORS configuration
15
  app.add_middleware(
 
1
  from fastapi import FastAPI
2
  from fastapi.middleware.cors import CORSMiddleware
3
+
4
+ # Scheduler imports
5
+ from contextlib import asynccontextmanager
6
+ from scheduler.scheduler import scheduler
7
+ from scheduler.task_registry import task_registry
8
+ from scheduler.tasks.cijene_task import CijeneDataFetchTask
9
+ from scheduler.tasks.cleanup_task import DatabaseCleanupTask
10
+
11
+ # Routes imports
12
  from api.product_routes import router as product_router
13
  from api.receipt_routes import router as receipt_router
14
  from api.scrape_routes import router as scrape_router
 
16
  from api.similarity_routes import router as similarity_router
17
  from api.geocoding_routes import router as geocoding_router
18
 
19
+ @asynccontextmanager
20
+ async def lifespan(app: FastAPI):
21
+ # Register task classes
22
+ task_registry.register_task_class("cijene_fetch", CijeneDataFetchTask)
23
+ task_registry.register_task_class("database_cleanup", DatabaseCleanupTask)
24
+
25
+ # Create task instances
26
+ fetch_task = task_registry.create_task(
27
+ "cijene_fetch",
28
+ "daily_cijene_fetch",
29
+ name="Daily Cijene Data Fetch"
30
+ )
31
+
32
+ cleanup_task = task_registry.create_task(
33
+ "database_cleanup",
34
+ "daily_cleanup",
35
+ name="Daily Database Cleanup",
36
+ dependencies=["daily_cijene_fetch"] # Run after fetch
37
+ )
38
+
39
+ # Schedule the job chain
40
+ scheduler.add_daily_job(
41
+ "daily_data_pipeline",
42
+ ["daily_cijene_fetch", "daily_cleanup"],
43
+ hour=16, minute=58
44
+ )
45
+
46
+ # Add event handlers
47
+ async def job_completed_handler(job_id: str, results: dict):
48
+ print(f"✅ Job {job_id} completed successfully!")
49
+
50
+ async def job_failed_handler(job_id: str, results: dict):
51
+ print(f"❌ Job {job_id} failed!")
52
+
53
+ scheduler.register_event_handler('job_completed', job_completed_handler)
54
+ scheduler.register_event_handler('job_failed', job_failed_handler)
55
+
56
+ # Start scheduler
57
+ scheduler.start()
58
+ yield
59
+
60
+ # Shutdown
61
+ scheduler.stop()
62
+
63
  # Initialize FastAPI
64
+ app = FastAPI(title="SupaKuna API", lifespan=lifespan)
65
 
66
  # CORS configuration
67
  app.add_middleware(
api/scheduler_routes.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException
2
+ from fastapi.responses import JSONResponse
3
+ from scheduler.scheduler import scheduler
4
+ from scheduler.task_registry import task_registry
5
+ from datetime import datetime, timedelta
6
+
7
+ router = APIRouter(prefix="/scheduler", tags=["Scheduler Management"])
8
+
9
+ @router.get("/status")
10
+ async def get_scheduler_status():
11
+ """Get current scheduler status"""
12
+ return scheduler.get_job_status()
13
+
14
+ @router.post("/run-job/{job_id}")
15
+ async def run_job_now(job_id: str):
16
+ """Manually trigger a job"""
17
+ job_config = scheduler.job_configs.get(job_id)
18
+ if not job_config:
19
+ raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
20
+
21
+ # Run job immediately
22
+ await scheduler._execute_job(
23
+ job_id,
24
+ job_config['task_ids'],
25
+ job_config['context']
26
+ )
27
+
28
+ return {"message": f"Job {job_id} executed successfully"}
app.py CHANGED
@@ -1,4 +1,3 @@
1
- from api.main import app
2
  import uvicorn
3
  from config.settings import API_HOST, API_PORT
4
 
 
 
1
  import uvicorn
2
  from config.settings import API_HOST, API_PORT
3
 
requirements.txt CHANGED
@@ -13,6 +13,7 @@ supabase
13
  rembg
14
  httpx
15
  unidecode
 
16
  # Similarity Engine Dependencies
17
  requests
18
  python-dateutil
 
13
  rembg
14
  httpx
15
  unidecode
16
+ apscheduler
17
  # Similarity Engine Dependencies
18
  requests
19
  python-dateutil
scheduler/scheduler.py ADDED
@@ -0,0 +1,149 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from apscheduler.schedulers.asyncio import AsyncIOScheduler
2
+ from apscheduler.triggers.cron import CronTrigger
3
+ from apscheduler.triggers.date import DateTrigger
4
+ from apscheduler.triggers.interval import IntervalTrigger
5
+ from typing import Dict, Any, List, Callable, Optional
6
+ from datetime import datetime
7
+ import logging
8
+ from scheduler.task_registry import TaskRegistry, task_registry
9
+ from scheduler.task_executor import TaskExecutor
10
+ from scheduler.tasks.base_task import TaskStatus
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+ class ModularScheduler:
15
+ def __init__(self, registry: TaskRegistry = None):
16
+ self.scheduler = AsyncIOScheduler()
17
+ self.registry = registry or task_registry
18
+ self.executor = TaskExecutor(self.registry)
19
+ self.job_configs: Dict[str, Dict] = {}
20
+ self.event_handlers: Dict[str, List[Callable]] = {}
21
+
22
+ def add_daily_job(self, job_id: str, task_ids: List[str],
23
+ hour: int = 2, minute: int = 0,
24
+ context: Dict[str, Any] = None):
25
+ """Add a daily scheduled job"""
26
+ self.scheduler.add_job(
27
+ self._execute_job,
28
+ CronTrigger(hour=hour, minute=minute),
29
+ args=[job_id, task_ids, context],
30
+ id=job_id,
31
+ replace_existing=True
32
+ )
33
+
34
+ self.job_configs[job_id] = {
35
+ 'task_ids': task_ids,
36
+ 'schedule': f"Daily at {hour:02d}:{minute:02d}",
37
+ 'context': context or {}
38
+ }
39
+
40
+ logger.info(f"Added daily job '{job_id}' scheduled for {hour:02d}:{minute:02d}")
41
+
42
+ def add_interval_job(self, job_id: str, task_ids: List[str],
43
+ minutes: int = None, hours: int = None,
44
+ context: Dict[str, Any] = None):
45
+ """Add an interval-based job"""
46
+ self.scheduler.add_job(
47
+ self._execute_job,
48
+ IntervalTrigger(minutes=minutes, hours=hours),
49
+ args=[job_id, task_ids, context],
50
+ id=job_id,
51
+ replace_existing=True
52
+ )
53
+
54
+ self.job_configs[job_id] = {
55
+ 'task_ids': task_ids,
56
+ 'schedule': f"Every {hours or 0}h {minutes or 0}m",
57
+ 'context': context or {}
58
+ }
59
+
60
+ logger.info(f"Added interval job '{job_id}' every {hours or 0}h {minutes or 0}m")
61
+
62
+ def add_one_time_job(self, job_id: str, task_ids: List[str],
63
+ run_date: datetime, context: Dict[str, Any] = None):
64
+ """Add a one-time job"""
65
+ self.scheduler.add_job(
66
+ self._execute_job,
67
+ DateTrigger(run_date=run_date),
68
+ args=[job_id, task_ids, context],
69
+ id=job_id,
70
+ replace_existing=True
71
+ )
72
+
73
+ self.job_configs[job_id] = {
74
+ 'task_ids': task_ids,
75
+ 'schedule': f"One-time at {run_date}",
76
+ 'context': context or {}
77
+ }
78
+
79
+ logger.info(f"Added one-time job '{job_id}' for {run_date}")
80
+
81
+ def register_event_handler(self, event: str, handler: Callable):
82
+ """Register event handler for job completion/failure"""
83
+ if event not in self.event_handlers:
84
+ self.event_handlers[event] = []
85
+ self.event_handlers[event].append(handler)
86
+
87
+ async def _execute_job(self, job_id: str, task_ids: List[str],
88
+ context: Dict[str, Any] = None):
89
+ """Execute job with task chain"""
90
+ try:
91
+ logger.info(f"Starting scheduled job: {job_id}")
92
+ start_time = datetime.now()
93
+
94
+ results = await self.executor.execute_task_chain(task_ids, context)
95
+
96
+ end_time = datetime.now()
97
+ duration = (end_time - start_time).total_seconds()
98
+
99
+ # Determine overall job status
100
+ failed_tasks = [task_id for task_id, result in results.items()
101
+ if result.status.value in ['failed']]
102
+
103
+ if failed_tasks:
104
+ logger.error(f"Job {job_id} completed with failures in {duration:.2f}s: {failed_tasks}")
105
+ await self._trigger_event('job_failed', job_id, results)
106
+ else:
107
+ logger.info(f"Job {job_id} completed successfully in {duration:.2f}s")
108
+ await self._trigger_event('job_completed', job_id, results)
109
+
110
+ # Clean up completed tasks
111
+ self.registry.clear_completed_tasks()
112
+
113
+ except Exception as e:
114
+ logger.error(f"Critical error in job {job_id}: {str(e)}")
115
+ await self._trigger_event('job_error', job_id, {'error': str(e)})
116
+
117
+ async def _trigger_event(self, event: str, job_id: str, data: Any):
118
+ """Trigger registered event handlers"""
119
+ if event in self.event_handlers:
120
+ for handler in self.event_handlers[event]:
121
+ try:
122
+ if asyncio.iscoroutinefunction(handler):
123
+ await handler(job_id, data)
124
+ else:
125
+ handler(job_id, data)
126
+ except Exception as e:
127
+ logger.error(f"Error in event handler for {event}: {str(e)}")
128
+
129
+ def start(self):
130
+ """Start the scheduler"""
131
+ self.scheduler.start()
132
+ logger.info("Modular scheduler started")
133
+ logger.info(f"Active jobs: {list(self.job_configs.keys())}")
134
+
135
+ def stop(self):
136
+ """Stop the scheduler"""
137
+ self.scheduler.shutdown()
138
+ logger.info("Scheduler stopped")
139
+
140
+ def get_job_status(self) -> Dict[str, Any]:
141
+ """Get status of all jobs"""
142
+ return {
143
+ 'running': self.scheduler.running,
144
+ 'jobs': self.job_configs,
145
+ 'active_tasks': len(self.registry.get_all_tasks())
146
+ }
147
+
148
+ # Global scheduler instance
149
+ scheduler = ModularScheduler()
scheduler/task_executor.py ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Dict, Any, Optional
2
+ from scheduler.tasks.base_task import BaseTask, TaskStatus, TaskResult
3
+ from scheduler.task_registry import TaskRegistry
4
+ import asyncio
5
+ import logging
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+ class TaskExecutor:
10
+ def __init__(self, registry: TaskRegistry):
11
+ self.registry = registry
12
+ self.execution_context: Dict[str, Any] = {}
13
+
14
+ async def execute_task_chain(self, task_ids: List[str],
15
+ context: Dict[str, Any] = None) -> Dict[str, TaskResult]:
16
+ """Execute tasks in dependency order"""
17
+ if context:
18
+ self.execution_context.update(context)
19
+
20
+ tasks = [self.registry.get_task(task_id) for task_id in task_ids]
21
+ tasks = [task for task in tasks if task is not None]
22
+
23
+ if not tasks:
24
+ logger.warning("No valid tasks found to execute")
25
+ return {}
26
+
27
+ # Sort tasks by dependencies (topological sort)
28
+ sorted_tasks = self._topological_sort(tasks)
29
+ completed_task_ids = []
30
+ results = {}
31
+
32
+ logger.info(f"Starting execution chain with {len(sorted_tasks)} tasks")
33
+
34
+ for task in sorted_tasks:
35
+ if task.can_execute(completed_task_ids):
36
+ # Add results from previous tasks to context
37
+ task_context = self.execution_context.copy()
38
+ task_context['previous_results'] = results
39
+
40
+ result = await task.run(task_context)
41
+ results[task.task_id] = result
42
+
43
+ if result.status == TaskStatus.COMPLETED:
44
+ completed_task_ids.append(task.task_id)
45
+ # Add task result data to global context for next tasks
46
+ if result.data:
47
+ self.execution_context.update(result.data)
48
+ elif result.status == TaskStatus.FAILED:
49
+ logger.error(f"Task chain stopped due to failure in task: {task.task_id}")
50
+ break
51
+ else:
52
+ logger.warning(f"Skipping task {task.task_id} - dependencies not met")
53
+ results[task.task_id] = TaskResult(TaskStatus.SKIPPED,
54
+ error="Dependencies not satisfied")
55
+
56
+ logger.info(f"Task chain execution completed. {len(completed_task_ids)} tasks succeeded")
57
+ return results
58
+
59
+ def _topological_sort(self, tasks: List[BaseTask]) -> List[BaseTask]:
60
+ """Sort tasks based on dependencies using topological sort"""
61
+ task_dict = {task.task_id: task for task in tasks}
62
+ visited = set()
63
+ temp_visited = set()
64
+ result = []
65
+
66
+ def visit(task_id: str):
67
+ if task_id in temp_visited:
68
+ raise ValueError(f"Circular dependency detected involving task: {task_id}")
69
+ if task_id in visited:
70
+ return
71
+
72
+ temp_visited.add(task_id)
73
+ task = task_dict.get(task_id)
74
+ if task:
75
+ for dep_id in task.dependencies:
76
+ if dep_id in task_dict:
77
+ visit(dep_id)
78
+ visited.add(task_id)
79
+ result.append(task)
80
+ temp_visited.remove(task_id)
81
+
82
+ for task in tasks:
83
+ if task.task_id not in visited:
84
+ visit(task.task_id)
85
+
86
+ return result
scheduler/task_registry.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, Type, List
2
+ from scheduler.tasks.base_task import BaseTask
3
+ import logging
4
+
5
+ logger = logging.getLogger(__name__)
6
+
7
+ class TaskRegistry:
8
+ def __init__(self):
9
+ self._task_classes: Dict[str, Type[BaseTask]] = {}
10
+ self._task_instances: Dict[str, BaseTask] = {}
11
+
12
+ def register_task_class(self, task_type: str, task_class: Type[BaseTask]):
13
+ """Register a task class by type"""
14
+ self._task_classes[task_type] = task_class
15
+ logger.debug(f"Registered task class: {task_type}")
16
+
17
+ def create_task(self, task_type: str, task_id: str, **kwargs) -> BaseTask:
18
+ """Create a task instance from registered class"""
19
+ if task_type not in self._task_classes:
20
+ raise ValueError(f"Task type '{task_type}' not registered")
21
+
22
+ task_class = self._task_classes[task_type]
23
+ task_instance = task_class(task_id=task_id, **kwargs)
24
+ self._task_instances[task_id] = task_instance
25
+
26
+ logger.debug(f"Created task instance: {task_id} of type {task_type}")
27
+ return task_instance
28
+
29
+ def get_task(self, task_id: str) -> BaseTask:
30
+ """Get task instance by ID"""
31
+ return self._task_instances.get(task_id)
32
+
33
+ def get_all_tasks(self) -> List[BaseTask]:
34
+ """Get all task instances"""
35
+ return list(self._task_instances.values())
36
+
37
+ def clear_completed_tasks(self):
38
+ """Remove completed tasks from registry"""
39
+ completed_tasks = [
40
+ task_id for task_id, task in self._task_instances.items()
41
+ if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]
42
+ ]
43
+ for task_id in completed_tasks:
44
+ del self._task_instances[task_id]
45
+ logger.info(f"Cleared {len(completed_tasks)} completed tasks")
46
+
47
+ # Global registry instance
48
+ task_registry = TaskRegistry()
scheduler/tasks/base_task.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from abc import ABC, abstractmethod
2
+ from typing import Dict, Any, List, Optional
3
+ from enum import Enum
4
+ import logging
5
+ from datetime import datetime
6
+ import asyncio
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+ class TaskStatus(Enum):
11
+ PENDING = "pending"
12
+ RUNNING = "running"
13
+ COMPLETED = "completed"
14
+ FAILED = "failed"
15
+ SKIPPED = "skipped"
16
+
17
+ class TaskResult:
18
+ def __init__(self, status: TaskStatus, data: Dict[Any, Any] = None, error: str = None):
19
+ self.status = status
20
+ self.data = data or {}
21
+ self.error = error
22
+ self.timestamp = datetime.now()
23
+
24
+ class BaseTask(ABC):
25
+ def __init__(self, task_id: str, name: str, dependencies: List[str] = None):
26
+ self.task_id = task_id
27
+ self.name = name
28
+ self.dependencies = dependencies or []
29
+ self.status = TaskStatus.PENDING
30
+ self.result: Optional[TaskResult] = None
31
+ self.created_at = datetime.now()
32
+ self.started_at: Optional[datetime] = None
33
+ self.completed_at: Optional[datetime] = None
34
+
35
+ @abstractmethod
36
+ async def execute(self, context: Dict[str, Any] = None) -> TaskResult:
37
+ """Execute the task and return result"""
38
+ pass
39
+
40
+ def can_execute(self, completed_tasks: List[str]) -> bool:
41
+ """Check if all dependencies are satisfied"""
42
+ return all(dep in completed_tasks for dep in self.dependencies)
43
+
44
+ async def run(self, context: Dict[str, Any] = None) -> TaskResult:
45
+ """Wrapper method that handles status tracking"""
46
+ try:
47
+ self.status = TaskStatus.RUNNING
48
+ self.started_at = datetime.now()
49
+ logger.info(f"Starting task: {self.name} ({self.task_id})")
50
+
51
+ self.result = await self.execute(context or {})
52
+ self.status = self.result.status
53
+ self.completed_at = datetime.now()
54
+
55
+ duration = (self.completed_at - self.started_at).total_seconds()
56
+ logger.info(f"Task {self.name} completed with status {self.status.value} in {duration:.2f}s")
57
+
58
+ return self.result
59
+
60
+ except Exception as e:
61
+ self.status = TaskStatus.FAILED
62
+ self.completed_at = datetime.now()
63
+ self.result = TaskResult(TaskStatus.FAILED, error=str(e))
64
+ logger.error(f"Task {self.name} failed: {str(e)}")
65
+ return self.result
scheduler/tasks/cijene_task.py ADDED
@@ -0,0 +1,226 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from scheduler.tasks.base_task import BaseTask, TaskResult, TaskStatus
2
+ from typing import Dict, Any, List
3
+ import httpx
4
+ import asyncio
5
+ from datetime import datetime
6
+ from db.cijene_repository import CijeneRepository
7
+ import logging
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+ class CijeneDataFetchTask(BaseTask):
12
+ def __init__(self, task_id: str, name: str = "Cijene Data Fetch",
13
+ dependencies: list = None, brands_limit: int = None):
14
+ super().__init__(task_id, name, dependencies)
15
+ self.repository = CijeneRepository()
16
+ self.brands_limit = brands_limit
17
+
18
+ async def execute(self, context: Dict[str, Any] = None) -> TaskResult:
19
+ """Execute the Cijene data fetching process using existing logic"""
20
+ try:
21
+ logger.info("Starting Cijene data fetch process")
22
+
23
+ # Get all brands and store chains
24
+ brands = self.repository.get_all_brands()
25
+ store_chains = self.repository.get_all_store_chains()
26
+
27
+ # Apply limit for testing
28
+ if self.brands_limit:
29
+ brands = brands[:self.brands_limit]
30
+ logger.info(f"Limited to {self.brands_limit} brands for testing")
31
+
32
+ if not brands:
33
+ return TaskResult(TaskStatus.FAILED, error="No brands found in database")
34
+
35
+ results = {
36
+ "total_brands_processed": 0,
37
+ "total_products_found": 0,
38
+ "total_products_inserted": 0,
39
+ "total_products_updated": 0,
40
+ "total_store_products_created": 0,
41
+ "total_price_records_inserted": 0,
42
+ "processing_duration_seconds": 0,
43
+ "errors": []
44
+ }
45
+
46
+ # API configuration
47
+ base_url = "https://api.cijene.dev/v1/products/"
48
+ headers = {
49
+ "Authorization": "Bearer aibeeboh2cuataChi2doMa2Aacah0eli",
50
+ "Content-Type": "application/json"
51
+ }
52
+
53
+ start_time = asyncio.get_event_loop().time()
54
+
55
+ async with httpx.AsyncClient(timeout=30.0) as client:
56
+ for index, brand in enumerate(brands, 1):
57
+ try:
58
+ brand_name = brand['brand_name']
59
+ brand_id = brand['brand_id']
60
+
61
+ safe_brand_name = self.repository.safe_log_string(brand_name)
62
+ logger.info(f"Processing brand {index}/{len(brands)}: {safe_brand_name}")
63
+
64
+ url = f"{base_url}?q={brand_name}"
65
+ response = await client.get(url, headers=headers)
66
+
67
+ if response.status_code != 200:
68
+ error_msg = f"API error for brand {safe_brand_name}: HTTP {response.status_code}"
69
+ logger.error(error_msg)
70
+ results["errors"].append(error_msg)
71
+ continue
72
+
73
+ data = response.json()
74
+ products = data.get("products", [])
75
+
76
+ results["total_brands_processed"] += 1
77
+ results["total_products_found"] += len(products)
78
+
79
+ logger.info(f"Found {len(products)} products for brand '{safe_brand_name}'")
80
+
81
+ # Process each product using the existing logic
82
+ for product_index, product in enumerate(products, 1):
83
+ try:
84
+ safe_product_name = self.repository.safe_log_string(product.get('name', 'unknown'))
85
+ logger.debug(f"Processing product {product_index}/{len(products)} for brand '{safe_brand_name}': {safe_product_name}")
86
+
87
+ # Use the existing process_product logic
88
+ await self._process_product(product, brand_id, store_chains, results)
89
+
90
+ except Exception as e:
91
+ error_msg = f"Error processing product {product.get('ean', 'unknown')} for brand '{safe_brand_name}': {str(e)}"
92
+ logger.error(error_msg)
93
+ results["errors"].append(error_msg)
94
+ continue
95
+
96
+ logger.info(f"Completed processing brand '{safe_brand_name}' - Products: {len(products)}")
97
+
98
+ # Rate limiting
99
+ await asyncio.sleep(0.1)
100
+
101
+ # Log progress every 10 brands
102
+ if index % 10 == 0:
103
+ logger.info(f"Progress update: {index}/{len(brands)} brands processed")
104
+ logger.info(f"Current stats - Products inserted: {results['total_products_inserted']}, Updated: {results['total_products_updated']}")
105
+
106
+ except Exception as e:
107
+ error_msg = f"Error processing brand {brand.get('brand_name', 'unknown')}: {str(e)}"
108
+ logger.error(error_msg)
109
+ results["errors"].append(error_msg)
110
+ continue
111
+
112
+ end_time = asyncio.get_event_loop().time()
113
+ results["processing_duration_seconds"] = round(end_time - start_time, 2)
114
+
115
+ logger.info(f"Cijene fetch completed: {results}")
116
+ return TaskResult(TaskStatus.COMPLETED, data=results)
117
+
118
+ except Exception as e:
119
+ logger.error(f"Critical error in Cijene fetch: {str(e)}")
120
+ return TaskResult(TaskStatus.FAILED, error=str(e))
121
+
122
+ async def _process_product(self, product: Dict, brand_id: str,
123
+ store_chains: List[Dict], results: Dict):
124
+ """
125
+ Process a single product and its price data
126
+ This is the EXACT same logic from your working cijene_routes.py
127
+ """
128
+ ean = product.get("ean", "")
129
+ name = product.get("name", "")
130
+ quantity = product.get("quantity", "")
131
+ unit = product.get("unit", "")
132
+ chains = product.get("chains", [])
133
+
134
+ if not ean:
135
+ safe_name = self.repository.safe_log_string(name)
136
+ logger.warning(f"Skipping product without EAN: {safe_name}")
137
+ return
138
+
139
+ safe_name = self.repository.safe_log_string(name)
140
+ logger.debug(f"Processing product: {safe_name} (EAN: {ean})")
141
+
142
+ # Check if product already exists (by EAN or product_name+brand_id)
143
+ existing_product_id = self.repository.check_product_exists(ean, name, brand_id)
144
+
145
+ if existing_product_id:
146
+ product_id = existing_product_id
147
+ results["total_products_updated"] += 1
148
+ logger.debug(f"Product exists, using existing ID: {product_id}")
149
+ else:
150
+ # Create new product
151
+ weight = self.repository.combine_quantity_unit(quantity, unit)
152
+
153
+ product_data = {
154
+ "product_ean": ean,
155
+ "product_name": name,
156
+ "product_weight": weight,
157
+ "product_quantity": 1, # Add default quantity
158
+ "brand_id": brand_id
159
+ }
160
+
161
+ product_id = self.repository.insert_product(product_data)
162
+ results["total_products_inserted"] += 1
163
+ logger.debug(f"Created new product with ID: {product_id}")
164
+
165
+ # Process price data for each chain
166
+ today = datetime.now().date().isoformat()
167
+ logger.debug(f"Processing {len(chains)} price entries for product {safe_name}")
168
+
169
+ price_records_for_product = 0
170
+
171
+ for chain_data in chains:
172
+ chain_name = chain_data.get("chain", "")
173
+ min_price = chain_data.get("min_price")
174
+ max_price = chain_data.get("max_price")
175
+ avg_price = chain_data.get("avg_price")
176
+
177
+ if not chain_name:
178
+ logger.debug("Skipping chain entry without name")
179
+ continue
180
+
181
+ # Find matching store chain
182
+ store_chain_id = self.repository.find_matching_store_chain(chain_name, store_chains)
183
+
184
+ if not store_chain_id:
185
+ safe_chain_name = self.repository.safe_log_string(chain_name)
186
+ logger.debug(f"No matching store chain found for '{safe_chain_name}', skipping price entry")
187
+ continue
188
+
189
+ # Get or create store_product mapping
190
+ try:
191
+ store_product_id = self.repository.get_or_create_store_product(product_id, store_chain_id)
192
+ results["total_store_products_created"] += 1
193
+ except Exception as e:
194
+ logger.error(f"Failed to get/create store_product mapping: {str(e)}")
195
+ continue
196
+
197
+ # Validate price data
198
+ try:
199
+ min_price_float = float(min_price) if min_price else None
200
+ max_price_float = float(max_price) if max_price else None
201
+ avg_price_float = float(avg_price) if avg_price else None
202
+
203
+ if not any([min_price_float, max_price_float, avg_price_float]):
204
+ safe_chain_name = self.repository.safe_log_string(chain_name)
205
+ logger.debug(f"No valid price data for chain '{safe_chain_name}', skipping")
206
+ continue
207
+
208
+ except (ValueError, TypeError) as e:
209
+ safe_chain_name = self.repository.safe_log_string(chain_name)
210
+ logger.warning(f"Invalid price data for chain '{safe_chain_name}': {e}")
211
+ continue
212
+
213
+ # Insert price history
214
+ price_data = {
215
+ "store_product_id": store_product_id,
216
+ "price_date": today,
217
+ "min_price": min_price_float,
218
+ "max_price": max_price_float,
219
+ "avg_price": avg_price_float
220
+ }
221
+
222
+ self.repository.insert_price_history(price_data)
223
+ results["total_price_records_inserted"] += 1
224
+ price_records_for_product += 1
225
+
226
+ logger.debug(f"Completed processing product {safe_name}: {price_records_for_product} price records inserted")
scheduler/tasks/cleanup_task.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from scheduler.tasks.base_task import BaseTask, TaskResult, TaskStatus
2
+ from typing import Dict, Any
3
+ import logging
4
+
5
+ logger = logging.getLogger(__name__)
6
+
7
+ class DatabaseCleanupTask(BaseTask):
8
+ def __init__(self, task_id: str, name: str = "Database Cleanup",
9
+ dependencies: list = None):
10
+ super().__init__(task_id, name, dependencies)
11
+
12
+ async def execute(self, context: Dict[str, Any] = None) -> TaskResult:
13
+ """Placeholder cleanup task"""
14
+ try:
15
+ logger.info("Database cleanup completed (placeholder)")
16
+ return TaskResult(TaskStatus.COMPLETED, data={"cleanup_completed": True})
17
+ except Exception as e:
18
+ return TaskResult(TaskStatus.FAILED, error=str(e))
19
+
20
+ class NotificationTask(BaseTask):
21
+ def __init__(self, task_id: str, name: str = "Notification",
22
+ dependencies: list = None):
23
+ super().__init__(task_id, name, dependencies)
24
+
25
+ async def execute(self, context: Dict[str, Any] = None) -> TaskResult:
26
+ """Placeholder notification task"""
27
+ try:
28
+ previous_results = context.get('previous_results', {})
29
+ logger.info(f"Notification sent (placeholder) - Previous results: {len(previous_results)} tasks")
30
+ return TaskResult(TaskStatus.COMPLETED, data={"notification_sent": True})
31
+ except Exception as e:
32
+ return TaskResult(TaskStatus.FAILED, error=str(e))