Fred808 commited on
Commit
f674d57
·
verified ·
1 Parent(s): 639b015

Update app/utils/tasks.py

Browse files
Files changed (1) hide show
  1. app/utils/tasks.py +154 -146
app/utils/tasks.py CHANGED
@@ -1,146 +1,154 @@
1
- from datetime import datetime, timedelta
2
- from ..db.database import db
3
- from ..services.maintenance import maintenance
4
- from ..utils.logger import logger
5
- from ..services.websocket import create_and_broadcast_notification
6
- import asyncio
7
- from sqlalchemy import select
8
- from ..db.models import Event, User
9
-
10
- async def check_event_reminders():
11
- """Check and send event reminders"""
12
- try:
13
- now = datetime.utcnow()
14
- # Find events happening soon that haven't sent reminders
15
- stmt = select(Event).where(
16
- Event.start_time > now,
17
- Event.start_time <= now + timedelta(minutes=30),
18
- Event.reminder_sent == False
19
- )
20
-
21
- async with db.session() as session:
22
- result = await session.execute(stmt)
23
- upcoming_events = result.scalars().all()
24
-
25
- for event in upcoming_events:
26
- # Get event owner
27
- user_stmt = select(User).where(User.id == event.user_id)
28
- user_result = await session.execute(user_stmt)
29
- user = user_result.scalar_one_or_none()
30
-
31
- if user:
32
- # Send reminder to event owner and attendees
33
- reminder_users = [str(user.id)] + event.attendees
34
- for user_id in reminder_users:
35
- await create_and_broadcast_notification(
36
- user_id=user_id,
37
- title=f"Event Reminder: {event.title}",
38
- message=f"Your event '{event.title}' starts in {event.reminder_minutes} minutes",
39
- notification_type="event_reminder",
40
- data={"event_id": str(event.id)}
41
- )
42
-
43
- # Mark reminder as sent
44
- event.reminder_sent = True
45
- await session.commit()
46
-
47
- except Exception as e:
48
- logger.error(f"Error in event reminder check: {str(e)}")
49
-
50
- async def cleanup_old_notifications():
51
- """Clean up old notifications"""
52
- try:
53
- # Delete notifications older than 30 days
54
- cutoff = datetime.utcnow() - timedelta(days=30)
55
- result = await db.db["notifications"].delete_many({
56
- "created_at": {"$lt": cutoff},
57
- "read": True # Only delete read notifications
58
- })
59
- logger.info(f"Cleaned up {result.deleted_count} old notifications")
60
- except Exception as e:
61
- logger.error(f"Error in cleanup_old_notifications: {str(e)}")
62
-
63
- async def perform_daily_maintenance():
64
- """Perform daily system maintenance tasks"""
65
- try:
66
- # Clean up expired sessions
67
- deleted_sessions = await maintenance.cleanup_expired_sessions()
68
- logger.info(f"Cleaned up {deleted_sessions} expired sessions")
69
-
70
- # Archive old data
71
- archived = await maintenance.archive_old_data()
72
- if archived:
73
- logger.info(f"Archived data: {archived}")
74
-
75
- # Check system health
76
- health_data = await maintenance.check_system_health()
77
- if "error" not in health_data:
78
- logger.info("System health check completed successfully")
79
- else:
80
- logger.error(f"System health check error: {health_data['error']}")
81
-
82
- # Monitor system resources
83
- resources = await maintenance.monitor_system_resources()
84
- if "error" not in resources:
85
- logger.info("System resource monitoring completed successfully")
86
- else:
87
- logger.error(f"Resource monitoring error: {resources['error']}")
88
-
89
- except Exception as e:
90
- logger.error(f"Error in daily maintenance: {str(e)}")
91
-
92
- async def perform_weekly_maintenance():
93
- """Perform weekly system maintenance tasks"""
94
- try:
95
- # Perform database maintenance
96
- await maintenance.perform_database_maintenance()
97
- logger.info("Database maintenance completed successfully")
98
-
99
- # Rotate log files
100
- await maintenance.rotate_log_files()
101
- logger.info("Log rotation completed successfully")
102
-
103
- # Manage storage quotas
104
- quota_results = await maintenance.manage_storage_quotas()
105
- if quota_results.get("warnings"):
106
- for warning in quota_results["warnings"]:
107
- logger.warning(warning)
108
- logger.info(f"Storage cleanup: removed {quota_results.get('cleaned', 0)} temporary files")
109
-
110
- except Exception as e:
111
- logger.error(f"Error in weekly maintenance: {str(e)}")
112
-
113
- async def run_periodic_tasks():
114
- """Run periodic maintenance tasks"""
115
- daily_maintenance_run = False
116
- weekly_maintenance_run = False
117
-
118
- while True:
119
- try:
120
- now = datetime.utcnow()
121
-
122
- # Check event reminders every minute
123
- await check_event_reminders()
124
-
125
- # Clean up old notifications daily
126
- await cleanup_old_notifications()
127
-
128
- # Run daily maintenance at 2 AM
129
- if now.hour == 2 and not daily_maintenance_run:
130
- await perform_daily_maintenance()
131
- daily_maintenance_run = True
132
- elif now.hour != 2:
133
- daily_maintenance_run = False
134
-
135
- # Run weekly maintenance on Sunday at 3 AM
136
- if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run:
137
- await perform_weekly_maintenance()
138
- weekly_maintenance_run = True
139
- elif now.weekday() != 6 or now.hour != 3:
140
- weekly_maintenance_run = False
141
-
142
- # Wait before next check
143
- await asyncio.sleep(60) # 1 minute
144
- except Exception as e:
145
- logger.error(f"Error in periodic tasks: {str(e)}")
146
- await asyncio.sleep(60) # Wait before retrying
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime, timedelta
2
+ from ..db.database import db
3
+ from ..services.maintenance import maintenance
4
+ from ..utils.logger import logger
5
+ from ..services.websocket import create_and_broadcast_notification
6
+ import asyncio
7
+ from sqlalchemy import select, delete
8
+ from ..db.models import Event, User, Notification
9
+
10
+ async def check_event_reminders():
11
+ """Check and send event reminders"""
12
+ try:
13
+ now = datetime.utcnow()
14
+ # Find events happening soon that haven't sent reminders
15
+ stmt = select(Event).where(
16
+ Event.start_time > now,
17
+ Event.start_time <= now + timedelta(minutes=30),
18
+ Event.reminder_sent == False
19
+ )
20
+
21
+ async with db.session() as session:
22
+ result = await session.execute(stmt)
23
+ upcoming_events = result.scalars().all()
24
+
25
+ for event in upcoming_events:
26
+ # Get event owner
27
+ user_stmt = select(User).where(User.id == event.user_id)
28
+ user_result = await session.execute(user_stmt)
29
+ user = user_result.scalar_one_or_none()
30
+
31
+ if user:
32
+ # Send reminder to event owner and attendees
33
+ reminder_users = [str(user.id)] + event.attendees
34
+ for user_id in reminder_users:
35
+ await create_and_broadcast_notification(
36
+ user_id=user_id,
37
+ title=f"Event Reminder: {event.title}",
38
+ message=f"Your event '{event.title}' starts in {event.reminder_minutes} minutes",
39
+ notification_type="event_reminder",
40
+ data={"event_id": str(event.id)}
41
+ )
42
+
43
+ # Mark reminder as sent
44
+ event.reminder_sent = True
45
+
46
+ # Commit all changes at once
47
+ await session.commit()
48
+
49
+ except Exception as e:
50
+ logger.error(f"Error in event reminder check: {str(e)}")
51
+
52
+ async def cleanup_old_notifications():
53
+ """Clean up old notifications"""
54
+ try:
55
+ cutoff = datetime.utcnow() - timedelta(days=30)
56
+ async with db.session() as session:
57
+ # Delete notifications older than 30 days that have been read
58
+ stmt = delete(Notification).where(
59
+ Notification.created_at < cutoff,
60
+ Notification.read == True
61
+ )
62
+ result = await session.execute(stmt)
63
+ await session.commit()
64
+
65
+ # Get number of deleted rows
66
+ deleted_count = result.rowcount
67
+ logger.info(f"Cleaned up {deleted_count} old notifications")
68
+ except Exception as e:
69
+ logger.error(f"Error in cleanup_old_notifications: {str(e)}")
70
+
71
+ async def perform_daily_maintenance():
72
+ """Perform daily system maintenance tasks"""
73
+ try:
74
+ # Clean up expired sessions
75
+ deleted_sessions = await maintenance.cleanup_expired_sessions()
76
+ logger.info(f"Cleaned up {deleted_sessions} expired sessions")
77
+
78
+ # Archive old data
79
+ archived = await maintenance.archive_old_data()
80
+ if archived:
81
+ logger.info(f"Archived data: {archived}")
82
+
83
+ # Check system health
84
+ health_data = await maintenance.check_system_health()
85
+ if "error" not in health_data:
86
+ logger.info("System health check completed successfully")
87
+ else:
88
+ logger.error(f"System health check error: {health_data['error']}")
89
+
90
+ # Monitor system resources
91
+ resources = await maintenance.monitor_system_resources()
92
+ if "error" not in resources:
93
+ logger.info("System resource monitoring completed successfully")
94
+ else:
95
+ logger.error(f"Resource monitoring error: {resources['error']}")
96
+
97
+ except Exception as e:
98
+ logger.error(f"Error in daily maintenance: {str(e)}")
99
+
100
+ async def perform_weekly_maintenance():
101
+ """Perform weekly system maintenance tasks"""
102
+ try:
103
+ # Perform database maintenance
104
+ await maintenance.perform_database_maintenance()
105
+ logger.info("Database maintenance completed successfully")
106
+
107
+ # Rotate log files
108
+ await maintenance.rotate_log_files()
109
+ logger.info("Log rotation completed successfully")
110
+
111
+ # Manage storage quotas
112
+ quota_results = await maintenance.manage_storage_quotas()
113
+ if quota_results.get("warnings"):
114
+ for warning in quota_results["warnings"]:
115
+ logger.warning(warning)
116
+ logger.info(f"Storage cleanup: removed {quota_results.get('cleaned', 0)} temporary files")
117
+
118
+ except Exception as e:
119
+ logger.error(f"Error in weekly maintenance: {str(e)}")
120
+
121
+ async def run_periodic_tasks():
122
+ """Run periodic maintenance tasks"""
123
+ daily_maintenance_run = False
124
+ weekly_maintenance_run = False
125
+
126
+ while True:
127
+ try:
128
+ now = datetime.utcnow()
129
+
130
+ # Check event reminders every minute
131
+ await check_event_reminders()
132
+
133
+ # Clean up old notifications daily
134
+ await cleanup_old_notifications()
135
+
136
+ # Run daily maintenance at 2 AM
137
+ if now.hour == 2 and not daily_maintenance_run:
138
+ await perform_daily_maintenance()
139
+ daily_maintenance_run = True
140
+ elif now.hour != 2:
141
+ daily_maintenance_run = False
142
+
143
+ # Run weekly maintenance on Sunday at 3 AM
144
+ if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run:
145
+ await perform_weekly_maintenance()
146
+ weekly_maintenance_run = True
147
+ elif now.weekday() != 6 or now.hour != 3:
148
+ weekly_maintenance_run = False
149
+
150
+ # Wait before next check
151
+ await asyncio.sleep(60) # 1 minute
152
+ except Exception as e:
153
+ logger.error(f"Error in periodic tasks: {str(e)}")
154
+ await asyncio.sleep(60) # Wait before retrying