File size: 14,455 Bytes
b1acf7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
db77419
b1acf7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
"""
Preprocessing utilities for different input modalities.
"""

import os
import tempfile
import logging
from typing import List, Optional, Tuple, Union

try:
    from PIL import Image
    import numpy as np

    PIL_AVAILABLE = True
except ImportError:
    PIL_AVAILABLE = False
    Image = None
    np = None

from ..config.settings import (
    IMAGE_TRANSFORMS,
    AUDIO_MODEL_CONFIG,
)

# Add Any to typing imports
from typing import List, Optional, Union, Any

# Add torch import for audio preprocessing
try:
    import torch

    TORCH_AVAILABLE = True
except ImportError:
    TORCH_AVAILABLE = False
    torch = None

logger = logging.getLogger(__name__)


def detect_and_preprocess_face(
    image: Union[Image.Image, np.ndarray, Any], crop_tightness: float = 0.05
) -> Optional[Image.Image]:
    """
    Detect face in image, crop to face region, convert to grayscale, and resize.

    Args:
        image: Input image (PIL Image or numpy array)
        crop_tightness: Padding around face (0.0 = no padding, 0.3 = 30% padding)

    Returns:
        Preprocessed PIL Image or None if preprocessing fails
    """
    if not PIL_AVAILABLE:
        logger.error("PIL (Pillow) not available. Cannot process images.")
        return None

    try:
        import cv2

        # Convert PIL image to OpenCV format
        if isinstance(image, Image.Image):
            img_array = np.array(image)
            # Convert RGB to BGR for OpenCV
            if len(img_array.shape) == 3:
                img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
        else:
            img_array = image

        # Load face detection cascade
        face_cascade = cv2.CascadeClassifier(
            cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
        )

        # Convert to grayscale for face detection
        gray = cv2.cvtColor(img_array, cv2.COLOR_BGR2GRAY)

        # Detect faces
        faces = face_cascade.detectMultiScale(
            gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)
        )

        if len(faces) == 0:
            logger.warning("No face detected in the image. Using center crop instead.")
            return _fallback_preprocessing(image)

        # Get the largest face (assuming it's the main subject)
        x, y, w, h = max(faces, key=lambda rect: rect[2] * rect[3])

        # Add padding around the face based on user preference
        padding_x = int(w * crop_tightness)
        padding_y = int(h * crop_tightness)

        # Ensure we don't go out of bounds
        x1 = max(0, x - padding_x)
        y1 = max(0, y - padding_y)
        x2 = min(img_array.shape[1], x + w + padding_x)
        y2 = min(img_array.shape[0], y + h + padding_y)

        # Crop to face region
        face_crop = img_array[y1:y2, x1:x2]

        # Convert BGR to RGB first
        face_crop_rgb = cv2.cvtColor(face_crop, cv2.COLOR_BGR2RGB)

        # Convert to grayscale
        face_gray = cv2.cvtColor(face_crop_rgb, cv2.COLOR_RGB2GRAY)

        # Resize to target size
        target_size = IMAGE_TRANSFORMS["resize"]
        face_resized = cv2.resize(
            face_gray, (target_size, target_size), interpolation=cv2.INTER_AREA
        )

        # Convert grayscale to 3-channel RGB (replicate grayscale values)
        face_rgb_3channel = cv2.cvtColor(face_resized, cv2.COLOR_GRAY2RGB)

        # Convert back to PIL Image
        face_pil = Image.fromarray(face_rgb_3channel)
        return face_pil

    except ImportError:
        logger.error(
            "OpenCV not installed. Please install it with: pip install opencv-python"
        )
        return _fallback_preprocessing(image)
    except Exception as e:
        logger.error(f"Error in face detection: {str(e)}")
        return _fallback_preprocessing(image)


