# type: ignore """ AKIRA V21 ULTIMATE - Módulo de Visão Computacional e OCR ========================================================== Visão computacional avançada com OCR multilíngue (PT/EN), detecção de formas e objetos, e aprendizado contínuo de imagens. Features: - OCR com Tesseract (Português + Inglês) - Detecção de formas e contornos - Detecção de objetos básicos (Haar Cascades) - Aprendizado de características por imagem - Armazenamento no SQLite (tabela imagefeatures) - Suporte a imagens em base64 Author: AKIRA V21 ULTIMATE ⚠️ IMPORTANTE: Imports são lazy para permitir que o módulo seja desabilitado se OpenCV/Tesseract não estiverem disponíveis. """ import hashlib import json import time import os from typing import Dict, Any, List, Optional, Tuple from datetime import datetime from dataclasses import dataclass from loguru import logger import base64 import re # Imports lazy - só carregam quando necessário _cv2 = None _np = None _pytesseract = None _PIL_Image = None def _check_dependencies() -> Tuple[bool, str]: """Verifica se as dependências estão disponíveis""" global _cv2, _np, _pytesseract, _PIL_Image errors = [] try: import cv2 as _cv2_temp _cv2 = _cv2_temp except (ImportError, Exception) as e: errors.append(f"OpenCV: {str(e)[:50]}") try: import numpy as _np_temp _np = _np_temp except (ImportError, Exception) as e: errors.append(f"NumPy: {str(e)[:50]}") try: import pytesseract as _pytesseract_temp _pytesseract = _pytesseract_temp except (ImportError, Exception) as e: errors.append(f"pytesseract: {str(e)[:50]}") try: from PIL import Image as _PIL_Image_temp _PIL_Image = _PIL_Image_temp except (ImportError, Exception) as e: errors.append(f"Pillow: {str(e)[:50]}") if errors: return False, "; ".join(errors) return True, "OK" # Verificação inicial - módulo pode ser desabilitado _DEPENDENCIES_OK, _DEPENDENCY_ERROR = _check_dependencies() if _DEPENDENCIES_OK: logger.success("✅ Dependências de Visão Computacional carregadas") else: logger.warning(f"⚠️ Visão Computacional limitada: {_DEPENDENCY_ERROR}") # ============================================================ # CONFIGURAÇÕES # ============================================================ @dataclass class VisionConfig: """Configuração do módulo de visão computacional""" # OCR ocr_primary_lang: str = "por" # Português ocr_secondary_lang: str = "eng" # Inglês ocr_psm: int = 6 # Page segmentation mode (6 = block uniforme) ocr_oem: int = 3 # OCR Engine mode (3 = default) # OpenCV blur_kernel: int = 5 canny_threshold1: float = 50 canny_threshold2: float = 150 contour_min_area: int = 100 contour_max_area: int = 100000 # Aprendizado feature_dim: int = 128 # Dimensão do vetor de características similarity_threshold: float = 0.85 # Threshold para considerar "similar" max_stored_features: int = 1000 # Max características por usuário # Processamento max_image_size: int = 1920 # Redimensiona se maior resize_factor: float = 0.5 # Fator de redimensionamento preserve_aspect_ratio: bool = True class ImageFeature: """Classe para armazenar características de uma imagem""" def __init__( self, image_hash: str, features: list, text_detected: str, shapes: List[Dict[str, Any]], objects: List[str], user_id: str, confidence: float = 0.0, width: int = 0, height: int = 0 ): self.image_hash = image_hash self.features = features self.text_detected = text_detected self.shapes = shapes self.objects = objects self.user_id = user_id self.confidence = confidence self.width = width self.height = height self.timestamp = time.time() self.occurrences = 1 def to_dict(self) -> Dict[str, Any]: """Converte para dicionário serializável""" return { "image_hash": self.image_hash, "features": self.features, "text_detected": self.text_detected, "shapes": self.shapes, "objects": self.objects, "user_id": self.user_id, "confidence": self.confidence, "width": self.width, "height": self.height, "timestamp": self.timestamp, "occurrences": self.occurrences } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'ImageFeature': """Cria instância a partir de dicionário""" feature = cls( image_hash=data["image_hash"], features=data["features"], text_detected=data["text_detected"], shapes=data.get("shapes", []), objects=data.get("objects", []), user_id=data["user_id"], confidence=data.get("confidence", 0.0), width=data.get("width", 0), height=data.get("height", 0) ) feature.timestamp = data.get("timestamp", time.time()) feature.occurrences = data.get("occurrences", 1) return feature class ComputerVision: """ Motor de visão computacional avançada para AKIRA. Capabilities: - OCR multilíngue (PT/EN) - Detecção de formas geométricas - Análise de cores e padrões - Aprendizado de características - Comparação de similaridade entre imagens """ def __init__(self, config: Optional[VisionConfig] = None): self.config = config or VisionConfig() self.db_path = "akira.db" self._cache: Dict[str, dict] = {} self._similarity_cache: Dict[str, float] = {} # Inicializa Tesseract self._tesseract_available = self._check_tesseract() # Modelos OpenCV self._face_cascade = None self._eye_cascade = None self._plate_cascade = None if _DEPENDENCIES_OK: logger.info("ComputerVision inicializado") else: logger.warning("ComputerVision operando em modo limitado") def _check_tesseract(self) -> bool: """Verifica se Tesseract está disponível""" if not _DEPENDENCIES_OK or _pytesseract is None: return False try: _pytesseract.get_tesseract_version() logger.success("Tesseract OCR disponível") return True except Exception as e: logger.warning(f"Tesseract não disponível: {e}") return False def _init_cascades(self): """Inicializa classificadores em cascata""" if not _DEPENDENCIES_OK or _cv2 is None: return try: # Paths possíveis para classifiers cv2_data_path = _cv2.data.haarcascades # Face detection face_path = os.path.join(cv2_data_path, "haarcascade_frontalface_default.xml") if os.path.exists(face_path): self._face_cascade = _cv2.CascadeClassifier(face_path) logger.info("Face cascade carregada") # Eye detection eye_path = os.path.join(cv2_data_path, "haarcascade_eye.xml") if os.path.exists(eye_path): self._eye_cascade = _cv2.CascadeClassifier(eye_path) logger.info("Eye cascade carregada") # License plate (modelo simplificado) plate_path = os.path.join(cv2_data_path, "haarcascade_russian_plate_number.xml") if os.path.exists(plate_path): self._plate_cascade = _cv2.CascadeClassifier(plate_path) logger.info("Plate cascade carregada") except Exception as e: logger.warning(f"Erro ao carregar cascades: {e}") def decode_base64_image(self, base64_string: str) -> Optional[Any]: """Decodifica imagem em base64 para array numpy""" if not _DEPENDENCIES_OK: logger.error("Dependências de CV não disponíveis") return None if not base64_string or len(base64_string) < 100: logger.error(f"Base64 inválido: comprimento {len(base64_string) if base64_string else 0}") return None try: # Remove prefix se existir (data:image/png;base64,) if "base64," in base64_string: base64_string = base64_string.split("base64,")[1] # Remove espaços e quebras de linha base64_string = base64_string.strip().replace('\n', '').replace('\r', '') # Decodifica logger.debug(f"Decodificando base64 ({len(base64_string)} chars)") image_data = base64.b64decode(base64_string) logger.debug(f"Bytes decodificados: {len(image_data)}") # Converte para numpy array image_array = _np.frombuffer(image_data, dtype=_np.uint8) logger.debug(f"Array numpy criado: shape {image_array.shape}") # Decodifica com OpenCV image = _cv2.imdecode(image_array, _cv2.IMREAD_COLOR) if image is None: logger.error("OpenCV retornou None - formato não suportado ou dados corrompidos") return None logger.success(f"✅ Imagem decodificada: {image.shape[1]}x{image.shape[0]} px") return image except Exception as e: logger.error(f"Erro ao decodificar imagem: {e}") import traceback logger.error(f"Stack trace: {traceback.format_exc()}") return None def preprocess_image(self, image: Any) -> Any: """Pré-processa imagem para análise""" if not _DEPENDENCIES_OK or image is None: return None # Redimensiona se muito grande height, width = image.shape[:2] if max(height, width) > self.config.max_image_size: scale = self.config.max_image_size / max(height, width) image = _cv2.resize( image, None, fx=scale, fy=scale, interpolation=_cv2.INTER_AREA ) # Converte para RGB se necessário if len(image.shape) == 2: image = _cv2.cvtColor(image, _cv2.COLOR_GRAY2BGR) elif image.shape[2] == 4: image = _cv2.cvtColor(image, _cv2.COLOR_BGRA2BGR) elif image.shape[2] == 3: pass # Já BGR else: image = _cv2.cvtColor(image, _cv2.COLOR_GRAY2BGR) return image def extract_features(self, image: Any) -> List[float]: """Extrai vetor de características da imagem""" if not _DEPENDENCIES_OK or image is None: return [0.0] * self.config.feature_dim features = [] # 1. Histograma de cores (HSV) hsv = _cv2.cvtColor(image, _cv2.COLOR_BGR2HSV) hist_h = _cv2.calcHist([hsv], [0], None, [32], [0, 256]).flatten() hist_s = _cv2.calcHist([hsv], [1], None, [32], [0, 256]).flatten() hist_v = _cv2.calcHist([hsv], [2], None, [32], [0, 256]).flatten() # Normaliza histogramas hist_h = hist_h / (hist_h.sum() + 1e-6) hist_s = hist_s / (hist_s.sum() + 1e-6) hist_v = hist_v / (hist_v.sum() + 1e-6) features.extend(hist_h.tolist()) features.extend(hist_s.tolist()) features.extend(hist_v.tolist()) # 2. Textura (LBP simplificado) gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY) # Estatísticas de textura mean, std = _cv2.meanStdDev(gray) features.append(float(mean[0]) / 255.0) features.append(float(std[0]) / 255.0) # 3. Dimensões normalizadas h, w = image.shape[:2] features.append(h / max(h, w)) features.append(w / max(h, w)) features.append(h * w / (1920 * 1080)) # Area ratio # 4. Bordas (Canny) edges = _cv2.Canny(gray, self.config.canny_threshold1, self.config.canny_threshold2) edge_density = float(_np.sum(edges > 0) / (edges.size + 1e-6)) features.append(edge_density) # 5. Saturation e Brightness médios avg_saturation = float(_np.mean(hsv[:, :, 1]) / 255.0) avg_brightness = float(_np.mean(hsv[:, :, 2]) / 255.0) features.append(avg_saturation) features.append(avg_brightness) # Garante dimensão fixa current_len = len(features) target_len = int(self.config.feature_dim) if current_len < target_len: features.extend([0.0] * (target_len - current_len)) return [float(x) for x in features[:target_len]] def detect_shapes(self, image: Any) -> List[Dict[str, Any]]: """Detecta formas geométricas na imagem""" if not _DEPENDENCIES_OK or image is None: return [] shapes = [] gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY) blur = _cv2.GaussianBlur(gray, (self.config.blur_kernel, self.config.blur_kernel), 0) # Detecção de bordas edges = _cv2.Canny(blur, self.config.canny_threshold1, self.config.canny_threshold2) # Contornos contours, _ = _cv2.findContours(edges, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE) for contour in contours: area = _cv2.contourArea(contour) # Filtra por área if area < self.config.contour_min_area or area > self.config.contour_max_area: continue # Aproxima forma peri = _cv2.arcLength(contour, True) approx = _cv2.approxPolyDP(contour, 0.02 * peri, True) # Classifica forma x, y, w, h = _cv2.boundingRect(approx) aspect_ratio = w / (h + 1e-6) num_vertices = len(approx) shape_type = "desconhecido" if num_vertices == 3: shape_type = "triangulo" elif num_vertices == 4: if 0.9 <= aspect_ratio <= 1.1: shape_type = "quadrado" else: shape_type = "retangulo" elif num_vertices == 5: shape_type = "pentagono" elif num_vertices == 6: shape_type = "hexagono" elif num_vertices > 6: # Verifica se é círculo (cx, cy), radius = _cv2.minEnclosingCircle(contour) circle_area = 3.14159 * radius * radius if 0.7 <= area / circle_area <= 1.3: shape_type = "circulo" else: shape_type = "poligono_irregular" if shape_type != "desconhecido": shapes.append({ "tipo": shape_type, "vertices": num_vertices, "area": float(area), "centro": {"x": int(x + w/2), "y": int(y + h/2)}, "dimensoes": {"largura": w, "altura": h}, "aspect_ratio": round(aspect_ratio, 3) }) return shapes def detect_objects_cascade(self, image: Any) -> List[Dict[str, Any]]: """Detecta objetos usando classificadores em cascata""" if not _DEPENDENCIES_OK or image is None: return [] objects = [] if self._face_cascade is None: self._init_cascades() gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY) # Detecta faces if self._face_cascade is not None: faces = self._face_cascade.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30) ) for i, (x, y, w, h) in enumerate(faces): objects.append({ "tipo": "face", "id": i, "posicao": {"x": int(x), "y": int(y)}, "dimensoes": {"largura": w, "altura": h}, "confianca": 0.85 }) # Detecta olhos if self._eye_cascade is not None: eyes = self._eye_cascade.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(15, 15) ) for i, (x, y, w, h) in enumerate(eyes): objects.append({ "tipo": "olho", "id": i, "posicao": {"x": int(x), "y": int(y)}, "dimensoes": {"largura": w, "altura": h}, "confianca": 0.80 }) # Detecta placas (simplificado) if self._plate_cascade is not None: plates = self._plate_cascade.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(60, 20) ) for i, (x, y, w, h) in enumerate(plates): objects.append({ "tipo": "placa", "id": i, "posicao": {"x": int(x), "y": int(y)}, "dimensoes": {"largura": w, "altura": h}, "confianca": 0.75 }) return objects def perform_ocr( self, image: Any, languages: Optional[List[str]] = None ) -> Dict[str, Any]: """Realiza OCR na imagem""" if not _DEPENDENCIES_OK or not self._tesseract_available or image is None: return { "success": False, "text": "", "confidence": 0.0, "languages": [], "error": "Tesseract não disponível" } if languages is None: languages = [self.config.ocr_primary_lang, self.config.ocr_secondary_lang] # Converte para grayscale gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY) # Aplica threshold adaptativo para melhorar OCR thresh = _cv2.adaptiveThreshold( gray, 255, _cv2.ADAPTIVE_THRESH_GAUSSIAN_C, _cv2.THRESH_BINARY_INV, 11, 2 ) # OCR config ocr_config = f"--psm {self.config.ocr_psm} --oem {self.config.ocr_oem}" # Executa OCR com idiomas combinados lang_code = "+".join(languages) try: text = _pytesseract.image_to_string(thresh, lang=lang_code, config=ocr_config) # Limpa texto text = self._clean_ocr_text(text) # Calcula confiança data = _pytesseract.image_to_data(thresh, lang=lang_code, config=ocr_config, output_type=dict) confs = [int(d) for d in data.get("conf", []) if int(d) > 0] avg_confidence = float(_np.mean(confs)) if confs else 0.0 return { "success": True, "text": text.strip(), "confidence": avg_confidence / 100.0, "languages": languages, "char_count": len(text), "word_count": len(text.split()) if text else 0 } except Exception as e: logger.error(f"OCR error: {e}") return { "success": False, "text": "", "confidence": 0.0, "languages": languages, "error": str(e) } def _clean_ocr_text(self, text: str) -> str: """Limpa texto do OCR""" # Remove caracteres especiais e ruído text = re.sub(r'[^\w\sáàâãéèêíïóôõúüçÁÀÂÃÉÈÊÍÏÓÔÕÚÜÇ\n\r\t]', '', text) # Normaliza espaços text = re.sub(r'\s+', ' ', text) # Remove linhas muito curtas ou vazias lines = [l.strip() for l in text.split('\n') if len(l.strip()) > 1] return '\n'.join(lines) def detect_text_regions(self, image: Any) -> List[Dict[str, Any]]: """Detecta regiões prováveis de texto""" if not _DEPENDENCIES_OK or image is None: return [] regions = [] gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY) blur = _cv2.GaussianBlur(gray, (5, 5), 0) # Detecta bordas edges = _cv2.Canny(blur, 50, 150) # Encontra contornos contours, _ = _cv2.findContours(edges, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE) for contour in contours: x, y, w, h = _cv2.boundingRect(contour) # Filtra regiões muito pequenas ou muito grandes if w < 20 or h < 10 or w > image.shape[1] or h > image.shape[0]: continue # Calcula aspecto ratio típico de texto aspect_ratio = w / (h + 1e-6) regions.append({ "x": int(x), "y": int(y), "width": int(w), "height": int(h), "aspect_ratio": round(aspect_ratio, 3), "area": float(w * h) }) # Ordena por posição (esquerda para direita, cima para baixo) regions.sort(key=lambda r: (r["y"] // 50, r["x"])) return regions def analyze_image( self, image: Any, user_id: str = "anonymous", include_ocr: bool = True, include_shapes: bool = True, include_objects: bool = True ) -> Dict[str, Any]: """Análise completa da imagem""" start_time = time.time() if not _DEPENDENCIES_OK or image is None: return { "success": False, "error": "Dependências não disponíveis", "dependencies_ok": False } # Pré-processa processed = self.preprocess_image(image) if processed is None: return {"success": False, "error": "Falha no pré-processamento"} h, w = processed.shape[:2] result: Dict[str, Any] = { "success": True, "timestamp": datetime.now().isoformat(), "dimensions": {"width": w, "height": h}, "user_id": user_id, "processing_time": 0, "dependencies_ok": True } shapes = [] text_detected = "" # OCR if include_ocr: ocr_result = self.perform_ocr(processed) result["ocr"] = ocr_result text_detected = ocr_result.get("text", "") result["text_detected"] = text_detected # Formas geométricas if include_shapes: shapes = self.detect_shapes(processed) result["shapes"] = shapes result["shape_count"] = len(shapes) # Objetos (face, olhos, etc.) if include_objects: objects = self.detect_objects_cascade(processed) result["objects"] = objects result["object_count"] = len(objects) # Extrai características features = self.extract_features(processed) result["features"] = { "dimension": len(features), "mean": float(sum(features) / len(features)) if features else 0.0, "std": 0.0 } # Calcula hash da imagem image_hash = self._compute_image_hash(image) result["image_hash"] = image_hash # Cria registro de aprendizado feature_record = ImageFeature( image_hash=image_hash, features=features, text_detected=text_detected, shapes=shapes if include_shapes else [], objects=[o["tipo"] for o in result.get("objects", [])], user_id=user_id, confidence=result.get("ocr", {}).get("confidence", 0.5), width=w, height=h ) # Salva características self._save_feature(feature_record) # Verifica se já conhece esta imagem similarity = self._find_similar(features, user_id) result["is_known"] = similarity > self.config.similarity_threshold result["similarity_score"] = similarity result["processing_time"] = round(time.time() - start_time, 3) return result def analyze_base64( self, base64_string: str, user_id: str = "anonymous" ) -> Dict[str, Any]: """Analisa imagem a partir de string base64 ou bytes diretos""" try: logger.info(f"--- [VISION DEBUG] Recebida requisição de análise para user: {user_id} ---") logger.debug(f"[VISION DEBUG] Base64 length: {len(base64_string) if base64_string else 0}") # Tenta decodificar como base64 image = self.decode_base64_image(base64_string) if image is None: logger.error("[VISION DEBUG] Falha ao decodificar imagem (image is None)") return { "success": False, "error": "Falha ao decodificar imagem (formato inválido ou corrompido)" } logger.info(f"[VISION DEBUG] Imagem decodificada com sucesso. Shape: {image.shape}") result = self.analyze_image(image, user_id) logger.info(f"[VISION DEBUG] Análise concluída. Sucesso: {result.get('success')}") return result except Exception as e: logger.exception(f"[VISION DEBUG] Erro crítico na análise de visão: {e}") return { "success": False, "error": f"Erro interno na análise de visão: {str(e)}" } def _compute_image_hash(self, image: Any) -> str: """Computa hash perceptual da imagem""" if not _DEPENDENCIES_OK or image is None: return "unknown" # Reduz para tamanho pequeno para hash resized = _cv2.resize(image, (32, 32), interpolation=_cv2.INTER_AREA) gray = _cv2.cvtColor(resized, _cv2.COLOR_BGR2GRAY) # Hash simples baseado em diferenças hash_str = "" mean_val = float(_np.mean(gray)) for i in range(32): for j in range(32): if gray[i, j] > mean_val: hash_str += "1" else: hash_str += "0" return hashlib.md5(hash_str.encode()).hexdigest() def _save_feature(self, feature: ImageFeature): """Salva características no cache e banco""" self._cache[feature.image_hash] = feature.to_dict() # Salva no banco SQLite try: import sqlite3 conn = sqlite3.connect(self.db_path) c = conn.cursor() # Cria tabela se não existir c.execute(""" CREATE TABLE IF NOT EXISTS imagefeatures ( image_hash TEXT PRIMARY KEY, features BLOB, text_detected TEXT, shapes TEXT, objects TEXT, user_id TEXT, confidence REAL, width INTEGER, height INTEGER, timestamp REAL, occurrences INTEGER DEFAULT 1 ) """) # Verifica se já existe c.execute("SELECT occurrences FROM imagefeatures WHERE image_hash = ?", (feature.image_hash,)) existing = c.fetchone() if existing: # Atualiza ocorrências c.execute(""" UPDATE imagefeatures SET occurrences = occurrences + 1, timestamp = ? WHERE image_hash = ? """, (time.time(), feature.image_hash)) else: # Insere novo features_json = json.dumps(feature.to_dict()) c.execute(""" INSERT INTO imagefeatures VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( feature.image_hash, features_json, feature.text_detected, json.dumps(feature.shapes), json.dumps(feature.objects), feature.user_id, feature.confidence, feature.width, feature.height, feature.timestamp, feature.occurrences )) conn.commit() conn.close() except Exception as e: logger.warning(f"Erro ao salvar no banco: {e}") def _find_similar( self, features: List[float], user_id: str, limit: int = 5 ) -> float: """Encontra imagem similar e retorna score de similaridade""" max_similarity = 0.0 try: conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute("SELECT features FROM imagefeatures WHERE user_id = ?", (user_id,)) rows = c.fetchall() conn.close() for row in rows: try: stored_dict = json.loads(row[0]) stored_features = stored_dict["features"] # Calcula similaridade (cosine similarity) dot = sum(f1 * f2 for f1, f2 in zip(features, stored_features)) norm1 = sum(f * f for f in features) ** 0.5 norm2 = sum(f * f for f in stored_features) ** 0.5 similarity = dot / (norm1 * norm2 + 1e-6) if similarity > max_similarity: max_similarity = similarity except Exception: continue except Exception as e: logger.warning(f"Erro ao buscar similar: {e}") return float(max_similarity) def get_learned_images(self, user_id: str) -> List[Dict[str, Any]]: """Obtém lista de imagens aprendidas pelo usuário""" images = [] try: conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(""" SELECT image_hash, text_detected, objects, confidence, timestamp, occurrences FROM imagefeatures WHERE user_id = ? ORDER BY timestamp DESC LIMIT 50 """, (user_id,)) rows = c.fetchall() conn.close() for row in rows: try: objects_list = json.loads(row[2]) if row[2] else [] except: objects_list = [] images.append({ "hash": row[0], "text": row[1], "objects": objects_list, "confidence": row[3], "timestamp": row[4], "occurrences": row[5] }) except Exception as e: logger.warning(f"Erro ao obter imagens: {e}") return images def get_stats(self) -> Dict[str, Any]: """Obtém estatísticas do módulo""" total_images = len(self._cache) # Conta do banco try: conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute("SELECT COUNT(*) FROM imagefeatures") db_count = c.fetchone()[0] conn.close() except: db_count = 0 return { "cached_images": total_images, "database_images": db_count, "dependencies_ok": _DEPENDENCIES_OK, "tesseract_available": self._tesseract_available, "config": { "ocr_langs": [self.config.ocr_primary_lang, self.config.ocr_secondary_lang], "feature_dim": self.config.feature_dim, "similarity_threshold": self.config.similarity_threshold } } # ============================================================ # FUNÇÕES DE CONVENIÊNCIA # ============================================================ _vision_instance: Optional[ComputerVision] = None def get_computer_vision(config: Optional[VisionConfig] = None) -> ComputerVision: """Obtém instância singleton do ComputerVision""" global _vision_instance if _vision_instance is None: _vision_instance = ComputerVision(config) return _vision_instance def analyze_image_from_base64( base64_string: str, user_id: str = "anonymous" ) -> Dict[str, Any]: """Função de conveniência para analisar imagem base64""" vision = get_computer_vision() return vision.analyze_base64(base64_string, user_id) def analyze_image_file( file_path: str, user_id: str = "anonymous" ) -> Dict[str, Any]: """Função de conveniência para analisar arquivo de imagem""" if not _DEPENDENCIES_OK or _cv2 is None: return {"success": False, "error": "OpenCV não disponível"} vision = get_computer_vision() image = _cv2.imread(file_path) if image is None: return {"success": False, "error": "Falha ao carregar imagem"} return vision.analyze_image(image, user_id) def analyze_image_from_any_source( source: Any, user_id: str = "anonymous" ) -> Dict[str, Any]: """ Função universal para analisar imagens de qualquer fonte. Suporta: - base64 string - array numpy (OpenCV) - bytes - file path (str) - PIL Image - Buffer de arquivo Args: source: Fonte da imagem user_id: ID do usuário Returns: Dict com resultado da análise """ vision = get_computer_vision() # Caso 1: String base64 if isinstance(source, str): # Verifica se é path de arquivo ou base64 if os.path.exists(source): return analyze_image_file(source, user_id) elif len(source) > 100 or "data:image" in source or source.startswith("/9j"): # É base64 return vision.analyze_base64(source, user_id) else: # Tenta como path return analyze_image_file(source, user_id) # Caso 2: Bytes ou bytearray elif isinstance(source, (bytes, bytearray)): try: image_array = _np.frombuffer(source, dtype=_np.uint8) image = _cv2.imdecode(image_array, _cv2.IMREAD_COLOR) if image is not None: return vision.analyze_image(image, user_id) return {"success": False, "error": "Falha ao decodificar bytes"} except Exception as e: return {"success": False, "error": f"Erro ao processar bytes: {e}"} # Caso 3: PIL Image (se disponível) elif _PIL_Image is not None and isinstance(source, _PIL_Image.Image): try: # Converte PIL para numpy array image = _cv2.cvtColor( _np.array(source.convert('RGB')), _cv2.COLOR_RGB2BGR ) return vision.analyze_image(image, user_id) except Exception as e: return {"success": False, "error": f"Erro ao converter PIL: {e}"} # Caso 4: Array numpy (imagem OpenCV) elif _np is not None and isinstance(source, _np.ndarray): try: return vision.analyze_image(source, user_id) except Exception as e: return {"success": False, "error": f"Erro ao processar array numpy: {e}"} # Caso 5: BytesIO ou similar elif hasattr(source, 'read'): try: source.seek(0) data = source.read() return analyze_image_from_any_source(data, user_id) except Exception as e: return {"success": False, "error": f"Erro ao ler stream: {e}"} return { "success": False, "error": f"Tipo de fonte não suportado: {type(source)}", "supported_types": ["base64", "bytes", "file_path", "numpy_array", "PIL_Image"] } def convert_image_to_base64( source: Any, format: str = "JPEG", quality: int = 95 ) -> Optional[str]: """ Converte imagem de qualquer fonte para base64. Args: source: Fonte da imagem format: Formato de saída (JPEG, PNG) quality: Qualidade da compressão Returns: String base64 ou None se falhar """ try: # Se já é string base64, retorna if isinstance(source, str) and len(source) > 100: return source # Se é path, lê o arquivo if isinstance(source, str) and os.path.exists(source): with open(source, "rb") as f: import base64 return base64.b64encode(f.read()).decode('utf-8') # Se é bytes if isinstance(source, (bytes, bytearray)): import base64 return base64.b64encode(source).decode('utf-8') # Se é array numpy if _np is not None and isinstance(source, _np.ndarray): success, buffer = _cv2.imencode(f".{format.lower()}", source, [_cv2.IMWRITE_JPEG_QUALITY, quality]) if success: import base64 return base64.b64encode(buffer).decode('utf-8') # Se é PIL Image if _PIL_Image is not None and isinstance(source, _PIL_Image.Image): import io, base64 buffer = io.BytesIO() source.save(buffer, format=format, quality=quality) return base64.b64encode(buffer.getvalue()).decode('utf-8') return None except Exception as e: logger.error(f"Erro ao converter para base64: {e}") return None # ============================================================ # EXPORTAÇÃO # ============================================================ __all__ = [ "VisionConfig", "ComputerVision", "ImageFeature", "get_computer_vision", "analyze_image_from_base64", "analyze_image_file", "analyze_image_from_any_source", "convert_image_to_base64", ]