Spaces:
Sleeping
Sleeping
| from celery import Celery | |
| import os | |
| import logging | |
| import json | |
| import openai | |
| from bson import ObjectId | |
| from models.log import Log | |
| from models.workflow import Workflow | |
| from models.incident import Incident | |
| from utils.pdf_utils import pdf_to_text, extract_activities, fill_markdown_form, save_filled_form | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| # Set up Celery with fallbacks for development | |
| celery_app = Celery('enflow', | |
| broker=os.environ.get('REDIS_URL', 'redis://localhost:6379/0'), | |
| backend=os.environ.get('REDIS_URL', 'redis://localhost:6379/0')) | |
| def process_log_document(self, log_id): | |
| """Process a log document asynchronously""" | |
| try: | |
| # Check if OpenAI API key is set | |
| api_key = os.environ.get('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY environment variable is not set") | |
| return {"status": "error", "message": "OpenAI API key not configured"} | |
| # Create OpenAI client - removed any proxies parameter | |
| client = openai.OpenAI(api_key=api_key) | |
| # Retrieve the log | |
| log = Log.find_by_id(log_id) | |
| if not log: | |
| logger.error(f"Log not found: {log_id}") | |
| return {"status": "error", "message": "Log not found"} | |
| # 1. Extract text from PDF using OCR | |
| logger.info(f"Starting OCR for log {log_id}") | |
| extracted_text = pdf_to_text(log.log_file) | |
| # 2. Extract activities using LLM | |
| logger.info(f"Extracting activities for log {log_id}") | |
| activities_json = extract_activities(extracted_text) | |
| # Parse the activities JSON | |
| activities = json.loads(activities_json).get('activities', []) | |
| # 3. Classify each activity and create incidents | |
| logger.info(f"Classifying activities and creating incidents for log {log_id}") | |
| # Get all workflows for this department | |
| workflows = Workflow.find_by_department(log.department_id) | |
| # Skip if no workflows defined | |
| if not workflows: | |
| logger.warning(f"No workflows defined for department {log.department_id}") | |
| return {"status": "completed", "message": "No workflows to process"} | |
| # Prepare workflow information for classification | |
| workflow_info = [] | |
| for workflow in workflows: | |
| workflow_info.append({ | |
| "id": str(workflow._id), | |
| "title": workflow.title, | |
| "description": workflow.description | |
| }) | |
| # Process each activity | |
| for activity in activities: | |
| # Classify activity against workflows | |
| workflow_id = classify_activity(activity, workflow_info) | |
| # If classified as a workflow, create an incident | |
| if workflow_id: | |
| logger.info(f"Creating incident for activity: {activity['activity']}") | |
| # Create incident | |
| incident = Incident( | |
| department_id=log.department_id, | |
| user_id=log.user_id, | |
| workflow_id=ObjectId(workflow_id), | |
| description=activity['activity'], | |
| date=log.log_date, | |
| activity_text=activity['text'], | |
| log_id=log._id | |
| ) | |
| if incident.save(): | |
| # Add incident to log | |
| log.add_incident(incident._id) | |
| # Process incident forms (this could be another Celery task) | |
| logger.info(f"Queueing incident processing for incident {incident._id}") | |
| process_incident_forms.delay(str(incident._id)) | |
| return {"status": "completed", "message": "Log processing completed"} | |
| except Exception as e: | |
| logger.error(f"Error processing log {log_id}: {str(e)}") | |
| # Retry with exponential backoff | |
| self.retry(exc=e, countdown=2 ** self.request.retries) | |
| def classify_activity(activity, workflow_info): | |
| """ | |
| Classify an activity against available workflows | |
| Returns workflow_id if matched, None otherwise | |
| """ | |
| try: | |
| # Check if OpenAI API key is set | |
| api_key = os.environ.get('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY environment variable is not set") | |
| return None | |
| # Create OpenAI client - removed any proxies parameter | |
| client = openai.OpenAI(api_key=api_key) | |
| # Prepare prompt for OpenAI | |
| workflows_text = "\n".join([ | |
| f"Workflow {i+1}: {w['title']} - {w['description']}" | |
| for i, w in enumerate(workflow_info) | |
| ]) | |
| prompt = f""" | |
| I need to classify a law enforcement activity into one of our defined workflows, | |
| or determine if it's a routine/mundane activity that doesn't match any workflow. | |
| Here are the available workflows: | |
| {workflows_text} | |
| Here is the activity: | |
| Activity: {activity['activity']} | |
| Full Text: {activity['text']} | |
| Time: {activity.get('time', 'Not specified')} | |
| Location: {activity.get('location', 'Not specified')} | |
| Please classify this activity into one of the workflows, or indicate it's mundane. | |
| Respond with just the workflow ID if it matches, or "mundane" if it doesn't match any workflow. | |
| """ | |
| # Call OpenAI API | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are a law enforcement activity classifier that matches activities to defined workflows."}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| ) | |
| # Get classification result | |
| result = response.choices[0].message.content.strip() | |
| # Check if result is a workflow ID or "mundane" | |
| if result == "mundane": | |
| return None | |
| # Find the workflow by ID or index | |
| for workflow in workflow_info: | |
| if workflow['id'] in result: | |
| return workflow['id'] | |
| if workflow['title'] in result: | |
| return workflow['id'] | |
| # If we got a number, try to use it as an index | |
| try: | |
| index = int(result) - 1 | |
| if 0 <= index < len(workflow_info): | |
| return workflow_info[index]['id'] | |
| except ValueError: | |
| pass | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error classifying activity: {str(e)}") | |
| return None | |
| def process_incident_forms(self, incident_id): | |
| """Process forms for an incident asynchronously""" | |
| try: | |
| # Check if OpenAI API key is set | |
| api_key = os.environ.get('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY environment variable is not set") | |
| return {"status": "error", "message": "OpenAI API key not configured"} | |
| # Create OpenAI client - removed any proxies parameter | |
| client = openai.OpenAI(api_key=api_key) | |
| # Retrieve the incident | |
| incident = Incident.find_by_id(incident_id) | |
| if not incident: | |
| logger.error(f"Incident not found: {incident_id}") | |
| return {"status": "error", "message": "Incident not found"} | |
| # Update incident status to processing | |
| incident.status = "processing" | |
| incident.save() | |
| # Get the associated workflow | |
| workflow = Workflow.find_by_id(incident.workflow_id) | |
| if not workflow: | |
| logger.error(f"Workflow not found: {incident.workflow_id}") | |
| incident.status = "failed" | |
| incident.save() | |
| return {"status": "error", "message": "Workflow not found"} | |
| # Check if workflow has a markdown template and data requirements | |
| if not workflow.markdown_template or not workflow.data_requirements: | |
| logger.warning(f"Workflow {workflow._id} has no markdown template or data requirements") | |
| incident.status = "completed" | |
| incident.save() | |
| return {"status": "completed", "message": "No forms to process"} | |
| # Extract required data using LLM | |
| required_data = extract_required_data(incident.activity_text, workflow.data_requirements) | |
| # Store the extracted data in the incident | |
| incident.extracted_data = required_data | |
| filled_forms = [] | |
| try: | |
| # Fill in the markdown template with extracted data | |
| filled_markdown = fill_markdown_form(workflow.markdown_template, required_data) | |
| # Generate a filename for the filled form | |
| form_filename = f"{workflow.title}_incident_{incident._id}" | |
| # Save the filled form as a PDF and get the URL | |
| form_url = save_filled_form( | |
| filled_markdown, | |
| form_filename, | |
| incident.department_id, | |
| incident.user_id | |
| ) | |
| # Add the form info to the filled forms list | |
| filled_forms.append({ | |
| "url": form_url, | |
| "filename": form_filename, | |
| "original_template": workflow.template_name | |
| }) | |
| logger.info(f"Successfully processed form for incident {incident_id}") | |
| except Exception as e: | |
| logger.error(f"Error processing form for incident {incident_id}: {str(e)}") | |
| # Update incident with filled forms and status | |
| incident.filled_forms = filled_forms | |
| incident.status = "completed" | |
| incident.save() | |
| return {"status": "completed", "message": "Incident forms processed"} | |
| except Exception as e: | |
| logger.error(f"Error processing incident forms {incident_id}: {str(e)}") | |
| # Update incident status to failed | |
| try: | |
| incident = Incident.find_by_id(incident_id) | |
| if incident: | |
| incident.status = "failed" | |
| incident.save() | |
| except Exception as update_e: | |
| logger.error(f"Error updating incident status: {str(update_e)}") | |
| # Retry with exponential backoff | |
| self.retry(exc=e, countdown=2 ** self.request.retries) | |
| def extract_required_data(activity_text, data_requirements): | |
| """ | |
| Extract required data from activity text based on data requirements | |
| Returns a dictionary of field:value pairs | |
| """ | |
| try: | |
| # Check if OpenAI API key is set | |
| api_key = os.environ.get('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY environment variable is not set") | |
| return {} | |
| # Create OpenAI client - removed any proxies parameter | |
| client = openai.OpenAI(api_key=api_key) | |
| # Prepare data requirements as a string | |
| requirements_text = "\n".join([ | |
| f"{i+1}. {req['field']}: {req['description']}" | |
| for i, req in enumerate(data_requirements) | |
| ]) | |
| prompt = f""" | |
| I need to extract specific information from a law enforcement activity text. | |
| I need to extract the following information: | |
| {requirements_text} | |
| Here is the activity text: | |
| {activity_text} | |
| Please extract the requested information and format as a JSON object with the field names as keys. | |
| If any information is not available, use null as the value. | |
| """ | |
| # Call OpenAI API | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are a data extraction assistant that extracts specific information from text."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| # Parse the extracted data | |
| extracted_data = json.loads(response.choices[0].message.content) | |
| return extracted_data | |
| except Exception as e: | |
| logger.error(f"Error extracting required data: {str(e)}") | |
| return {} |