Fred808 commited on
Commit
73e4339
·
verified ·
1 Parent(s): 56f4c2b

Update app/services/maintenance.py

Browse files
Files changed (1) hide show
  1. app/services/maintenance.py +216 -201
app/services/maintenance.py CHANGED
@@ -1,202 +1,217 @@
1
- import os
2
- import shutil
3
- from datetime import datetime, timedelta
4
- from typing import Dict, Any, Optional
5
- from sqlalchemy import select, delete, text
6
- from ..db.database import db
7
- from ..utils.logger import logger
8
- from ..core.config import settings
9
- from ..services.websocket import create_and_broadcast_notification
10
-
11
- class MaintenanceService:
12
- async def cleanup_expired_sessions(self) -> int:
13
- """Clean up expired sessions"""
14
- cutoff = datetime.utcnow() - timedelta(days=7)
15
- result = await db.db["sessions"].delete_many({
16
- "last_activity": {"$lt": cutoff}
17
- })
18
- return result.deleted_count
19
-
20
- async def archive_old_data(self) -> Dict[str, int]:
21
- """Archive old data"""
22
- try:
23
- cutoff = datetime.utcnow() - timedelta(days=365)
24
- archived = {}
25
-
26
- # Archive old orders
27
- result = await db.db["orders"].update_many(
28
- {
29
- "created_at": {"$lt": cutoff},
30
- "status": {"$in": ["completed", "cancelled"]}
31
- },
32
- {"$set": {"archived": True}}
33
- )
34
- archived["orders"] = result.modified_count
35
-
36
- # Archive old notifications
37
- result = await db.db["notifications"].update_many(
38
- {
39
- "created_at": {"$lt": cutoff},
40
- "read": True
41
- },
42
- {"$set": {"archived": True}}
43
- )
44
- archived["notifications"] = result.modified_count
45
-
46
- return archived
47
- except Exception as e:
48
- logger.error(f"Error archiving old data: {str(e)}")
49
- return None
50
-
51
- async def check_system_health(self) -> Dict[str, Any]:
52
- """Check system health"""
53
- try:
54
- # Check database connection
55
- await db.db.command("ping")
56
-
57
- # Get database stats
58
- stats = await db.db.command("dbStats")
59
- disk_size = stats.get("dataSize", 0) / (1024 * 1024) # Convert to MB
60
-
61
- # Check disk space
62
- total_space = shutil.disk_usage("/").total / (1024 * 1024 * 1024) # GB
63
- free_space = shutil.disk_usage("/").free / (1024 * 1024 * 1024) # GB
64
-
65
- health_data = {
66
- "status": "healthy",
67
- "database": {
68
- "connected": True,
69
- "size_mb": disk_size
70
- },
71
- "disk": {
72
- "total_gb": total_space,
73
- "free_gb": free_space,
74
- "usage_percent": ((total_space - free_space) / total_space) * 100
75
- },
76
- "timestamp": datetime.utcnow()
77
- }
78
-
79
- # Send alert if disk space is low
80
- if free_space < 5: # Less than 5GB free
81
- await create_and_broadcast_notification(
82
- user_id="admin",
83
- title="Low Disk Space Alert",
84
- message=f"Server is running low on disk space. Only {free_space:.2f}GB remaining.",
85
- notification_type="system_alert",
86
- data={"free_space_gb": free_space}
87
- )
88
-
89
- return health_data
90
- except Exception as e:
91
- logger.error(f"Health check error: {str(e)}")
92
- return {"status": "unhealthy", "error": str(e)}
93
-
94
- async def monitor_system_resources(self) -> Dict[str, Any]:
95
- """Monitor system resources"""
96
- try:
97
- # Monitor database connections
98
- server_status = await db.db.command("serverStatus")
99
- current_connections = server_status.get("connections", {}).get("current", 0)
100
-
101
- resources = {
102
- "database": {
103
- "connections": current_connections,
104
- },
105
- "timestamp": datetime.utcnow()
106
- }
107
-
108
- # Alert if too many connections
109
- if current_connections > settings.MAX_DB_CONNECTIONS * 0.9:
110
- await create_and_broadcast_notification(
111
- user_id="admin",
112
- title="High Database Connections",
113
- message=f"Database has {current_connections} active connections",
114
- notification_type="system_alert",
115
- data={"connections": current_connections}
116
- )
117
-
118
- return resources
119
- except Exception as e:
120
- logger.error(f"Resource monitoring error: {str(e)}")
121
- return {"error": str(e)}
122
-
123
- async def perform_database_maintenance(self) -> Dict[str, Any]:
124
- """Perform database maintenance tasks"""
125
- try:
126
- # Run VACUUM ANALYZE
127
- await db.db.command("vacuum")
128
-
129
- # Clean up temporary collections
130
- temp_collections = [
131
- "temp_imports",
132
- "temp_exports",
133
- "temp_reports"
134
- ]
135
- for collection in temp_collections:
136
- await db.db[collection].drop()
137
-
138
- return {"status": "success"}
139
- except Exception as e:
140
- logger.error(f"Database maintenance error: {str(e)}")
141
- return {"error": str(e)}
142
-
143
- async def rotate_log_files(self) -> None:
144
- """Rotate log files"""
145
- log_dir = "logs"
146
- max_log_size = 10 * 1024 * 1024 # 10MB
147
-
148
- try:
149
- for filename in os.listdir(log_dir):
150
- filepath = os.path.join(log_dir, filename)
151
- if os.path.getsize(filepath) > max_log_size:
152
- # Archive old log
153
- archive_name = f"{filename}.{datetime.now().strftime('%Y%m%d')}"
154
- shutil.move(filepath, os.path.join(log_dir, archive_name))
155
-
156
- # Create new log file
157
- open(filepath, 'a').close()
158
- logger.info(f"Rotated log file: {filename}")
159
- except Exception as e:
160
- logger.error(f"Log rotation error: {str(e)}")
161
-
162
- async def manage_storage_quotas(self) -> Dict[str, Any]:
163
- """Manage storage quotas and cleanup"""
164
- try:
165
- results = {
166
- "warnings": [],
167
- "cleaned": 0
168
- }
169
-
170
- # Check and clean upload directories
171
- upload_dirs = ["uploads/documents", "uploads/images"]
172
- for directory in upload_dirs:
173
- if os.path.exists(directory):
174
- total_size = sum(
175
- os.path.getsize(os.path.join(directory, f))
176
- for f in os.listdir(directory)
177
- if os.path.isfile(os.path.join(directory, f))
178
- ) / (1024 * 1024) # Convert to MB
179
-
180
- if total_size > settings.MAX_UPLOAD_DIR_SIZE_MB:
181
- results["warnings"].append(
182
- f"Upload directory {directory} exceeds size limit"
183
- )
184
-
185
- # Clean up temporary files
186
- temp_dirs = ["uploads/temp", "backups/temp"]
187
- for directory in temp_dirs:
188
- if os.path.exists(directory):
189
- # Remove files older than 24 hours
190
- cutoff = datetime.now() - timedelta(days=1)
191
- for filename in os.listdir(directory):
192
- filepath = os.path.join(directory, filename)
193
- if os.path.getctime(filepath) < cutoff.timestamp():
194
- os.remove(filepath)
195
- results["cleaned"] += 1
196
-
197
- return results
198
- except Exception as e:
199
- logger.error(f"Storage quota management error: {str(e)}")
200
- return {"error": str(e)}
201
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
202
  maintenance = MaintenanceService()
 
