enflow-api / models /incident.py
dhruv575
greedy
bacb197
from bson import ObjectId
from datetime import datetime
from db import get_incidents_collection
from models.user import User
class Incident:
def __init__(self, department_id, user_id, workflow_id, description, date,
log_id=None, activity_text=None, status="pending", filled_forms=None,
extracted_data=None, _id=None, created_at=None, updated_at=None):
self.department_id = department_id
self.user_id = user_id
self.workflow_id = workflow_id
self.description = description
self.date = date
self.log_id = log_id # Reference to the log this incident was created from
self.activity_text = activity_text # The raw text of the activity
self.status = status # "pending", "processing", "completed", "failed"
self.filled_forms = filled_forms or [] # Array of URLs to generated forms
self.extracted_data = extracted_data or {} # Extracted data for form filling
self._id = _id
self.created_at = created_at or datetime.now()
self.updated_at = updated_at or datetime.now()
def to_dict(self):
"""Convert instance to dictionary"""
incident_dict = {
"department_id": str(self.department_id) if self.department_id else None,
"user_id": str(self.user_id) if self.user_id else None,
"workflow_id": str(self.workflow_id) if self.workflow_id else None,
"description": self.description,
"date": self.date,
"log_id": str(self.log_id) if self.log_id else None,
"activity_text": self.activity_text,
"status": self.status,
"filled_forms": self.filled_forms,
"extracted_data": self.extracted_data,
"created_at": self.created_at,
"updated_at": self.updated_at
}
if self._id:
incident_dict["_id"] = str(self._id)
return incident_dict
@classmethod
def from_dict(cls, incident_dict):
"""Create instance from dictionary"""
if "_id" in incident_dict and incident_dict["_id"]:
incident_dict["_id"] = ObjectId(incident_dict["_id"]) if isinstance(incident_dict["_id"], str) else incident_dict["_id"]
if "department_id" in incident_dict and incident_dict["department_id"]:
incident_dict["department_id"] = ObjectId(incident_dict["department_id"]) if isinstance(incident_dict["department_id"], str) else incident_dict["department_id"]
if "user_id" in incident_dict and incident_dict["user_id"]:
incident_dict["user_id"] = ObjectId(incident_dict["user_id"]) if isinstance(incident_dict["user_id"], str) else incident_dict["user_id"]
if "workflow_id" in incident_dict and incident_dict["workflow_id"]:
incident_dict["workflow_id"] = ObjectId(incident_dict["workflow_id"]) if isinstance(incident_dict["workflow_id"], str) else incident_dict["workflow_id"]
if "log_id" in incident_dict and incident_dict["log_id"]:
incident_dict["log_id"] = ObjectId(incident_dict["log_id"]) if isinstance(incident_dict["log_id"], str) else incident_dict["log_id"]
return cls(**incident_dict)
def save(self):
"""Save incident to database"""
incidents_collection = get_incidents_collection()
incident_dict = self.to_dict()
if self._id:
# Update existing incident
incident_dict["updated_at"] = datetime.now()
if "_id" in incident_dict:
del incident_dict["_id"]
result = incidents_collection.update_one(
{"_id": ObjectId(self._id)},
{"$set": incident_dict}
)
success = result.modified_count > 0
else:
# Insert new incident
incident_dict["created_at"] = datetime.now()
incident_dict["updated_at"] = datetime.now()
result = incidents_collection.insert_one(incident_dict)
self._id = result.inserted_id
success = result.acknowledged
# Update user's incidents if successful
if success and self.user_id:
user = User.find_by_id(self.user_id)
if user:
user.add_incident(self._id)
return success
@classmethod
def find_by_id(cls, incident_id):
"""Find incident by ID"""
incidents_collection = get_incidents_collection()
incident_data = incidents_collection.find_one({"_id": ObjectId(incident_id)})
if incident_data:
return cls.from_dict(incident_data)
return None
@classmethod
def find_by_user(cls, user_id):
"""Find all incidents for a user"""
incidents_collection = get_incidents_collection()
incidents_data = incidents_collection.find({"user_id": ObjectId(user_id)})
return [cls.from_dict(incident_data) for incident_data in incidents_data]
@classmethod
def find_by_department(cls, department_id):
"""Find all incidents for a department"""
incidents_collection = get_incidents_collection()
incidents_data = incidents_collection.find({"department_id": ObjectId(department_id)})
return [cls.from_dict(incident_data) for incident_data in incidents_data]
@classmethod
def find_by_workflow(cls, workflow_id):
"""Find all incidents for a workflow"""
incidents_collection = get_incidents_collection()
incidents_data = incidents_collection.find({"workflow_id": ObjectId(workflow_id)})
return [cls.from_dict(incident_data) for incident_data in incidents_data]
@classmethod
def find_by_date_range(cls, department_id, start_date, end_date):
"""Find incidents by date range for a department"""
incidents_collection = get_incidents_collection()
query = {
"department_id": ObjectId(department_id),
"date": {"$gte": start_date, "$lte": end_date}
}
incidents_data = incidents_collection.find(query).sort("date", -1)
return [cls.from_dict(incident_data) for incident_data in incidents_data]
def delete(self):
"""Delete incident from database"""
if not self._id:
return False
# Remove incident reference from user if it exists
if self.user_id:
user = User.find_by_id(self.user_id)
if user and self._id in user.incidents:
user.incidents.remove(self._id)
user.save()
incidents_collection = get_incidents_collection()
result = incidents_collection.delete_one({"_id": ObjectId(self._id)})
return result.deleted_count > 0
def add_filled_form(self, form_url):
"""Add a filled form to this incident"""
if form_url not in self.filled_forms:
self.filled_forms.append(form_url)
return self.save()
return True
def update_status(self, status):
"""Update incident status"""
self.status = status
return self.save()