Fred808 commited on
Commit
a9069b6
·
verified ·
1 Parent(s): aed0eb9

Update app/utils/tasks.py

Browse files
Files changed (1) hide show
  1. app/utils/tasks.py +94 -54
app/utils/tasks.py CHANGED
@@ -1,104 +1,138 @@
1
  from datetime import datetime, timedelta
2
- from sqlalchemy.future import select
3
- from sqlalchemy import update, delete
4
- from sqlalchemy.orm import selectinload
5
- from ..db.database import AsyncSessionLocal
6
- from app.models.event import Event
7
- from ..models.notification import Notification # Your notification model
8
- from ..services.notifications import notifications
9
  from ..services.maintenance import maintenance
10
  from ..utils.logger import logger
 
11
  import asyncio
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
  async def check_event_reminders():
 
14
  try:
15
- async with AsyncSessionLocal() as session:
16
- now = datetime.utcnow()
17
- reminder_window = now + timedelta(minutes=30)
18
-
19
- # Fetch events within the window that haven’t been reminded yet
20
- stmt = select(Event).where(
21
- Event.start_time >= now,
22
- Event.start_time <= reminder_window,
23
- Event.reminder_sent == False
24
- ).options(selectinload(Event.attendees))
25
-
26
  result = await session.execute(stmt)
27
- events = result.scalars().all()
28
-
29
- for event in events:
30
- # Notify the creator
31
- await notifications.create_notification(
32
- user_id=event.user_id,
33
- title=f"Event Reminder: {event.title}",
34
- message=f"Your event '{event.title}' starts in {event.reminder_minutes} minutes",
35
- notification_type="event_reminder",
36
- data={"event_id": str(event.id)}
37
- )
38
-
39
- # Notify attendees
40
- for attendee in event.attendees:
41
- await notifications.create_notification(
42
- user_id=attendee.user_id,
43
  title=f"Event Reminder: {event.title}",
44
- message=f"Event '{event.title}' starts in {event.reminder_minutes} minutes",
45
  notification_type="event_reminder",
46
  data={"event_id": str(event.id)}
47
  )
48
-
49
- # Mark reminder sent
50
- event.reminder_sent = True
51
-
52
- await session.commit()
53
 
54
  except Exception as e:
55
  logger.error(f"Error in event reminder check: {str(e)}")
56
 
57
  async def cleanup_old_notifications():
 
58
  try:
59
- async with AsyncSessionLocal() as session:
60
- cutoff = datetime.utcnow() - timedelta(days=30)
61
- stmt = delete(Notification).where(Notification.created_at < cutoff)
62
- result = await session.execute(stmt)
63
- await session.commit()
64
- logger.info(f"Cleaned up old notifications.")
 
65
  except Exception as e:
66
  logger.error(f"Error in cleanup_old_notifications: {str(e)}")
67
 
68
  async def perform_daily_maintenance():
 
69
  try:
 
70
  deleted_sessions = await maintenance.cleanup_expired_sessions()
71
  logger.info(f"Cleaned up {deleted_sessions} expired sessions")
72
 
 
73
  archived = await maintenance.archive_old_data()
74
  if archived:
75
  logger.info(f"Archived data: {archived}")
76
 
 
77
  health_data = await maintenance.check_system_health()
78
  if "error" not in health_data:
79
  logger.info("System health check completed successfully")
80
  else:
81
  logger.error(f"System health check error: {health_data['error']}")
82
 
 
83
  resources = await maintenance.monitor_system_resources()
84
  if "error" not in resources:
85
  logger.info("System resource monitoring completed successfully")
86
  else:
87
  logger.error(f"Resource monitoring error: {resources['error']}")
 
88
  except Exception as e:
89
  logger.error(f"Error in daily maintenance: {str(e)}")
90
 
91
  async def perform_weekly_maintenance():
 
92
  try:
93
- db_maintenance = await maintenance.perform_database_maintenance()
94
- if "error" not in db_maintenance:
95
- logger.info("Database maintenance completed successfully")
96
- else:
97
- logger.error(f"Database maintenance error: {db_maintenance['error']}")
 
 
 
 
 
 
 
 
 
 
98
  except Exception as e:
99
  logger.error(f"Error in weekly maintenance: {str(e)}")
100
 
101
  async def run_periodic_tasks():
 
102
  daily_maintenance_run = False
103
  weekly_maintenance_run = False
104
 
@@ -106,22 +140,28 @@ async def run_periodic_tasks():
106
  try:
107
  now = datetime.utcnow()
108
 
 
109
  await check_event_reminders()
 
 
110
  await cleanup_old_notifications()
111
 
 
112
  if now.hour == 2 and not daily_maintenance_run:
113
  await perform_daily_maintenance()
114
  daily_maintenance_run = True
115
  elif now.hour != 2:
116
  daily_maintenance_run = False
117
 
 
118
  if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run:
119
  await perform_weekly_maintenance()
120
  weekly_maintenance_run = True
121
  elif now.weekday() != 6 or now.hour != 3:
122
  weekly_maintenance_run = False
123
-
124
- await asyncio.sleep(60)
 
125
  except Exception as e:
126
  logger.error(f"Error in periodic tasks: {str(e)}")
127
- await asyncio.sleep(60)
 
1
  from datetime import datetime, timedelta
2
+ from ..db.database import db
 
 
 
 
 
 
3
  from ..services.maintenance import maintenance
4
  from ..utils.logger import logger
5
+ from ..services.websocket import broadcast_notification
6
  import asyncio