def _fallback_preprocessing(
    image: Union[Image.Image, np.ndarray, Any],
) -> Optional[Image.Image]:
    """Fallback preprocessing when face detection fails."""
    try:
        if isinstance(image, Image.Image):
            rgb_pil = image.convert("RGB")
            target_size = IMAGE_TRANSFORMS["resize"]
            resized = rgb_pil.resize(
                (target_size, target_size), Image.Resampling.LANCZOS
            )
            # Convert to grayscale and then to 3-channel RGB
            gray_pil = resized.convert("L")
            gray_rgb_pil = gray_pil.convert("RGB")
            return gray_rgb_pil
        return None
    except Exception as e:
        logger.error(f"Fallback preprocessing failed: {str(e)}")
        return None


def get_vision_transforms():
    """Get the image transforms used during training."""
    from torchvision import transforms

    return transforms.Compose(
        [
            transforms.Resize(IMAGE_TRANSFORMS["resize"]),
            transforms.CenterCrop(IMAGE_TRANSFORMS["center_crop"]),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=IMAGE_TRANSFORMS["normalize_mean"],
                std=IMAGE_TRANSFORMS["normalize_std"],
            ),
        ]
    )


def preprocess_audio_for_model(audio_bytes: bytes) -> Optional[torch.Tensor]:
    """
    Preprocess audio bytes for wav2vec2 model input using AutoFeatureExtractor.

    Args:
        audio_bytes: Raw audio bytes

    Returns:
        Preprocessed audio tensor ready for wav2vec2 model
    """
    if not TORCH_AVAILABLE:
        logger.error("PyTorch not available. Cannot process audio.")
        return None

    try:
        from transformers import AutoFeatureExtractor
        import librosa

        # Save audio bytes to temporary file
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
            tmp_file.write(audio_bytes)
            tmp_file_path = tmp_file.name

        try:
            # Load and resample audio to target sampling rate
            audio, sr = librosa.load(
                tmp_file_path, sr=AUDIO_MODEL_CONFIG["target_sampling_rate"]
            )

            # Use AutoFeatureExtractor (same as training)
            feature_extractor = AutoFeatureExtractor.from_pretrained(
                AUDIO_MODEL_CONFIG["model_name"]
            )

            # Calculate max length in samples (5 seconds * 16kHz = 80000 samples)
            max_length = int(
                AUDIO_MODEL_CONFIG["max_duration"]
                * AUDIO_MODEL_CONFIG["target_sampling_rate"]
            )

            logger.info(f"Audio length: {len(audio)} samples, max_length: {max_length}")

            inputs = feature_extractor(
                audio,
                sampling_rate=AUDIO_MODEL_CONFIG["target_sampling_rate"],
                max_length=max_length,
                truncation=True,
                padding="max_length",
                return_tensors="pt",
            )

            # Return tensor with correct shape for wav2vec2
            # The model expects: [batch_size, sequence_length]
            tensor = inputs.input_values

            # Log the tensor shape for debugging
            logger.info(f"Audio preprocessing output shape: {tensor.shape}")

            return tensor

        finally:
            # Clean up temporary file
            try:
                os.unlink(tmp_file_path)
            except (OSError, PermissionError):
                pass

    except ImportError as e:
        logger.error(f"Required library not installed: {str(e)}")
        raise ImportError("Please install: pip install transformers librosa torch")


def extract_frames_from_video(video_file, max_frames: int = 5) -> List[Any]:
    """
    Extract frames from video file for vision sentiment analysis.

    Args:
        video_file: Video file object
        max_frames: Maximum number of frames to extract

    Returns:
        List of PIL Image objects
    """
    try:
        import cv2

        # Save video bytes to temporary file
        with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file:
            if hasattr(video_file, "getvalue"):
                tmp_file.write(video_file.getvalue())
            else:
                tmp_file.write(video_file)
            tmp_file_path = tmp_file.name

        try:
            # Open video with OpenCV
            cap = cv2.VideoCapture(tmp_file_path)

            if not cap.isOpened():
                logger.error("Could not open video file")
                return []

            frames = []
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            fps = cap.get(cv2.CAP_PROP_FPS)
            duration = total_frames / fps if fps > 0 else 0

            logger.info(
                f"Video: {total_frames} frames, {fps:.1f} FPS, {duration:.1f}s duration"
            )

            # Extract frames at strategic intervals
            if total_frames > 0:
                # Select frames: start, 25%, 50%, 75%, end
                frame_indices = [
                    0,
                    int(total_frames * 0.25),
                    int(total_frames * 0.5),
                    int(total_frames * 0.75),
                    total_frames - 1,
                ]
                frame_indices = list(set(frame_indices))  # Remove duplicates
                frame_indices.sort()

                for frame_idx in frame_indices:
                    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
                    ret, frame = cap.read()
                    if ret:
                        # Convert BGR to RGB
                        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                        # Convert to PIL Image
                        pil_image = Image.fromarray(frame_rgb)
                        frames.append(pil_image)

            cap.release()
            return frames

        finally:
            # Clean up temporary file
            try:
                os.unlink(tmp_file_path)
            except (OSError, PermissionError):
                pass

    except ImportError:
        logger.error(
            "OpenCV not installed. Please install it with: pip install opencv-python"
        )
        return []
    except Exception as e:
        logger.error(f"Error extracting frames: {str(e)}")
        return []


