Spaces:
Sleeping
Sleeping
| from bson import ObjectId | |
| from datetime import datetime | |
| from db import get_logs_collection | |
| from models.user import User | |
| from models.department import Department | |
| class Log: | |
| def __init__(self, user_id, department_id, log_date, log_text=None, log_file=None, incidents=None, | |
| _id=None, created_at=None, updated_at=None): | |
| self.user_id = user_id | |
| self.department_id = department_id | |
| self.log_date = log_date | |
| self.log_text = log_text # Extracted text content from the log | |
| self.log_file = log_file # For backward compatibility | |
| self.incidents = incidents or [] # Array of incident IDs | |
| self._id = _id | |
| self.created_at = created_at or datetime.now() | |
| self.updated_at = updated_at or datetime.now() | |
| def to_dict(self): | |
| """Convert instance to dictionary""" | |
| log_dict = { | |
| "user_id": str(self.user_id) if self.user_id else None, | |
| "department_id": str(self.department_id) if self.department_id else None, | |
| "log_date": self.log_date, | |
| "log_text": self.log_text, | |
| "log_file": self.log_file, # Keep for backward compatibility | |
| "incidents": [str(incident_id) for incident_id in self.incidents], | |
| "created_at": self.created_at, | |
| "updated_at": self.updated_at | |
| } | |
| if self._id: | |
| log_dict["_id"] = str(self._id) | |
| return log_dict | |
| def from_dict(cls, log_dict): | |
| """Create instance from dictionary""" | |
| if "_id" in log_dict and log_dict["_id"]: | |
| log_dict["_id"] = ObjectId(log_dict["_id"]) if isinstance(log_dict["_id"], str) else log_dict["_id"] | |
| if "user_id" in log_dict and log_dict["user_id"]: | |
| log_dict["user_id"] = ObjectId(log_dict["user_id"]) if isinstance(log_dict["user_id"], str) else log_dict["user_id"] | |
| if "department_id" in log_dict and log_dict["department_id"]: | |
| log_dict["department_id"] = ObjectId(log_dict["department_id"]) if isinstance(log_dict["department_id"], str) else log_dict["department_id"] | |
| # Convert string IDs to ObjectIds for incidents | |
| if "incidents" in log_dict and log_dict["incidents"]: | |
| log_dict["incidents"] = [ObjectId(incident_id) if isinstance(incident_id, str) else incident_id for incident_id in log_dict["incidents"]] | |
| return cls(**log_dict) | |
| def save(self): | |
| """Save log to database""" | |
| logs_collection = get_logs_collection() | |
| log_dict = self.to_dict() | |
| if self._id: | |
| # Update existing log | |
| log_dict["updated_at"] = datetime.now() | |
| if "_id" in log_dict: | |
| del log_dict["_id"] | |
| result = logs_collection.update_one( | |
| {"_id": ObjectId(self._id)}, | |
| {"$set": log_dict} | |
| ) | |
| success = result.modified_count > 0 | |
| else: | |
| # Insert new log | |
| log_dict["created_at"] = datetime.now() | |
| log_dict["updated_at"] = datetime.now() | |
| result = logs_collection.insert_one(log_dict) | |
| self._id = result.inserted_id | |
| success = result.acknowledged | |
| # Update user's logs if successful | |
| if success and self.user_id: | |
| user = User.find_by_id(self.user_id) | |
| if user: | |
| user.add_log(self._id) | |
| return success | |
| def find_by_id(cls, log_id): | |
| """Find log by ID""" | |
| logs_collection = get_logs_collection() | |
| log_data = logs_collection.find_one({"_id": ObjectId(log_id)}) | |
| if log_data: | |
| return cls.from_dict(log_data) | |
| return None | |
| def find_by_user(cls, user_id): | |
| """Find all logs for a user""" | |
| logs_collection = get_logs_collection() | |
| logs_data = logs_collection.find({"user_id": ObjectId(user_id)}) | |
| return [cls.from_dict(log_data) for log_data in logs_data] | |
| def find_by_department(cls, department_id): | |
| """Find all logs for a department""" | |
| logs_collection = get_logs_collection() | |
| logs_data = logs_collection.find({"department_id": ObjectId(department_id)}) | |
| return [cls.from_dict(log_data) for log_data in logs_data] | |
| def find_by_date_range(cls, department_id, start_date, end_date): | |
| """Find logs by date range for a department""" | |
| logs_collection = get_logs_collection() | |
| query = { | |
| "department_id": ObjectId(department_id), | |
| "log_date": {"$gte": start_date, "$lte": end_date} | |
| } | |
| logs_data = logs_collection.find(query).sort("log_date", -1) | |
| return [cls.from_dict(log_data) for log_data in logs_data] | |
| def delete(self): | |
| """Delete log from database""" | |
| if not self._id: | |
| return False | |
| # Remove log reference from user if it exists | |
| if self.user_id: | |
| user = User.find_by_id(self.user_id) | |
| if user and self._id in user.logs: | |
| user.logs.remove(self._id) | |
| user.save() | |
| logs_collection = get_logs_collection() | |
| result = logs_collection.delete_one({"_id": ObjectId(self._id)}) | |
| return result.deleted_count > 0 | |
| def add_incident(self, incident_id): | |
| """Add an incident to this log""" | |
| if incident_id not in self.incidents: | |
| self.incidents.append(ObjectId(incident_id)) | |
| return self.save() | |
| return True |