Spaces:
Sleeping
Sleeping
| from flask import jsonify, request | |
| import logging | |
| import os | |
| import json | |
| from datetime import datetime, date, time | |
| import uuid | |
| import pytesseract | |
| from PIL import Image | |
| import pdf2image | |
| import io | |
| import tempfile | |
| import openai | |
| from models.log import Log | |
| from models.user import User | |
| from models.department import Department | |
| from models.workflow import Workflow | |
| from models.incident import Incident | |
| from utils.pdf_utils import pdf_to_text, extract_activities | |
| from db import get_gridfs | |
| from bson.objectid import ObjectId | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| def upload_log(current_user): | |
| """Upload a new log file, extract text using OCR, and save only the text""" | |
| if 'file' not in request.files: | |
| return jsonify({'message': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'message': 'No selected file'}), 400 | |
| # Validate file is PDF | |
| if not file.filename.lower().endswith('.pdf'): | |
| return jsonify({'message': 'Only PDF files are allowed'}), 400 | |
| # Get log date from form data | |
| log_date_str = request.form.get('log_date') | |
| if not log_date_str: | |
| return jsonify({'message': 'Log date is required'}), 400 | |
| try: | |
| # Parse the date string and convert to datetime at midnight | |
| parsed_date = datetime.strptime(log_date_str, '%Y-%m-%d').date() | |
| log_datetime = datetime.combine(parsed_date, time.min) # Use time.min for midnight | |
| # Read the file content | |
| file_content = file.read() | |
| # Extract text from PDF using OCR | |
| logger.info(f"Extracting text from PDF using OCR") | |
| extracted_text = pdf_to_text(file_content, is_bytes=True) | |
| # Create new log entry with datetime object | |
| log = Log( | |
| user_id=current_user._id, | |
| department_id=current_user.department_id, | |
| log_date=log_datetime, # Pass datetime object | |
| log_text=extracted_text | |
| ) | |
| if log.save(): | |
| # Process log synchronously | |
| result = process_log_sync(str(log._id)) | |
| return jsonify({ | |
| 'message': 'Log uploaded and processed successfully', | |
| 'log': log.to_dict(), | |
| 'incidents_created': result.get('incidents_created', 0) | |
| }), 201 | |
| else: | |
| return jsonify({'message': 'Failed to save log entry'}), 500 | |
| except ValueError: | |
| return jsonify({'message': 'Invalid date format. Please use YYYY-MM-DD'}), 400 | |
| except Exception as e: | |
| logger.error(f"Error uploading log: {str(e)}") | |
| return jsonify({'message': f'Error uploading log: {str(e)}'}), 500 | |
| def process_log_sync(log_id): | |
| """Process a log document synchronously""" | |
| try: | |
| # Check if OpenAI API key is set | |
| api_key = os.environ.get('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY environment variable is not set") | |
| return {"status": "error", "message": "OpenAI API key not configured"} | |
| # Create OpenAI client with correct parameters for the current version | |
| client = openai.OpenAI(api_key=api_key) | |
| # Retrieve the log | |
| log = Log.find_by_id(log_id) | |
| if not log: | |
| logger.error(f"Log not found: {log_id}") | |
| return {"status": "error", "message": "Log not found"} | |
| # Use the stored text directly instead of extracting from PDF | |
| logger.info(f"Using stored text for log {log_id}") | |
| extracted_text = log.log_text | |
| # 2. Extract activities using LLM | |
| logger.info(f"Extracting activities for log {log_id}") | |
| activities_json = extract_activities(extracted_text) | |
| # Parse the activities JSON | |
| activities_data = json.loads(activities_json) | |
| activities = activities_data.get('activities', []) | |
| # 3. Classify each activity and create incidents | |
| logger.info(f"Classifying activities and creating incidents for log {log_id}") | |
| # Get all workflows for this department | |
| workflows = Workflow.find_by_department(log.department_id) | |
| # Skip if no workflows defined | |
| if not workflows: | |
| logger.warning(f"No workflows defined for department {log.department_id}") | |
| return {"status": "completed", "message": "No workflows to process", "incidents_created": 0} | |
| # Prepare workflow information for classification | |
| workflow_info = [] | |
| for workflow in workflows: | |
| workflow_info.append({ | |
| "id": str(workflow._id), | |
| "title": workflow.title, | |
| "description": workflow.description | |
| }) | |
| # Classify each activity against workflows | |
| classified_activities = [] | |
| created_incidents = 0 | |
| for activity in activities: | |
| # Classify activity against workflows | |
| workflow_id = classify_activity(activity, workflow_info) | |
| # If classified as a workflow, create an incident | |
| if workflow_id: | |
| logger.info(f"Creating incident for activity: {activity['activity']}") | |
| # Create incident | |
| incident = Incident( | |
| department_id=log.department_id, | |
| user_id=log.user_id, | |
| workflow_id=ObjectId(workflow_id), | |
| description=activity['activity'], | |
| date=log.log_date, | |
| activity_text=activity['text'], | |
| log_id=log._id, | |
| status="completed" # Mark as completed since we're processing synchronously | |
| ) | |
| if incident.save(): | |
| # Add incident to log | |
| log.add_incident(incident._id) | |
| created_incidents += 1 | |
| # Add to classified activities | |
| classified_activities.append({ | |
| "activity": activity, | |
| "workflow_id": workflow_id, | |
| "incident_id": str(incident._id) | |
| }) | |
| return { | |
| "status": "completed", | |
| "message": "Log processing completed", | |
| "incidents_created": created_incidents, | |
| "classified_activities": classified_activities | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing log {log_id}: {str(e)}") | |
| return {"status": "error", "message": str(e)} | |
| def classify_activity(activity, workflow_info): | |
| """ | |
| Classify an activity against available workflows using an LLM. | |
| Returns workflow_id if matched, None otherwise. | |
| Includes enhanced logging and asks for justification from the LLM. | |
| """ | |
| try: | |
| api_key = os.environ.get('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY not found for classify_activity.") | |
| return None | |
| client = openai.OpenAI(api_key=api_key) | |
| workflows_text = "\n".join([ | |
| f"Workflow ID: {w['id']} | Title: {w['title']} | Description: {w['description']}" | |
| for w in workflow_info | |
| ]) | |
| prompt = f""" | |
| Analyze the following law enforcement activity and decide if it matches one of the provided workflows or if it is a mundane activity. | |
| Available Workflows: | |
| {workflows_text} | |
| Activity Details: | |
| Activity Description: {activity.get('activity', 'N/A')} | |
| Full Text: {activity.get('text', 'N/A')} | |
| Time: {activity.get('time', 'Not specified')} | |
| Location: {activity.get('location', 'Not specified')} | |
| Your Task: | |
| 1. Determine if the activity clearly matches one of the workflow descriptions. | |
| 2. If it matches, provide the corresponding Workflow ID. | |
| 3. If it does not match any workflow, classify it as "mundane". | |
| 4. Provide a brief justification for your decision (1-2 sentences). | |
| Output Format: | |
| Return a JSON object with two keys: "decision" and "justification". | |
| - "decision": Should be the matching Workflow ID (string) or the string "mundane". | |
| - "justification": Should be a brief string explaining your reasoning. | |
| Example Match Response: | |
| {{"decision": "60d21b4967d0d8992e610c87", "justification": "The activity describes a traffic stop, which matches the Traffic Violation workflow."}} | |
| Example Mundane Response: | |
| {{"decision": "mundane", "justification": "The activity describes routine patrol or administrative tasks not covered by any workflow."}} | |
| """ | |
| # Log the prompt being sent (use debug level for potentially sensitive info) | |
| logger.debug(f"Sending classification prompt to OpenAI: \n{prompt}") | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are an AI assistant helping classify law enforcement activities into predefined workflows. Respond ONLY in the requested JSON format."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} # Ensure JSON output | |
| ) | |
| # Parse the JSON response | |
| try: | |
| content = response.choices[0].message.content | |
| logger.debug(f"Received OpenAI classification response content: {content}") | |
| result_json = json.loads(content) | |
| decision = result_json.get("decision") | |
| justification = result_json.get("justification", "No justification provided.") | |
| logger.info(f"LLM Classification - Decision: {decision}, Justification: {justification}") | |
| if decision == "mundane": | |
| return None | |
| # Check if the decision is a valid ObjectId and matches a known workflow ID | |
| valid_workflow_ids = {w['id'] for w in workflow_info} | |
| if decision in valid_workflow_ids: | |
| try: | |
| # Validate it's a proper ObjectId format, though it's already a string match | |
| ObjectId(decision) | |
| return decision # Return the matched workflow ID string | |
| except Exception: | |
| logger.warning(f"LLM returned a decision '{decision}' matching a workflow ID, but it's not a valid ObjectId format. Treating as unclassified.") | |
| return None | |
| else: | |
| logger.warning(f"LLM returned a decision '{decision}' which is not 'mundane' and does not match any known workflow ID. Treating as unclassified.") | |
| return None | |
| except json.JSONDecodeError: | |
| logger.error(f"Failed to decode JSON response from OpenAI: {content}") | |
| return None | |
| except Exception as parse_err: | |
| logger.error(f"Error parsing OpenAI classification response: {parse_err}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error in classify_activity function: {str(e)}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| return None | |
| def get_log(current_user, log_id): | |
| """Get log by ID""" | |
| log = Log.find_by_id(log_id) | |
| if not log: | |
| return jsonify({'message': 'Log not found'}), 404 | |
| # Check if user has access to this log | |
| if str(log.department_id) != str(current_user.department_id): | |
| return jsonify({'message': 'Access denied to logs from other departments'}), 403 | |
| return jsonify({'log': log.to_dict()}), 200 | |
| def delete_log(current_user, log_id): | |
| """Delete a log""" | |
| log = Log.find_by_id(log_id) | |
| if not log: | |
| return jsonify({'message': 'Log not found'}), 404 | |
| # Check if user has access to this log | |
| if str(log.department_id) != str(current_user.department_id): | |
| return jsonify({'message': 'Access denied to logs from other departments'}), 403 | |
| # Additional check: only log owner or admin can delete | |
| if str(log.user_id) != str(current_user._id) and current_user.permissions != 'Admin': | |
| return jsonify({'message': 'Only the log owner or department admin can delete logs'}), 403 | |
| # Delete associated incidents if they exist | |
| for incident_id in log.incidents: | |
| incident = Incident.find_by_id(incident_id) | |
| if incident: | |
| incident.delete() | |
| # Delete the log | |
| if log.delete(): | |
| return jsonify({'message': 'Log and associated incidents deleted successfully'}), 200 | |
| else: | |
| return jsonify({'message': 'Failed to delete log'}), 500 | |
| def get_user_logs(current_user): | |
| """Get all logs for the current user""" | |
| logs = Log.find_by_user(current_user._id) | |
| return jsonify({'logs': [log.to_dict() for log in logs]}), 200 | |
| def get_department_logs(current_user): | |
| """Get all logs for the user's department""" | |
| # Check if user has admin permissions | |
| if current_user.permissions != 'Admin': | |
| return jsonify({'message': 'Admin permissions required'}), 403 | |
| logs = Log.find_by_department(current_user.department_id) | |
| return jsonify({'logs': [log.to_dict() for log in logs]}), 200 | |
| def get_logs_by_date_range(current_user): | |
| """Get logs by date range""" | |
| data = request.get_json() | |
| # Check if required fields are present | |
| if 'start_date' not in data or 'end_date' not in data: | |
| return jsonify({'message': 'Start date and end date are required'}), 400 | |
| try: | |
| # Parse date strings | |
| start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date() | |
| end_date = datetime.strptime(data['end_date'], '%Y-%m-%d').date() | |
| # Get logs by date range | |
| logs = Log.find_by_date_range(current_user.department_id, start_date, end_date) | |
| return jsonify({'logs': [log.to_dict() for log in logs]}), 200 | |
| except ValueError: | |
| return jsonify({'message': 'Invalid date format. Please use YYYY-MM-DD'}), 400 | |
| except Exception as e: | |
| logger.error(f"Error fetching logs by date range: {str(e)}") | |
| return jsonify({'message': f'Error fetching logs: {str(e)}'}), 500 | |
| def classify_log_activities(current_user): | |
| """ | |
| Uploads log, extracts text, creates Log object, extracts activities, | |
| classifies activities, creates Incident objects for classified activities, | |
| and returns results. | |
| """ | |
| logger.info(f"Entering classify_log_activities (now creates Log/Incidents) for user {current_user.email}") | |
| if 'file' not in request.files: | |
| logger.error("No file part in the request") | |
| return jsonify({'message': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| logger.error("No selected file") | |
| return jsonify({'message': 'No selected file'}), 400 | |
| if not file.filename.lower().endswith('.pdf'): | |
| logger.error(f"Invalid file type: {file.filename}") | |
| return jsonify({'message': 'Only PDF files are allowed'}), 400 | |
| try: | |
| logger.info("Checking for OpenAI API key...") | |
| api_key = os.environ.get('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY environment variable is not set") | |
| return jsonify({'message': 'OpenAI API key not configured'}), 500 | |
| logger.info("Reading file content...") | |
| file_content = file.read() | |
| logger.info(f"Read {len(file_content)} bytes from file {file.filename}") | |
| logger.info(f"Starting OCR...") | |
| extracted_text = pdf_to_text(file_content, is_bytes=True) | |
| logger.info(f"OCR finished. Extracted {len(extracted_text)} characters.") | |
| # Get log date from form data and convert to datetime at midnight | |
| log_date_str = request.form.get('log_date', datetime.now().strftime('%Y-%m-%d')) | |
| try: | |
| parsed_date = datetime.strptime(log_date_str, '%Y-%m-%d').date() | |
| log_datetime = datetime.combine(parsed_date, time.min) # Use time.min | |
| except ValueError: | |
| logger.warning(f"Invalid log_date format '{log_date_str}', using today's date.") | |
| parsed_date = datetime.now().date() | |
| log_datetime = datetime.combine(parsed_date, time.min) | |
| logger.info(f"Creating Log object for user {current_user._id} on {log_datetime.date()}") | |
| new_log = Log( | |
| user_id=current_user._id, | |
| department_id=current_user.department_id, | |
| log_date=log_datetime, # Pass datetime object | |
| log_text=extracted_text, | |
| incidents=[] | |
| ) | |
| if not new_log.save(): | |
| logger.error("Failed to save initial Log object.") | |
| return jsonify({'message': 'Failed to create log entry'}), 500 | |
| log_id = new_log._id | |
| logger.info(f"Log object created and saved successfully with ID: {log_id}") | |
| logger.info(f"Extracting activities with LLM for log {log_id}...") | |
| activities_json = extract_activities(extracted_text) | |
| logger.info(f"Activity extraction finished for log {log_id}. JSON length: {len(activities_json)}.") | |
| logger.info(f"Parsing activities JSON for log {log_id}...") | |
| activities_data = json.loads(activities_json) | |
| activities = activities_data.get('activities', []) | |
| logger.info(f"Parsed activities JSON for log {log_id}. Found {len(activities)} activities.") | |
| logger.info(f"Fetching workflows for department {current_user.department_id}...") | |
| workflows = Workflow.find_by_department(current_user.department_id) | |
| logger.info(f"Fetched {len(workflows)} workflows.") | |
| # Prepare workflow info only if workflows exist | |
| workflow_info = [] | |
| if workflows: | |
| for workflow in workflows: | |
| workflow_info.append({ | |
| "id": str(workflow._id), | |
| "title": workflow.title, | |
| "description": workflow.description | |
| }) | |
| logger.info(f"Prepared workflow info for classification: {workflow_info}") | |
| else: | |
| logger.warning(f"No workflows found for department {current_user.department_id}. No classification will occur.") | |
| classified_activities_output = [] | |
| incident_ids_created = [] # Keep track of incidents created for this log | |
| logger.info(f"Starting classification & incident creation loop for {len(activities)} activities...") | |
| for index, activity in enumerate(activities): | |
| logger.info(f"Processing activity {index + 1}/{len(activities)}: '{activity.get('activity', 'N/A')}'") | |
| workflow_id = None | |
| incident_id = None | |
| workflow_title = None | |
| classified = False | |
| if workflow_info: | |
| workflow_id = classify_activity(activity, workflow_info) | |
| activity_text_for_incident = activity.get('text', '') | |
| activity_description = activity.get('activity', 'No description') | |
| if workflow_id: | |
| workflow = next((w for w in workflow_info if w["id"] == workflow_id), None) | |
| if workflow: | |
| classified = True | |
| workflow_title = workflow["title"] | |
| logger.info(f"Activity {index + 1} classified as Workflow: {workflow_title} ({workflow_id}). Creating Incident...") | |
| # Create Incident Object | |
| try: | |
| new_incident = Incident( | |
| department_id=current_user.department_id, | |
| user_id=current_user._id, | |
| workflow_id=ObjectId(workflow_id), | |
| description=activity_description, | |
| date=log_datetime, # Pass the full datetime object | |
| activity_text=activity_text_for_incident, | |
| log_id=log_id, | |
| status="classified" # New initial status | |
| ) | |
| if new_incident.save(): | |
| incident_id = new_incident._id | |
| incident_ids_created.append(incident_id) | |
| logger.info(f"Incident created successfully with ID: {incident_id} for activity {index + 1}") | |
| else: | |
| logger.error(f"Failed to save Incident object for activity {index + 1}") | |
| except Exception as incident_exc: | |
| logger.error(f"Error creating Incident for activity {index + 1}: {incident_exc}") | |
| else: | |
| logger.warning(f"Activity {index + 1} returned workflow ID {workflow_id} but no matching workflow found. Treating as unclassified.") | |
| else: | |
| logger.info(f"Activity {index + 1} classified as mundane.") | |
| # Prepare the result for this activity to send back to frontend | |
| activity_result = { | |
| "activity": activity, | |
| "classified": classified, | |
| "workflow_id": workflow_id, | |
| "workflow_title": workflow_title, | |
| "incident_id": str(incident_id) if incident_id else None # Include incident ID if created | |
| } | |
| classified_activities_output.append(activity_result) | |
| logger.info(f"Classification & incident creation loop finished. Created {len(incident_ids_created)} incidents.") | |
| # Update Log object with incident IDs | |
| if incident_ids_created: | |
| logger.info(f"Updating Log {log_id} with {len(incident_ids_created)} incident IDs.") | |
| log_to_update = Log.find_by_id(log_id) | |
| if log_to_update: | |
| log_to_update.incidents = incident_ids_created | |
| if not log_to_update.save(): | |
| logger.error(f"Failed to update Log {log_id} with incident IDs.") | |
| else: | |
| logger.info(f"Successfully updated Log {log_id} with incident IDs.") | |
| else: | |
| logger.error(f"Could not find Log {log_id} to update with incident IDs.") | |
| # --- Log right before return --- | |
| try: | |
| log_dict_to_return = new_log.to_dict() | |
| logger.info(f"Preparing to return Log object: {log_dict_to_return}") | |
| except Exception as to_dict_err: | |
| logger.error(f"Error calling new_log.to_dict() before return: {to_dict_err}") | |
| # Optionally return an error here if to_dict fails | |
| return jsonify({'message': 'Internal error preparing log data for response.'}), 500 | |
| # --- End Log right before return --- | |
| logger.info("Successfully processed upload & classification request. Returning 200 OK.") | |
| return jsonify({ | |
| 'message': 'Log created, activities extracted and classified, incidents created.', | |
| 'log': log_dict_to_return, # Use the logged dictionary | |
| 'classified_activities': classified_activities_output, | |
| 'extracted_text': extracted_text | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"!!! Unhandled exception in classify_log_activities: {str(e)}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| return jsonify({'message': 'An internal server error occurred during log processing.'}), 500 |