| import os |
| import gradio as gr |
| from google.cloud import videointelligence, speech, storage |
| import io |
| import json |
| import cv2 |
| import torch |
| import clip |
| from PIL import Image |
| from transformers import Blip2Processor, Blip2ForConditionalGeneration |
| import openai |
| import wave |
| from fastapi import FastAPI, File, UploadFile |
| from fastapi.responses import JSONResponse |
| import uvicorn |
| from pydantic import BaseModel |
|
|
| clip_loaded, blip_loaded = False, False |
| cred_file = "<PASTE THE PATH TO YOUR GOOGLE CREDENTIALS JSON FILE HERE>" |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cred_file |
| os.environ["OPENAI_API_KEY"] = "<PASTE YOUR OPENAI KEY HERE>" |
| openai_api_key = "<PASTE YOUR OPENAI KEY HERE>" |
|
|
|
|
| def get_timestamps(video): |
|
|
| tiktok_vid = video |
|
|
| ffmpeg_command = """ffmpeg -i tiktokvideo -filter:v "select='gt(scene,0.2)',showinfo" -f null - 2> ffout""" |
| ffmpeg_command = ffmpeg_command.replace("tiktokvideo", tiktok_vid) |
|
|
| grep_command = """grep showinfo ffout | grep 'pts_time:[0-9.]*' -o | grep '[0-9]*\.[0-9]*' -o > timestamps.txt""" |
|
|
| os.system(ffmpeg_command) |
| os.system(grep_command) |
|
|
| with open('timestamps.txt', "r") as t: |
| times = [0] + [float(k) for k in t.read().split("\n") if k] |
|
|
| times_output = "Times: " |
| print(times) |
| for time in times: |
| times_output += str(time) + ", " |
|
|
| return times_output |
|
|
| def get_text_annotations(video, cred_file): |
| os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = cred_file |
|
|
| |
| |
| video_client = videointelligence.VideoIntelligenceServiceClient() |
| features = [videointelligence.Feature.TEXT_DETECTION] |
| video_context = videointelligence.VideoContext() |
|
|
| with io.open(video, "rb") as file: |
| input_content = file.read() |
|
|
| operation = video_client.annotate_video( |
| request={ |
| "features": features, |
| "input_content": input_content, |
| "video_context": video_context, |
| } |
| ) |
|
|
| print("\nProcessing video for text detection.") |
| result = operation.result(timeout=300) |
|
|
| |
| annotation_result = result.annotation_results[0] |
|
|
| |
| |
| text_annotation_json = [] |
|
|
| for text_annotation in annotation_result.text_annotations: |
|
|
| text_segment = text_annotation.segments[0] |
| start_time = text_segment.segment.start_time_offset |
| end_time = text_segment.segment.end_time_offset |
|
|
| frame = text_segment.frames[0] |
| time_offset = frame.time_offset |
|
|
| current_text_annotation_json = { |
| "text": text_annotation.text, |
| "start": start_time.seconds + start_time.microseconds * 1e-6, |
| "end": end_time.seconds + end_time.microseconds * 1e-6, |
| "confidence": text_segment.confidence, |
| "vertecies": [] |
| } |
|
|
| for vertex in frame.rotated_bounding_box.vertices: |
| current_text_annotation_json["vertecies"].append([vertex.x, vertex.y]) |
| text_annotation_json.append(current_text_annotation_json) |
|
|
| out = [] |
|
|
| for text_annotation in annotation_result.text_annotations: |
|
|
| text_segment = text_annotation.segments[0] |
| start_time = text_segment.segment.start_time_offset |
| end_time = text_segment.segment.end_time_offset |
|
|
| start_time_s = start_time.seconds + start_time.microseconds * 1e-6 |
| end_time_s = end_time.seconds + end_time.microseconds * 1e-6 |
| confidence = text_segment.confidence |
|
|
| frame = text_segment.frames[0] |
| top_left = frame.rotated_bounding_box.vertices[0] |
|
|
| out.append([start_time_s, end_time_s, text_annotation.text, confidence, top_left.y]) |
|
|
| simple_text = [k for k in sorted(out, key= lambda k: k[0] + k[4]) if k[3] > 0.95] |
|
|
| for s in simple_text: |
| print(s) |
|
|
| with open('annotation.json', 'w') as f: |
| json.dump(text_annotation_json, f, indent=4) |
|
|
| with open('simple_annotation.json', 'w') as f: |
| json.dump(simple_text, f, indent=4) |
|
|
| def transcribe_video(video, cred_file): |
|
|
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cred_file |
|
|
| if os.path.exists("output_audio.wav"): |
| os.remove("output_audio.wav") |
| else: |
| print("NOT THERE") |
|
|
| wav_cmd = f"ffmpeg -i {video} output_audio.wav" |
| os.system(wav_cmd) |
|
|
| print(os.path.exists("output_audio.wav")) |
|
|
| gcs_uri = upload_file_to_bucket("output_audio.wav", cred_file) |
|
|
| speech_client = speech.SpeechClient() |
|
|
| with open("output_audio.wav", "rb") as f: |
| audio_content = f.read() |
|
|
|
|
| audio = speech.RecognitionAudio(uri=gcs_uri) |
| sample_rate_hertz, audio_channel_count = wav_data("output_audio.wav") |
|
|
| config = speech.RecognitionConfig( |
| encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, |
| sample_rate_hertz=sample_rate_hertz, |
| audio_channel_count=audio_channel_count, |
| language_code="en-US", |
| model="video", |
| enable_word_time_offsets=True, |
| enable_automatic_punctuation=True, |
| enable_word_confidence=True |
| ) |
|
|
| request = speech.LongRunningRecognizeRequest( |
| config=config, |
| audio=audio |
| ) |
|
|
| operation = speech_client.long_running_recognize(request=request) |
|
|
| print("Waiting for operation to complete...") |
|
|
| response = operation.result(timeout=600) |
|
|
| out = [] |
| for i, result in enumerate(response.results): |
| alternative = result.alternatives[0] |
|
|
| if len(alternative.words) > 0: |
| alt_start = alternative.words[0].start_time.seconds + alternative.words[0].start_time.microseconds * 1e-6 |
| alt_end = alternative.words[-1].end_time.seconds + alternative.words[-1].end_time.microseconds * 1e-6 |
|
|
| for word in alternative.words: |
| out.append([word.word, |
| word.start_time.seconds + word.start_time.microseconds * 1e-6, |
| word.end_time.seconds + word.end_time.microseconds * 1e-6, |
| word.confidence]) |
|
|
| simple_text = [k for k in sorted(out, key= lambda k: k[1])] |
| for s in simple_text: |
| print(s) |
|
|
| with open("speech_transcriptions.json", "w") as f: |
| json.dump(simple_text, f, indent=4) |
| |
| return simple_text |
|
|
| def wav_data(wav_file): |
|
|
| with wave.open(wav_file, 'rb') as wf: |
| sample_rate_hertz = wf.getframerate() |
| audio_channel_count = wf.getnchannels() |
| |
| return sample_rate_hertz, audio_channel_count |
|
|
| def get_shot_frames(video, shot_text): |
| cam = cv2.VideoCapture(video) |
| fps = cam.get(cv2.CAP_PROP_FPS) |
| frame_count = int(cam.get(cv2.CAP_PROP_FRAME_COUNT)) |
| duration = frame_count/fps |
|
|
| with open('timestamps.txt', 'r') as t: |
| times = [0] + [float(k) for k in t.read().split('\n') if k] |
| print("Times: ", times) |
|
|
| with open('simple_annotation.json', 'r') as f: |
| simple_text = json.load(f) |
|
|
| with open('speech_transcriptions.json', 'r') as f: |
| transcriptions = json.load(f) |
|
|
| for i, time in enumerate(times): |
| current_time = time |
| next_time = times[i + 1] if i < len(times) - 1 else duration |
|
|
| rel_text = [s for s in simple_text if s[0] >= current_time and s[0] < next_time] |
| plain_rel_text = ' '.join([s[2] for s in rel_text]) |
|
|
| rel_transcriptions = [t for t in transcriptions if t[1] >= current_time and t[1] < next_time] |
| plain_transcriptions = ' '.join([t[0] for t in rel_transcriptions]) |
|
|
| shot_text.append({ |
| "start": current_time, |
| "end": next_time, |
| "text_on_screen": plain_rel_text, |
| "transcript_text": plain_transcriptions |
| }) |
|
|
| frames = [] |
| for i, shot in enumerate(shot_text): |
| keyframe_time = (shot["end"] - shot["start"])/2 + shot["start"] |
| cam.set(1, int(fps * (keyframe_time))) |
| ret, frame = cam.read() |
|
|
| if ret: |
| cv2.imwrite('shot' + str(i) + '.png', frame) |
| frame_copy = Image.fromarray(frame).convert('RGB') |
| frames.append(frame_copy) |
|
|
| return frames |
|
|
|
|
| def load_clip_model(): |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| clip_model, preprocess = clip.load('ViT-B/32', device=device) |
|
|
| return clip_model, preprocess, device |
|
|
| def clip_score(fn, text_list, clip_model, preprocess, clip_device): |
| fn.show() |
| image = preprocess(fn).unsqueeze(0).to(clip_device) |
| text = clip.tokenize(text_list).to(clip_device) |
|
|
| with torch.no_grad(): |
| image_features = clip_model.encode_image(image) |
| text_features = clip_model.encode_text(text) |
|
|
| logits_per_image, logits_per_text = clip_model(image, text) |
| probs = logits_per_image.softmax(dim=-1).cpu().numpy() |
|
|
| return probs |
|
|
|
|
| def load_blip_model(): |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
|
| processor = Blip2Processor.from_pretrained('Salesforce/blip2-flan-t5-xxl') |
| model = Blip2ForConditionalGeneration.from_pretrained( |
| 'Salesforce/blip2-flan-t5-xxl', torch_dtype=torch.float16 |
| ) |
|
|
| model = model.to(device) |
|
|
| return model, processor, device |
|
|
| def run_blip(shot_text, frames, model, processor, device, clip_model, preprocess, clip_device): |
| |
|
|
| for i, shot in enumerate(shot_text): |
| if not os.path.exists(f"shot{i}.png"): |
| shot_text[i]["image_captions"] = ["" for _ in range(5)] |
| shot_text[i]["image_captions_clip"] = [{"text": "", "score": 0.0} for _ in range(5)] |
| continue |
|
|
| image = Image.open(f"shot{i}.png").convert('RGB') |
|
|
| with torch.no_grad(): |
| |
| gen_texts = [] |
| for j in range(5): |
| inputs = processor(images=image, return_tensors="pt").to(device, torch.float16) |
| generated_ids = model.generate(**inputs, min_length=5, max_length=20, do_sample=True, top_p=0.9) |
| generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() |
| gen_texts.append(generated_text) |
|
|
| image.show() |
| shot_text[i]["image_captions"] = [gen_texts[j] for j in range(len(gen_texts))] |
| print(shot_text[i]["image_captions"]) |
|
|
| clip_scores = clip_score(image.copy(), shot_text[i]["image_captions"], clip_model, preprocess, clip_device)[0] |
| print(clip_scores) |
| shot_text[i]["image_captions_clip"] = [{"text": shot_text[i]["image_captions"][j], |
| "score": float(clip_scores[j])} for j in range(len(shot_text[i]["image_captions"]))] |
|
|
| shot_text[i]["image_captions_clip"] = sorted(shot_text[i]["image_captions_clip"], key=lambda x: x["score"] * -1) |
|
|
| for s in shot_text[i]["image_captions_clip"]: |
| print(s) |
|
|
| shot_text[i]["image_captions"] = [t["text"] for t in shot_text[i]["image_captions_clip"] if "caption" not in t["text"]] |
| |
| for i, shot in enumerate(shot_text): |
| if os.path.exists(f"shot{i}.png"): |
| os.remove(f"shot{i}.png") |
|
|
| return shot_text |
|
|
| def get_summaries(summary_input, openai_key): |
| gpt_system_prompt = f'''Your task is to generate a summary paragraph for an entire short-form video based on data extracted from the video. Your summary must be a holistic description of the full video. \n |
| |
| The text in quotations defines the format of the data that I will provide you. The video data comprises of data extracted from all shots of the video.\n |
| The data is formatted in the structure defined in the quotations:\n |
| "\n |
| SHOT NUMBER |
| Duration: the number of seconds that the shot lasts |
| Text on screen: Any text that appears in the shot |
| Shot audio transcript: Any speech that is in the shot |
| Shot description: A short visual description of what is happening in the shot |
| "\n |
| ''' |
|
|
| gpt_user_prompt = f'''Perform this video summarization task for the video below, where the data is delimited by triple quotations.\n |
| Video: \n"""{summary_input}"""\n ''' |
|
|
| messages = [{"role": "system", "content": gpt_system_prompt}, |
| {"role": "user", "content": gpt_user_prompt}] |
| responses = [] |
|
|
| response = openai.ChatCompletion.create( |
| model='gpt-4', |
| messages=messages |
| ) |
|
|
| messages.append(response.choices[0].message) |
| responses.append(response.choices[0].message["content"]) |
|
|
| for word_limit in [50, 25, 10]: |
|
|
| condense_prompt = f'''Condense the summary below such that the response adheres to a {word_limit} word limit.\n |
| Summary: """ {response.choices[0].message["content"]} """\n''' |
|
|
| messages.append({"role": "user", "content": condense_prompt}) |
|
|
| response = openai.ChatCompletion.create( |
| model='gpt-4', |
| messages=messages |
| ) |
|
|
| messages.append(response.choices[0].message) |
| responses.append(response.choices[0].message["content"]) |
|
|
| return responses |
|
|
| def get_shot_summaries(summary_input, openai_key): |
|
|
| gpt_system_prompt = f'''Your task is to generate a summary for each shot of a short-form video based on data extracted from the video.\n |
| |
| The text in quotations defines the format of the data that I will provide you. The video data comprises of data extracted from all shots of the video.\n |
| The data is formatted in the structure defined in the quotations:\n |
| "\n |
| SHOT NUMBER |
| Duration: the number of seconds that the shot lasts |
| Text on screen: Any text that appears in the shot |
| Shot audio transcript: Any speech that is in the shot |
| Shot description: A short visual description of what is happening in the shot |
| "\n |
| |
| All of the summaries you create must satisfy the following constraints:\n |
| |
| 1. If the field for text on screen is empty, do not include references to text on screen in the summary.\n |
| 2. If the field for shot audio transcript is empty, do not include references to shot audio transcript in the summary.\n |
| 3. If the field for shot description is empty, do not include references to the shot description in the summary.\n |
| 4. If the field for shot description is empty, do not include references to shot description in the summary.\n |
| 5. Do not include references to Tiktok logos or Tiktok usernames in the summary.\n |
| |
| There must be a summary for every shot in the data. |
| |
| Provide the summaries in a newline-separated format. There must be exactly one summary for every shot.\n |
| You must strictly follow the format inside the quotations.\n |
| |
| "Your first summary\n |
| Your second summary\n |
| Your third summary\n |
| More of your summaries...\n |
| Your last summary\n |
| " |
| |
| ''' |
|
|
| gpt_user_prompt = f'''Perform this summarization task for the video below, where the data is delimited by triple quotations.\n |
| Video: \n"""{summary_input}"""\n ''' |
|
|
|
|
| messages = [{"role": "system", "content": gpt_system_prompt}, |
| {"role": "user", "content": gpt_user_prompt}] |
| responses = [] |
|
|
| response = openai.ChatCompletion.create( |
| model='gpt-4', |
| messages=messages |
| ) |
|
|
| messages.append(response.choices[0].message) |
| responses.append(response.choices[0].message["content"]) |
|
|
| responses[0] = responses[0].strip() |
| shot_summary_list = [shot_summ.strip().strip('[]') for shot_summ in responses[0].split("\n") |
| if shot_summ.strip().strip('[]') != "" and shot_summ.strip().strip('[]') != " "] |
|
|
| print(responses[0]) |
| print() |
| print(shot_summary_list) |
| print() |
|
|
| return shot_summary_list |
|
|
| def upload_file_to_bucket(filename, cred_file): |
| storage_client = storage.Client.from_service_account_json( |
| cred_file, |
| project="short-video-descriptions") |
| |
| bucket_name = "short-video-descriptions" |
| destination_blob_name = filename |
| bucket = storage_client.get_bucket(bucket_name) |
| blob = bucket.blob(destination_blob_name) |
|
|
| blob.upload_from_filename(filename) |
|
|
| return f"gs://{bucket_name}/{destination_blob_name}" |
|
|
|
|
| def blob_exists(filename, cred_file): |
| storage_client = storage.Client.from_service_account_json( |
| cred_file, |
| project="short-video-descriptions") |
| |
| bucket_name = 'short-video-descriptions' |
| bucket = storage_client.bucket(bucket_name) |
| stats = storage.Blob(bucket=bucket, name=filename).exists(storage_client) |
|
|
| return stats |
|
|
| def del_blob(blob_name, cred_file): |
| storage_client = storage.Client.from_service_account_json( |
| cred_file, |
| project="short-video-descriptions") |
|
|
| bucket = storage_client.bucket("short-video-descriptions") |
| blob = bucket.blob(blob_name) |
| generation_match_precondition = None |
|
|
| |
| |
| |
| blob.reload() |
| generation_match_precondition = blob.generation |
|
|
| blob.delete(if_generation_match=generation_match_precondition) |
|
|
| print(f"Blob {blob_name} deleted.") |
|
|
| def get_summary_input(shot_text): |
| summ_input = "" |
| for i, s in enumerate(shot_text): |
| summ_input += f"SHOT {i + 1}\n" |
| summ_input += f"Duration: {round(s['end'] - s['start'])} seconds\n" |
| summ_input += f"Text on screen: {s['text_on_screen']}\n" |
| summ_input += f"Shot audio transcript: {s['transcript_text']}\n" |
| summ_input += f"Shot description: {s['image_captions'][0] if len(s['image_captions']) > 0 else ''}\n" |
| summ_input += "\n" |
|
|
| return summ_input |
|
|
| def get_video_data(video, transcript, cred_file): |
| shot_text = [] |
| timestamps_output = get_timestamps(video) |
| get_text_annotations(video, cred_file.name) |
| transcribe_video(video, cred_file.name) |
| frames = get_shot_frames(video, shot_text) |
| shot_text = run_blip(shot_text, frames, model, processor, device, clip_model, preprocess, clip_device) |
|
|
| return shot_text |
|
|
| def get_video_information(video, cred_file, openai_key): |
| shot_text = [] |
| timestamps_output = get_timestamps(video) |
| get_text_annotations(video, cred_file.name) |
| transcribe_video(video, cred_file.name) |
| frames = get_shot_frames(video, shot_text) |
| shot_text = run_blip(shot_text, frames, model, processor, device, |
| clip_model, preprocess, clip_device) |
|
|
| print("FINAL INPUT") |
| print(shot_text) |
|
|
| with open('cur_shots.json', 'w') as f: |
| json.dump(shot_text, f, indent=4) |
|
|
| summary_input = get_summary_input(shot_text) |
| summaries = get_summaries(summary_input, openai_key) |
|
|
| print("ALL SUMMARIES") |
| for summary in summaries: |
| print(summary) |
|
|
| return (shot_text, summary_input) + (*summaries,) |
|
|
| def get_per_shot_information(video, cred_file, openai_key): |
| shot_text = [] |
| timestamps_output = get_timestamps(video) |
| get_text_annotations(video, cred_file.name) |
| transcribe_video(video, cred_file.name) |
| frames = get_shot_frames(video, shot_text) |
| |
| |
| shot_text = run_blip(shot_text, frames, model, processor, device, |
| clip_model, preprocess, clip_device) |
|
|
| print("FINAL INPUT") |
| print(shot_text) |
|
|
| with open('cur_shots.json', 'w') as f: |
| json.dump(shot_text, f, indent=4) |
|
|
| summary_input = get_summary_input(shot_text) |
| per_shot_summaries = get_shot_summaries(summary_input, openai_key) |
| per_shot_data = create_per_shot_dict(shot_text, per_shot_summaries) |
|
|
| return (per_shot_data, per_shot_summaries, summary_input) |
|
|
| def create_per_shot_dict(shot_text, per_shot_summaries): |
|
|
| for elem in per_shot_summaries: |
| print(elem) |
|
|
| per_shot_data = [] |
| for i, s in enumerate(shot_text): |
| cur_summ = "" |
| if i < len(per_shot_summaries): |
| cur_summ = per_shot_summaries[i] |
| per_shot_data.append({ |
| "start": s["start"], |
| "end": s["end"], |
| "text_on_screen": s["text_on_screen"], |
| "per_shot_summaries": cur_summ |
| }) |
|
|
| return per_shot_data |
|
|
| with gr.Blocks() as demo: |
| with gr.Row(): |
| video = gr.Video(label='Video To Describe', interactive=True) |
|
|
| with gr.Column(): |
| api_cred_file = gr.File(label='Google API Credentials File', file_types=['.json']) |
| openai_key = gr.Textbox(label="OpenAI API Key") |
|
|
|
|
| with gr.Row(): |
| summary_btn = gr.Button("Summarize Full Video") |
| summary_per_shot_btn = gr.Button("Summarize Each Shot") |
|
|
| with gr.Row(): |
| summary_input = gr.Textbox(label="Extracted Video Data") |
|
|
| with gr.Row(): |
| summary = gr.Textbox(label='Summary') |
| with gr.Column(): |
| summary_10 = gr.Textbox(label='10-word Summary') |
| summary_25 = gr.Textbox(label='25-word Summary') |
| summary_50 = gr.Textbox(label='50-word Summary') |
|
|
| with gr.Row(): |
| per_shot_summaries = gr.Textbox(label="Per Shot Summaries") |
|
|
| with gr.Row(): |
| shot_data = gr.JSON(label='Shot Data') |
|
|
| |
| inputs = [video, api_cred_file, openai_key] |
| outputs = [shot_data, summary_input, summary, summary_50, summary_25, summary_10] |
|
|
| summary_btn.click(fn=get_video_information, inputs=inputs, outputs=outputs) |
| summary_per_shot_btn.click(fn=get_per_shot_information, inputs=inputs, outputs=[shot_data, per_shot_summaries, summary_input]) |
|
|
|
|
|
|
| def analyze_video(video_id: str): |
| shot_text = [] |
|
|
| video_path = f"temporary_uploads/{video_id}.mp4" |
|
|
| timestamps_output = get_timestamps(video_path) |
| get_text_annotations(video_path, cred_file) |
| transcribe_video(video_path, cred_file) |
| frames = get_shot_frames(video_path, shot_text) |
| shot_text = run_blip(shot_text, frames, model, processor, device, clip_model, preprocess, clip_device) |
|
|
| return shot_text |
|
|
| def summarize_video(video_id: str): |
|
|
| video_path = f"temporary_uploads/{video_id}.mp4" |
| shot_text = analyze_video(video_id) |
| summary_input = get_summary_input(shot_text) |
| summaries = get_summaries(summary_input, openai_api_key) |
|
|
| summary_json = { |
| "video_description": summaries[0], |
| "summary_10": summaries[3], |
| "summary_25": summaries[2], |
| "summary_50": summaries[1] |
| } |
|
|
| return summary_json |
|
|
| def summarize_shots(video_id: str): |
|
|
| video_path = f"temporary_uploads/{video_id}.mp4" |
| shot_text = analyze_video(video_id) |
| summary_input = get_summary_input(shot_text) |
| per_shot_summaries = get_shot_summaries(summary_input, "") |
| per_shot_data = create_per_shot_dict(shot_text, per_shot_summaries) |
|
|
| return per_shot_data |
|
|
| app = FastAPI() |
| app = gr.mount_gradio_app(app, demo, path="/gradio") |
|
|
| @app.get("/") |
| async def read_main(): |
| return {"message": "Welcome to ShortVideoA11y! Go to https://utcs-hci-short-video-descriptions.hf.space/gradio for an interactive demo!"} |
|
|
| @app.get("/getVideoData/{video_id}") |
| async def create_video_data(video_id: str): |
| try: |
| shot_text = analyze_video(video_id) |
| return JSONResponse(content=shot_text) |
|
|
| except Exception as e: |
| error_content = {"error": str(e)} |
| return JSONResponse(content=error_content, status_code=400) |
|
|
| @app.get("/getShotSummaries/{video_id}") |
| async def create_shot_summaries(video_id: str): |
|
|
| per_shot_data = summarize_shots(video_id) |
| return JSONResponse(content=per_shot_data) |
|
|
| @app.get("/getVideoSummary/{video_id}") |
| async def create_video_summaries(video_id: str): |
|
|
| vid_summaries = summarize_video(video_id) |
| return JSONResponse(content=vid_summaries) |
|
|
| demo.queue() |
|
|
| if not clip_loaded: |
| clip_model, preprocess, clip_device = load_clip_model() |
| clip_loaded = True |
|
|
| if not blip_loaded: |
| model, processor, device = load_blip_model() |
| blip_loaded = True |
|
|