7
+ from sqlalchemy import select
8
+ from ..db.models import Event, User
9
+
10
+ async def notify_users(user_ids: list, title: str, message: str, notification_type: str, data: dict = None):
11
+ """Send notification to users"""
12
+ for user_id in user_ids:
13
+ notification = {
14
+ "user_id": user_id,
15
+ "title": title,
16
+ "message": message,
17
+ "type": notification_type,
18
+ "data": data,
19
+ "created_at": datetime.utcnow(),
20
+ "read": False
21
+ }
22
+
23
+ # Store in database
24
+ await db.db["notifications"].insert_one(notification)
25
+
26
+ # Broadcast to connected clients
27
+ await broadcast_notification({
28
+ "type": "notification",
29
+ "data": notification
30
+ })
31
 
32
  async def check_event_reminders():
33
+ """Check and send event reminders"""
34
  try:
35
+ now = datetime.utcnow()
36
+ # Find events happening soon that haven't sent reminders
37
+ stmt = select(Event).where(
38
+ Event.start_time > now,
39
+ Event.start_time <= now + timedelta(minutes=30),
40
+ Event.reminder_sent == False
41
+ )
42
+
43
+ async with db.session() as session:
 
 
44
  result = await session.execute(stmt)
45
+ upcoming_events = result.scalars().all()
46
+
47
+ for event in upcoming_events:
48
+ # Get event owner
49
+ user_stmt = select(User).where(User.id == event.user_id)
50
+ user_result = await session.execute(user_stmt)
51
+ user = user_result.scalar_one_or_none()
52
+
53
+ if user:
54
+ # Send reminder to event owner and attendees
55
+ reminder_users = [str(user.id)] + event.attendees
56
+ await notify_users(
57
+ user_ids=reminder_users,
 
 
 
58
  title=f"Event Reminder: {event.title}",
59
+ message=f"Your event '{event.title}' starts in {event.reminder_minutes} minutes",
60
  notification_type="event_reminder",
61
  data={"event_id": str(event.id)}
62
  )
63
+
64
+ # Mark reminder as sent
65
+ event.reminder_sent = True
66
+ await session.commit()
 
67
 
68
  except Exception as e:
69
  logger.error(f"Error in event reminder check: {str(e)}")
70
 
71
  async def cleanup_old_notifications():
72
+ """Clean up old notifications"""
73
  try:
74
+ # Delete notifications older than 30 days
75
+ cutoff = datetime.utcnow() - timedelta(days=30)
76
+ result = await db.db["notifications"].delete_many({
77
+ "created_at": {"$lt": cutoff},
78
+ "read": True # Only delete read notifications
79
+ })
80
+ logger.info(f"Cleaned up {result.deleted_count} old notifications")
81
  except Exception as e:
82
  logger.error(f"Error in cleanup_old_notifications: {str(e)}")
83
 
84
  async def perform_daily_maintenance():
85
+ """Perform daily system maintenance tasks"""
86
  try:
87
+ # Clean up expired sessions
88
  deleted_sessions = await maintenance.cleanup_expired_sessions()
89
  logger.info(f"Cleaned up {deleted_sessions} expired sessions")
90
 
91
+ # Archive old data
92
  archived = await maintenance.archive_old_data()
93
  if archived:
94
  logger.info(f"Archived data: {archived}")
95
 
96
+ # Check system health
97
  health_data = await maintenance.check_system_health()
98
  if "error" not in health_data:
99
  logger.info("System health check completed successfully")
100
  else:
101
  logger.error(f"System health check error: {health_data['error']}")
102
 
103
+ # Monitor system resources
104
  resources = await maintenance.monitor_system_resources()
105
  if "error" not in resources:
106
  logger.info("System resource monitoring completed successfully")
107
  else:
108
  logger.error(f"Resource monitoring error: {resources['error']}")
109
+
110
  except Exception as e:
111
  logger.error(f"Error in daily maintenance: {str(e)}")
112
 
113
  async def perform_weekly_maintenance():
114
+ """Perform weekly system maintenance tasks"""
115
  try:
116
+ # Perform database maintenance
117
+ await maintenance.perform_database_maintenance()
118
+ logger.info("Database maintenance completed successfully")
119
+
120
+ # Rotate log files
121
+ await maintenance.rotate_log_files()
122
+ logger.info("Log rotation completed successfully")
123
+
124
+ # Manage storage quotas
125
+ quota_results = await maintenance.manage_storage_quotas()
126
+ if quota_results.get("warnings"):
127
+ for warning in quota_results["warnings"]:
128
+ logger.warning(warning)
129
+ logger.info(f"Storage cleanup: removed {quota_results.get('cleaned', 0)} temporary files")
130
+
131
  except Exception as e:
132
  logger.error(f"Error in weekly maintenance: {str(e)}")
133
 
134
  async def run_periodic_tasks():
135
+ """Run periodic maintenance tasks"""
136
  daily_maintenance_run = False
137
  weekly_maintenance_run = False
138
 
 
140
  try:
141
  now = datetime.utcnow()
142
 
143
+ # Check event reminders every minute
144
  await check_event_reminders()
145
+
146
+ # Clean up old notifications daily
147
  await cleanup_old_notifications()
148
 
149
+ # Run daily maintenance at 2 AM
150
  if now.hour == 2 and not daily_maintenance_run:
151
  await perform_daily_maintenance()
152
  daily_maintenance_run = True
153
  elif now.hour != 2:
154
  daily_maintenance_run = False
155
 
156
+ # Run weekly maintenance on Sunday at 3 AM
157
  if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run:
158
  await perform_weekly_maintenance()
159
  weekly_maintenance_run = True
160
  elif now.weekday() != 6 or now.hour != 3:
161
  weekly_maintenance_run = False
162
+
163
+ # Wait before next check
164
+ await asyncio.sleep(60) # 1 minute
165
  except Exception as e:
166
  logger.error(f"Error in periodic tasks: {str(e)}")
167
+ await asyncio.sleep(60) # Wait before retrying