Spaces:
Sleeping
Sleeping
File size: 5,621 Bytes
5854e13 18c9405 5854e13 c31b74d 18c9405 5854e13 c31b74d 18c9405 5854e13 18c9405 5854e13 c31b74d 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 bacb197 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 18c9405 5854e13 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | from bson import ObjectId
from datetime import datetime
from db import get_logs_collection
from models.user import User
from models.department import Department
class Log:
def __init__(self, user_id, department_id, log_date, log_text=None, log_file=None, incidents=None,
_id=None, created_at=None, updated_at=None):
self.user_id = user_id
self.department_id = department_id
self.log_date = log_date
self.log_text = log_text # Extracted text content from the log
self.log_file = log_file # For backward compatibility
self.incidents = incidents or [] # Array of incident IDs
self._id = _id
self.created_at = created_at or datetime.now()
self.updated_at = updated_at or datetime.now()
def to_dict(self):
"""Convert instance to dictionary"""
log_dict = {
"user_id": str(self.user_id) if self.user_id else None,
"department_id": str(self.department_id) if self.department_id else None,
"log_date": self.log_date,
"log_text": self.log_text,
"log_file": self.log_file, # Keep for backward compatibility
"incidents": [str(incident_id) for incident_id in self.incidents],
"created_at": self.created_at,
"updated_at": self.updated_at
}
if self._id:
log_dict["_id"] = str(self._id)
return log_dict
@classmethod
def from_dict(cls, log_dict):
"""Create instance from dictionary"""
if "_id" in log_dict and log_dict["_id"]:
log_dict["_id"] = ObjectId(log_dict["_id"]) if isinstance(log_dict["_id"], str) else log_dict["_id"]
if "user_id" in log_dict and log_dict["user_id"]:
log_dict["user_id"] = ObjectId(log_dict["user_id"]) if isinstance(log_dict["user_id"], str) else log_dict["user_id"]
if "department_id" in log_dict and log_dict["department_id"]:
log_dict["department_id"] = ObjectId(log_dict["department_id"]) if isinstance(log_dict["department_id"], str) else log_dict["department_id"]
# Convert string IDs to ObjectIds for incidents
if "incidents" in log_dict and log_dict["incidents"]:
log_dict["incidents"] = [ObjectId(incident_id) if isinstance(incident_id, str) else incident_id for incident_id in log_dict["incidents"]]
return cls(**log_dict)
def save(self):
"""Save log to database"""
logs_collection = get_logs_collection()
log_dict = self.to_dict()
if self._id:
# Update existing log
log_dict["updated_at"] = datetime.now()
if "_id" in log_dict:
del log_dict["_id"]
result = logs_collection.update_one(
{"_id": ObjectId(self._id)},
{"$set": log_dict}
)
success = result.modified_count > 0
else:
# Insert new log
log_dict["created_at"] = datetime.now()
log_dict["updated_at"] = datetime.now()
result = logs_collection.insert_one(log_dict)
self._id = result.inserted_id
success = result.acknowledged
# Update user's logs if successful
if success and self.user_id:
user = User.find_by_id(self.user_id)
if user:
user.add_log(self._id)
return success
@classmethod
def find_by_id(cls, log_id):
"""Find log by ID"""
logs_collection = get_logs_collection()
log_data = logs_collection.find_one({"_id": ObjectId(log_id)})
if log_data:
return cls.from_dict(log_data)
return None
@classmethod
def find_by_user(cls, user_id):
"""Find all logs for a user"""
logs_collection = get_logs_collection()
logs_data = logs_collection.find({"user_id": ObjectId(user_id)})
return [cls.from_dict(log_data) for log_data in logs_data]
@classmethod
def find_by_department(cls, department_id):
"""Find all logs for a department"""
logs_collection = get_logs_collection()
logs_data = logs_collection.find({"department_id": ObjectId(department_id)})
return [cls.from_dict(log_data) for log_data in logs_data]
@classmethod
def find_by_date_range(cls, department_id, start_date, end_date):
"""Find logs by date range for a department"""
logs_collection = get_logs_collection()
query = {
"department_id": ObjectId(department_id),
"log_date": {"$gte": start_date, "$lte": end_date}
}
logs_data = logs_collection.find(query).sort("log_date", -1)
return [cls.from_dict(log_data) for log_data in logs_data]
def delete(self):
"""Delete log from database"""
if not self._id:
return False
# Remove log reference from user if it exists
if self.user_id:
user = User.find_by_id(self.user_id)
if user and self._id in user.logs:
user.logs.remove(self._id)
user.save()
logs_collection = get_logs_collection()
result = logs_collection.delete_one({"_id": ObjectId(self._id)})
return result.deleted_count > 0
def add_incident(self, incident_id):
"""Add an incident to this log"""
if incident_id not in self.incidents:
self.incidents.append(ObjectId(incident_id))
return self.save()
return True |