Fred808 commited on
Commit
1f8ac0c
·
verified ·
1 Parent(s): 13e03d8

Upload 94 files

Browse files
app/__pycache__/main.cpython-312.pyc CHANGED
Binary files a/app/__pycache__/main.cpython-312.pyc and b/app/__pycache__/main.cpython-312.pyc differ
 
app/api/__pycache__/branches.cpython-312.pyc ADDED
Binary file (4.93 kB). View file
 
app/api/__pycache__/sessions.cpython-312.pyc ADDED
Binary file (1.73 kB). View file
 
app/api/__pycache__/staff_analytics.cpython-312.pyc ADDED
Binary file (9.06 kB). View file
 
app/api/sessions.py CHANGED
@@ -4,7 +4,7 @@ from typing import Any, Dict
4
 
5
  from app.db.database import get_db
6
  from app.db import models, schemas
7
- from app.core.auth import get_current_active_user
8
 
9
  router = APIRouter(
10
  prefix="/api/sessions",
 
4
 
5
  from app.db.database import get_db
6
  from app.db import models, schemas
7
+ from app.core.dependencies import get_current_active_user
8
 
9
  router = APIRouter(
10
  prefix="/api/sessions",
app/api/staff_analytics.py CHANGED
@@ -6,6 +6,7 @@ from ..core.dependencies import get_current_active_user, get_db
6
  from ..db.models import User, ActivityType
7
  from ..services.analytics import staff_analytics
8
  from ..services.staff_reports import staff_reports
 
9
  from ..db.schemas import (
10
  StaffActivityCreate,
11
  StaffActivityInDB,
@@ -25,13 +26,22 @@ async def record_staff_activity(
25
  if not current_user.branch_id:
26
  raise HTTPException(status_code=400, detail="User is not associated with a branch")
27
 
28
- recorded_activity = await staff_analytics.record_activity(
29
  user_id=current_user.id,
30
  branch_id=current_user.branch_id,
31
  activity_type=activity.activity_type,
32
  details=activity.details,
33
  duration=activity.duration
34
  )
 
 
 
 
 
 
 
 
 
35
 
36
  return {
37
  "status": "success",
 
6
  from ..db.models import User, ActivityType
7
  from ..services.analytics import staff_analytics
8
  from ..services.staff_reports import staff_reports
9
+ from ..services.performance_notifications import performance_notifications
10
  from ..db.schemas import (
11
  StaffActivityCreate,
12
  StaffActivityInDB,
 
26
  if not current_user.branch_id:
27
  raise HTTPException(status_code=400, detail="User is not associated with a branch")
28
 
29
+ recorded_activity, prev_metrics, new_metrics = await staff_analytics.record_activity(
30
  user_id=current_user.id,
31
  branch_id=current_user.branch_id,
32
  activity_type=activity.activity_type,
33
  details=activity.details,
34
  duration=activity.duration
35
  )
36
+
37
+ # Process notifications for the activity
38
+ await performance_notifications.process_activity_notifications(
39
+ user_id=current_user.id,
40
+ branch_id=current_user.branch_id,
41
+ activity=recorded_activity,
42
+ prev_metrics=prev_metrics,
43
+ new_metrics=new_metrics
44
+ )
45
 
46
  return {
47
  "status": "success",
app/core/__pycache__/auth.cpython-312.pyc ADDED
Binary file (972 Bytes). View file
 
app/main.py CHANGED
@@ -8,7 +8,7 @@ from app.utils.logger import log_api_request
8
  from app.utils.tasks import run_periodic_tasks, sync_pos_metrics_task
9
  from app.services.websocket import connect, disconnect
10
  from app.realtime.subscriber import subscribe_order_events
11
- from app.routes.websocket import websocket_endpoint, manager, router as websocket_router
12
  import socketio
13
  import time
14
  import logging
@@ -65,16 +65,11 @@ async def health_check() -> Dict[str, str]:
65
 
66
  # WebSocket endpoint
67
  @app.websocket("/ws")
68
- async def websocket_endpoint(websocket: WebSocket):
69
- await connect(websocket)
70
- try:
71
- while True:
72
- data = await websocket.receive_text()
73
- except:
74
- await disconnect(websocket)
75
 
76
  @app.websocket("/ws/orders")
77
- async def orders_websocket_endpoint(websocket: WebSocket):
78
  await websocket_endpoint(websocket)
79
 
80
  # Request logging and rate limiting middleware
 
8
  from app.utils.tasks import run_periodic_tasks, sync_pos_metrics_task
9
  from app.services.websocket import connect, disconnect
10
  from app.realtime.subscriber import subscribe_order_events
11
+ from app.routes.websocket import websocket_endpoint, manager, router as websocket_router, staff_metrics_websocket
12
  import socketio
13
  import time
14
  import logging
 
65
 
66
  # WebSocket endpoint
67
  @app.websocket("/ws")
68
+ async def websocket_handler(websocket: WebSocket):
69
+ await websocket_endpoint(websocket)
 
 
 
 
 
70
 
71
  @app.websocket("/ws/orders")
72
+ async def orders_websocket_handler(websocket: WebSocket):
73
  await websocket_endpoint(websocket)
74
 
75
  # Request logging and rate limiting middleware
app/realtime/__pycache__/subscriber.cpython-312.pyc ADDED
Binary file (2.85 kB). View file
 
app/realtime/subscriber.py CHANGED
@@ -0,0 +1,46 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from typing import Dict, Any
3
+ import json
4
+ from ..core.config import settings
5
+ from ..utils.logger import logger
6
+ import redis.asyncio as redis
7
+ from ..routes.websocket import ConnectionManager
8
+
9
+ async def subscribe_order_events(manager: ConnectionManager) -> None:
10
+ """Subscribe to Redis order events channel and broadcast updates to WebSocket clients"""
11
+ try:
12
+ r = redis.Redis(
13
+ host=settings.REDIS_HOST,
14
+ port=settings.REDIS_PORT,
15
+ decode_responses=True
16
+ )
17
+
18
+ pubsub = r.pubsub()
19
+ await pubsub.subscribe("order_events")
20
+
21
+ logger.info("Started Redis subscriber for order events")
22
+
23
+ while True:
24
+ try:
25
+ message = await pubsub.get_message(ignore_subscribe_messages=True)
26
+ if message and message["type"] == "message":
27
+ try:
28
+ data = json.loads(message["data"])
29
+ # Broadcast to all connected WebSocket clients
30
+ await manager.broadcast(data)
31
+ logger.debug(f"Broadcasted order event: {data}")
32
+ except json.JSONDecodeError:
33
+ logger.error(f"Invalid JSON in order event: {message['data']}")
34
+ except Exception as e:
35
+ logger.error(f"Error processing order event: {str(e)}")
36
+
37
+ # Small delay to prevent tight loop
38
+ await asyncio.sleep(0.1)
39
+
40
+ except Exception as e:
41
+ logger.error(f"Error in Redis subscription loop: {str(e)}")
42
+ await asyncio.sleep(1) # Longer delay on error
43
+
44
+ except Exception as e:
45
+ logger.error(f"Redis subscription error: {str(e)}")
46
+ raise # Re-raise to allow startup to handle the error
app/routes/__pycache__/websocket.cpython-312.pyc ADDED
Binary file (5.43 kB). View file
 
app/routes/websocket.py CHANGED
@@ -47,6 +47,14 @@ class ConnectionManager:
47
 
48
  manager = ConnectionManager()
49
 
 
 
 
 
 
 
 
 
50
  @router.websocket("/ws/staff")
51
  async def staff_metrics_websocket(websocket: WebSocket):
52
  await manager.connect(websocket, "staff_metrics")
 
47
 
48
  manager = ConnectionManager()
49
 
50
+ async def websocket_endpoint(websocket: WebSocket):
51
+ await manager.connect(websocket, "orders")
52
+ try:
53
+ while True:
54
+ data = await websocket.receive_text()
55
+ except WebSocketDisconnect:
56
+ manager.disconnect(websocket, "orders")
57
+
58
  @router.websocket("/ws/staff")
59
  async def staff_metrics_websocket(websocket: WebSocket):
60
  await manager.connect(websocket, "staff_metrics")
app/services/__pycache__/analytics.cpython-312.pyc ADDED
Binary file (20.3 kB). View file
 
app/services/__pycache__/maintenance.cpython-312.pyc CHANGED
Binary files a/app/services/__pycache__/maintenance.cpython-312.pyc and b/app/services/__pycache__/maintenance.cpython-312.pyc differ
 
app/services/__pycache__/notifications.cpython-312.pyc CHANGED
Binary files a/app/services/__pycache__/notifications.cpython-312.pyc and b/app/services/__pycache__/notifications.cpython-312.pyc differ
 
app/services/__pycache__/performance_notifications.cpython-312.pyc ADDED
Binary file (7.53 kB). View file
 
app/services/__pycache__/pos_analytics.cpython-312.pyc ADDED
Binary file (18.5 kB). View file
 
app/services/__pycache__/staff_reports.cpython-312.pyc ADDED
Binary file (14.9 kB). View file
 
app/services/analytics.py CHANGED
@@ -3,7 +3,6 @@ from sqlalchemy import func, and_, select, extract
3
  from ..db.database import db
4
  from ..utils.cache import cache
5
  from ..db.models import StaffActivity, PerformanceMetric, User, ActivityType
6
- from ..services.performance_notifications import performance_notifications
7
  from typing import Dict, List, Any, Optional
8
  import numpy as np
9
  from collections import defaultdict
@@ -180,22 +179,7 @@ class StaffAnalyticsService:
180
  user_id, branch_id, activity
181
  )
182
 
183
- # Check and send notifications based on new metrics
184
- await performance_notifications.check_and_notify_performance(
185
- user_id=user_id,
186
- branch_id=branch_id,
187
- metrics=new_metrics
188
- )
189
-
190
- # Check for realtime alerts by comparing with previous metrics
191
- if prev_metrics:
192
- await performance_notifications.notify_realtime_alerts(
193
- branch_id=branch_id,
194
- current_metrics=new_metrics,
195
- previous_metrics=prev_metrics
196
- )
197
-
198
- return activity
199
 
200
  @staticmethod
201
  async def _get_current_metrics(user_id: int, branch_id: int) -> Optional[Dict[str, Any]]:
 
3
  from ..db.database import db
4
  from ..utils.cache import cache
5
  from ..db.models import StaffActivity, PerformanceMetric, User, ActivityType
 
6
  from typing import Dict, List, Any, Optional
7
  import numpy as np
8
  from collections import defaultdict
 
179
  user_id, branch_id, activity
180
  )
181
 
182
+ return activity, prev_metrics, new_metrics
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
183
 
184
  @staticmethod
185
  async def _get_current_metrics(user_id: int, branch_id: int) -> Optional[Dict[str, Any]]:
app/services/maintenance.py CHANGED
@@ -11,9 +11,9 @@ from ..services.websocket import create_and_broadcast_notification
11
  from ..db.models import User, Order, Notification, Session
12
  from ..utils.cache import cache
13
  from sqlalchemy import text
14
- import aioredis
15
  import asyncio
16
- from ..utils.retry import retry_with_backoff, circuit_breaker, CircuitBreaker
17
 
18
  class MaintenanceService:
19
  def __init__(self):
@@ -21,10 +21,6 @@ class MaintenanceService:
21
  self._background_tasks = {}
22
  self.last_health_check = None
23
  self.last_maintenance = None
24
- self.circuit_breaker = CircuitBreaker(
25
- failure_threshold=5,
26
- reset_timeout=60
27
- )
28
 
29
  def get_timestamp(self) -> str:
30
  """Get current timestamp in ISO format"""
@@ -104,7 +100,7 @@ class MaintenanceService:
104
  "error": str(e)
105
  }
106
 
107
- @circuit_breaker(failure_threshold=5, reset_timeout=300.0)
108
  async def check_background_tasks(self) -> Dict[str, Any]:
109
  """Check status of background tasks"""
110
  active_tasks = []
 
11
  from ..db.models import User, Order, Notification, Session
12
  from ..utils.cache import cache
13
  from sqlalchemy import text
14
+ import redis.asyncio as redis
15
  import asyncio
16
+ from ..utils.retry import retry_with_backoff, circuit_breaker
17
 
18
  class MaintenanceService:
19
  def __init__(self):
 
21
  self._background_tasks = {}
22
  self.last_health_check = None
23
  self.last_maintenance = None
 
 
 
 
24
 
25
  def get_timestamp(self) -> str:
26
  """Get current timestamp in ISO format"""
 
100
  "error": str(e)
101
  }
102
 
103
+ @circuit_breaker(failure_threshold=5, reset_timeout=60)
104
  async def check_background_tasks(self) -> Dict[str, Any]:
105
  """Check status of background tasks"""
106
  active_tasks = []
app/services/performance_notifications.py CHANGED
@@ -1,7 +1,6 @@
1
  from datetime import datetime
2
  from typing import Dict, Any, List, Optional
3
  from ..services.notifications import create_and_broadcast_notification
4
- from ..services.analytics import staff_analytics
5
  from ..db.models import ActivityType
6
  from ..utils.logger import logger
7
 
@@ -192,4 +191,26 @@ class PerformanceNotificationService:
192
  except Exception as e:
193
  logger.error(f"Error in realtime alert notifications: {str(e)}")
194
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
195
  performance_notifications = PerformanceNotificationService()
 
1
  from datetime import datetime
2
  from typing import Dict, Any, List, Optional
3
  from ..services.notifications import create_and_broadcast_notification
 
4
  from ..db.models import ActivityType
5
  from ..utils.logger import logger
6
 
 
191
  except Exception as e:
192
  logger.error(f"Error in realtime alert notifications: {str(e)}")
193
 
194
+ @staticmethod
195
+ async def process_activity_notifications(
196
+ user_id: int,
197
+ branch_id: int,
198
+ activity: Any,
199
+ prev_metrics: Optional[Dict[str, Any]],
200
+ new_metrics: Dict[str, Any]
201
+ ):
202
+ """Process notifications for a new staff activity"""
203
+ await PerformanceNotificationService.check_and_notify_performance(
204
+ user_id=user_id,
205
+ branch_id=branch_id,
206
+ metrics=new_metrics
207
+ )
208
+
209
+ if prev_metrics:
210
+ await PerformanceNotificationService.notify_realtime_alerts(
211
+ branch_id=branch_id,
212
+ current_metrics=new_metrics,
213
+ previous_metrics=prev_metrics
214
+ )
215
+
216
  performance_notifications = PerformanceNotificationService()
app/services/pos_analytics.py CHANGED
@@ -1,4 +1,4 @@
1
- from datetime import datetime
2
  from typing import Dict, Any, Optional, List
3
  from ..core.config import settings
4
  from ..utils.logger import logger, log_health_check
@@ -6,6 +6,7 @@ from ..services.analytics import staff_analytics
6
  from ..db.models import ActivityType
7
  from ..routes.websocket import broadcast_staff_update
8
  from ..utils.retry import with_retry, retry_with_backoff
 
9
  import httpx
10
  import asyncio
11
  import aiohttp
@@ -25,9 +26,7 @@ class POSAnalyticsService:
25
 
26
  @with_retry(max_retries=3, delay=1.0, exceptions=(httpx.HTTPError, asyncio.TimeoutError))
27
  async def sync_pos_activity(self, pos_data: Dict[str, Any]) -> bool:
28
- """
29
- Sync staff activity data from POS system with retry mechanism
30
- """
31
  try:
32
  # Map POS activity types to our ActivityType enum
33
  activity_mapping = {
@@ -46,7 +45,7 @@ class POSAnalyticsService:
46
  return False
47
 
48
  # Record the activity in our system
49
- await staff_analytics.record_activity(
50
  user_id=pos_data["user_id"],
51
  branch_id=pos_data["branch_id"],
52
  activity_type=activity_type,
@@ -54,6 +53,15 @@ class POSAnalyticsService:
54
  duration=pos_data.get("duration")
55
  )
56
 
 
 
 
 
 
 
 
 
 
57
  # Get updated metrics for the branch and broadcast
58
  metrics = await staff_analytics.get_staff_performance(
59
  branch_id=pos_data["branch_id"],
@@ -93,61 +101,66 @@ class POSAnalyticsService:
93
  logger.error(f"Error fetching POS metrics: {str(e)}")
94
  return None
95
 
96
- @with_retry(max_retries=3, delay=2.0, exceptions=(httpx.HTTPError, asyncio.TimeoutError))
97
  async def sync_all_metrics(self, branch_id: Optional[int] = None) -> bool:
98
- """
99
- Sync all staff metrics from POS system with retry mechanism
100
- """
101
  try:
102
- today = datetime.utcnow()
103
- params = {"date": today.date().isoformat()}
104
- if branch_id:
105
- params["branch_id"] = branch_id
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
- async with httpx.AsyncClient() as client:
108
- response = await client.get(
109
- f"{self.pos_api_url}/api/v1/staff/metrics/daily",
110
- headers=self._headers,
111
- params=params,
112
- timeout=30.0
113
- )
114
-
115
- response.raise_for_status()
116
- metrics_data = response.json()
117
- sync_tasks = []
118
- processed_branches = set()
119
 
120
- for metric in metrics_data:
121
- activities = self._convert_metrics_to_activities(metric)
 
122
  processed_branches.add(metric["branch_id"])
123
-
124
- for activity in activities:
125
- sync_tasks.append(self.sync_pos_activity(activity))
126
-
127
- # Run all sync tasks concurrently with individual retries
128
- results = await asyncio.gather(*sync_tasks, return_exceptions=True)
129
 
130
- # Broadcast updates for each affected branch
131
- for branch_id in processed_branches:
132
- metrics = await staff_analytics.get_staff_performance(branch_id=branch_id)
133
- await broadcast_staff_update(metrics)
134
-
135
- # Check for any failures
136
- success = all(
137
- result is True if not isinstance(result, Exception) else False
138
- for result in results
139
- )
140
 
141
- if not success:
142
- logger.warning("Some metrics failed to sync")
 
 
 
 
 
 
 
 
 
 
 
143
 
144
- return success
 
 
 
145
 
146
- except httpx.HTTPError as e:
147
- logger.error(f"HTTP error in sync_all_metrics: {str(e)}")
148
- raise
149
  except Exception as e:
150
- logger.error(f"Error in sync_all_metrics: {str(e)}")
 
151
  return False
152
 
153
  def _convert_metrics_to_activities(self, metric: Dict[str, Any]) -> List[Dict[str, Any]]:
@@ -258,39 +271,6 @@ class POSAnalyticsService:
258
  "details": {"error": str(e)}
259
  }
260
 
261
- @retry_with_backoff(max_retries=3)
262
- async def sync_all_metrics(self) -> bool:
263
- """Sync all metrics from POS system"""
264
- try:
265
- # Reset failed operations list
266
- self._failed_operations = []
267
-
268
- # Sync various metrics
269
- tasks = [
270
- self._sync_sales_metrics(),
271
- self._sync_inventory_metrics(),
272
- self._sync_staff_metrics(),
273
- self._sync_customer_metrics()
274
- ]
275
-
276
- results = await asyncio.gather(*tasks, return_exceptions=True)
277
-
278
- # Check for failures
279
- for result in results:
280
- if isinstance(result, Exception):
281
- self._failed_operations.append(str(result))
282
- logger.error(f"Metric sync failed: {str(result)}")
283
-
284
- self._last_sync_time = datetime.utcnow()
285
- self._last_error = None if not self._failed_operations else self._failed_operations[-1]
286
-
287
- return len(self._failed_operations) == 0
288
-
289
- except Exception as e:
290
- self._last_error = str(e)
291
- logger.error(f"Failed to sync metrics: {str(e)}")
292
- return False
293
-
294
  @retry_with_backoff(max_retries=2)
295
  async def _sync_sales_metrics(self) -> Dict[str, Any]:
296
  """Sync sales metrics from POS"""
 
1
+ from datetime import datetime, timedelta
2
  from typing import Dict, Any, Optional, List
3
  from ..core.config import settings
4
  from ..utils.logger import logger, log_health_check
 
6
  from ..db.models import ActivityType
7
  from ..routes.websocket import broadcast_staff_update
8
  from ..utils.retry import with_retry, retry_with_backoff
9
+ from ..services.performance_notifications import performance_notifications
10
  import httpx
11
  import asyncio
12
  import aiohttp
 
26
 
27
  @with_retry(max_retries=3, delay=1.0, exceptions=(httpx.HTTPError, asyncio.TimeoutError))
28
  async def sync_pos_activity(self, pos_data: Dict[str, Any]) -> bool:
29
+ """Sync staff activity data from POS system with retry mechanism"""
 
 
30
  try:
31
  # Map POS activity types to our ActivityType enum
32
  activity_mapping = {
 
45
  return False
46
 
47
  # Record the activity in our system
48
+ activity, prev_metrics, new_metrics = await staff_analytics.record_activity(
49
  user_id=pos_data["user_id"],
50
  branch_id=pos_data["branch_id"],
51
  activity_type=activity_type,
 
53
  duration=pos_data.get("duration")
54
  )
55
 
56
+ # Process notifications for the activity
57
+ await performance_notifications.process_activity_notifications(
58
+ user_id=pos_data["user_id"],
59
+ branch_id=pos_data["branch_id"],
60
+ activity=activity,
61
+ prev_metrics=prev_metrics,
62
+ new_metrics=new_metrics
63
+ )
64
+
65
  # Get updated metrics for the branch and broadcast
66
  metrics = await staff_analytics.get_staff_performance(
67
  branch_id=pos_data["branch_id"],
 
101
  logger.error(f"Error fetching POS metrics: {str(e)}")
102
  return None
103
 
104
+ @retry_with_backoff(max_retries=3)
105
  async def sync_all_metrics(self, branch_id: Optional[int] = None) -> bool:
106
+ """Sync all metrics from POS system"""
 
 
107
  try:
108
+ # Reset failed operations list
109
+ self._failed_operations = []
110
+
111
+ # Sync various metrics
112
+ tasks = [
113
+ self._sync_sales_metrics(),
114
+ self._sync_inventory_metrics(),
115
+ self._sync_staff_metrics(),
116
+ self._sync_customer_metrics()
117
+ ]
118
+
119
+ results = await asyncio.gather(*tasks, return_exceptions=True)
120
+
121
+ # Check for failures
122
+ for result in results:
123
+ if isinstance(result, Exception):
124
+ self._failed_operations.append(str(result))
125
+ logger.error(f"Metric sync failed: {str(result)}")
126
+
127
+ self._last_sync_time = datetime.utcnow()
128
+ self._last_error = None if not self._failed_operations else self._failed_operations[-1]
129
 
130
+ # Process metrics and sync staff activities
131
+ sync_tasks = []
132
+ processed_branches = set()
 
 
 
 
 
 
 
 
 
133
 
134
+ for metric in (r for r in results if not isinstance(r, Exception)):
135
+ activities = self._convert_metrics_to_activities(metric)
136
+ if "branch_id" in metric:
137
  processed_branches.add(metric["branch_id"])
 
 
 
 
 
 
138
 
139
+ for activity in activities:
140
+ sync_tasks.append(self.sync_pos_activity(activity))
 
 
 
 
 
 
 
 
141
 
142
+ # Run all sync tasks concurrently with individual retries
143
+ activity_results = await asyncio.gather(*sync_tasks, return_exceptions=True)
144
+
145
+ # Broadcast updates for each affected branch
146
+ for branch_id in processed_branches:
147
+ metrics = await staff_analytics.get_staff_performance(branch_id=branch_id)
148
+ await broadcast_staff_update(metrics)
149
+
150
+ # Check overall success
151
+ success = len(self._failed_operations) == 0 and all(
152
+ result is True if not isinstance(result, Exception) else False
153
+ for result in activity_results
154
+ )
155
 
156
+ if not success:
157
+ logger.warning("Some metrics failed to sync")
158
+
159
+ return success
160
 
 
 
 
161
  except Exception as e:
162
+ self._last_error = str(e)
163
+ logger.error(f"Failed to sync metrics: {str(e)}")
164
  return False
165
 
166
  def _convert_metrics_to_activities(self, metric: Dict[str, Any]) -> List[Dict[str, Any]]:
 
271
  "details": {"error": str(e)}
272
  }
273
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
274
  @retry_with_backoff(max_retries=2)
275
  async def _sync_sales_metrics(self) -> Dict[str, Any]:
276
  """Sync sales metrics from POS"""
app/utils/__pycache__/cache.cpython-312.pyc CHANGED
Binary files a/app/utils/__pycache__/cache.cpython-312.pyc and b/app/utils/__pycache__/cache.cpython-312.pyc differ
 
app/utils/__pycache__/rate_limiter.cpython-312.pyc CHANGED
Binary files a/app/utils/__pycache__/rate_limiter.cpython-312.pyc and b/app/utils/__pycache__/rate_limiter.cpython-312.pyc differ
 
app/utils/__pycache__/retry.cpython-312.pyc ADDED
Binary file (7.53 kB). View file
 
app/utils/__pycache__/tasks.cpython-312.pyc CHANGED
Binary files a/app/utils/__pycache__/tasks.cpython-312.pyc and b/app/utils/__pycache__/tasks.cpython-312.pyc differ
 
app/utils/cache.py CHANGED
@@ -1,106 +1,32 @@
1
- import redis
2
  import json
3
  import inspect
4
  import functools
5
  from ..core.config import settings
6
  from typing import Any, Optional, Callable, TypeVar
7
  from ..utils.logger import logger
8
- from redis.asyncio import Redis
9
- import asyncio
10
 
11
  T = TypeVar('T')
12
 
13
- def cached(ttl_seconds: int):
14
- """
15
- Cache decorator that stores function results in Redis.
16
- Works with both sync and async functions.
17
-
18
- Args:
19
- ttl_seconds: Time to live in seconds for cached results
20
-
21
- Example:
22
- @cached(300) # Cache for 5 minutes
23
- async def get_user(user_id: int):
24
- return await db.fetch_user(user_id)
25
- """
26
- def decorator(func: Callable[..., T]) -> Callable[..., T]:
27
- is_async = inspect.iscoroutinefunction(func)
28
-
29
- def get_cache_key(*args, **kwargs) -> str:
30
- """Generate cache key from function name and arguments"""
31
- # Sort kwargs to ensure consistent key generation
32
- sorted_kwargs = sorted(kwargs.items())
33
- args_str = ":".join(str(arg) for arg in args)
34
- kwargs_str = ":".join(f"{k}={v}" for k, v in sorted_kwargs)
35
- return f"cache:{func.__module__}:{func.__name__}:{args_str}:{kwargs_str}"
36
-
37
- if is_async:
38
- @functools.wraps(func)
39
- async def async_wrapper(*args, **kwargs) -> T:
40
- cache_key = get_cache_key(*args, **kwargs)
41
-
42
- # Try to get from cache
43
- cached_value = await cache.get_cache(cache_key)
44
- if cached_value is not None:
45
- return cached_value
46
-
47
- # Call function and cache result
48
- result = await func(*args, **kwargs)
49
- await cache.set_cache(cache_key, result, ttl_seconds)
50
- return result
51
- return async_wrapper
52
- else:
53
- @functools.wraps(func)
54
- def sync_wrapper(*args, **kwargs) -> T:
55
- cache_key = get_cache_key(*args, **kwargs)
56
-
57
- # Try to get from cache
58
- try:
59
- cached_value = cache.redis_client.get(cache_key)
60
- if cached_value:
61
- return json.loads(cached_value)
62
- except:
63
- if cache.is_connected:
64
- logger.error("Redis error in sync cache access")
65
- return cache.fallback_cache.get(cache_key)
66
-
67
- # Call function and cache result
68
- result = func(*args, **kwargs)
69
- try:
70
- if cache.is_connected:
71
- cache.redis_client.setex(
72
- cache_key,
73
- ttl_seconds,
74
- json.dumps(result)
75
- )
76
- else:
77
- cache.fallback_cache[cache_key] = result
78
- except Exception as e:
79
- logger.error(f"Cache set error in sync wrapper: {str(e)}")
80
- return result
81
- return sync_wrapper
82
-
83
- return decorator
84
-
85
  class RedisCache:
86
  _instance = None
87
 
88
  def __new__(cls):
89
  if cls._instance is None:
90
  cls._instance = super(RedisCache, cls).__new__(cls)
91
- asyncio.create_task(cls._instance.initialize())
 
 
 
 
 
 
92
  return cls._instance
93
 
94
  async def initialize(self):
95
  """Initialize Redis connection with fallback to dummy cache"""
96
  try:
97
- self.redis_client = Redis(
98
- host=settings.REDIS_HOST,
99
- port=settings.REDIS_PORT,
100
- decode_responses=True,
101
- socket_connect_timeout=1
102
- )
103
- await self.redis_client.ping()
104
  self.is_connected = True
105
  logger.info("Redis cache initialized successfully")
106
  except Exception as e:
@@ -110,74 +36,101 @@ class RedisCache:
110
 
111
  async def set_cache(self, key: str, value: Any, expire: int = 3600):
112
  """Set a cache entry with optional expiration time (default 1 hour)"""
113
- if not self.is_connected:
114
- self.fallback_cache[key] = value
115
- return True
116
-
117
  try:
118
- serialized_value = json.dumps(value)
119
- await self.redis_client.set(key, serialized_value, ex=expire)
120
- return True
 
121
  except Exception as e:
122
- logger.error(f"Error setting cache: {str(e)}")
123
- return False
124
 
125
  async def get_cache(self, key: str) -> Optional[Any]:
126
  """Get a cached value by key"""
127
- if not self.is_connected:
128
- return self.fallback_cache.get(key)
129
-
130
  try:
131
- value = await self.redis_client.get(key)
132
- if value:
133
- return json.loads(value)
 
134
  except Exception as e:
135
- logger.error(f"Error getting cache: {str(e)}")
136
- return None
137
 
138
  async def delete_cache(self, key: str) -> bool:
139
  """Delete a cache entry by key"""
140
- if not self.is_connected:
141
- self.fallback_cache.pop(key, None)
142
- return True
143
-
144
  try:
145
- await self.redis_client.delete(key)
 
 
146
  return True
147
  except Exception as e:
148
- logger.error(f"Error deleting cache: {str(e)}")
149
  return False
150
 
151
  async def clear_cache_pattern(self, pattern: str) -> bool:
152
  """Clear all cache entries matching a pattern"""
153
- if not self.is_connected:
154
- # Basic pattern matching for in-memory cache
155
- keys_to_delete = [k for k in self.fallback_cache.keys() if pattern in k]
156
- for key in keys_to_delete:
157
- del self.fallback_cache[key]
158
- return True
159
-
160
  try:
161
- cursor = 0
162
- while True:
163
- cursor, keys = await self.redis_client.scan(cursor, match=pattern)
164
  if keys:
165
- await self.redis_client.delete(*keys)
166
- if cursor == 0:
167
- break
168
  return True
169
  except Exception as e:
170
- logger.error(f"Error clearing cache pattern: {str(e)}")
171
  return False
172
 
173
  async def check_connection(self) -> bool:
174
  """Check if Redis connection is alive"""
175
  try:
176
- await self.redis_client.ping()
177
- self.is_connected = True
178
- return True
179
- except:
180
- self.is_connected = False
181
  return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
 
183
  cache = RedisCache()
 
1
+ import redis.asyncio as redis
2
  import json
3
  import inspect
4
  import functools
5
  from ..core.config import settings
6
  from typing import Any, Optional, Callable, TypeVar
7
  from ..utils.logger import logger
 
 
8
 
9
  T = TypeVar('T')
10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  class RedisCache:
12
  _instance = None
13
 
14
  def __new__(cls):
15
  if cls._instance is None:
16
  cls._instance = super(RedisCache, cls).__new__(cls)
17
+ cls._instance.is_connected = False
18
+ cls._instance.fallback_cache = {}
19
+ cls._instance.redis = redis.Redis(
20
+ host=settings.REDIS_HOST,
21
+ port=settings.REDIS_PORT,
22
+ decode_responses=True
23
+ )
24
  return cls._instance
25
 
26
  async def initialize(self):
27
  """Initialize Redis connection with fallback to dummy cache"""
28
  try:
29
+ await self.redis.ping()
 
 
 
 
 
 
30
  self.is_connected = True
31
  logger.info("Redis cache initialized successfully")
32
  except Exception as e:
 
36
 
37
  async def set_cache(self, key: str, value: Any, expire: int = 3600):
38
  """Set a cache entry with optional expiration time (default 1 hour)"""
 
 
 
 
39
  try:
40
+ if self.is_connected:
41
+ await self.redis.set(key, json.dumps(value), ex=expire)
42
+ else:
43
+ self.fallback_cache[key] = value
44
  except Exception as e:
45
+ logger.error(f"Cache set error: {str(e)}")
46
+ self.fallback_cache[key] = value
47
 
48
  async def get_cache(self, key: str) -> Optional[Any]:
49
  """Get a cached value by key"""
 
 
 
50
  try:
51
+ if self.is_connected:
52
+ value = await self.redis.get(key)
53
+ return json.loads(value) if value else None
54
+ return self.fallback_cache.get(key)
55
  except Exception as e:
56
+ logger.error(f"Cache get error: {str(e)}")
57
+ return self.fallback_cache.get(key)
58
 
59
  async def delete_cache(self, key: str) -> bool:
60
  """Delete a cache entry by key"""
 
 
 
 
61
  try:
62
+ if self.is_connected:
63
+ return bool(await self.redis.delete(key))
64
+ self.fallback_cache.pop(key, None)
65
  return True
66
  except Exception as e:
67
+ logger.error(f"Cache delete error: {str(e)}")
68
  return False
69
 
70
  async def clear_cache_pattern(self, pattern: str) -> bool:
71
  """Clear all cache entries matching a pattern"""
 
 
 
 
 
 
 
72
  try:
73
+ if self.is_connected:
74
+ keys = await self.redis.keys(pattern)
 
75
  if keys:
76
+ return bool(await self.redis.delete(*keys))
 
 
77
  return True
78
  except Exception as e:
79
+ logger.error(f"Cache clear pattern error: {str(e)}")
80
  return False
81
 
82
  async def check_connection(self) -> bool:
83
  """Check if Redis connection is alive"""
84
  try:
85
+ if self.is_connected:
86
+ await self.redis.ping()
87
+ return True
 
 
88
  return False
89
+ except Exception:
90
+ return False
91
+
92
+ def cached(ttl_seconds: int):
93
+ """Decorator to cache function results"""
94
+ def decorator(func: Callable[..., T]) -> Callable[..., T]:
95
+ if inspect.iscoroutinefunction(func):
96
+ @functools.wraps(func)
97
+ async def async_wrapper(*args, **kwargs) -> T:
98
+ cache_key = f"{func.__name__}:{args}:{kwargs}"
99
+ cached_value = await cache.get_cache(cache_key)
100
+ if cached_value is not None:
101
+ return cached_value
102
+
103
+ result = await func(*args, **kwargs)
104
+ await cache.set_cache(cache_key, result, expire=ttl_seconds)
105
+ return result
106
+ return async_wrapper
107
+ else:
108
+ @functools.wraps(func)
109
+ def sync_wrapper(*args, **kwargs) -> T:
110
+ cache_key = f"{func.__name__}:{args}:{kwargs}"
111
+ try:
112
+ cached_value = cache.redis.get(cache_key)
113
+ if cached_value:
114
+ return json.loads(cached_value)
115
+ except:
116
+ if cache.is_connected:
117
+ logger.error("Redis error in sync cache access")
118
+ return cache.fallback_cache.get(cache_key)
119
+
120
+ result = func(*args, **kwargs)
121
+ try:
122
+ if cache.is_connected:
123
+ cache.redis.setex(
124
+ cache_key,
125
+ ttl_seconds,
126
+ json.dumps(result)
127
+ )
128
+ else:
129
+ cache.fallback_cache[cache_key] = result
130
+ except Exception as e:
131
+ logger.error(f"Cache set error in sync wrapper: {str(e)}")
132
+ return result
133
+ return sync_wrapper
134
+ return decorator
135
 
136
  cache = RedisCache()
app/utils/tasks.py CHANGED
@@ -1,9 +1,9 @@
 
1
  from datetime import datetime, timedelta
2
  from ..db.database import db
3
- from ..services.maintenance import maintenance
4
  from ..utils.logger import logger
5
  from ..services.websocket import create_and_broadcast_notification
6
- import asyncio
7
  from sqlalchemy import select, delete
8
  from ..db.models import Event, User, Notification
9
  from ..services.pos_analytics import pos_analytics
@@ -73,23 +73,23 @@ async def perform_daily_maintenance():
73
  """Perform daily system maintenance tasks"""
74
  try:
75
  # Clean up expired sessions
76
- deleted_sessions = await maintenance.cleanup_expired_sessions()
77
  logger.info(f"Cleaned up {deleted_sessions} expired sessions")
78
 
79
  # Archive old data
80
- archived = await maintenance.archive_old_data()
81
  if archived:
82
  logger.info(f"Archived data: {archived}")
83
 
84
  # Check system health
85
- health_data = await maintenance.check_system_health()
86
  if "error" not in health_data:
87
  logger.info("System health check completed successfully")
88
  else:
89
  logger.error(f"System health check error: {health_data['error']}")
90
 
91
  # Monitor system resources
92
- resources = await maintenance.monitor_system_resources()
93
  if "error" not in resources:
94
  logger.info("System resource monitoring completed successfully")
95
  else:
@@ -102,15 +102,15 @@ async def perform_weekly_maintenance():
102
  """Perform weekly system maintenance tasks"""
103
  try:
104
  # Perform database maintenance
105
- await maintenance.perform_database_maintenance()
106
  logger.info("Database maintenance completed successfully")
107
 
108
  # Rotate log files
109
- await maintenance.rotate_log_files()
110
  logger.info("Log rotation completed successfully")
111
 
112
  # Manage storage quotas
113
- quota_results = await maintenance.manage_storage_quotas()
114
  if quota_results.get("warnings"):
115
  for warning in quota_results["warnings"]:
116
  logger.warning(warning)
 
1
+ import asyncio
2
  from datetime import datetime, timedelta
3
  from ..db.database import db
4
+ from ..services.maintenance import maintenance_service
5
  from ..utils.logger import logger
6
  from ..services.websocket import create_and_broadcast_notification
 
7
  from sqlalchemy import select, delete
8
  from ..db.models import Event, User, Notification
9
  from ..services.pos_analytics import pos_analytics
 
73
  """Perform daily system maintenance tasks"""
74
  try:
75
  # Clean up expired sessions
76
+ deleted_sessions = await maintenance_service.cleanup_expired_sessions()
77
  logger.info(f"Cleaned up {deleted_sessions} expired sessions")
78
 
79
  # Archive old data
80
+ archived = await maintenance_service.archive_old_data()
81
  if archived:
82
  logger.info(f"Archived data: {archived}")
83
 
84
  # Check system health
85
+ health_data = await maintenance_service.check_system_health()
86
  if "error" not in health_data:
87
  logger.info("System health check completed successfully")
88
  else:
89
  logger.error(f"System health check error: {health_data['error']}")
90
 
91
  # Monitor system resources
92
+ resources = await maintenance_service.monitor_system_resources()
93
  if "error" not in resources:
94
  logger.info("System resource monitoring completed successfully")
95
  else:
 
102
  """Perform weekly system maintenance tasks"""
103
  try:
104
  # Perform database maintenance
105
+ await maintenance_service.perform_database_maintenance()
106
  logger.info("Database maintenance completed successfully")
107
 
108
  # Rotate log files
109
+ await maintenance_service.rotate_log_files()
110
  logger.info("Log rotation completed successfully")
111
 
112
  # Manage storage quotas
113
+ quota_results = await maintenance_service.manage_storage_quotas()
114
  if quota_results.get("warnings"):
115
  for warning in quota_results["warnings"]:
116
  logger.warning(warning)