File size: 2,929 Bytes
4dff442
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
from workers.celery_app import celery_app
from workers.ml_pipeline import Siglip2EmbeddingPipeline
from app.LLD.ffmpeg_strategy import LocalFFmpegStrategy
from app.LLD.qdrant_strategy import QdrantVectorStoreStrategy
from app.core.config import settings

# Global placeholders for safe, lazy runtime evaluation
ffmpeg_strategy = None
vector_store = None
ai_engine_pipeline = None

def get_ffmpeg_strategy():
    global ffmpeg_strategy
    if ffmpeg_strategy is None:
        ffmpeg_strategy = LocalFFmpegStrategy()
    return ffmpeg_strategy

def get_vector_store():
    global vector_store
    if vector_store is None:
        vector_store = QdrantVectorStoreStrategy()
    return vector_store

def load_ai_engine():
    global ai_engine_pipeline
    if ai_engine_pipeline is None:
        ai_engine_pipeline = Siglip2EmbeddingPipeline()
    return ai_engine_pipeline


@celery_app.task(name="workers.tasks.process_video_pipeline")
def process_video_pipeline(video_id: str, raw_file_path: str) -> bool:
    print(f"[WORKER CHOREOGRAPHER] Commencing processing pipeline layout for ID: {video_id}")
    output_dir = os.path.join(settings.OUTPUT_DIR, video_id)
    
    # Instantiate strategies inside the execution block instead of the import layer
    ffmpeg_engine = get_ffmpeg_strategy()
    db_vector_store = get_vector_store()
    ai_model = load_ai_engine()
    
    try:
        # Step 1: HLS transcoding
        playlist_path = ffmpeg_engine.transcode_to_hls(raw_file_path, output_dir)
        print(f"[WORKER] Transcoding complete: {playlist_path}")
        
        # Step 2: Keyframe extraction
        frames_metadata = ffmpeg_engine.extract_keyframes(raw_file_path, output_dir, interval_seconds=1)
        print(f"[WORKER] Extracted {len(frames_metadata)} frames.")
        
        if not frames_metadata:
            return False
            
        # Step 3: SigLIP 2 Batch Matrix Encoding
        frame_paths = [item["file_path"] for item in frames_metadata]
        batch_size = 16
        all_computed_vectors = []
        
        for i in range(0, len(frame_paths), batch_size):
            chunk_paths = frame_paths[i:i + batch_size]
            chunk_vectors = ai_model.get_image_batch_embeddings(chunk_paths)
            all_computed_vectors.extend(chunk_vectors)
            
        # Step 4: Sync to Qdrant Space
        return db_vector_store.upsert_embeddings(
            video_id=video_id,
            embeddings=all_computed_vectors,
            metadata=frames_metadata
        )
            
    except Exception as e:
        print(f"[WORKER CRITICAL SHUTDOWN] Ingestion routine dropped: {str(e)}")
        return False


@celery_app.task(name="workers.tasks.generate_text_embedding")
def generate_text_embedding(query_text: str) -> list[float]:
    # Dynamic instantiation on user search trigger call
    ai_model = load_ai_engine()
    return ai_model.get_text_embedding(query_text)