Spaces:
Sleeping
Sleeping
| """ | |
| Codette Multimodal Analyzer | |
| Provides lightweight, dependency-safe analysis helpers for text, image, | |
| audio and video inputs. Functions accept either raw bytes, a filesystem | |
| path (str) or simple Python objects (e.g. numpy arrays for audio). | |
| This module intentionally avoids heavy mandatory dependencies. If | |
| optional libraries (Pillow, OpenCV) are installed, the analyzer will use | |
| them for richer metadata extraction; otherwise it falls back to safe | |
| heuristics and headers. | |
| """ | |
| from typing import Dict, Any, List, Union, Optional | |
| import io | |
| import os | |
| import imghdr | |
| import mimetypes | |
| import wave | |
| import struct | |
| try: | |
| from PIL import Image | |
| except Exception: | |
| Image = None | |
| try: | |
| import cv2 | |
| except Exception: | |
| cv2 = None | |
| try: | |
| import numpy as np | |
| except Exception: | |
| np = None | |
| class MultimodalAnalyzer: | |
| def __init__(self): | |
| self.supported_modalities = { | |
| "text": self._analyze_text, | |
| "image": self._analyze_image, | |
| "audio": self._analyze_audio, | |
| "video": self._analyze_video, | |
| } | |
| def analyze_content(self, content: Dict[str, Any]) -> Dict[str, Any]: | |
| results: Dict[str, Any] = {} | |
| for modality, data in content.items(): | |
| handler = self.supported_modalities.get(modality) | |
| if handler is None: | |
| results[modality] = {"error": "Unsupported modality"} | |
| continue | |
| try: | |
| results[modality] = handler(data) | |
| except Exception as e: | |
| results[modality] = {"error": str(e)} | |
| return results | |
| def _analyze_text(self, text: Union[str, bytes]) -> Dict[str, Any]: | |
| if isinstance(text, bytes): | |
| try: | |
| text = text.decode("utf-8", errors="replace") | |
| except Exception: | |
| text = str(text) | |
| text = text or "" | |
| words = [w for w in text.split() if w] | |
| unique_words = set(w.strip(".,!?;:\"()[]{}") for w in words) | |
| avg_word_len = sum(len(w) for w in words) / len(words) if words else 0 | |
| has_questions = "?" in text | |
| has_exclamations = "!" in text | |
| language = "en" if all(ord(c) < 128 for c in text) else "non-en" | |
| return { | |
| "type": "text", | |
| "length": len(text), | |
| "word_count": len(words), | |
| "unique_word_count": len(unique_words), | |
| "avg_word_length": round(avg_word_len, 2), | |
| "has_content": bool(text.strip()), | |
| "has_questions": has_questions, | |
| "has_exclamations": has_exclamations, | |
| "language_estimate": language, | |
| } | |
| def _read_bytes_or_path(self, data: Union[bytes, str]) -> Optional[bytes]: | |
| if data is None: | |
| return None | |
| if isinstance(data, bytes): | |
| return data | |
| if isinstance(data, str): | |
| if os.path.exists(data): | |
| try: | |
| with open(data, "rb") as f: | |
| return f.read() | |
| except Exception: | |
| return None | |
| # treat string as raw small payload | |
| return data.encode("utf-8", errors="replace") | |
| return None | |
| def _analyze_image(self, image_data: Union[bytes, str, None]) -> Dict[str, Any]: | |
| raw = self._read_bytes_or_path(image_data) | |
| info: Dict[str, Any] = {"type": "image", "has_content": bool(raw)} | |
| if not raw: | |
| return info | |
| fmt = None | |
| try: | |
| fmt = imghdr.what(None, h=raw) | |
| except Exception: | |
| fmt = None | |
| if fmt: | |
| info["format"] = fmt | |
| else: | |
| # fallback to mime type by filename if provided | |
| if isinstance(image_data, str): | |
| mt, _ = mimetypes.guess_type(image_data) | |
| info["format"] = mt or "unknown" | |
| else: | |
| info["format"] = "unknown" | |
| if Image is not None: | |
| try: | |
| img = Image.open(io.BytesIO(raw)) | |
| info.update({ | |
| "width": img.width, | |
| "height": img.height, | |
| "mode": img.mode, | |
| "has_alpha": "A" in img.getbands(), | |
| }) | |
| img.close() | |
| except Exception: | |
| pass | |
| return info | |
| def _analyze_audio(self, audio_data: Union[bytes, str, Any, None]) -> Dict[str, Any]: | |
| info: Dict[str, Any] = {"type": "audio", "has_content": False, "format": "unknown"} | |
| if audio_data is None: | |
| return info | |
| if np is not None and isinstance(audio_data, np.ndarray): | |
| arr = audio_data | |
| info["has_content"] = getattr(arr, "size", 0) > 0 | |
| info["format"] = "numpy.ndarray" | |
| try: | |
| samples = arr.astype(float) | |
| rms = float(np.sqrt(np.mean(samples ** 2))) | |
| info["rms"] = float(rms) | |
| info["duration_seconds_estimate"] = None | |
| except Exception: | |
| pass | |
| return info | |
| raw = self._read_bytes_or_path(audio_data) | |
| if not raw: | |
| return info | |
| info["has_content"] = True | |
| # Try WAV detection | |
| try: | |
| bio = io.BytesIO(raw) | |
| with wave.open(bio, "rb") as w: | |
| nchannels = w.getnchannels() | |
| sampwidth = w.getsampwidth() | |
| framerate = w.getframerate() | |
| nframes = w.getnframes() | |
| duration = nframes / float(framerate) if framerate else None | |
| info.update({ | |
| "format": "wav", | |
| "channels": nchannels, | |
| "sample_width": sampwidth, | |
| "frame_rate": framerate, | |
| "n_frames": nframes, | |
| "duration_seconds": duration, | |
| }) | |
| try: | |
| frames = w.readframes(min(nframes, 44100)) | |
| if frames: | |
| # unpack frames to numpy array for RMS | |
| if sampwidth == 1: | |
| dtype = np.uint8 | |
| elif sampwidth == 2: | |
| dtype = np.int16 | |
| elif sampwidth == 4: | |
| dtype = np.int32 | |
| else: | |
| dtype = np.int16 | |
| samples = np.frombuffer(frames, dtype=dtype).astype(float) | |
| if nchannels > 1: | |
| samples = samples.reshape(-1, nchannels) | |
| samples = samples.mean(axis=1) | |
| rms = float(np.sqrt(np.mean((samples) ** 2))) | |
| info["rms"] = rms | |
| except Exception: | |
| pass | |
| return info | |
| except wave.Error: | |
| pass | |
| except Exception: | |
| pass | |
| # fallback: try to guess mime type by filename | |
| if isinstance(audio_data, str): | |
| mt, _ = mimetypes.guess_type(audio_data) | |
| if mt: | |
| info["format"] = mt | |
| return info | |
| def _analyze_video(self, video_data: Union[bytes, str, None]) -> Dict[str, Any]: | |
| info: Dict[str, Any] = {"type": "video", "has_content": False, "format": "unknown"} | |
| raw = self._read_bytes_or_path(video_data) | |
| if not raw: | |
| return info | |
| info["has_content"] = True | |
| if isinstance(video_data, str): | |
| mt, _ = mimetypes.guess_type(video_data) | |
| if mt: | |
| info["format"] = mt | |
| # If OpenCV is available, try to extract metadata | |
| if cv2 is not None and isinstance(video_data, str) and os.path.exists(video_data): | |
| try: | |
| cap = cv2.VideoCapture(video_data) | |
| if cap.isOpened(): | |
| fps = cap.get(cv2.CAP_PROP_FPS) or None | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) | |
| duration = frame_count / fps if fps else None | |
| info.update({ | |
| "fps": fps, | |
| "frame_count": frame_count, | |
| "width": width, | |
| "height": height, | |
| "duration_seconds": duration, | |
| }) | |
| cap.release() | |
| except Exception: | |
| pass | |
| return info | |
| def combine_modalities(self, analyses: Dict[str, Any]) -> Dict[str, Any]: | |
| modalities_present = [k for k, v in analyses.items() if not v.get("error")] | |
| summary = { | |
| "modalities_present": modalities_present, | |
| "modality_count": len(modalities_present), | |
| "complete_analysis": all(not v.get("error") for v in analyses.values()), | |
| "analyses": analyses, | |
| } | |
| if "text" in analyses and "image" in analyses: | |
| t = analyses.get("text", {}) | |
| img = analyses.get("image", {}) | |
| summary["text_and_image"] = { | |
| "text_length": t.get("length"), | |
| "image_size": (img.get("width"), img.get("height")), | |
| } | |
| return summary | |
| def get_supported_modalities(self) -> List[str]: | |
| return list(self.supported_modalities.keys()) |