Fred808 commited on
Commit
c9e21e4
·
verified ·
1 Parent(s): 0f8ccfb

Update app/utils/tasks.py

Browse files
Files changed (1) hide show
  1. app/utils/tasks.py +127 -139
app/utils/tasks.py CHANGED
@@ -1,139 +1,127 @@
1
- from datetime import datetime, timedelta
2
- from ..db.database import db
3
- from ..services.notifications import notifications
4
- from ..services.maintenance import maintenance
5
- from ..utils.logger import logger
6
- import asyncio
7
-
8
- async def check_event_reminders():
9
- """Check and send event reminders"""
10
- try:
11
- now = datetime.utcnow()
12
- reminder_window = now + timedelta(minutes=30) # Check next 30 minutes
13
-
14
- # Find events that need reminders
15
- events = await db.db["events"].find({
16
- "start_time": {
17
- "$gte": now,
18
- "$lte": reminder_window
19
- },
20
- "reminder_sent": {"$ne": True}
21
- }).to_list(None)
22
-
23
- for event in events:
24
- # Send reminder to event creator
25
- await notifications.create_notification(
26
- user_id=event["user_id"],
27
- title=f"Event Reminder: {event['title']}",
28
- message=f"Your event '{event['title']}' starts in {event['reminder_minutes']} minutes",
29
- notification_type="event_reminder",
30
- data={"event_id": str(event["_id"])}
31
- )
32
-
33
- # Send reminders to attendees
34
- for attendee in event.get("attendees", []):
35
- await notifications.create_notification(
36
- user_id=attendee,
37
- title=f"Event Reminder: {event['title']}",
38
- message=f"Event '{event['title']}' starts in {event['reminder_minutes']} minutes",
39
- notification_type="event_reminder",
40
- data={"event_id": str(event["_id"])}
41
- )
42
-
43
- # Mark reminder as sent
44
- await db.db["events"].update_one(
45
- {"_id": event["_id"]},
46
- {"$set": {"reminder_sent": True}}
47
- )
48
-
49
- except Exception as e:
50
- logger.error(f"Error in event reminder check: {str(e)}")
51
-
52
- async def cleanup_old_notifications():
53
- """Clean up old notifications"""
54
- try:
55
- # Delete notifications older than 30 days
56
- cutoff_date = datetime.utcnow() - timedelta(days=30)
57
- result = await db.db["notifications"].delete_many({
58
- "created_at": {"$lt": cutoff_date}
59
- })
60
- logger.info(f"Cleaned up {result.deleted_count} old notifications")
61
- except Exception as e:
62
- logger.error(f"Error in notification cleanup: {str(e)}")
63
-
64
- async def perform_daily_maintenance():
65
- """Perform daily system maintenance tasks"""
66
- try:
67
- # Clean up expired sessions
68
- deleted_sessions = await maintenance.cleanup_expired_sessions()
69
- logger.info(f"Cleaned up {deleted_sessions} expired sessions")
70
-
71
- # Archive old data
72
- archived = await maintenance.archive_old_data()
73
- if archived:
74
- logger.info(f"Archived data: {archived}")
75
-
76
- # Check system health
77
- health_data = await maintenance.check_system_health()
78
- if "error" not in health_data:
79
- logger.info("System health check completed successfully")
80
- else:
81
- logger.error(f"System health check error: {health_data['error']}")
82
-
83
- # Monitor system resources
84
- resources = await maintenance.monitor_system_resources()
85
- if "error" not in resources:
86
- logger.info("System resource monitoring completed successfully")
87
- else:
88
- logger.error(f"Resource monitoring error: {resources['error']}")
89
-
90
- except Exception as e:
91
- logger.error(f"Error in daily maintenance: {str(e)}")
92
-
93
- async def perform_weekly_maintenance():
94
- """Perform weekly system maintenance tasks"""
95
- try:
96
- # Perform database maintenance
97
- db_maintenance = await maintenance.perform_database_maintenance()
98
- if "error" not in db_maintenance:
99
- logger.info("Database maintenance completed successfully")
100
- else:
101
- logger.error(f"Database maintenance error: {db_maintenance['error']}")
102
-
103
- except Exception as e:
104
- logger.error(f"Error in weekly maintenance: {str(e)}")
105
-
106
- async def run_periodic_tasks():
107
- """Run periodic maintenance tasks"""
108
- daily_maintenance_run = False
109
- weekly_maintenance_run = False
110
-
111
- while True:
112
- try:
113
- now = datetime.utcnow()
114
-
115
- # Check event reminders every minute
116
- await check_event_reminders()
117
-
118
- # Clean up old notifications daily
119
- await cleanup_old_notifications()
120
-
121
- # Run daily maintenance at 2 AM
122
- if now.hour == 2 and not daily_maintenance_run:
123
- await perform_daily_maintenance()
124
- daily_maintenance_run = True
125
- elif now.hour != 2:
126
- daily_maintenance_run = False
127
-
128
- # Run weekly maintenance on Sunday at 3 AM
129
- if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run:
130
- await perform_weekly_maintenance()
131
- weekly_maintenance_run = True
132
- elif now.weekday() != 6 or now.hour != 3:
133
- weekly_maintenance_run = False
134
-
135
- # Wait before next check
136
- await asyncio.sleep(60) # 1 minute
137
- except Exception as e:
138
- logger.error(f"Error in periodic tasks: {str(e)}")
139
- await asyncio.sleep(60) # Wait before retrying
 