1
+ import os
2
+ import shutil
3
+ import psutil
4
+ from datetime import datetime, timedelta
5
+ from typing import Dict, Any, Optional
6
+ from sqlalchemy import select, delete, update, func
7
+ from ..db.database import db
8
+ from ..utils.logger import logger
9
+ from ..core.config import settings
10
+ from ..services.websocket import create_and_broadcast_notification
11
+ from ..db.models import User, Order, Notification, Session
12
+
13
+ class MaintenanceService:
14
+ async def cleanup_expired_sessions(self) -> int:
15
+ """Clean up expired sessions"""
16
+ try:
17
+ cutoff = datetime.utcnow() - timedelta(days=7)
18
+ async with db.session() as session:
19
+ stmt = delete(Session).where(Session.last_activity < cutoff)
20
+ result = await session.execute(stmt)
21
+ await session.commit()
22
+ return result.rowcount
23
+ except Exception as e:
24
+ logger.error(f"Error cleaning up sessions: {str(e)}")
25
+ return 0
26
+
27
+ async def archive_old_data(self) -> Dict[str, int]:
28
+ """Archive old data"""
29
+ try:
30
+ cutoff = datetime.utcnow() - timedelta(days=365)
31
+ archived = {}
32
+
33
+ async with db.session() as session:
34
+ # Archive old orders
35
+ order_stmt = update(Order).where(
36
+ Order.created_at < cutoff,
37
+ Order.status.in_(["completed", "cancelled"])
38
+ ).values(archived=True)
39
+ order_result = await session.execute(order_stmt)
40
+ archived["orders"] = order_result.rowcount
41
+
42
+ # Archive old notifications
43
+ notif_stmt = update(Notification).where(
44
+ Notification.created_at < cutoff,
45
+ Notification.read == True
46
+ ).values(archived=True)
47
+ notif_result = await session.execute(notif_stmt)
48
+ archived["notifications"] = notif_result.rowcount
49
+
50
+ await session.commit()
51
+ return archived
52
+
53
+ except Exception as e:
54
+ logger.error(f"Error archiving old data: {str(e)}")
55
+ return None
56
+
57
+ async def check_system_health(self) -> Dict[str, Any]:
58
+ """Check system health"""
59
+ try:
60
+ async with db.session() as session:
61
+ # Check database connection by running a simple query
62
+ await session.execute(select(func.now()))
63
+
64
+ # Get database size (using psutil for disk stats)
65
+ disk = psutil.disk_usage('/')
66
+ total_space = disk.total / (1024 * 1024 * 1024) # GB
67
+ free_space = disk.free / (1024 * 1024 * 1024) # GB
68
+
69
+ health_data = {
70
+ "status": "healthy",
71
+ "database": {
72
+ "connected": True
73
+ },
74
+ "disk": {
75
+ "total_gb": total_space,
76
+ "free_gb": free_space,
77
+ "usage_percent": disk.percent
78
+ },
79
+ "timestamp": datetime.utcnow()
80
+ }
81
+
82
+ # Send alert if disk space is low
83
+ if free_space < 5: # Less than 5GB free
84
+ await create_and_broadcast_notification(
85
+ user_id="admin",
86
+ title="Low Disk Space Alert",
87
+ message=f"Server is running low on disk space. Only {free_space:.2f}GB remaining.",
88
+ notification_type="system_alert",
89
+ data={"free_space_gb": free_space}
90
+ )
91
+
92
+ return health_data
93
+
94
+ except Exception as e:
95
+ logger.error(f"Health check error: {str(e)}")
96
+ return {"status": "unhealthy", "error": str(e)}
97
+
98
+ async def monitor_system_resources(self) -> Dict[str, Any]:
99
+ """Monitor system resources"""
100
+ try:
101
+ async with db.session() as session:
102
+ # Get current active connections (using psutil for process stats)
103
+ process = psutil.Process()
104
+ open_files = process.open_files()
105
+ connections = len([f for f in open_files if 'socket' in str(f.path)])
106
+
107
+ resources = {
108
+ "database": {
109
+ "connections": connections,
110
+ },
111
+ "system": {
112
+ "cpu_percent": psutil.cpu_percent(),
113
+ "memory_percent": psutil.virtual_memory().percent
114
+ },
115
+ "timestamp": datetime.utcnow()
116
+ }
117
+
118
+ # Alert if too many connections or high resource usage
119
+ if connections > settings.MAX_DB_CONNECTIONS * 0.9:
120
+ await create_and_broadcast_notification(
121
+ user_id="admin",
122
+ title="High Database Connections",
123
+ message=f"Database has {connections} active connections",
124
+ notification_type="system_alert",
125
+ data={"connections": connections}
126
+ )
127
+
128
+ return resources
129
+
130
+ except Exception as e:
131
+ logger.error(f"Resource monitoring error: {str(e)}")
132
+ return {"error": str(e)}
133
+
134
+ async def perform_database_maintenance(self) -> Dict[str, Any]:
135
+ """Perform database maintenance tasks"""
136
+ try:
137
+ async with db.session() as session:
138
+ # Run ANALYZE on major tables
139
+ for table in [User, Order, Notification]:
140
+ await session.execute(f"ANALYZE {table.__tablename__}")
141
+
142
+ # Clean up any orphaned records
143
+ # For example, delete notifications for non-existent users
144
+ stmt = delete(Notification).where(
145
+ ~Notification.user_id.in_(
146
+ select(User.id)
147
+ )
148
+ )
149
+ await session.execute(stmt)
150
+ await session.commit()
151
+
152
+ return {"status": "success"}
153
+
154
+ except Exception as e:
155
+ logger.error(f"Database maintenance error: {str(e)}")
156
+ return {"error": str(e)}
157
+
158
+ async def rotate_log_files(self) -> None:
159
+ """Rotate log files"""
160
+ log_dir = "logs"
161
+ max_log_size = 10 * 1024 * 1024 # 10MB
162
+
163
+ try:
164
+ for filename in os.listdir(log_dir):
165
+ filepath = os.path.join(log_dir, filename)
166
+ if os.path.getsize(filepath) > max_log_size:
167
+ # Archive old log
168
+ archive_name = f"{filename}.{datetime.now().strftime('%Y%m%d')}"
169
+ shutil.move(filepath, os.path.join(log_dir, archive_name))
170
+
171
+ # Create new log file
172
+ open(filepath, 'a').close()
173
+ logger.info(f"Rotated log file: {filename}")
174
+ except Exception as e:
175
+ logger.error(f"Log rotation error: {str(e)}")
176
+
177
+ async def manage_storage_quotas(self) -> Dict[str, Any]:
178
+ """Manage storage quotas and cleanup"""
179
+ try:
180
+ results = {
181
+ "warnings": [],
182
+ "cleaned": 0
183
+ }
184
+
185
+ # Check and clean upload directories
186
+ upload_dirs = ["uploads/documents", "uploads/images"]
187
+ for directory in upload_dirs:
188
+ if os.path.exists(directory):
189
+ total_size = sum(
190
+ os.path.getsize(os.path.join(directory, f))
191
+ for f in os.listdir(directory)
192
+ if os.path.isfile(os.path.join(directory, f))
193
+ ) / (1024 * 1024) # Convert to MB
194
+
195
+ if total_size > settings.MAX_UPLOAD_DIR_SIZE_MB:
196
+ results["warnings"].append(
197
+ f"Upload directory {directory} exceeds size limit"
198
+ )
199
+
200
+ # Clean up temporary files
201
+ temp_dirs = ["uploads/temp", "backups/temp"]
202
+ for directory in temp_dirs:
203
+ if os.path.exists(directory):
204
+ # Remove files older than 24 hours
205
+ cutoff = datetime.now() - timedelta(days=1)
206
+ for filename in os.listdir(directory):
207
+ filepath = os.path.join(directory, filename)
208
+ if os.path.getctime(filepath) < cutoff.timestamp():
209
+ os.remove(filepath)
210
+ results["cleaned"] += 1
211
+
212
+ return results
213
+ except Exception as e:
214
+ logger.error(f"Storage quota management error: {str(e)}")
215
+ return {"error": str(e)}
216
+
217
  maintenance = MaintenanceService()