Spaces:
Running
Running
File size: 2,929 Bytes
4dff442 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | import os
from workers.celery_app import celery_app
from workers.ml_pipeline import Siglip2EmbeddingPipeline
from app.LLD.ffmpeg_strategy import LocalFFmpegStrategy
from app.LLD.qdrant_strategy import QdrantVectorStoreStrategy
from app.core.config import settings
# Global placeholders for safe, lazy runtime evaluation
ffmpeg_strategy = None
vector_store = None
ai_engine_pipeline = None
def get_ffmpeg_strategy():
global ffmpeg_strategy
if ffmpeg_strategy is None:
ffmpeg_strategy = LocalFFmpegStrategy()
return ffmpeg_strategy
def get_vector_store():
global vector_store
if vector_store is None:
vector_store = QdrantVectorStoreStrategy()
return vector_store
def load_ai_engine():
global ai_engine_pipeline
if ai_engine_pipeline is None:
ai_engine_pipeline = Siglip2EmbeddingPipeline()
return ai_engine_pipeline
@celery_app.task(name="workers.tasks.process_video_pipeline")
def process_video_pipeline(video_id: str, raw_file_path: str) -> bool:
print(f"[WORKER CHOREOGRAPHER] Commencing processing pipeline layout for ID: {video_id}")
output_dir = os.path.join(settings.OUTPUT_DIR, video_id)
# Instantiate strategies inside the execution block instead of the import layer
ffmpeg_engine = get_ffmpeg_strategy()
db_vector_store = get_vector_store()
ai_model = load_ai_engine()
try:
# Step 1: HLS transcoding
playlist_path = ffmpeg_engine.transcode_to_hls(raw_file_path, output_dir)
print(f"[WORKER] Transcoding complete: {playlist_path}")
# Step 2: Keyframe extraction
frames_metadata = ffmpeg_engine.extract_keyframes(raw_file_path, output_dir, interval_seconds=1)
print(f"[WORKER] Extracted {len(frames_metadata)} frames.")
if not frames_metadata:
return False
# Step 3: SigLIP 2 Batch Matrix Encoding
frame_paths = [item["file_path"] for item in frames_metadata]
batch_size = 16
all_computed_vectors = []
for i in range(0, len(frame_paths), batch_size):
chunk_paths = frame_paths[i:i + batch_size]
chunk_vectors = ai_model.get_image_batch_embeddings(chunk_paths)
all_computed_vectors.extend(chunk_vectors)
# Step 4: Sync to Qdrant Space
return db_vector_store.upsert_embeddings(
video_id=video_id,
embeddings=all_computed_vectors,
metadata=frames_metadata
)
except Exception as e:
print(f"[WORKER CRITICAL SHUTDOWN] Ingestion routine dropped: {str(e)}")
return False
@celery_app.task(name="workers.tasks.generate_text_embedding")
def generate_text_embedding(query_text: str) -> list[float]:
# Dynamic instantiation on user search trigger call
ai_model = load_ai_engine()
return ai_model.get_text_embedding(query_text) |