def extract_audio_from_video(video_file) -> Optional[bytes]:
    """
    Extract audio from video file for audio sentiment analysis.

    Args:
        video_file: Video file object

    Returns:
        Audio bytes in WAV format or None if extraction fails
    """
    try:
        import tempfile

        try:
            from moviepy import VideoFileClip
        except ImportError as e:
            logger.error(f"MoviePy import failed: {e}")
            return None

        # Save video bytes to temporary file
        with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file:
            if hasattr(video_file, "getvalue"):
                tmp_file.write(video_file.getvalue())
            else:
                tmp_file.write(video_file)
            tmp_file_path = tmp_file.name

        try:
            # Extract audio using moviepy
            video = VideoFileClip(tmp_file_path)
            audio = video.audio

            if audio is None:
                logger.warning("No audio track found in video")
                return None

            # Save audio to temporary WAV file
            with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as audio_file:
                audio_path = audio_file.name

            # Export audio as WAV
            audio.write_audiofile(audio_path, logger=None)

            # Read the audio file and return bytes
            with open(audio_path, "rb") as f:
                audio_bytes = f.read()

            # Clean up temporary audio file
            try:
                os.unlink(audio_path)
            except (OSError, PermissionError):
                pass

            return audio_bytes

        finally:
            # Clean up temporary video file
            try:
                # Close video and audio objects first
                if "video" in locals():
                    video.close()
                if "audio" in locals() and audio:
                    audio.close()

                # Wait a bit before trying to delete
                import time

                time.sleep(0.1)

                os.unlink(tmp_file_path)
            except (OSError, PermissionError):
                pass

    except ImportError:
        logger.error(
            "MoviePy not installed. Please install it with: pip install moviepy"
        )
        return None
    except Exception as e:
        logger.error(f"Error extracting audio: {str(e)}")
        return None


def transcribe_audio(audio_bytes: bytes) -> str:
    """
    Transcribe audio to text for text sentiment analysis.

    Args:
        audio_bytes: Audio bytes in WAV format

    Returns:
        Transcribed text string
    """
    if audio_bytes is None:
        return ""

    try:
        import tempfile
        import speech_recognition as sr

        # Save audio bytes to temporary file
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
            tmp_file.write(audio_bytes)
            tmp_file_path = tmp_file.name

        try:
            # Initialize recognizer
            recognizer = sr.Recognizer()

            # Load audio file
            with sr.AudioFile(tmp_file_path) as source:
                # Read audio data
                audio_data = recognizer.record(source)

                # Transcribe using Google Speech Recognition
                try:
                    text = recognizer.recognize_google(audio_data)
                    return text
                except sr.UnknownValueError:
                    logger.warning("Speech could not be understood")
                    return ""
                except sr.RequestError as e:
                    logger.error(
                        f"Could not request results from speech recognition service: {e}"
                    )
                    return ""

        finally:
            # Clean up temporary file
            try:
                os.unlink(tmp_file_path)
            except (OSError, PermissionError):
                pass

    except ImportError:
        logger.error(
            "SpeechRecognition not installed. Please install it with: pip install SpeechRecognition"
        )
        return ""
    except Exception as e:
        logger.error(f"Error transcribing audio: {str(e)}")
        return ""