Codette3.0 / src /components /multimodal_analyzer.py
Raiff1982's picture
Upload 117 files
6d6b8af verified
"""
Codette Multimodal Analyzer
Provides lightweight, dependency-safe analysis helpers for text, image,
audio and video inputs. Functions accept either raw bytes, a filesystem
path (str) or simple Python objects (e.g. numpy arrays for audio).
This module intentionally avoids heavy mandatory dependencies. If
optional libraries (Pillow, OpenCV) are installed, the analyzer will use
them for richer metadata extraction; otherwise it falls back to safe
heuristics and headers.
"""
from typing import Dict, Any, List, Union, Optional
import io
import os
import imghdr
import mimetypes
import wave
import struct
try:
from PIL import Image
except Exception:
Image = None
try:
import cv2
except Exception:
cv2 = None
try:
import numpy as np
except Exception:
np = None
class MultimodalAnalyzer:
def __init__(self):
self.supported_modalities = {
"text": self._analyze_text,
"image": self._analyze_image,
"audio": self._analyze_audio,
"video": self._analyze_video,
}
def analyze_content(self, content: Dict[str, Any]) -> Dict[str, Any]:
results: Dict[str, Any] = {}
for modality, data in content.items():
handler = self.supported_modalities.get(modality)
if handler is None:
results[modality] = {"error": "Unsupported modality"}
continue
try:
results[modality] = handler(data)
except Exception as e:
results[modality] = {"error": str(e)}
return results
def _analyze_text(self, text: Union[str, bytes]) -> Dict[str, Any]:
if isinstance(text, bytes):
try:
text = text.decode("utf-8", errors="replace")
except Exception:
text = str(text)
text = text or ""
words = [w for w in text.split() if w]
unique_words = set(w.strip(".,!?;:\"()[]{}") for w in words)
avg_word_len = sum(len(w) for w in words) / len(words) if words else 0
has_questions = "?" in text
has_exclamations = "!" in text
language = "en" if all(ord(c) < 128 for c in text) else "non-en"
return {
"type": "text",
"length": len(text),
"word_count": len(words),
"unique_word_count": len(unique_words),
"avg_word_length": round(avg_word_len, 2),
"has_content": bool(text.strip()),
"has_questions": has_questions,
"has_exclamations": has_exclamations,
"language_estimate": language,
}
def _read_bytes_or_path(self, data: Union[bytes, str]) -> Optional[bytes]:
if data is None:
return None
if isinstance(data, bytes):
return data
if isinstance(data, str):
if os.path.exists(data):
try:
with open(data, "rb") as f:
return f.read()
except Exception:
return None
# treat string as raw small payload
return data.encode("utf-8", errors="replace")
return None
def _analyze_image(self, image_data: Union[bytes, str, None]) -> Dict[str, Any]:
raw = self._read_bytes_or_path(image_data)
info: Dict[str, Any] = {"type": "image", "has_content": bool(raw)}
if not raw:
return info
fmt = None
try:
fmt = imghdr.what(None, h=raw)
except Exception:
fmt = None
if fmt:
info["format"] = fmt
else:
# fallback to mime type by filename if provided
if isinstance(image_data, str):
mt, _ = mimetypes.guess_type(image_data)
info["format"] = mt or "unknown"
else:
info["format"] = "unknown"
if Image is not None:
try:
img = Image.open(io.BytesIO(raw))
info.update({
"width": img.width,
"height": img.height,
"mode": img.mode,
"has_alpha": "A" in img.getbands(),
})
img.close()
except Exception:
pass
return info
def _analyze_audio(self, audio_data: Union[bytes, str, Any, None]) -> Dict[str, Any]:
info: Dict[str, Any] = {"type": "audio", "has_content": False, "format": "unknown"}
if audio_data is None:
return info
if np is not None and isinstance(audio_data, np.ndarray):
arr = audio_data
info["has_content"] = getattr(arr, "size", 0) > 0
info["format"] = "numpy.ndarray"
try:
samples = arr.astype(float)
rms = float(np.sqrt(np.mean(samples ** 2)))
info["rms"] = float(rms)
info["duration_seconds_estimate"] = None
except Exception:
pass
return info
raw = self._read_bytes_or_path(audio_data)
if not raw:
return info
info["has_content"] = True
# Try WAV detection
try:
bio = io.BytesIO(raw)
with wave.open(bio, "rb") as w:
nchannels = w.getnchannels()
sampwidth = w.getsampwidth()
framerate = w.getframerate()
nframes = w.getnframes()
duration = nframes / float(framerate) if framerate else None
info.update({
"format": "wav",
"channels": nchannels,
"sample_width": sampwidth,
"frame_rate": framerate,
"n_frames": nframes,
"duration_seconds": duration,
})
try:
frames = w.readframes(min(nframes, 44100))
if frames:
# unpack frames to numpy array for RMS
if sampwidth == 1:
dtype = np.uint8
elif sampwidth == 2:
dtype = np.int16
elif sampwidth == 4:
dtype = np.int32
else:
dtype = np.int16
samples = np.frombuffer(frames, dtype=dtype).astype(float)
if nchannels > 1:
samples = samples.reshape(-1, nchannels)
samples = samples.mean(axis=1)
rms = float(np.sqrt(np.mean((samples) ** 2)))
info["rms"] = rms
except Exception:
pass
return info
except wave.Error:
pass
except Exception:
pass
# fallback: try to guess mime type by filename
if isinstance(audio_data, str):
mt, _ = mimetypes.guess_type(audio_data)
if mt:
info["format"] = mt
return info
def _analyze_video(self, video_data: Union[bytes, str, None]) -> Dict[str, Any]:
info: Dict[str, Any] = {"type": "video", "has_content": False, "format": "unknown"}
raw = self._read_bytes_or_path(video_data)
if not raw:
return info
info["has_content"] = True
if isinstance(video_data, str):
mt, _ = mimetypes.guess_type(video_data)
if mt:
info["format"] = mt
# If OpenCV is available, try to extract metadata
if cv2 is not None and isinstance(video_data, str) and os.path.exists(video_data):
try:
cap = cv2.VideoCapture(video_data)
if cap.isOpened():
fps = cap.get(cv2.CAP_PROP_FPS) or None
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
duration = frame_count / fps if fps else None
info.update({
"fps": fps,
"frame_count": frame_count,
"width": width,
"height": height,
"duration_seconds": duration,
})
cap.release()
except Exception:
pass
return info
def combine_modalities(self, analyses: Dict[str, Any]) -> Dict[str, Any]:
modalities_present = [k for k, v in analyses.items() if not v.get("error")]
summary = {
"modalities_present": modalities_present,
"modality_count": len(modalities_present),
"complete_analysis": all(not v.get("error") for v in analyses.values()),
"analyses": analyses,
}
if "text" in analyses and "image" in analyses:
t = analyses.get("text", {})
img = analyses.get("image", {})
summary["text_and_image"] = {
"text_length": t.get("length"),
"image_size": (img.get("width"), img.get("height")),
}
return summary
def get_supported_modalities(self) -> List[str]:
return list(self.supported_modalities.keys())