File size: 15,095 Bytes
3b6c24d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ebfc501
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# type: ignore
"""

modules/computervision.py

================================================================================

VISION AI MÓDULO - MULTIMODAL GEMINI + QR CODE + fallback OCR

================================================================================

Versão 3.0 - AKIRA "The Seer"



Este módulo evoluiu de detecção de bordas para entendimento semântico.

Pipeline de Processamento:

  1. Gemini Vision (Multimodal): Descrição de cena, objetos, cores e contexto.

  2. QR Code Scanner: Extração de dados de códigos QR.

  3. OCR (Tesseract): Extração de texto (fallback para técnica/precisão).

  4. CV2 Analytics: Contagem de formas e objetos (Haar Cascades).

  5. RAG Visual: Armazena hashes de imagens conhecidas para lembrança rápida.



Diferente da V2, este módulo não apenas "vê" pixels, ele "entende" a imagem.

================================================================================

"""

import os
import io
import json
import time
import base64
import hashlib
import sqlite3
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple, Union
from dataclasses import dataclass
from loguru import logger

try:
    from .config import DB_PATH
except (ImportError, ValueError):
    try:
        from modules.config import DB_PATH
    except ImportError:
        DB_PATH = "akira.db"


# ============================================================
# Imports Lazy para Performance
# ============================================================
_cv2 = None
_np = None
_pytesseract = None
_PIL_Image = None
_genai = None

def _check_core_deps():
    global _cv2, _np, _pytesseract, _PIL_Image, _genai
    try:
        import cv2 as cv
        import numpy as np
        import pytesseract as pt
        from PIL import Image as PILImg
        _cv2, _np, _pytesseract, _PIL_Image = cv, np, pt, PILImg
        
        # Google GenAI (nova API)
        try:
            import google.genai as genai_new
            _genai = genai_new
        except ImportError:
            try:
                import google.generativeai as genai_old
                _genai = genai_old
            except ImportError:
                _genai = None
                
        return True
    except Exception as e:
        logger.warning(f"Visão parcial: {e}")
        return False

_DEPS_OK = _check_core_deps()

# ============================================================
# CONFIGURAÇÕES
# ============================================================

@dataclass
class VisionConfig:
    ocr_lang: str = "por+eng"
    similarity_threshold: float = 0.88
    max_image_res: int = 1200
    enable_gemini: bool = True
    enable_qr: bool = True
    db_path: str = DB_PATH

# ============================================================
# CLASSE PRINCIPAL
# ============================================================

