Spaces:
Running
Running
| # type: ignore | |
| """ | |
| AKIRA V21 ULTIMATE - Módulo de Visão Computacional e OCR | |
| ========================================================== | |
| Visão computacional avançada com OCR multilíngue (PT/EN), | |
| detecção de formas e objetos, e aprendizado contínuo de imagens. | |
| Features: | |
| - OCR com Tesseract (Português + Inglês) | |
| - Detecção de formas e contornos | |
| - Detecção de objetos básicos (Haar Cascades) | |
| - Aprendizado de características por imagem | |
| - Armazenamento no SQLite (tabela imagefeatures) | |
| - Suporte a imagens em base64 | |
| Author: AKIRA V21 ULTIMATE | |
| ⚠️ IMPORTANTE: Imports são lazy para permitir que o módulo | |
| seja desabilitado se OpenCV/Tesseract não estiverem disponíveis. | |
| """ | |
| import hashlib | |
| import json | |
| import time | |
| import os | |
| from typing import Dict, Any, List, Optional, Tuple | |
| from datetime import datetime | |
| from dataclasses import dataclass | |
| from loguru import logger | |
| import base64 | |
| import re | |
| # Imports lazy - só carregam quando necessário | |
| _cv2 = None | |
| _np = None | |
| _pytesseract = None | |
| _PIL_Image = None | |
| def _check_dependencies() -> Tuple[bool, str]: | |
| """Verifica se as dependências estão disponíveis""" | |
| global _cv2, _np, _pytesseract, _PIL_Image | |
| errors = [] | |
| try: | |
| import cv2 as _cv2_temp | |
| _cv2 = _cv2_temp | |
| except (ImportError, Exception) as e: | |
| errors.append(f"OpenCV: {str(e)[:50]}") | |
| try: | |
| import numpy as _np_temp | |
| _np = _np_temp | |
| except (ImportError, Exception) as e: | |
| errors.append(f"NumPy: {str(e)[:50]}") | |
| try: | |
| import pytesseract as _pytesseract_temp | |
| _pytesseract = _pytesseract_temp | |
| except (ImportError, Exception) as e: | |
| errors.append(f"pytesseract: {str(e)[:50]}") | |
| try: | |
| from PIL import Image as _PIL_Image_temp | |
| _PIL_Image = _PIL_Image_temp | |
| except (ImportError, Exception) as e: | |
| errors.append(f"Pillow: {str(e)[:50]}") | |
| if errors: | |
| return False, "; ".join(errors) | |
| return True, "OK" | |
| # Verificação inicial - módulo pode ser desabilitado | |
| _DEPENDENCIES_OK, _DEPENDENCY_ERROR = _check_dependencies() | |
| if _DEPENDENCIES_OK: | |
| logger.success("✅ Dependências de Visão Computacional carregadas") | |
| else: | |
| logger.warning(f"⚠️ Visão Computacional limitada: {_DEPENDENCY_ERROR}") | |
| # ============================================================ | |
| # CONFIGURAÇÕES | |
| # ============================================================ | |
| class VisionConfig: | |
| """Configuração do módulo de visão computacional""" | |
| # OCR | |
| ocr_primary_lang: str = "por" # Português | |
| ocr_secondary_lang: str = "eng" # Inglês | |
| ocr_psm: int = 6 # Page segmentation mode (6 = block uniforme) | |
| ocr_oem: int = 3 # OCR Engine mode (3 = default) | |
| # OpenCV | |
| blur_kernel: int = 5 | |
| canny_threshold1: float = 50 | |
| canny_threshold2: float = 150 | |
| contour_min_area: int = 100 | |
| contour_max_area: int = 100000 | |
| # Aprendizado | |
| feature_dim: int = 128 # Dimensão do vetor de características | |
| similarity_threshold: float = 0.85 # Threshold para considerar "similar" | |
| max_stored_features: int = 1000 # Max características por usuário | |
| # Processamento | |
| max_image_size: int = 1920 # Redimensiona se maior | |
| resize_factor: float = 0.5 # Fator de redimensionamento | |
| preserve_aspect_ratio: bool = True | |
| class ImageFeature: | |
| """Classe para armazenar características de uma imagem""" | |
| def __init__( | |
| self, | |
| image_hash: str, | |
| features: list, | |
| text_detected: str, | |
| shapes: List[Dict[str, Any]], | |
| objects: List[str], | |
| user_id: str, | |
| confidence: float = 0.0, | |
| width: int = 0, | |
| height: int = 0 | |
| ): | |
| self.image_hash = image_hash | |
| self.features = features | |
| self.text_detected = text_detected | |
| self.shapes = shapes | |
| self.objects = objects | |
| self.user_id = user_id | |
| self.confidence = confidence | |
| self.width = width | |
| self.height = height | |
| self.timestamp = time.time() | |
| self.occurrences = 1 | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Converte para dicionário serializável""" | |
| return { | |
| "image_hash": self.image_hash, | |
| "features": self.features, | |
| "text_detected": self.text_detected, | |
| "shapes": self.shapes, | |
| "objects": self.objects, | |
| "user_id": self.user_id, | |
| "confidence": self.confidence, | |
| "width": self.width, | |
| "height": self.height, | |
| "timestamp": self.timestamp, | |
| "occurrences": self.occurrences | |
| } | |
| def from_dict(cls, data: Dict[str, Any]) -> 'ImageFeature': | |
| """Cria instância a partir de dicionário""" | |
| feature = cls( | |
| image_hash=data["image_hash"], | |
| features=data["features"], | |
| text_detected=data["text_detected"], | |
| shapes=data.get("shapes", []), | |
| objects=data.get("objects", []), | |
| user_id=data["user_id"], | |
| confidence=data.get("confidence", 0.0), | |
| width=data.get("width", 0), | |
| height=data.get("height", 0) | |
| ) | |
| feature.timestamp = data.get("timestamp", time.time()) | |
| feature.occurrences = data.get("occurrences", 1) | |
| return feature | |
| class ComputerVision: | |
| """ | |
| Motor de visão computacional avançada para AKIRA. | |
| Capabilities: | |
| - OCR multilíngue (PT/EN) | |
| - Detecção de formas geométricas | |
| - Análise de cores e padrões | |
| - Aprendizado de características | |
| - Comparação de similaridade entre imagens | |
| """ | |
| def __init__(self, config: Optional[VisionConfig] = None): | |
| self.config = config or VisionConfig() | |
| self.db_path = "akira.db" | |
| self._cache: Dict[str, dict] = {} | |
| self._similarity_cache: Dict[str, float] = {} | |
| # Inicializa Tesseract | |
| self._tesseract_available = self._check_tesseract() | |
| # Modelos OpenCV | |
| self._face_cascade = None | |
| self._eye_cascade = None | |
| self._plate_cascade = None | |
| if _DEPENDENCIES_OK: | |
| logger.info("ComputerVision inicializado") | |
| else: | |
| logger.warning("ComputerVision operando em modo limitado") | |
| def _check_tesseract(self) -> bool: | |
| """Verifica se Tesseract está disponível""" | |
| if not _DEPENDENCIES_OK or _pytesseract is None: | |
| return False | |
| try: | |
| _pytesseract.get_tesseract_version() | |
| logger.success("Tesseract OCR disponível") | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Tesseract não disponível: {e}") | |
| return False | |
| def _init_cascades(self): | |
| """Inicializa classificadores em cascata""" | |
| if not _DEPENDENCIES_OK or _cv2 is None: | |
| return | |
| try: | |
| # Paths possíveis para classifiers | |
| cv2_data_path = _cv2.data.haarcascades | |
| # Face detection | |
| face_path = os.path.join(cv2_data_path, "haarcascade_frontalface_default.xml") | |
| if os.path.exists(face_path): | |
| self._face_cascade = _cv2.CascadeClassifier(face_path) | |
| logger.info("Face cascade carregada") | |
| # Eye detection | |
| eye_path = os.path.join(cv2_data_path, "haarcascade_eye.xml") | |
| if os.path.exists(eye_path): | |
| self._eye_cascade = _cv2.CascadeClassifier(eye_path) | |
| logger.info("Eye cascade carregada") | |
| # License plate (modelo simplificado) | |
| plate_path = os.path.join(cv2_data_path, "haarcascade_russian_plate_number.xml") | |
| if os.path.exists(plate_path): | |
| self._plate_cascade = _cv2.CascadeClassifier(plate_path) | |
| logger.info("Plate cascade carregada") | |
| except Exception as e: | |
| logger.warning(f"Erro ao carregar cascades: {e}") | |
| def decode_base64_image(self, base64_string: str) -> Optional[Any]: | |
| """Decodifica imagem em base64 para array numpy""" | |
| if not _DEPENDENCIES_OK: | |
| logger.error("Dependências de CV não disponíveis") | |
| return None | |
| if not base64_string or len(base64_string) < 100: | |
| logger.error(f"Base64 inválido: comprimento {len(base64_string) if base64_string else 0}") | |
| return None | |
| try: | |
| # Remove prefix se existir (data:image/png;base64,) | |
| if "base64," in base64_string: | |
| base64_string = base64_string.split("base64,")[1] | |
| # Remove espaços e quebras de linha | |
| base64_string = base64_string.strip().replace('\n', '').replace('\r', '') | |
| # Decodifica | |
| logger.debug(f"Decodificando base64 ({len(base64_string)} chars)") | |
| image_data = base64.b64decode(base64_string) | |
| logger.debug(f"Bytes decodificados: {len(image_data)}") | |
| # Converte para numpy array | |
| image_array = _np.frombuffer(image_data, dtype=_np.uint8) | |
| logger.debug(f"Array numpy criado: shape {image_array.shape}") | |
| # Decodifica com OpenCV | |
| image = _cv2.imdecode(image_array, _cv2.IMREAD_COLOR) | |
| if image is None: | |
| logger.error("OpenCV retornou None - formato não suportado ou dados corrompidos") | |
| return None | |
| logger.success(f"✅ Imagem decodificada: {image.shape[1]}x{image.shape[0]} px") | |
| return image | |
| except Exception as e: | |
| logger.error(f"Erro ao decodificar imagem: {e}") | |
| import traceback | |
| logger.error(f"Stack trace: {traceback.format_exc()}") | |
| return None | |
| def preprocess_image(self, image: Any) -> Any: | |
| """Pré-processa imagem para análise""" | |
| if not _DEPENDENCIES_OK or image is None: | |
| return None | |
| # Redimensiona se muito grande | |
| height, width = image.shape[:2] | |
| if max(height, width) > self.config.max_image_size: | |
| scale = self.config.max_image_size / max(height, width) | |
| image = _cv2.resize( | |
| image, | |
| None, | |
| fx=scale, | |
| fy=scale, | |
| interpolation=_cv2.INTER_AREA | |
| ) | |
| # Converte para RGB se necessário | |
| if len(image.shape) == 2: | |
| image = _cv2.cvtColor(image, _cv2.COLOR_GRAY2BGR) | |
| elif image.shape[2] == 4: | |
| image = _cv2.cvtColor(image, _cv2.COLOR_BGRA2BGR) | |
| elif image.shape[2] == 3: | |
| pass # Já BGR | |
| else: | |
| image = _cv2.cvtColor(image, _cv2.COLOR_GRAY2BGR) | |
| return image | |
| def extract_features(self, image: Any) -> List[float]: | |
| """Extrai vetor de características da imagem""" | |
| if not _DEPENDENCIES_OK or image is None: | |
| return [0.0] * self.config.feature_dim | |
| features = [] | |
| # 1. Histograma de cores (HSV) | |
| hsv = _cv2.cvtColor(image, _cv2.COLOR_BGR2HSV) | |
| hist_h = _cv2.calcHist([hsv], [0], None, [32], [0, 256]).flatten() | |
| hist_s = _cv2.calcHist([hsv], [1], None, [32], [0, 256]).flatten() | |
| hist_v = _cv2.calcHist([hsv], [2], None, [32], [0, 256]).flatten() | |
| # Normaliza histogramas | |
| hist_h = hist_h / (hist_h.sum() + 1e-6) | |
| hist_s = hist_s / (hist_s.sum() + 1e-6) | |
| hist_v = hist_v / (hist_v.sum() + 1e-6) | |
| features.extend(hist_h.tolist()) | |
| features.extend(hist_s.tolist()) | |
| features.extend(hist_v.tolist()) | |
| # 2. Textura (LBP simplificado) | |
| gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY) | |
| # Estatísticas de textura | |
| mean, std = _cv2.meanStdDev(gray) | |
| features.append(float(mean[0]) / 255.0) | |
| features.append(float(std[0]) / 255.0) | |
| # 3. Dimensões normalizadas | |
| h, w = image.shape[:2] | |
| features.append(h / max(h, w)) | |
| features.append(w / max(h, w)) | |
| features.append(h * w / (1920 * 1080)) # Area ratio | |
| # 4. Bordas (Canny) | |
| edges = _cv2.Canny(gray, self.config.canny_threshold1, self.config.canny_threshold2) | |
| edge_density = float(_np.sum(edges > 0) / (edges.size + 1e-6)) | |
| features.append(edge_density) | |
| # 5. Saturation e Brightness médios | |
| avg_saturation = float(_np.mean(hsv[:, :, 1]) / 255.0) | |
| avg_brightness = float(_np.mean(hsv[:, :, 2]) / 255.0) | |
| features.append(avg_saturation) | |
| features.append(avg_brightness) | |
| # Garante dimensão fixa | |
| current_len = len(features) | |
| target_len = int(self.config.feature_dim) | |
| if current_len < target_len: | |
| features.extend([0.0] * (target_len - current_len)) | |
| return [float(x) for x in features[:target_len]] | |
| def detect_shapes(self, image: Any) -> List[Dict[str, Any]]: | |
| """Detecta formas geométricas na imagem""" | |
| if not _DEPENDENCIES_OK or image is None: | |
| return [] | |
| shapes = [] | |
| gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY) | |
| blur = _cv2.GaussianBlur(gray, (self.config.blur_kernel, self.config.blur_kernel), 0) | |
| # Detecção de bordas | |
| edges = _cv2.Canny(blur, self.config.canny_threshold1, self.config.canny_threshold2) | |
| # Contornos | |
| contours, _ = _cv2.findContours(edges, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE) | |
| for contour in contours: | |
| area = _cv2.contourArea(contour) | |
| # Filtra por área | |
| if area < self.config.contour_min_area or area > self.config.contour_max_area: | |
| continue | |
| # Aproxima forma | |
| peri = _cv2.arcLength(contour, True) | |
| approx = _cv2.approxPolyDP(contour, 0.02 * peri, True) | |
| # Classifica forma | |
| x, y, w, h = _cv2.boundingRect(approx) | |
| aspect_ratio = w / (h + 1e-6) | |
| num_vertices = len(approx) | |
| shape_type = "desconhecido" | |
| if num_vertices == 3: | |
| shape_type = "triangulo" | |
| elif num_vertices == 4: | |
| if 0.9 <= aspect_ratio <= 1.1: | |
| shape_type = "quadrado" | |
| else: | |
| shape_type = "retangulo" | |
| elif num_vertices == 5: | |
| shape_type = "pentagono" | |
| elif num_vertices == 6: | |
| shape_type = "hexagono" | |
| elif num_vertices > 6: | |
| # Verifica se é círculo | |
| (cx, cy), radius = _cv2.minEnclosingCircle(contour) | |
| circle_area = 3.14159 * radius * radius | |
| if 0.7 <= area / circle_area <= 1.3: | |
| shape_type = "circulo" | |
| else: | |
| shape_type = "poligono_irregular" | |
| if shape_type != "desconhecido": | |
| shapes.append({ | |
| "tipo": shape_type, | |
| "vertices": num_vertices, | |
| "area": float(area), | |
| "centro": {"x": int(x + w/2), "y": int(y + h/2)}, | |
| "dimensoes": {"largura": w, "altura": h}, | |
| "aspect_ratio": round(aspect_ratio, 3) | |
| }) | |
| return shapes | |
| def detect_objects_cascade(self, image: Any) -> List[Dict[str, Any]]: | |
| """Detecta objetos usando classificadores em cascata""" | |
| if not _DEPENDENCIES_OK or image is None: | |
| return [] | |
| objects = [] | |
| if self._face_cascade is None: | |
| self._init_cascades() | |
| gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY) | |
| # Detecta faces | |
| if self._face_cascade is not None: | |
| faces = self._face_cascade.detectMultiScale( | |
| gray, | |
| scaleFactor=1.1, | |
| minNeighbors=5, | |
| minSize=(30, 30) | |
| ) | |
| for i, (x, y, w, h) in enumerate(faces): | |
| objects.append({ | |
| "tipo": "face", | |
| "id": i, | |
| "posicao": {"x": int(x), "y": int(y)}, | |
| "dimensoes": {"largura": w, "altura": h}, | |
| "confianca": 0.85 | |
| }) | |
| # Detecta olhos | |
| if self._eye_cascade is not None: | |
| eyes = self._eye_cascade.detectMultiScale( | |
| gray, | |
| scaleFactor=1.1, | |
| minNeighbors=5, | |
| minSize=(15, 15) | |
| ) | |
| for i, (x, y, w, h) in enumerate(eyes): | |
| objects.append({ | |
| "tipo": "olho", | |
| "id": i, | |
| "posicao": {"x": int(x), "y": int(y)}, | |
| "dimensoes": {"largura": w, "altura": h}, | |
| "confianca": 0.80 | |
| }) | |
| # Detecta placas (simplificado) | |
| if self._plate_cascade is not None: | |
| plates = self._plate_cascade.detectMultiScale( | |
| gray, | |
| scaleFactor=1.1, | |
| minNeighbors=5, | |
| minSize=(60, 20) | |
| ) | |
| for i, (x, y, w, h) in enumerate(plates): | |
| objects.append({ | |
| "tipo": "placa", | |
| "id": i, | |
| "posicao": {"x": int(x), "y": int(y)}, | |
| "dimensoes": {"largura": w, "altura": h}, | |
| "confianca": 0.75 | |
| }) | |
| return objects | |
| def perform_ocr( | |
| self, | |
| image: Any, | |
| languages: Optional[List[str]] = None | |
| ) -> Dict[str, Any]: | |
| """Realiza OCR na imagem""" | |
| if not _DEPENDENCIES_OK or not self._tesseract_available or image is None: | |
| return { | |
| "success": False, | |
| "text": "", | |
| "confidence": 0.0, | |
| "languages": [], | |
| "error": "Tesseract não disponível" | |
| } | |
| if languages is None: | |
| languages = [self.config.ocr_primary_lang, self.config.ocr_secondary_lang] | |
| # Converte para grayscale | |
| gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY) | |
| # Aplica threshold adaptativo para melhorar OCR | |
| thresh = _cv2.adaptiveThreshold( | |
| gray, 255, | |
| _cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| _cv2.THRESH_BINARY_INV, | |
| 11, 2 | |
| ) | |
| # OCR config | |
| ocr_config = f"--psm {self.config.ocr_psm} --oem {self.config.ocr_oem}" | |
| # Executa OCR com idiomas combinados | |
| lang_code = "+".join(languages) | |
| try: | |
| text = _pytesseract.image_to_string(thresh, lang=lang_code, config=ocr_config) | |
| # Limpa texto | |
| text = self._clean_ocr_text(text) | |
| # Calcula confiança | |
| data = _pytesseract.image_to_data(thresh, lang=lang_code, config=ocr_config, output_type=dict) | |
| confs = [int(d) for d in data.get("conf", []) if int(d) > 0] | |
| avg_confidence = float(_np.mean(confs)) if confs else 0.0 | |
| return { | |
| "success": True, | |
| "text": text.strip(), | |
| "confidence": avg_confidence / 100.0, | |
| "languages": languages, | |
| "char_count": len(text), | |
| "word_count": len(text.split()) if text else 0 | |
| } | |
| except Exception as e: | |
| logger.error(f"OCR error: {e}") | |
| return { | |
| "success": False, | |
| "text": "", | |
| "confidence": 0.0, | |
| "languages": languages, | |
| "error": str(e) | |
| } | |
| def _clean_ocr_text(self, text: str) -> str: | |
| """Limpa texto do OCR""" | |
| # Remove caracteres especiais e ruído | |
| text = re.sub(r'[^\w\sáàâãéèêíïóôõúüçÁÀÂÃÉÈÊÍÏÓÔÕÚÜÇ\n\r\t]', '', text) | |
| # Normaliza espaços | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove linhas muito curtas ou vazias | |
| lines = [l.strip() for l in text.split('\n') if len(l.strip()) > 1] | |
| return '\n'.join(lines) | |
| def detect_text_regions(self, image: Any) -> List[Dict[str, Any]]: | |
| """Detecta regiões prováveis de texto""" | |
| if not _DEPENDENCIES_OK or image is None: | |
| return [] | |
| regions = [] | |
| gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY) | |
| blur = _cv2.GaussianBlur(gray, (5, 5), 0) | |
| # Detecta bordas | |
| edges = _cv2.Canny(blur, 50, 150) | |
| # Encontra contornos | |
| contours, _ = _cv2.findContours(edges, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE) | |
| for contour in contours: | |
| x, y, w, h = _cv2.boundingRect(contour) | |
| # Filtra regiões muito pequenas ou muito grandes | |
| if w < 20 or h < 10 or w > image.shape[1] or h > image.shape[0]: | |
| continue | |
| # Calcula aspecto ratio típico de texto | |
| aspect_ratio = w / (h + 1e-6) | |
| regions.append({ | |
| "x": int(x), | |
| "y": int(y), | |
| "width": int(w), | |
| "height": int(h), | |
| "aspect_ratio": round(aspect_ratio, 3), | |
| "area": float(w * h) | |
| }) | |
| # Ordena por posição (esquerda para direita, cima para baixo) | |
| regions.sort(key=lambda r: (r["y"] // 50, r["x"])) | |
| return regions | |
| def analyze_image( | |
| self, | |
| image: Any, | |
| user_id: str = "anonymous", | |
| include_ocr: bool = True, | |
| include_shapes: bool = True, | |
| include_objects: bool = True | |
| ) -> Dict[str, Any]: | |
| """Análise completa da imagem""" | |
| start_time = time.time() | |
| if not _DEPENDENCIES_OK or image is None: | |
| return { | |
| "success": False, | |
| "error": "Dependências não disponíveis", | |
| "dependencies_ok": False | |
| } | |
| # Pré-processa | |
| processed = self.preprocess_image(image) | |
| if processed is None: | |
| return {"success": False, "error": "Falha no pré-processamento"} | |
| h, w = processed.shape[:2] | |
| result: Dict[str, Any] = { | |
| "success": True, | |
| "timestamp": datetime.now().isoformat(), | |
| "dimensions": {"width": w, "height": h}, | |
| "user_id": user_id, | |
| "processing_time": 0, | |
| "dependencies_ok": True | |
| } | |
| shapes = [] | |
| text_detected = "" | |
| # OCR | |
| if include_ocr: | |
| ocr_result = self.perform_ocr(processed) | |
| result["ocr"] = ocr_result | |
| text_detected = ocr_result.get("text", "") | |
| result["text_detected"] = text_detected | |
| # Formas geométricas | |
| if include_shapes: | |
| shapes = self.detect_shapes(processed) | |
| result["shapes"] = shapes | |
| result["shape_count"] = len(shapes) | |
| # Objetos (face, olhos, etc.) | |
| if include_objects: | |
| objects = self.detect_objects_cascade(processed) | |
| result["objects"] = objects | |
| result["object_count"] = len(objects) | |
| # Extrai características | |
| features = self.extract_features(processed) | |
| result["features"] = { | |
| "dimension": len(features), | |
| "mean": float(sum(features) / len(features)) if features else 0.0, | |
| "std": 0.0 | |
| } | |
| # Calcula hash da imagem | |
| image_hash = self._compute_image_hash(image) | |
| result["image_hash"] = image_hash | |
| # Cria registro de aprendizado | |
| feature_record = ImageFeature( | |
| image_hash=image_hash, | |
| features=features, | |
| text_detected=text_detected, | |
| shapes=shapes if include_shapes else [], | |
| objects=[o["tipo"] for o in result.get("objects", [])], | |
| user_id=user_id, | |
| confidence=result.get("ocr", {}).get("confidence", 0.5), | |
| width=w, | |
| height=h | |
| ) | |
| # Salva características | |
| self._save_feature(feature_record) | |
| # Verifica se já conhece esta imagem | |
| similarity = self._find_similar(features, user_id) | |
| result["is_known"] = similarity > self.config.similarity_threshold | |
| result["similarity_score"] = similarity | |
| result["processing_time"] = round(time.time() - start_time, 3) | |
| return result | |
| def analyze_base64( | |
| self, | |
| base64_string: str, | |
| user_id: str = "anonymous" | |
| ) -> Dict[str, Any]: | |
| """Analisa imagem a partir de string base64 ou bytes diretos""" | |
| try: | |
| logger.info(f"--- [VISION DEBUG] Recebida requisição de análise para user: {user_id} ---") | |
| logger.debug(f"[VISION DEBUG] Base64 length: {len(base64_string) if base64_string else 0}") | |
| # Tenta decodificar como base64 | |
| image = self.decode_base64_image(base64_string) | |
| if image is None: | |
| logger.error("[VISION DEBUG] Falha ao decodificar imagem (image is None)") | |
| return { | |
| "success": False, | |
| "error": "Falha ao decodificar imagem (formato inválido ou corrompido)" | |
| } | |
| logger.info(f"[VISION DEBUG] Imagem decodificada com sucesso. Shape: {image.shape}") | |
| result = self.analyze_image(image, user_id) | |
| logger.info(f"[VISION DEBUG] Análise concluída. Sucesso: {result.get('success')}") | |
| return result | |
| except Exception as e: | |
| logger.exception(f"[VISION DEBUG] Erro crítico na análise de visão: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Erro interno na análise de visão: {str(e)}" | |
| } | |
| def _compute_image_hash(self, image: Any) -> str: | |
| """Computa hash perceptual da imagem""" | |
| if not _DEPENDENCIES_OK or image is None: | |
| return "unknown" | |
| # Reduz para tamanho pequeno para hash | |
| resized = _cv2.resize(image, (32, 32), interpolation=_cv2.INTER_AREA) | |
| gray = _cv2.cvtColor(resized, _cv2.COLOR_BGR2GRAY) | |
| # Hash simples baseado em diferenças | |
| hash_str = "" | |
| mean_val = float(_np.mean(gray)) | |
| for i in range(32): | |
| for j in range(32): | |
| if gray[i, j] > mean_val: | |
| hash_str += "1" | |
| else: | |
| hash_str += "0" | |
| return hashlib.md5(hash_str.encode()).hexdigest() | |
| def _save_feature(self, feature: ImageFeature): | |
| """Salva características no cache e banco""" | |
| self._cache[feature.image_hash] = feature.to_dict() | |
| # Salva no banco SQLite | |
| try: | |
| import sqlite3 | |
| conn = sqlite3.connect(self.db_path) | |
| c = conn.cursor() | |
| # Cria tabela se não existir | |
| c.execute(""" | |
| CREATE TABLE IF NOT EXISTS imagefeatures ( | |
| image_hash TEXT PRIMARY KEY, | |
| features BLOB, | |
| text_detected TEXT, | |
| shapes TEXT, | |
| objects TEXT, | |
| user_id TEXT, | |
| confidence REAL, | |
| width INTEGER, | |
| height INTEGER, | |
| timestamp REAL, | |
| occurrences INTEGER DEFAULT 1 | |
| ) | |
| """) | |
| # Verifica se já existe | |
| c.execute("SELECT occurrences FROM imagefeatures WHERE image_hash = ?", | |
| (feature.image_hash,)) | |
| existing = c.fetchone() | |
| if existing: | |
| # Atualiza ocorrências | |
| c.execute(""" | |
| UPDATE imagefeatures SET occurrences = occurrences + 1, | |
| timestamp = ? WHERE image_hash = ? | |
| """, (time.time(), feature.image_hash)) | |
| else: | |
| # Insere novo | |
| features_json = json.dumps(feature.to_dict()) | |
| c.execute(""" | |
| INSERT INTO imagefeatures VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| feature.image_hash, | |
| features_json, | |
| feature.text_detected, | |
| json.dumps(feature.shapes), | |
| json.dumps(feature.objects), | |
| feature.user_id, | |
| feature.confidence, | |
| feature.width, | |
| feature.height, | |
| feature.timestamp, | |
| feature.occurrences | |
| )) | |
| conn.commit() | |
| conn.close() | |
| except Exception as e: | |
| logger.warning(f"Erro ao salvar no banco: {e}") | |
| def _find_similar( | |
| self, | |
| features: List[float], | |
| user_id: str, | |
| limit: int = 5 | |
| ) -> float: | |
| """Encontra imagem similar e retorna score de similaridade""" | |
| max_similarity = 0.0 | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| c = conn.cursor() | |
| c.execute("SELECT features FROM imagefeatures WHERE user_id = ?", (user_id,)) | |
| rows = c.fetchall() | |
| conn.close() | |
| for row in rows: | |
| try: | |
| stored_dict = json.loads(row[0]) | |
| stored_features = stored_dict["features"] | |
| # Calcula similaridade (cosine similarity) | |
| dot = sum(f1 * f2 for f1, f2 in zip(features, stored_features)) | |
| norm1 = sum(f * f for f in features) ** 0.5 | |
| norm2 = sum(f * f for f in stored_features) ** 0.5 | |
| similarity = dot / (norm1 * norm2 + 1e-6) | |
| if similarity > max_similarity: | |
| max_similarity = similarity | |
| except Exception: | |
| continue | |
| except Exception as e: | |
| logger.warning(f"Erro ao buscar similar: {e}") | |
| return float(max_similarity) | |
| def get_learned_images(self, user_id: str) -> List[Dict[str, Any]]: | |
| """Obtém lista de imagens aprendidas pelo usuário""" | |
| images = [] | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| c = conn.cursor() | |
| c.execute(""" | |
| SELECT image_hash, text_detected, objects, confidence, timestamp, occurrences | |
| FROM imagefeatures | |
| WHERE user_id = ? | |
| ORDER BY timestamp DESC | |
| LIMIT 50 | |
| """, (user_id,)) | |
| rows = c.fetchall() | |
| conn.close() | |
| for row in rows: | |
| try: | |
| objects_list = json.loads(row[2]) if row[2] else [] | |
| except: | |
| objects_list = [] | |
| images.append({ | |
| "hash": row[0], | |
| "text": row[1], | |
| "objects": objects_list, | |
| "confidence": row[3], | |
| "timestamp": row[4], | |
| "occurrences": row[5] | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Erro ao obter imagens: {e}") | |
| return images | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Obtém estatísticas do módulo""" | |
| total_images = len(self._cache) | |
| # Conta do banco | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| c = conn.cursor() | |
| c.execute("SELECT COUNT(*) FROM imagefeatures") | |
| db_count = c.fetchone()[0] | |
| conn.close() | |
| except: | |
| db_count = 0 | |
| return { | |
| "cached_images": total_images, | |
| "database_images": db_count, | |
| "dependencies_ok": _DEPENDENCIES_OK, | |
| "tesseract_available": self._tesseract_available, | |
| "config": { | |
| "ocr_langs": [self.config.ocr_primary_lang, self.config.ocr_secondary_lang], | |
| "feature_dim": self.config.feature_dim, | |
| "similarity_threshold": self.config.similarity_threshold | |
| } | |
| } | |
| # ============================================================ | |
| # FUNÇÕES DE CONVENIÊNCIA | |
| # ============================================================ | |
| _vision_instance: Optional[ComputerVision] = None | |
| def get_computer_vision(config: Optional[VisionConfig] = None) -> ComputerVision: | |
| """Obtém instância singleton do ComputerVision""" | |
| global _vision_instance | |
| if _vision_instance is None: | |
| _vision_instance = ComputerVision(config) | |
| return _vision_instance | |
| def analyze_image_from_base64( | |
| base64_string: str, | |
| user_id: str = "anonymous" | |
| ) -> Dict[str, Any]: | |
| """Função de conveniência para analisar imagem base64""" | |
| vision = get_computer_vision() | |
| return vision.analyze_base64(base64_string, user_id) | |
| def analyze_image_file( | |
| file_path: str, | |
| user_id: str = "anonymous" | |
| ) -> Dict[str, Any]: | |
| """Função de conveniência para analisar arquivo de imagem""" | |
| if not _DEPENDENCIES_OK or _cv2 is None: | |
| return {"success": False, "error": "OpenCV não disponível"} | |
| vision = get_computer_vision() | |
| image = _cv2.imread(file_path) | |
| if image is None: | |
| return {"success": False, "error": "Falha ao carregar imagem"} | |
| return vision.analyze_image(image, user_id) | |
| def analyze_image_from_any_source( | |
| source: Any, | |
| user_id: str = "anonymous" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Função universal para analisar imagens de qualquer fonte. | |
| Suporta: | |
| - base64 string | |
| - array numpy (OpenCV) | |
| - bytes | |
| - file path (str) | |
| - PIL Image | |
| - Buffer de arquivo | |
| Args: | |
| source: Fonte da imagem | |
| user_id: ID do usuário | |
| Returns: | |
| Dict com resultado da análise | |
| """ | |
| vision = get_computer_vision() | |
| # Caso 1: String base64 | |
| if isinstance(source, str): | |
| # Verifica se é path de arquivo ou base64 | |
| if os.path.exists(source): | |
| return analyze_image_file(source, user_id) | |
| elif len(source) > 100 or "data:image" in source or source.startswith("/9j"): | |
| # É base64 | |
| return vision.analyze_base64(source, user_id) | |
| else: | |
| # Tenta como path | |
| return analyze_image_file(source, user_id) | |
| # Caso 2: Bytes ou bytearray | |
| elif isinstance(source, (bytes, bytearray)): | |
| try: | |
| image_array = _np.frombuffer(source, dtype=_np.uint8) | |
| image = _cv2.imdecode(image_array, _cv2.IMREAD_COLOR) | |
| if image is not None: | |
| return vision.analyze_image(image, user_id) | |
| return {"success": False, "error": "Falha ao decodificar bytes"} | |
| except Exception as e: | |
| return {"success": False, "error": f"Erro ao processar bytes: {e}"} | |
| # Caso 3: PIL Image (se disponível) | |
| elif _PIL_Image is not None and isinstance(source, _PIL_Image.Image): | |
| try: | |
| # Converte PIL para numpy array | |
| image = _cv2.cvtColor( | |
| _np.array(source.convert('RGB')), | |
| _cv2.COLOR_RGB2BGR | |
| ) | |
| return vision.analyze_image(image, user_id) | |
| except Exception as e: | |
| return {"success": False, "error": f"Erro ao converter PIL: {e}"} | |
| # Caso 4: Array numpy (imagem OpenCV) | |
| elif _np is not None and isinstance(source, _np.ndarray): | |
| try: | |
| return vision.analyze_image(source, user_id) | |
| except Exception as e: | |
| return {"success": False, "error": f"Erro ao processar array numpy: {e}"} | |
| # Caso 5: BytesIO ou similar | |
| elif hasattr(source, 'read'): | |
| try: | |
| source.seek(0) | |
| data = source.read() | |
| return analyze_image_from_any_source(data, user_id) | |
| except Exception as e: | |
| return {"success": False, "error": f"Erro ao ler stream: {e}"} | |
| return { | |
| "success": False, | |
| "error": f"Tipo de fonte não suportado: {type(source)}", | |
| "supported_types": ["base64", "bytes", "file_path", "numpy_array", "PIL_Image"] | |
| } | |
| def convert_image_to_base64( | |
| source: Any, | |
| format: str = "JPEG", | |
| quality: int = 95 | |
| ) -> Optional[str]: | |
| """ | |
| Converte imagem de qualquer fonte para base64. | |
| Args: | |
| source: Fonte da imagem | |
| format: Formato de saída (JPEG, PNG) | |
| quality: Qualidade da compressão | |
| Returns: | |
| String base64 ou None se falhar | |
| """ | |
| try: | |
| # Se já é string base64, retorna | |
| if isinstance(source, str) and len(source) > 100: | |
| return source | |
| # Se é path, lê o arquivo | |
| if isinstance(source, str) and os.path.exists(source): | |
| with open(source, "rb") as f: | |
| import base64 | |
| return base64.b64encode(f.read()).decode('utf-8') | |
| # Se é bytes | |
| if isinstance(source, (bytes, bytearray)): | |
| import base64 | |
| return base64.b64encode(source).decode('utf-8') | |
| # Se é array numpy | |
| if _np is not None and isinstance(source, _np.ndarray): | |
| success, buffer = _cv2.imencode(f".{format.lower()}", source, | |
| [_cv2.IMWRITE_JPEG_QUALITY, quality]) | |
| if success: | |
| import base64 | |
| return base64.b64encode(buffer).decode('utf-8') | |
| # Se é PIL Image | |
| if _PIL_Image is not None and isinstance(source, _PIL_Image.Image): | |
| import io, base64 | |
| buffer = io.BytesIO() | |
| source.save(buffer, format=format, quality=quality) | |
| return base64.b64encode(buffer.getvalue()).decode('utf-8') | |
| return None | |
| except Exception as e: | |
| logger.error(f"Erro ao converter para base64: {e}") | |
| return None | |
| # ============================================================ | |
| # EXPORTAÇÃO | |
| # ============================================================ | |
| __all__ = [ | |
| "VisionConfig", | |
| "ComputerVision", | |
| "ImageFeature", | |
| "get_computer_vision", | |
| "analyze_image_from_base64", | |
| "analyze_image_file", | |
| "analyze_image_from_any_source", | |
| "convert_image_to_base64", | |
| ] | |