| import httpx |
| import os |
| from typing import Dict, Any, Optional |
| from pathlib import Path |
|
|
| class AssetAnalyzer: |
| """Service to analyze uploaded assets using OCR API and extract content""" |
| |
| def __init__(self): |
| self.ocr_api_url = os.getenv("OCR_API_URL", "https://seth0330-ezofisocr.hf.space") |
| self.ocr_api_key = os.getenv("OCR_API_KEY", "") |
| |
| async def analyze_document(self, file_path: str, key_fields: Optional[str] = None) -> Dict[str, Any]: |
| """ |
| Analyze a document using the OCR API |
| |
| Args: |
| file_path: Path to the file to analyze |
| key_fields: Optional comma-separated string of key fields to extract |
| |
| Returns: |
| Dictionary containing extracted content and metadata |
| """ |
| try: |
| file_path_obj = Path(file_path) |
| if not file_path_obj.exists(): |
| return { |
| "success": False, |
| "error": "File not found", |
| "extracted_content": None |
| } |
| |
| |
| file_type = self._get_file_type(file_path) |
| if file_type not in ["document", "image"]: |
| return { |
| "success": True, |
| "extracted_content": None, |
| "message": f"File type {file_type} not suitable for OCR analysis" |
| } |
| |
| |
| with open(file_path, 'rb') as f: |
| files = {'file': (file_path_obj.name, f, self._get_content_type(file_path))} |
| data = {} |
| if key_fields: |
| data['key_fields'] = key_fields |
| |
| headers = {} |
| if self.ocr_api_key: |
| headers["X-API-Key"] = self.ocr_api_key |
| |
| async with httpx.AsyncClient(timeout=60.0) as client: |
| response = await client.post( |
| f"{self.ocr_api_url}/api/extract", |
| headers=headers, |
| files=files, |
| data=data |
| ) |
| |
| if response.status_code == 200: |
| result = response.json() |
| return { |
| "success": True, |
| "extracted_content": result, |
| "message": "Document analyzed successfully" |
| } |
| else: |
| return { |
| "success": False, |
| "error": f"OCR API returned status {response.status_code}: {response.text}", |
| "extracted_content": None |
| } |
| except Exception as e: |
| return { |
| "success": False, |
| "error": str(e), |
| "extracted_content": None |
| } |
| |
| async def analyze_image(self, file_path: str) -> Dict[str, Any]: |
| """ |
| Analyze an image using GPT-4 Vision (for screenshots, infographics, etc.) |
| This is a placeholder for future implementation |
| |
| Args: |
| file_path: Path to the image file |
| |
| Returns: |
| Dictionary containing image analysis |
| """ |
| |
| |
| return { |
| "success": True, |
| "extracted_content": { |
| "type": "image", |
| "message": "Image analysis not yet implemented" |
| }, |
| "message": "Image analysis placeholder" |
| } |
| |
| def _get_file_type(self, file_path: str) -> str: |
| """Determine file type from extension""" |
| ext = Path(file_path).suffix.lower() |
| document_extensions = ['.pdf', '.doc', '.docx', '.txt', '.rtf'] |
| image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg'] |
| video_extensions = ['.mp4', '.avi', '.mov', '.wmv', '.flv'] |
| |
| if ext in document_extensions: |
| return "document" |
| elif ext in image_extensions: |
| return "image" |
| elif ext in video_extensions: |
| return "video" |
| else: |
| return "unknown" |
| |
| def _get_content_type(self, file_path: str) -> str: |
| """Get MIME type for file""" |
| ext = Path(file_path).suffix.lower() |
| content_types = { |
| '.pdf': 'application/pdf', |
| '.doc': 'application/msword', |
| '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', |
| '.txt': 'text/plain', |
| '.jpg': 'image/jpeg', |
| '.jpeg': 'image/jpeg', |
| '.png': 'image/png', |
| '.gif': 'image/gif', |
| } |
| return content_types.get(ext, 'application/octet-stream') |
| |
| def extract_key_insights(self, extracted_content: Dict[str, Any]) -> str: |
| """ |
| Extract key insights from OCR results to use as context for AI content generation |
| |
| Args: |
| extracted_content: The JSON response from OCR API |
| |
| Returns: |
| Formatted string with key insights |
| """ |
| if not extracted_content: |
| return "" |
| |
| insights = [] |
| |
| |
| if isinstance(extracted_content, dict): |
| |
| for key, value in extracted_content.items(): |
| if value and key not in ['raw_text', 'confidence', 'metadata']: |
| if isinstance(value, (str, int, float)): |
| insights.append(f"{key}: {value}") |
| elif isinstance(value, list) and len(value) > 0: |
| insights.append(f"{key}: {', '.join(map(str, value[:5]))}") |
| |
| |
| if 'raw_text' in extracted_content: |
| raw_text = extracted_content['raw_text'] |
| if isinstance(raw_text, str) and len(raw_text) > 0: |
| |
| if len(raw_text) > 500: |
| insights.append(f"Document content: {raw_text[:500]}...") |
| else: |
| insights.append(f"Document content: {raw_text}") |
| |
| return "\n".join(insights) if insights else "No specific insights extracted" |
|
|
|
|