class ComputerVision:
    """

    Controlador de Visão Computacional de Nova Geração.

    """

    def __init__(self, config: Optional[VisionConfig] = None):
        self.config = config or VisionConfig()
        self.db_path = self.config.db_path
        self._setup_db()
        self._init_cascades()
        
        # API Key do Gemini (preferencialmente injetada via config)
        self.api_key = os.getenv("GEMINI_API_KEY", "")

    def _setup_db(self):
        """Garante tabela de memória visual."""
        try:
            conn = sqlite3.connect(self.db_path)
            c = conn.cursor()
            c.execute("""

                CREATE TABLE IF NOT EXISTS image_memory (

                    hash TEXT PRIMARY KEY,

                    user_id TEXT,

                    description TEXT,

                    ocr_text TEXT,

                    qr_data TEXT,

                    metadata TEXT,

                    timestamp DATETIME

                )

            """)
            conn.commit()
            conn.close()
        except Exception as e:
            logger.error(f"Erro DB Visão: {e}")

    def _init_cascades(self):
        """Carrega modelos Haar Cascades para detecção básica."""
        if not _cv2: return
        try:
            self._face_cascade = _cv2.CascadeClassifier(_cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        except:
            self._face_cascade = None

    # ==================================================================
    # 🎯 PIPELINE PRINCIPAL
    # ==================================================================

    # ==================================================================
    # PROCESSAMENTO
    # ==================================================================

    def analyze_image(self, input_data: Union[str, bytes], user_id: str = "anon") -> Dict[str, Any]:
        """

        Processa imagem através de todo o pipeline.

        Aceita: Caminho de arquivo (str), Base64 (str) ou Bytes brutos (bytes).

        """
        if not input_data: return {"success": False, "error": "Entrada vazia"}
        
        img_bytes = None
        
        try:
            # 1. Detecção e Normalização da Entrada
            if isinstance(input_data, bytes):
                img_bytes = input_data
            elif isinstance(input_data, str):
                # Caso A: Caminho de arquivo local
                if os.path.isfile(input_data):
                    with open(input_data, "rb") as f:
                        img_bytes = f.read()
                # Caso B: Base64
                else:
                    try:
                        b64_str = input_data
                        if "," in b64_str: b64_str = b64_str.split(",")[1]
                        img_bytes = base64.b64decode(b64_str)
                    except Exception:
                        return {"success": False, "error": "String informada não é um caminho válido nem Base64 válido"}
            
            if not img_bytes:
                return {"success": False, "error": "Falha ao extrair bytes da imagem"}

            img_hash = hashlib.md5(img_bytes).hexdigest()
            
            # 2. Check Memória Visual (Cache BD)
            cached = self._get_from_memory(img_hash)
            if cached:
                logger.info(f"🧠 Memória Visual recordada: {img_hash}")
                cached["cached"] = True
                return cached

            # 3. Preparação para OCR e CV2
            nparr = _np.frombuffer(img_bytes, _np.uint8)
            img_cv = _cv2.imdecode(nparr, _cv2.IMREAD_COLOR)
            pil_img = _PIL_Image.open(io.BytesIO(img_bytes))

            # --- EXECUÇÃO DO PIPELINE ---
            
            # A. QR Code (Rápido)
            qr_data = self._scan_qr(img_cv) if self.config.enable_qr else None
            
            # B. Gemini Vision (Semântico - O Coração)
            descricao = ""
            if self.config.enable_gemini and self.api_key:
                descricao = self._gemini_visual_analyze(img_bytes)
            
            # C. OCR (Fallback/Técnico)
            ocr_text = self._run_ocr(pil_img)
            
            # D. CV2 Analytics (Estatístico/Objetos)
            analytics = self._run_cv2_analytics(img_cv)

            # 4. Consolidação
            result = {
                "success": True,
                "hash": img_hash,
                "description": descricao or "Não foi possível descrever a imagem semanticamente.",
                "ocr": ocr_text,
                "qr": qr_data,
                "objects": analytics.get("objects", []),
                "details": {
                    "faces": analytics.get("faces", 0),
                    "resolution": f"{img_cv.shape[1]}x{img_cv.shape[0]}" if img_cv is not None else "N/A"
                },
                "timestamp": datetime.now().isoformat()
            }

            # 5. Salva na Memória
            self._save_to_memory(result, user_id)

            return result

        except Exception as e:
            logger.exception("Falha no pipeline de visão")
            return {"success": False, "error": str(e)}

    # ==================================================================
    # 👁️ MOTORES ESPECÍFICOS
    # ==================================================================

    def _gemini_visual_analyze(self, img_bytes: bytes) -> str:
        """Usa Google Gemini Multimodal para descrever a imagem."""
        if not _genai or not self.api_key: return ""
        
        try:
            # Detecta se é a API nova ou antiga
            if hasattr(_genai, 'Client'):  # Nova API google.genai
                client = _genai.Client(api_key=self.api_key)
                # Otimizado para Gemini 2.0 Flash
                model_id = "gemini-2.0-flash" if "2.0-flash" in os.getenv("GEMINI_MODEL", "") else "gemini-1.5-flash"
                
                # Detetar MimeType dinâmico
                mime_type = "image/png" if img_bytes.startswith(b"\x89PNG") else "image/jpeg"
                response = client.models.generate_content(
                    model=model_id,
                    contents=[
                        "Descreva esta imagem detalhadamente para uma IA assistente. Fale sobre objetos, cores, ambiente e se houver pessoas, descreva suas expressões.",
                        _genai.types.Part.from_bytes(data=img_bytes, mime_type=mime_type),
                    ]
                )
                return response.text if response else ""
            else:
                # API antiga google.generativeai
                _genai.configure(api_key=self.api_key)
                model = _genai.GenerativeModel('gemini-1.5-flash')
                response = model.generate_content([
                    "Descreva esta imagem detalhadamente. Seja direto e informativo.",
                    _PIL_Image.open(io.BytesIO(img_bytes))
                ])
                return response.text if response else ""
        except Exception as e:
            logger.warning(f"Gemini Vision falhou: {e}")
            return ""

    def _scan_qr(self, img_cv) -> Optional[str]:
        """Detecta e decodifica QR Code."""
        if not _cv2 or img_cv is None: return None
        try:
            detector = _cv2.QRCodeDetector()
            data, _, _ = detector.detectAndDecode(img_cv)
            return data if data else None
        except:
            return None

    def _run_ocr(self, pil_img) -> str:
        """Extrai texto da imagem via Tesseract."""
        if not _pytesseract: return ""
        try:
            return _pytesseract.image_to_string(pil_img, lang=self.config.ocr_lang).strip()
        except:
            return ""

    def _run_cv2_analytics(self, img_cv) -> Dict[str, Any]:
        """Detecta faces e extrai metadados visuais básicos."""
        res = {"faces": 0, "objects": []}
        if not _cv2 or img_cv is None: return res
        
        try:
            gray = _cv2.cvtColor(img_cv, _cv2.COLOR_BGR2GRAY)
            # Faces
            if self._face_cascade:
                faces = self._face_cascade.detectMultiScale(gray, 1.1, 4)
                res["faces"] = len(faces)
                if len(faces) > 0: res["objects"].append("pessoa/rosto")
            
            # Brilho médio
            avg_color = _np.mean(img_cv, axis=(0, 1))
            res["avg_color_bgr"] = avg_color.tolist()
            
        except: pass
        return res

    # ==================================================================
    # 🗄️ PERSISTÊNCIA (MEMÓRIA VISUAL)
    # ==================================================================

    def _get_from_memory(self, img_hash: str) -> Optional[Dict]:
        try:
            conn = sqlite3.connect(self.db_path)
            conn.row_factory = sqlite3.Row
            c = conn.cursor()
            c.execute("SELECT * FROM image_memory WHERE hash = ?", (img_hash,))
            row = c.fetchone()
            conn.close()
            
            if row:
                res = dict(row)
                return {
                    "success": True,
                    "hash": res["hash"],
                    "description": res["description"],
                    "ocr": res["ocr_text"],
                    "qr": res["qr_data"],
                    "timestamp": res["timestamp"],
                    "from_memory": True
                }
        except: pass
        return None

    def _save_to_memory(self, result: Dict, user_id: str):
        try:
            conn = sqlite3.connect(self.db_path)
            c = conn.cursor()
            c.execute("""

                INSERT OR REPLACE INTO image_memory 

                (hash, user_id, description, ocr_text, qr_data, metadata, timestamp)

                VALUES (?, ?, ?, ?, ?, ?, ?)

            """, (
                result["hash"],
                user_id,
                result["description"],
                result["ocr"],
                result["qr"],
                json.dumps(result.get("details", {})),
                result["timestamp"]
            ))
            conn.commit()
            conn.close()
        except Exception as e:
            logger.debug(f"Erro ao salvar memória visual: {e}")

# ============================================================
# SINGLETON EXPORT
# ============================================================

_vision_instance = None

def get_computer_vision(config=None) -> ComputerVision:
    global _vision_instance
    if _vision_instance is None:
        _vision_instance = ComputerVision(config)
    return _vision_instance

def analyze_image_base64(b64_str: str, user_id: str = "anon") -> Dict[str, Any]:
    return get_computer_vision().analyze_image(b64_str, user_id)

__all__ = ["ComputerVision", "get_computer_vision", "analyze_image_base64",
           "ImageFeature", "analyze_image_from_base64", "analyze_image_file"]


# ============================================================
# COMPATIBILIDADE — aliases para imports legados
# ============================================================

@dataclass
class ImageFeature:
    """Representação simplificada de features de uma imagem."""
    description: str = ""
    ocr_text: str = ""
    qr_data: Optional[str] = None
    objects: List[str] = None  # type: ignore

    def __post_init__(self):
        if self.objects is None:
            self.objects = []


def analyze_image_from_base64(b64_str: str, user_id: str = "anon") -> Dict[str, Any]:
    """Alias legado para analyze_image_base64."""
    return analyze_image_base64(b64_str, user_id)


def analyze_image_file(filepath: str, user_id: str = "anon") -> Dict[str, Any]:
    """Analisa imagem a partir de caminho de arquivo."""
    return get_computer_vision().analyze_image(filepath, user_id)