Spaces:
Sleeping
Sleeping
| import pytesseract | |
| from PIL import Image | |
| import pdf2image | |
| import tempfile | |
| import os | |
| import requests | |
| import io | |
| import logging | |
| import openai | |
| import markdown | |
| import weasyprint | |
| from bson import ObjectId | |
| from db import get_gridfs | |
| from datetime import datetime | |
| import json | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| def pdf_to_text(pdf_source, is_bytes=False): | |
| """ | |
| Extract text from PDF using OCR | |
| Args: | |
| pdf_source: Either a URL to a PDF or the PDF content as bytes | |
| is_bytes: Whether pdf_source is bytes (True) or a URL (False) | |
| Returns: | |
| str: Extracted text from PDF | |
| """ | |
| try: | |
| # Set up temporary directory for processing | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| if not is_bytes: | |
| # If pdf_source is a URL, download the PDF first | |
| if pdf_source.startswith('/api/'): | |
| # Handle internal URLs by prepending hostname | |
| pdf_url = f"http://localhost:5000{pdf_source}" | |
| else: | |
| pdf_url = pdf_source | |
| # Download PDF file | |
| logger.info(f"Downloading PDF from {pdf_url}") | |
| response = requests.get(pdf_url) | |
| if response.status_code != 200: | |
| logger.error(f"Failed to download PDF: {response.status_code}") | |
| raise Exception(f"Failed to download PDF: {response.status_code}") | |
| # Save PDF to temporary file | |
| pdf_path = os.path.join(temp_dir, "document.pdf") | |
| with open(pdf_path, 'wb') as f: | |
| f.write(response.content) | |
| else: | |
| # If pdf_source is already bytes, save directly | |
| pdf_path = os.path.join(temp_dir, "document.pdf") | |
| with open(pdf_path, 'wb') as f: | |
| f.write(pdf_source) | |
| # Convert PDF to images | |
| logger.info(f"Converting PDF to images") | |
| images = pdf2image.convert_from_path(pdf_path) | |
| # Extract text from each page with OCR | |
| logger.info(f"Extracting text with OCR from {len(images)} pages") | |
| extracted_text = "" | |
| for i, image in enumerate(images): | |
| logger.info(f"Processing page {i+1}/{len(images)}") | |
| # Use OCR to extract text | |
| text = pytesseract.image_to_string(image) | |
| extracted_text += text + "\n\n" | |
| return extracted_text | |
| except Exception as e: | |
| logger.error(f"Error extracting text from PDF: {str(e)}") | |
| raise | |
| def extract_activities(text, department_id=None): | |
| """ | |
| Use LLM to extract activities from log text | |
| Returns a list of activities in the format: | |
| [ | |
| { | |
| "activity": "Brief description of activity", | |
| "text": "Full text describing the activity", | |
| "time": "Time of activity (if available)", | |
| "location": "Location of activity (if available)" | |
| }, | |
| ... | |
| ] | |
| """ | |
| try: | |
| # Check if OpenAI API key is set | |
| api_key = os.environ.get('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY environment variable is not set") | |
| raise ValueError("OpenAI API key not configured") | |
| # Create OpenAI client | |
| client = openai.OpenAI(api_key=api_key) | |
| # Prepare prompt for OpenAI | |
| prompt = f""" | |
| I need to extract individual activities from a law enforcement officer's daily log. | |
| Please analyze the following text and break it down into discrete activities or events. | |
| For each activity, provide: | |
| 1. A brief description | |
| 2. The full text of that activity | |
| 3. Time (if mentioned) | |
| 4. Location (if mentioned) | |
| Format the output as a JSON array of objects, where each object has fields: | |
| "activity", "text", "time", "location" | |
| Here is the log text: | |
| {text} | |
| """ | |
| # Call OpenAI API | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are an assistant that extracts structured data from police daily logs."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| # Extract and return activities | |
| activities = response.choices[0].message.content | |
| return activities | |
| except Exception as e: | |
| logger.error(f"Error extracting activities with LLM: {str(e)}") | |
| raise | |
| def fill_markdown_form(markdown_template, extracted_data): | |
| """ | |
| Fill a markdown template with extracted data | |
| Args: | |
| markdown_template (str): The markdown template with placeholders | |
| extracted_data (dict): Dictionary of field:value pairs to insert | |
| Returns: | |
| str: Filled markdown content | |
| """ | |
| try: | |
| # Check if OpenAI API key is set | |
| api_key = os.environ.get('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY environment variable is not set") | |
| raise ValueError("OpenAI API key not configured") | |
| # Create OpenAI client | |
| client = openai.OpenAI(api_key=api_key) | |
| # Prepare data as a string for the prompt | |
| data_text = "\n".join([f"{key}: {value}" for key, value in extracted_data.items()]) | |
| # Prepare the prompt for OpenAI | |
| prompt = f""" | |
| I need to fill out a markdown form template with extracted data. | |
| Here is the extracted data: | |
| {data_text} | |
| Here is the markdown template: | |
| ```markdown | |
| {markdown_template} | |
| ``` | |
| Please fill in the template with the appropriate data, replacing the placeholders with the actual values. | |
| You should: | |
| 1. Look for placeholders in the template (they might be in various formats like {{field}}, [field], etc.) | |
| 2. Replace them with the corresponding values from the extracted data | |
| 3. Format dates and other values appropriately | |
| 4. Return ONLY the filled markdown without any additional text or formatting | |
| """ | |
| # Call OpenAI API | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are a form-filling assistant that precisely fills in templates with data."}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| ) | |
| # Get the filled markdown | |
| filled_markdown = response.choices[0].message.content.strip() | |
| # Remove any markdown code block markers if the LLM included them | |
| filled_markdown = filled_markdown.replace("```markdown", "").replace("```", "").strip() | |
| return filled_markdown | |
| except Exception as e: | |
| logger.error(f"Error filling markdown form: {str(e)}") | |
| raise | |
| def save_filled_form(filled_markdown, filename, department_id, user_id): | |
| """ | |
| Convert filled markdown to PDF and save to GridFS | |
| Args: | |
| filled_markdown (str): The filled markdown content | |
| filename (str): The name to give the form | |
| department_id (ObjectId): The department ID | |
| user_id (ObjectId): The user ID | |
| Returns: | |
| str: URL to access the saved form | |
| """ | |
| try: | |
| # Convert markdown to HTML | |
| html = markdown.markdown(filled_markdown) | |
| # Add some basic styling to the HTML | |
| styled_html = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <style> | |
| body {{ | |
| font-family: Arial, sans-serif; | |
| line-height: 1.6; | |
| margin: 30px; | |
| }} | |
| h1, h2, h3, h4 {{ | |
| color: #333; | |
| }} | |
| table {{ | |
| border-collapse: collapse; | |
| width: 100%; | |
| }} | |
| th, td {{ | |
| border: 1px solid #ddd; | |
| padding: 8px; | |
| }} | |
| th {{ | |
| background-color: #f2f2f2; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| {html} | |
| </body> | |
| </html> | |
| """ | |
| # Convert HTML to PDF using WeasyPrint | |
| pdf_bytes = io.BytesIO() | |
| weasyprint.HTML(string=styled_html).write_pdf(pdf_bytes) | |
| pdf_bytes.seek(0) | |
| # Save to GridFS | |
| fs = get_gridfs() | |
| file_id = fs.put( | |
| pdf_bytes.getvalue(), | |
| filename=f"{filename}.pdf", | |
| content_type='application/pdf', | |
| metadata={ | |
| 'user_id': str(user_id), | |
| 'department_id': str(department_id), | |
| 'form_type': 'filled_form', | |
| 'upload_date': datetime.now() | |
| } | |
| ) | |
| # Create and return the file URL | |
| form_url = f"/api/logs/files/{file_id}" | |
| return form_url | |
| except Exception as e: | |
| logger.error(f"Error saving filled form: {str(e)}") | |
| raise | |
| def extract_required_data(activity_text, data_requirements): | |
| """ | |
| Extract required data from activity text based on data requirements | |
| Returns a dictionary of field:value pairs | |
| """ | |
| try: | |
| # Check if OpenAI API key is set | |
| api_key = os.environ.get('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.error("OPENAI_API_KEY environment variable is not set") | |
| return {} | |
| # Create OpenAI client | |
| client = openai.OpenAI(api_key=api_key) | |
| # Prepare data requirements as a string | |
| requirements_text = "\n".join([ | |
| f"{i+1}. {req['field']}: {req['description']}" | |
| for i, req in enumerate(data_requirements) | |
| ]) | |
| prompt = f""" | |
| I need to extract specific information from a law enforcement activity text. | |
| I need to extract the following information: | |
| {requirements_text} | |
| Here is the activity text: | |
| {activity_text} | |
| Please extract the requested information and format as a JSON object with the field names as keys. | |
| If any information is not available, use null as the value. | |
| """ | |
| # Call OpenAI API | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are a data extraction assistant that extracts specific information from text."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| # Parse the extracted data | |
| extracted_data = json.loads(response.choices[0].message.content) | |
| return extracted_data | |
| except Exception as e: | |
| logger.error(f"Error extracting required data: {str(e)}") | |
| return {} |