Spaces:
Sleeping
Sleeping
| from bson import ObjectId | |
| from datetime import datetime | |
| from db import get_incidents_collection | |
| from models.user import User | |
| class Incident: | |
| def __init__(self, department_id, user_id, workflow_id, description, date, | |
| log_id=None, activity_text=None, status="pending", filled_forms=None, | |
| extracted_data=None, _id=None, created_at=None, updated_at=None): | |
| self.department_id = department_id | |
| self.user_id = user_id | |
| self.workflow_id = workflow_id | |
| self.description = description | |
| self.date = date | |
| self.log_id = log_id # Reference to the log this incident was created from | |
| self.activity_text = activity_text # The raw text of the activity | |
| self.status = status # "pending", "processing", "completed", "failed" | |
| self.filled_forms = filled_forms or [] # Array of URLs to generated forms | |
| self.extracted_data = extracted_data or {} # Extracted data for form filling | |
| self._id = _id | |
| self.created_at = created_at or datetime.now() | |
| self.updated_at = updated_at or datetime.now() | |
| def to_dict(self): | |
| """Convert instance to dictionary""" | |
| incident_dict = { | |
| "department_id": str(self.department_id) if self.department_id else None, | |
| "user_id": str(self.user_id) if self.user_id else None, | |
| "workflow_id": str(self.workflow_id) if self.workflow_id else None, | |
| "description": self.description, | |
| "date": self.date, | |
| "log_id": str(self.log_id) if self.log_id else None, | |
| "activity_text": self.activity_text, | |
| "status": self.status, | |
| "filled_forms": self.filled_forms, | |
| "extracted_data": self.extracted_data, | |
| "created_at": self.created_at, | |
| "updated_at": self.updated_at | |
| } | |
| if self._id: | |
| incident_dict["_id"] = str(self._id) | |
| return incident_dict | |
| def from_dict(cls, incident_dict): | |
| """Create instance from dictionary""" | |
| if "_id" in incident_dict and incident_dict["_id"]: | |
| incident_dict["_id"] = ObjectId(incident_dict["_id"]) if isinstance(incident_dict["_id"], str) else incident_dict["_id"] | |
| if "department_id" in incident_dict and incident_dict["department_id"]: | |
| incident_dict["department_id"] = ObjectId(incident_dict["department_id"]) if isinstance(incident_dict["department_id"], str) else incident_dict["department_id"] | |
| if "user_id" in incident_dict and incident_dict["user_id"]: | |
| incident_dict["user_id"] = ObjectId(incident_dict["user_id"]) if isinstance(incident_dict["user_id"], str) else incident_dict["user_id"] | |
| if "workflow_id" in incident_dict and incident_dict["workflow_id"]: | |
| incident_dict["workflow_id"] = ObjectId(incident_dict["workflow_id"]) if isinstance(incident_dict["workflow_id"], str) else incident_dict["workflow_id"] | |
| if "log_id" in incident_dict and incident_dict["log_id"]: | |
| incident_dict["log_id"] = ObjectId(incident_dict["log_id"]) if isinstance(incident_dict["log_id"], str) else incident_dict["log_id"] | |
| return cls(**incident_dict) | |
| def save(self): | |
| """Save incident to database""" | |
| incidents_collection = get_incidents_collection() | |
| incident_dict = self.to_dict() | |
| if self._id: | |
| # Update existing incident | |
| incident_dict["updated_at"] = datetime.now() | |
| if "_id" in incident_dict: | |
| del incident_dict["_id"] | |
| result = incidents_collection.update_one( | |
| {"_id": ObjectId(self._id)}, | |
| {"$set": incident_dict} | |
| ) | |
| success = result.modified_count > 0 | |
| else: | |
| # Insert new incident | |
| incident_dict["created_at"] = datetime.now() | |
| incident_dict["updated_at"] = datetime.now() | |
| result = incidents_collection.insert_one(incident_dict) | |
| self._id = result.inserted_id | |
| success = result.acknowledged | |
| # Update user's incidents if successful | |
| if success and self.user_id: | |
| user = User.find_by_id(self.user_id) | |
| if user: | |
| user.add_incident(self._id) | |
| return success | |
| def find_by_id(cls, incident_id): | |
| """Find incident by ID""" | |
| incidents_collection = get_incidents_collection() | |
| incident_data = incidents_collection.find_one({"_id": ObjectId(incident_id)}) | |
| if incident_data: | |
| return cls.from_dict(incident_data) | |
| return None | |
| def find_by_user(cls, user_id): | |
| """Find all incidents for a user""" | |
| incidents_collection = get_incidents_collection() | |
| incidents_data = incidents_collection.find({"user_id": ObjectId(user_id)}) | |
| return [cls.from_dict(incident_data) for incident_data in incidents_data] | |
| def find_by_department(cls, department_id): | |
| """Find all incidents for a department""" | |
| incidents_collection = get_incidents_collection() | |
| incidents_data = incidents_collection.find({"department_id": ObjectId(department_id)}) | |
| return [cls.from_dict(incident_data) for incident_data in incidents_data] | |
| def find_by_workflow(cls, workflow_id): | |
| """Find all incidents for a workflow""" | |
| incidents_collection = get_incidents_collection() | |
| incidents_data = incidents_collection.find({"workflow_id": ObjectId(workflow_id)}) | |
| return [cls.from_dict(incident_data) for incident_data in incidents_data] | |
| def find_by_date_range(cls, department_id, start_date, end_date): | |
| """Find incidents by date range for a department""" | |
| incidents_collection = get_incidents_collection() | |
| query = { | |
| "department_id": ObjectId(department_id), | |
| "date": {"$gte": start_date, "$lte": end_date} | |
| } | |
| incidents_data = incidents_collection.find(query).sort("date", -1) | |
| return [cls.from_dict(incident_data) for incident_data in incidents_data] | |
| def delete(self): | |
| """Delete incident from database""" | |
| if not self._id: | |
| return False | |
| # Remove incident reference from user if it exists | |
| if self.user_id: | |
| user = User.find_by_id(self.user_id) | |
| if user and self._id in user.incidents: | |
| user.incidents.remove(self._id) | |
| user.save() | |
| incidents_collection = get_incidents_collection() | |
| result = incidents_collection.delete_one({"_id": ObjectId(self._id)}) | |
| return result.deleted_count > 0 | |
| def add_filled_form(self, form_url): | |
| """Add a filled form to this incident""" | |
| if form_url not in self.filled_forms: | |
| self.filled_forms.append(form_url) | |
| return self.save() | |
| return True | |
| def update_status(self, status): | |
| """Update incident status""" | |
| self.status = status | |
| return self.save() |