| import os
|
| import base64
|
| import requests
|
| from dotenv import load_dotenv
|
| from typing import Optional, Dict
|
|
|
| try:
|
|
|
| from mistralai import Mistral
|
| except Exception:
|
| try:
|
|
|
| from mistralai.client import MistralClient as Mistral
|
| except Exception:
|
| Mistral = None
|
|
|
| load_dotenv()
|
|
|
| class ImageOCR:
|
| """
|
| Extract text from news images using Mistral OCR API.
|
| Useful for analyzing screenshots of news shared on social media.
|
| """
|
|
|
| def __init__(self):
|
| self.api_key = os.getenv('MISTRAL_API_KEY')
|
| self.enabled = False
|
| self.client = None
|
| self.use_http_fallback = False
|
| self.model = "mistral-ocr-latest"
|
|
|
| if not (self.api_key and self.api_key != 'your_api_key_here' and len(self.api_key) > 10):
|
| print("⚠ MISTRAL_API_KEY not configured, image OCR disabled")
|
| return
|
|
|
| if Mistral is not None:
|
| try:
|
| self.client = Mistral(api_key=self.api_key)
|
| self.enabled = True
|
| print("✓ Image OCR (Mistral OCR SDK) initialized successfully")
|
| return
|
| except Exception as e:
|
| print(f"⚠ Failed to initialize Mistral OCR SDK: {e}")
|
|
|
|
|
| self.use_http_fallback = True
|
| self.enabled = True
|
| print("⚠ Mistral SDK unavailable, using direct HTTP OCR fallback")
|
|
|
| def extract_text_from_image(self, image_data: bytes, mime_type: str = "image/jpeg") -> Optional[Dict]:
|
| """
|
| Extract news title and text from an image using Mistral OCR.
|
|
|
| Args:
|
| image_data: Raw image bytes
|
| mime_type: Image MIME type (image/jpeg, image/png, etc.)
|
|
|
| Returns:
|
| Dict with extracted title, text, and metadata
|
| """
|
| if not self.enabled:
|
| return None
|
|
|
| try:
|
|
|
| base64_image = base64.b64encode(image_data).decode('utf-8')
|
| return self._call_mistral_ocr(base64_image, mime_type)
|
|
|
| except Exception as e:
|
| print(f"Image OCR error: {e}")
|
| return {
|
| "title": "NOT_FOUND",
|
| "text": "NOT_FOUND",
|
| "source": "NOT_FOUND",
|
| "date": "NOT_FOUND",
|
| "error": str(e),
|
| "extraction_success": False
|
| }
|
|
|
| def _call_mistral_ocr(self, base64_image: str, mime_type: str) -> Dict:
|
| """Call Mistral OCR API for text extraction."""
|
|
|
| try:
|
|
|
| image_data_url = f"data:{mime_type};base64,{base64_image}"
|
| if self.client and not self.use_http_fallback:
|
| ocr_response = self.client.ocr.process(
|
| model=self.model,
|
| document={
|
| "type": "image_url",
|
| "image_url": image_data_url
|
| }
|
| )
|
| else:
|
| ocr_response = self._call_mistral_ocr_http(image_data_url)
|
|
|
| extracted_text = self._extract_text_from_ocr_response(ocr_response)
|
|
|
| extracted_text = extracted_text.strip()
|
|
|
| if not extracted_text:
|
| return {
|
| "title": "NOT_FOUND",
|
| "text": "NOT_FOUND",
|
| "source": "NOT_FOUND",
|
| "date": "NOT_FOUND",
|
| "extraction_success": False
|
| }
|
|
|
|
|
| return self._parse_extracted_text(extracted_text)
|
|
|
| except Exception as e:
|
| print(f"Mistral OCR API error: {e}")
|
| return {
|
| "title": "NOT_FOUND",
|
| "text": "NOT_FOUND",
|
| "source": "NOT_FOUND",
|
| "date": "NOT_FOUND",
|
| "error": str(e),
|
| "extraction_success": False
|
| }
|
|
|
| def _call_mistral_ocr_http(self, image_data_url: str) -> Dict:
|
| """Fallback to Mistral OCR REST API if SDK is unavailable."""
|
| if not self.api_key:
|
| raise RuntimeError("MISTRAL_API_KEY is missing")
|
|
|
| response = requests.post(
|
| "https://api.mistral.ai/v1/ocr",
|
| headers={
|
| "Authorization": f"Bearer {self.api_key}",
|
| "Content-Type": "application/json",
|
| },
|
| json={
|
| "model": self.model,
|
| "document": {
|
| "type": "image_url",
|
| "image_url": image_data_url,
|
| },
|
| },
|
| timeout=60,
|
| )
|
| response.raise_for_status()
|
| return response.json()
|
|
|
| def _extract_text_from_ocr_response(self, ocr_response) -> str:
|
| """Extract page text from both SDK objects and HTTP JSON responses."""
|
| extracted_text = ""
|
|
|
| if isinstance(ocr_response, dict):
|
| pages = ocr_response.get("pages", [])
|
| for page in pages:
|
| markdown = page.get("markdown") if isinstance(page, dict) else None
|
| text = page.get("text") if isinstance(page, dict) else None
|
| if markdown:
|
| extracted_text += markdown + "\n"
|
| elif text:
|
| extracted_text += text + "\n"
|
| return extracted_text.strip()
|
|
|
| if ocr_response and hasattr(ocr_response, 'pages'):
|
| for page in ocr_response.pages:
|
| if hasattr(page, 'markdown') and page.markdown:
|
| extracted_text += page.markdown + "\n"
|
| elif hasattr(page, 'text') and page.text:
|
| extracted_text += page.text + "\n"
|
|
|
| return extracted_text.strip()
|
|
|
| def _parse_extracted_text(self, text: str) -> Dict:
|
| """Parse OCR extracted text to identify title, content, source, and date."""
|
|
|
| lines = [line.strip() for line in text.split('\n') if line.strip()]
|
|
|
| if not lines:
|
| return {
|
| "title": "NOT_FOUND",
|
| "text": "NOT_FOUND",
|
| "source": "NOT_FOUND",
|
| "date": "NOT_FOUND",
|
| "extraction_success": False
|
| }
|
|
|
|
|
| title = "NOT_FOUND"
|
| text_content = "NOT_FOUND"
|
| source = "NOT_FOUND"
|
| date = "NOT_FOUND"
|
|
|
|
|
| for i, line in enumerate(lines):
|
|
|
| if len(line) > 15 and not any(x in line.lower() for x in ['follow', 'share', 'comment', 'like', 'reply', 'retweet']):
|
| title = line[:300]
|
|
|
| remaining_lines = lines[i+1:] if i+1 < len(lines) else []
|
| if remaining_lines:
|
| text_content = ' '.join(remaining_lines)[:2000]
|
| break
|
|
|
|
|
| if title == "NOT_FOUND" and lines:
|
| title = lines[0][:300]
|
| if len(lines) > 1:
|
| text_content = ' '.join(lines[1:])[:2000]
|
|
|
|
|
| source_keywords = ['reuters', 'bbc', 'cnn', 'fox', 'nbc', 'abc', 'times', 'post', 'guardian', 'india today', 'ndtv', 'hindu', 'express', 'twitter', 'x.com', 'facebook', 'instagram']
|
| for line in lines:
|
| line_lower = line.lower()
|
| for keyword in source_keywords:
|
| if keyword in line_lower:
|
| source = line[:100]
|
| break
|
| if source != "NOT_FOUND":
|
| break
|
|
|
|
|
| import re
|
| date_patterns = [
|
| r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}',
|
| r'\d{1,2}\s+(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{4}',
|
| r'(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{4}',
|
| ]
|
|
|
| for pattern in date_patterns:
|
| match = re.search(pattern, text, re.IGNORECASE)
|
| if match:
|
| date = match.group()
|
| break
|
|
|
| return {
|
| "title": title,
|
| "text": text_content if text_content != "NOT_FOUND" else title,
|
| "source": source,
|
| "date": date,
|
| "raw_text": text[:3000],
|
| "extraction_success": True
|
| }
|
|
|
| def extract_from_base64(self, base64_string: str, mime_type: str = "image/jpeg") -> Optional[Dict]:
|
| """
|
| Extract text from a base64-encoded image.
|
|
|
| Args:
|
| base64_string: Base64 encoded image string
|
| mime_type: Image MIME type
|
|
|
| Returns:
|
| Dict with extracted text
|
| """
|
| if not self.enabled:
|
| return None
|
|
|
| try:
|
|
|
| if ',' in base64_string:
|
| base64_string = base64_string.split(',')[1]
|
|
|
| return self._call_mistral_ocr(base64_string, mime_type)
|
| except Exception as e:
|
| print(f"Image OCR error: {e}")
|
| return {
|
| "title": "NOT_FOUND",
|
| "text": "NOT_FOUND",
|
| "source": "NOT_FOUND",
|
| "date": "NOT_FOUND",
|
| "error": str(e),
|
| "extraction_success": False
|
| }
|
|
|
|
|
|
|
| image_ocr = ImageOCR()
|
|
|