1
+ from datetime import datetime, timedelta
2
+ from sqlalchemy.future import select
3
+ from sqlalchemy import update, delete
4
+ from sqlalchemy.orm import selectinload
5
+ from ..db.database import AsyncSessionLocal
6
+ from ..models.event import Event # Your SQLAlchemy model for events
7
+ from ..models.notification import Notification # Your notification model
8
+ from ..services.notifications import notifications
9
+ from ..services.maintenance import maintenance
10
+ from ..utils.logger import logger
11
+ import asyncio
12
+
13
+ async def check_event_reminders():
14
+ try:
15
+ async with AsyncSessionLocal() as session:
16
+ now = datetime.utcnow()
17
+ reminder_window = now + timedelta(minutes=30)
18
+
19
+ # Fetch events within the window that haven’t been reminded yet
20
+ stmt = select(Event).where(
21
+ Event.start_time >= now,
22
+ Event.start_time <= reminder_window,
23
+ Event.reminder_sent == False
24
+ ).options(selectinload(Event.attendees))
25
+
26
+ result = await session.execute(stmt)
27
+ events = result.scalars().all()
28
+
29
+ for event in events:
30
+ # Notify the creator
31
+ await notifications.create_notification(
32
+ user_id=event.user_id,
33
+ title=f"Event Reminder: {event.title}",
34
+ message=f"Your event '{event.title}' starts in {event.reminder_minutes} minutes",
35
+ notification_type="event_reminder",
36
+ data={"event_id": str(event.id)}
37
+ )
38
+
39
+ # Notify attendees
40
+ for attendee in event.attendees:
41
+ await notifications.create_notification(
42
+ user_id=attendee.user_id,
43
+ title=f"Event Reminder: {event.title}",
44
+ message=f"Event '{event.title}' starts in {event.reminder_minutes} minutes",
45
+ notification_type="event_reminder",
46
+ data={"event_id": str(event.id)}
47
+ )
48
+
49
+ # Mark reminder sent
50
+ event.reminder_sent = True
51
+
52
+ await session.commit()
53
+
54
+ except Exception as e:
55
+ logger.error(f"Error in event reminder check: {str(e)}")
56
+
57
+ async def cleanup_old_notifications():
58
+ try:
59
+ async with AsyncSessionLocal() as session:
60
+ cutoff = datetime.utcnow() - timedelta(days=30)
61
+ stmt = delete(Notification).where(Notification.created_at < cutoff)
62
+ result = await session.execute(stmt)
63
+ await session.commit()
64
+ logger.info(f"Cleaned up old notifications.")
65
+ except Exception as e:
66
+ logger.error(f"Error in cleanup_old_notifications: {str(e)}")
67
+
68
+ async def perform_daily_maintenance():
69
+ try:
70
+ deleted_sessions = await maintenance.cleanup_expired_sessions()
71
+ logger.info(f"Cleaned up {deleted_sessions} expired sessions")
72
+
73
+ archived = await maintenance.archive_old_data()
74
+ if archived:
75
+ logger.info(f"Archived data: {archived}")
76
+
77
+ health_data = await maintenance.check_system_health()
78
+ if "error" not in health_data:
79
+ logger.info("System health check completed successfully")
80
+ else:
81
+ logger.error(f"System health check error: {health_data['error']}")
82
+
83
+ resources = await maintenance.monitor_system_resources()
84
+ if "error" not in resources:
85
+ logger.info("System resource monitoring completed successfully")
86
+ else:
87
+ logger.error(f"Resource monitoring error: {resources['error']}")
88
+ except Exception as e:
89
+ logger.error(f"Error in daily maintenance: {str(e)}")
90
+
91
+ async def perform_weekly_maintenance():
92
+ try:
93
+ db_maintenance = await maintenance.perform_database_maintenance()
94
+ if "error" not in db_maintenance:
95
+ logger.info("Database maintenance completed successfully")
96
+ else:
97
+ logger.error(f"Database maintenance error: {db_maintenance['error']}")
98
+ except Exception as e:
99
+ logger.error(f"Error in weekly maintenance: {str(e)}")
100
+
101
+ async def run_periodic_tasks():
102
+ daily_maintenance_run = False
103
+ weekly_maintenance_run = False
104
+
105
+ while True:
106
+ try:
107
+ now = datetime.utcnow()
108
+
109
+ await check_event_reminders()
110
+ await cleanup_old_notifications()
111
+
112
+ if now.hour == 2 and not daily_maintenance_run:
113
+ await perform_daily_maintenance()
114
+ daily_maintenance_run = True
115
+ elif now.hour != 2:
116
+ daily_maintenance_run = False
117
+
118
+ if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run:
119
+ await perform_weekly_maintenance()
120
+ weekly_maintenance_run = True
121
+ elif now.weekday() != 6 or now.hour != 3:
122
+ weekly_maintenance_run = False
123
+
124
+ await asyncio.sleep(60)
125
+ except Exception as e:
126
+ logger.error(f"Error in periodic tasks: {str(e)}")
127
+ await asyncio.sleep(60)