akira-index / modules /computervision.py
akra35567's picture
Upload 21 files
1d419ef verified
# type: ignore
"""
AKIRA V21 ULTIMATE - Módulo de Visão Computacional e OCR
==========================================================
Visão computacional avançada com OCR multilíngue (PT/EN),
detecção de formas e objetos, e aprendizado contínuo de imagens.
Features:
- OCR com Tesseract (Português + Inglês)
- Detecção de formas e contornos
- Detecção de objetos básicos (Haar Cascades)
- Aprendizado de características por imagem
- Armazenamento no SQLite (tabela imagefeatures)
- Suporte a imagens em base64
Author: AKIRA V21 ULTIMATE
⚠️ IMPORTANTE: Imports são lazy para permitir que o módulo
seja desabilitado se OpenCV/Tesseract não estiverem disponíveis.
"""
import hashlib
import json
import time
import os
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime
from dataclasses import dataclass
from loguru import logger
import base64
import re
# Imports lazy - só carregam quando necessário
_cv2 = None
_np = None
_pytesseract = None
_PIL_Image = None
def _check_dependencies() -> Tuple[bool, str]:
"""Verifica se as dependências estão disponíveis"""
global _cv2, _np, _pytesseract, _PIL_Image
errors = []
try:
import cv2 as _cv2_temp
_cv2 = _cv2_temp
except (ImportError, Exception) as e:
errors.append(f"OpenCV: {str(e)[:50]}")
try:
import numpy as _np_temp
_np = _np_temp
except (ImportError, Exception) as e:
errors.append(f"NumPy: {str(e)[:50]}")
try:
import pytesseract as _pytesseract_temp
_pytesseract = _pytesseract_temp
except (ImportError, Exception) as e:
errors.append(f"pytesseract: {str(e)[:50]}")
try:
from PIL import Image as _PIL_Image_temp
_PIL_Image = _PIL_Image_temp
except (ImportError, Exception) as e:
errors.append(f"Pillow: {str(e)[:50]}")
if errors:
return False, "; ".join(errors)
return True, "OK"
# Verificação inicial - módulo pode ser desabilitado
_DEPENDENCIES_OK, _DEPENDENCY_ERROR = _check_dependencies()
if _DEPENDENCIES_OK:
logger.success("✅ Dependências de Visão Computacional carregadas")
else:
logger.warning(f"⚠️ Visão Computacional limitada: {_DEPENDENCY_ERROR}")
# ============================================================
# CONFIGURAÇÕES
# ============================================================
@dataclass
class VisionConfig:
"""Configuração do módulo de visão computacional"""
# OCR
ocr_primary_lang: str = "por" # Português
ocr_secondary_lang: str = "eng" # Inglês
ocr_psm: int = 6 # Page segmentation mode (6 = block uniforme)
ocr_oem: int = 3 # OCR Engine mode (3 = default)
# OpenCV
blur_kernel: int = 5
canny_threshold1: float = 50
canny_threshold2: float = 150
contour_min_area: int = 100
contour_max_area: int = 100000
# Aprendizado
feature_dim: int = 128 # Dimensão do vetor de características
similarity_threshold: float = 0.85 # Threshold para considerar "similar"
max_stored_features: int = 1000 # Max características por usuário
# Processamento
max_image_size: int = 1920 # Redimensiona se maior
resize_factor: float = 0.5 # Fator de redimensionamento
preserve_aspect_ratio: bool = True
class ImageFeature:
"""Classe para armazenar características de uma imagem"""
def __init__(
self,
image_hash: str,
features: list,
text_detected: str,
shapes: List[Dict[str, Any]],
objects: List[str],
user_id: str,
confidence: float = 0.0,
width: int = 0,
height: int = 0
):
self.image_hash = image_hash
self.features = features
self.text_detected = text_detected
self.shapes = shapes
self.objects = objects
self.user_id = user_id
self.confidence = confidence
self.width = width
self.height = height
self.timestamp = time.time()
self.occurrences = 1
def to_dict(self) -> Dict[str, Any]:
"""Converte para dicionário serializável"""
return {
"image_hash": self.image_hash,
"features": self.features,
"text_detected": self.text_detected,
"shapes": self.shapes,
"objects": self.objects,
"user_id": self.user_id,
"confidence": self.confidence,
"width": self.width,
"height": self.height,
"timestamp": self.timestamp,
"occurrences": self.occurrences
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ImageFeature':
"""Cria instância a partir de dicionário"""
feature = cls(
image_hash=data["image_hash"],
features=data["features"],
text_detected=data["text_detected"],
shapes=data.get("shapes", []),
objects=data.get("objects", []),
user_id=data["user_id"],
confidence=data.get("confidence", 0.0),
width=data.get("width", 0),
height=data.get("height", 0)
)
feature.timestamp = data.get("timestamp", time.time())
feature.occurrences = data.get("occurrences", 1)
return feature
class ComputerVision:
"""
Motor de visão computacional avançada para AKIRA.
Capabilities:
- OCR multilíngue (PT/EN)
- Detecção de formas geométricas
- Análise de cores e padrões
- Aprendizado de características
- Comparação de similaridade entre imagens
"""
def __init__(self, config: Optional[VisionConfig] = None):
self.config = config or VisionConfig()
self.db_path = "akira.db"
self._cache: Dict[str, dict] = {}
self._similarity_cache: Dict[str, float] = {}
# Inicializa Tesseract
self._tesseract_available = self._check_tesseract()
# Modelos OpenCV
self._face_cascade = None
self._eye_cascade = None
self._plate_cascade = None
if _DEPENDENCIES_OK:
logger.info("ComputerVision inicializado")
else:
logger.warning("ComputerVision operando em modo limitado")
def _check_tesseract(self) -> bool:
"""Verifica se Tesseract está disponível"""
if not _DEPENDENCIES_OK or _pytesseract is None:
return False
try:
_pytesseract.get_tesseract_version()
logger.success("Tesseract OCR disponível")
return True
except Exception as e:
logger.warning(f"Tesseract não disponível: {e}")
return False
def _init_cascades(self):
"""Inicializa classificadores em cascata"""
if not _DEPENDENCIES_OK or _cv2 is None:
return
try:
# Paths possíveis para classifiers
cv2_data_path = _cv2.data.haarcascades
# Face detection
face_path = os.path.join(cv2_data_path, "haarcascade_frontalface_default.xml")
if os.path.exists(face_path):
self._face_cascade = _cv2.CascadeClassifier(face_path)
logger.info("Face cascade carregada")
# Eye detection
eye_path = os.path.join(cv2_data_path, "haarcascade_eye.xml")
if os.path.exists(eye_path):
self._eye_cascade = _cv2.CascadeClassifier(eye_path)
logger.info("Eye cascade carregada")
# License plate (modelo simplificado)
plate_path = os.path.join(cv2_data_path, "haarcascade_russian_plate_number.xml")
if os.path.exists(plate_path):
self._plate_cascade = _cv2.CascadeClassifier(plate_path)
logger.info("Plate cascade carregada")
except Exception as e:
logger.warning(f"Erro ao carregar cascades: {e}")
def decode_base64_image(self, base64_string: str) -> Optional[Any]:
"""Decodifica imagem em base64 para array numpy"""
if not _DEPENDENCIES_OK:
logger.error("Dependências de CV não disponíveis")
return None
if not base64_string or len(base64_string) < 100:
logger.error(f"Base64 inválido: comprimento {len(base64_string) if base64_string else 0}")
return None
try:
# Remove prefix se existir (data:image/png;base64,)
if "base64," in base64_string:
base64_string = base64_string.split("base64,")[1]
# Remove espaços e quebras de linha
base64_string = base64_string.strip().replace('\n', '').replace('\r', '')
# Decodifica
logger.debug(f"Decodificando base64 ({len(base64_string)} chars)")
image_data = base64.b64decode(base64_string)
logger.debug(f"Bytes decodificados: {len(image_data)}")
# Converte para numpy array
image_array = _np.frombuffer(image_data, dtype=_np.uint8)
logger.debug(f"Array numpy criado: shape {image_array.shape}")
# Decodifica com OpenCV
image = _cv2.imdecode(image_array, _cv2.IMREAD_COLOR)
if image is None:
logger.error("OpenCV retornou None - formato não suportado ou dados corrompidos")
return None
logger.success(f"✅ Imagem decodificada: {image.shape[1]}x{image.shape[0]} px")
return image
except Exception as e:
logger.error(f"Erro ao decodificar imagem: {e}")
import traceback
logger.error(f"Stack trace: {traceback.format_exc()}")
return None
def preprocess_image(self, image: Any) -> Any:
"""Pré-processa imagem para análise"""
if not _DEPENDENCIES_OK or image is None:
return None
# Redimensiona se muito grande
height, width = image.shape[:2]
if max(height, width) > self.config.max_image_size:
scale = self.config.max_image_size / max(height, width)
image = _cv2.resize(
image,
None,
fx=scale,
fy=scale,
interpolation=_cv2.INTER_AREA
)
# Converte para RGB se necessário
if len(image.shape) == 2:
image = _cv2.cvtColor(image, _cv2.COLOR_GRAY2BGR)
elif image.shape[2] == 4:
image = _cv2.cvtColor(image, _cv2.COLOR_BGRA2BGR)
elif image.shape[2] == 3:
pass # Já BGR
else:
image = _cv2.cvtColor(image, _cv2.COLOR_GRAY2BGR)
return image
def extract_features(self, image: Any) -> List[float]:
"""Extrai vetor de características da imagem"""
if not _DEPENDENCIES_OK or image is None:
return [0.0] * self.config.feature_dim
features = []
# 1. Histograma de cores (HSV)
hsv = _cv2.cvtColor(image, _cv2.COLOR_BGR2HSV)
hist_h = _cv2.calcHist([hsv], [0], None, [32], [0, 256]).flatten()
hist_s = _cv2.calcHist([hsv], [1], None, [32], [0, 256]).flatten()
hist_v = _cv2.calcHist([hsv], [2], None, [32], [0, 256]).flatten()
# Normaliza histogramas
hist_h = hist_h / (hist_h.sum() + 1e-6)
hist_s = hist_s / (hist_s.sum() + 1e-6)
hist_v = hist_v / (hist_v.sum() + 1e-6)
features.extend(hist_h.tolist())
features.extend(hist_s.tolist())
features.extend(hist_v.tolist())
# 2. Textura (LBP simplificado)
gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY)
# Estatísticas de textura
mean, std = _cv2.meanStdDev(gray)
features.append(float(mean[0]) / 255.0)
features.append(float(std[0]) / 255.0)
# 3. Dimensões normalizadas
h, w = image.shape[:2]
features.append(h / max(h, w))
features.append(w / max(h, w))
features.append(h * w / (1920 * 1080)) # Area ratio
# 4. Bordas (Canny)
edges = _cv2.Canny(gray, self.config.canny_threshold1, self.config.canny_threshold2)
edge_density = float(_np.sum(edges > 0) / (edges.size + 1e-6))
features.append(edge_density)
# 5. Saturation e Brightness médios
avg_saturation = float(_np.mean(hsv[:, :, 1]) / 255.0)
avg_brightness = float(_np.mean(hsv[:, :, 2]) / 255.0)
features.append(avg_saturation)
features.append(avg_brightness)
# Garante dimensão fixa
current_len = len(features)
target_len = int(self.config.feature_dim)
if current_len < target_len:
features.extend([0.0] * (target_len - current_len))
return [float(x) for x in features[:target_len]]
def detect_shapes(self, image: Any) -> List[Dict[str, Any]]:
"""Detecta formas geométricas na imagem"""
if not _DEPENDENCIES_OK or image is None:
return []
shapes = []
gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY)
blur = _cv2.GaussianBlur(gray, (self.config.blur_kernel, self.config.blur_kernel), 0)
# Detecção de bordas
edges = _cv2.Canny(blur, self.config.canny_threshold1, self.config.canny_threshold2)
# Contornos
contours, _ = _cv2.findContours(edges, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
area = _cv2.contourArea(contour)
# Filtra por área
if area < self.config.contour_min_area or area > self.config.contour_max_area:
continue
# Aproxima forma
peri = _cv2.arcLength(contour, True)
approx = _cv2.approxPolyDP(contour, 0.02 * peri, True)
# Classifica forma
x, y, w, h = _cv2.boundingRect(approx)
aspect_ratio = w / (h + 1e-6)
num_vertices = len(approx)
shape_type = "desconhecido"
if num_vertices == 3:
shape_type = "triangulo"
elif num_vertices == 4:
if 0.9 <= aspect_ratio <= 1.1:
shape_type = "quadrado"
else:
shape_type = "retangulo"
elif num_vertices == 5:
shape_type = "pentagono"
elif num_vertices == 6:
shape_type = "hexagono"
elif num_vertices > 6:
# Verifica se é círculo
(cx, cy), radius = _cv2.minEnclosingCircle(contour)
circle_area = 3.14159 * radius * radius
if 0.7 <= area / circle_area <= 1.3:
shape_type = "circulo"
else:
shape_type = "poligono_irregular"
if shape_type != "desconhecido":
shapes.append({
"tipo": shape_type,
"vertices": num_vertices,
"area": float(area),
"centro": {"x": int(x + w/2), "y": int(y + h/2)},
"dimensoes": {"largura": w, "altura": h},
"aspect_ratio": round(aspect_ratio, 3)
})
return shapes
def detect_objects_cascade(self, image: Any) -> List[Dict[str, Any]]:
"""Detecta objetos usando classificadores em cascata"""
if not _DEPENDENCIES_OK or image is None:
return []
objects = []
if self._face_cascade is None:
self._init_cascades()
gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY)
# Detecta faces
if self._face_cascade is not None:
faces = self._face_cascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30)
)
for i, (x, y, w, h) in enumerate(faces):
objects.append({
"tipo": "face",
"id": i,
"posicao": {"x": int(x), "y": int(y)},
"dimensoes": {"largura": w, "altura": h},
"confianca": 0.85
})
# Detecta olhos
if self._eye_cascade is not None:
eyes = self._eye_cascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(15, 15)
)
for i, (x, y, w, h) in enumerate(eyes):
objects.append({
"tipo": "olho",
"id": i,
"posicao": {"x": int(x), "y": int(y)},
"dimensoes": {"largura": w, "altura": h},
"confianca": 0.80
})
# Detecta placas (simplificado)
if self._plate_cascade is not None:
plates = self._plate_cascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(60, 20)
)
for i, (x, y, w, h) in enumerate(plates):
objects.append({
"tipo": "placa",
"id": i,
"posicao": {"x": int(x), "y": int(y)},
"dimensoes": {"largura": w, "altura": h},
"confianca": 0.75
})
return objects
def perform_ocr(
self,
image: Any,
languages: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Realiza OCR na imagem"""
if not _DEPENDENCIES_OK or not self._tesseract_available or image is None:
return {
"success": False,
"text": "",
"confidence": 0.0,
"languages": [],
"error": "Tesseract não disponível"
}
if languages is None:
languages = [self.config.ocr_primary_lang, self.config.ocr_secondary_lang]
# Converte para grayscale
gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY)
# Aplica threshold adaptativo para melhorar OCR
thresh = _cv2.adaptiveThreshold(
gray, 255,
_cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
_cv2.THRESH_BINARY_INV,
11, 2
)
# OCR config
ocr_config = f"--psm {self.config.ocr_psm} --oem {self.config.ocr_oem}"
# Executa OCR com idiomas combinados
lang_code = "+".join(languages)
try:
text = _pytesseract.image_to_string(thresh, lang=lang_code, config=ocr_config)
# Limpa texto
text = self._clean_ocr_text(text)
# Calcula confiança
data = _pytesseract.image_to_data(thresh, lang=lang_code, config=ocr_config, output_type=dict)
confs = [int(d) for d in data.get("conf", []) if int(d) > 0]
avg_confidence = float(_np.mean(confs)) if confs else 0.0
return {
"success": True,
"text": text.strip(),
"confidence": avg_confidence / 100.0,
"languages": languages,
"char_count": len(text),
"word_count": len(text.split()) if text else 0
}
except Exception as e:
logger.error(f"OCR error: {e}")
return {
"success": False,
"text": "",
"confidence": 0.0,
"languages": languages,
"error": str(e)
}
def _clean_ocr_text(self, text: str) -> str:
"""Limpa texto do OCR"""
# Remove caracteres especiais e ruído
text = re.sub(r'[^\w\sáàâãéèêíïóôõúüçÁÀÂÃÉÈÊÍÏÓÔÕÚÜÇ\n\r\t]', '', text)
# Normaliza espaços
text = re.sub(r'\s+', ' ', text)
# Remove linhas muito curtas ou vazias
lines = [l.strip() for l in text.split('\n') if len(l.strip()) > 1]
return '\n'.join(lines)
def detect_text_regions(self, image: Any) -> List[Dict[str, Any]]:
"""Detecta regiões prováveis de texto"""
if not _DEPENDENCIES_OK or image is None:
return []
regions = []
gray = _cv2.cvtColor(image, _cv2.COLOR_BGR2GRAY)
blur = _cv2.GaussianBlur(gray, (5, 5), 0)
# Detecta bordas
edges = _cv2.Canny(blur, 50, 150)
# Encontra contornos
contours, _ = _cv2.findContours(edges, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
x, y, w, h = _cv2.boundingRect(contour)
# Filtra regiões muito pequenas ou muito grandes
if w < 20 or h < 10 or w > image.shape[1] or h > image.shape[0]:
continue
# Calcula aspecto ratio típico de texto
aspect_ratio = w / (h + 1e-6)
regions.append({
"x": int(x),
"y": int(y),
"width": int(w),
"height": int(h),
"aspect_ratio": round(aspect_ratio, 3),
"area": float(w * h)
})
# Ordena por posição (esquerda para direita, cima para baixo)
regions.sort(key=lambda r: (r["y"] // 50, r["x"]))
return regions
def analyze_image(
self,
image: Any,
user_id: str = "anonymous",
include_ocr: bool = True,
include_shapes: bool = True,
include_objects: bool = True
) -> Dict[str, Any]:
"""Análise completa da imagem"""
start_time = time.time()
if not _DEPENDENCIES_OK or image is None:
return {
"success": False,
"error": "Dependências não disponíveis",
"dependencies_ok": False
}
# Pré-processa
processed = self.preprocess_image(image)
if processed is None:
return {"success": False, "error": "Falha no pré-processamento"}
h, w = processed.shape[:2]
result: Dict[str, Any] = {
"success": True,
"timestamp": datetime.now().isoformat(),
"dimensions": {"width": w, "height": h},
"user_id": user_id,
"processing_time": 0,
"dependencies_ok": True
}
shapes = []
text_detected = ""
# OCR
if include_ocr:
ocr_result = self.perform_ocr(processed)
result["ocr"] = ocr_result
text_detected = ocr_result.get("text", "")
result["text_detected"] = text_detected
# Formas geométricas
if include_shapes:
shapes = self.detect_shapes(processed)
result["shapes"] = shapes
result["shape_count"] = len(shapes)
# Objetos (face, olhos, etc.)
if include_objects:
objects = self.detect_objects_cascade(processed)
result["objects"] = objects
result["object_count"] = len(objects)
# Extrai características
features = self.extract_features(processed)
result["features"] = {
"dimension": len(features),
"mean": float(sum(features) / len(features)) if features else 0.0,
"std": 0.0
}
# Calcula hash da imagem
image_hash = self._compute_image_hash(image)
result["image_hash"] = image_hash
# Cria registro de aprendizado
feature_record = ImageFeature(
image_hash=image_hash,
features=features,
text_detected=text_detected,
shapes=shapes if include_shapes else [],
objects=[o["tipo"] for o in result.get("objects", [])],
user_id=user_id,
confidence=result.get("ocr", {}).get("confidence", 0.5),
width=w,
height=h
)
# Salva características
self._save_feature(feature_record)
# Verifica se já conhece esta imagem
similarity = self._find_similar(features, user_id)
result["is_known"] = similarity > self.config.similarity_threshold
result["similarity_score"] = similarity
result["processing_time"] = round(time.time() - start_time, 3)
return result
def analyze_base64(
self,
base64_string: str,
user_id: str = "anonymous"
) -> Dict[str, Any]:
"""Analisa imagem a partir de string base64 ou bytes diretos"""
try:
logger.info(f"--- [VISION DEBUG] Recebida requisição de análise para user: {user_id} ---")
logger.debug(f"[VISION DEBUG] Base64 length: {len(base64_string) if base64_string else 0}")
# Tenta decodificar como base64
image = self.decode_base64_image(base64_string)
if image is None:
logger.error("[VISION DEBUG] Falha ao decodificar imagem (image is None)")
return {
"success": False,
"error": "Falha ao decodificar imagem (formato inválido ou corrompido)"
}
logger.info(f"[VISION DEBUG] Imagem decodificada com sucesso. Shape: {image.shape}")
result = self.analyze_image(image, user_id)
logger.info(f"[VISION DEBUG] Análise concluída. Sucesso: {result.get('success')}")
return result
except Exception as e:
logger.exception(f"[VISION DEBUG] Erro crítico na análise de visão: {e}")
return {
"success": False,
"error": f"Erro interno na análise de visão: {str(e)}"
}
def _compute_image_hash(self, image: Any) -> str:
"""Computa hash perceptual da imagem"""
if not _DEPENDENCIES_OK or image is None:
return "unknown"
# Reduz para tamanho pequeno para hash
resized = _cv2.resize(image, (32, 32), interpolation=_cv2.INTER_AREA)
gray = _cv2.cvtColor(resized, _cv2.COLOR_BGR2GRAY)
# Hash simples baseado em diferenças
hash_str = ""
mean_val = float(_np.mean(gray))
for i in range(32):
for j in range(32):
if gray[i, j] > mean_val:
hash_str += "1"
else:
hash_str += "0"
return hashlib.md5(hash_str.encode()).hexdigest()
def _save_feature(self, feature: ImageFeature):
"""Salva características no cache e banco"""
self._cache[feature.image_hash] = feature.to_dict()
# Salva no banco SQLite
try:
import sqlite3
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
# Cria tabela se não existir
c.execute("""
CREATE TABLE IF NOT EXISTS imagefeatures (
image_hash TEXT PRIMARY KEY,
features BLOB,
text_detected TEXT,
shapes TEXT,
objects TEXT,
user_id TEXT,
confidence REAL,
width INTEGER,
height INTEGER,
timestamp REAL,
occurrences INTEGER DEFAULT 1
)
""")
# Verifica se já existe
c.execute("SELECT occurrences FROM imagefeatures WHERE image_hash = ?",
(feature.image_hash,))
existing = c.fetchone()
if existing:
# Atualiza ocorrências
c.execute("""
UPDATE imagefeatures SET occurrences = occurrences + 1,
timestamp = ? WHERE image_hash = ?
""", (time.time(), feature.image_hash))
else:
# Insere novo
features_json = json.dumps(feature.to_dict())
c.execute("""
INSERT INTO imagefeatures VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
feature.image_hash,
features_json,
feature.text_detected,
json.dumps(feature.shapes),
json.dumps(feature.objects),
feature.user_id,
feature.confidence,
feature.width,
feature.height,
feature.timestamp,
feature.occurrences
))
conn.commit()
conn.close()
except Exception as e:
logger.warning(f"Erro ao salvar no banco: {e}")
def _find_similar(
self,
features: List[float],
user_id: str,
limit: int = 5
) -> float:
"""Encontra imagem similar e retorna score de similaridade"""
max_similarity = 0.0
try:
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("SELECT features FROM imagefeatures WHERE user_id = ?", (user_id,))
rows = c.fetchall()
conn.close()
for row in rows:
try:
stored_dict = json.loads(row[0])
stored_features = stored_dict["features"]
# Calcula similaridade (cosine similarity)
dot = sum(f1 * f2 for f1, f2 in zip(features, stored_features))
norm1 = sum(f * f for f in features) ** 0.5
norm2 = sum(f * f for f in stored_features) ** 0.5
similarity = dot / (norm1 * norm2 + 1e-6)
if similarity > max_similarity:
max_similarity = similarity
except Exception:
continue
except Exception as e:
logger.warning(f"Erro ao buscar similar: {e}")
return float(max_similarity)
def get_learned_images(self, user_id: str) -> List[Dict[str, Any]]:
"""Obtém lista de imagens aprendidas pelo usuário"""
images = []
try:
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("""
SELECT image_hash, text_detected, objects, confidence, timestamp, occurrences
FROM imagefeatures
WHERE user_id = ?
ORDER BY timestamp DESC
LIMIT 50
""", (user_id,))
rows = c.fetchall()
conn.close()
for row in rows:
try:
objects_list = json.loads(row[2]) if row[2] else []
except:
objects_list = []
images.append({
"hash": row[0],
"text": row[1],
"objects": objects_list,
"confidence": row[3],
"timestamp": row[4],
"occurrences": row[5]
})
except Exception as e:
logger.warning(f"Erro ao obter imagens: {e}")
return images
def get_stats(self) -> Dict[str, Any]:
"""Obtém estatísticas do módulo"""
total_images = len(self._cache)
# Conta do banco
try:
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("SELECT COUNT(*) FROM imagefeatures")
db_count = c.fetchone()[0]
conn.close()
except:
db_count = 0
return {
"cached_images": total_images,
"database_images": db_count,
"dependencies_ok": _DEPENDENCIES_OK,
"tesseract_available": self._tesseract_available,
"config": {
"ocr_langs": [self.config.ocr_primary_lang, self.config.ocr_secondary_lang],
"feature_dim": self.config.feature_dim,
"similarity_threshold": self.config.similarity_threshold
}
}
# ============================================================
# FUNÇÕES DE CONVENIÊNCIA
# ============================================================
_vision_instance: Optional[ComputerVision] = None
def get_computer_vision(config: Optional[VisionConfig] = None) -> ComputerVision:
"""Obtém instância singleton do ComputerVision"""
global _vision_instance
if _vision_instance is None:
_vision_instance = ComputerVision(config)
return _vision_instance
def analyze_image_from_base64(
base64_string: str,
user_id: str = "anonymous"
) -> Dict[str, Any]:
"""Função de conveniência para analisar imagem base64"""
vision = get_computer_vision()
return vision.analyze_base64(base64_string, user_id)
def analyze_image_file(
file_path: str,
user_id: str = "anonymous"
) -> Dict[str, Any]:
"""Função de conveniência para analisar arquivo de imagem"""
if not _DEPENDENCIES_OK or _cv2 is None:
return {"success": False, "error": "OpenCV não disponível"}
vision = get_computer_vision()
image = _cv2.imread(file_path)
if image is None:
return {"success": False, "error": "Falha ao carregar imagem"}
return vision.analyze_image(image, user_id)
def analyze_image_from_any_source(
source: Any,
user_id: str = "anonymous"
) -> Dict[str, Any]:
"""
Função universal para analisar imagens de qualquer fonte.
Suporta:
- base64 string
- array numpy (OpenCV)
- bytes
- file path (str)
- PIL Image
- Buffer de arquivo
Args:
source: Fonte da imagem
user_id: ID do usuário
Returns:
Dict com resultado da análise
"""
vision = get_computer_vision()
# Caso 1: String base64
if isinstance(source, str):
# Verifica se é path de arquivo ou base64
if os.path.exists(source):
return analyze_image_file(source, user_id)
elif len(source) > 100 or "data:image" in source or source.startswith("/9j"):
# É base64
return vision.analyze_base64(source, user_id)
else:
# Tenta como path
return analyze_image_file(source, user_id)
# Caso 2: Bytes ou bytearray
elif isinstance(source, (bytes, bytearray)):
try:
image_array = _np.frombuffer(source, dtype=_np.uint8)
image = _cv2.imdecode(image_array, _cv2.IMREAD_COLOR)
if image is not None:
return vision.analyze_image(image, user_id)
return {"success": False, "error": "Falha ao decodificar bytes"}
except Exception as e:
return {"success": False, "error": f"Erro ao processar bytes: {e}"}
# Caso 3: PIL Image (se disponível)
elif _PIL_Image is not None and isinstance(source, _PIL_Image.Image):
try:
# Converte PIL para numpy array
image = _cv2.cvtColor(
_np.array(source.convert('RGB')),
_cv2.COLOR_RGB2BGR
)
return vision.analyze_image(image, user_id)
except Exception as e:
return {"success": False, "error": f"Erro ao converter PIL: {e}"}
# Caso 4: Array numpy (imagem OpenCV)
elif _np is not None and isinstance(source, _np.ndarray):
try:
return vision.analyze_image(source, user_id)
except Exception as e:
return {"success": False, "error": f"Erro ao processar array numpy: {e}"}
# Caso 5: BytesIO ou similar
elif hasattr(source, 'read'):
try:
source.seek(0)
data = source.read()
return analyze_image_from_any_source(data, user_id)
except Exception as e:
return {"success": False, "error": f"Erro ao ler stream: {e}"}
return {
"success": False,
"error": f"Tipo de fonte não suportado: {type(source)}",
"supported_types": ["base64", "bytes", "file_path", "numpy_array", "PIL_Image"]
}
def convert_image_to_base64(
source: Any,
format: str = "JPEG",
quality: int = 95
) -> Optional[str]:
"""
Converte imagem de qualquer fonte para base64.
Args:
source: Fonte da imagem
format: Formato de saída (JPEG, PNG)
quality: Qualidade da compressão
Returns:
String base64 ou None se falhar
"""
try:
# Se já é string base64, retorna
if isinstance(source, str) and len(source) > 100:
return source
# Se é path, lê o arquivo
if isinstance(source, str) and os.path.exists(source):
with open(source, "rb") as f:
import base64
return base64.b64encode(f.read()).decode('utf-8')
# Se é bytes
if isinstance(source, (bytes, bytearray)):
import base64
return base64.b64encode(source).decode('utf-8')
# Se é array numpy
if _np is not None and isinstance(source, _np.ndarray):
success, buffer = _cv2.imencode(f".{format.lower()}", source,
[_cv2.IMWRITE_JPEG_QUALITY, quality])
if success:
import base64
return base64.b64encode(buffer).decode('utf-8')
# Se é PIL Image
if _PIL_Image is not None and isinstance(source, _PIL_Image.Image):
import io, base64
buffer = io.BytesIO()
source.save(buffer, format=format, quality=quality)
return base64.b64encode(buffer.getvalue()).decode('utf-8')
return None
except Exception as e:
logger.error(f"Erro ao converter para base64: {e}")
return None
# ============================================================
# EXPORTAÇÃO
# ============================================================
__all__ = [
"VisionConfig",
"ComputerVision",
"ImageFeature",
"get_computer_vision",
"analyze_image_from_base64",
"analyze_image_file",
"analyze_image_from_any_source",
"convert_image_to_base64",
]