Spaces:
Sleeping
Sleeping
| import os | |
| import torch | |
| import cv2 | |
| import instaloader | |
| from PIL import Image | |
| from transformers import AutoProcessor, AutoModelForCausalLM | |
| from typing import Optional, List, Dict, Union | |
| import streamlit as st | |
| def download_instagram_reels(hashtag, num_reels=1, username="your_username", password="your_password"): | |
| # Remove previous downloads if they exist | |
| os.system("rm -rf downloaded_reels") | |
| os.makedirs("downloaded_reels", exist_ok=True) | |
| loader = instaloader.Instaloader(download_videos=True, download_video_thumbnails=True, download_comments=True) | |
| try: | |
| # Login to Instagram | |
| loader.login(username, password) | |
| # Get posts by hashtag | |
| posts = instaloader.Hashtag.from_name(loader.context, hashtag).get_posts() | |
| reel_urls = [] | |
| for post in posts: | |
| if post.is_video: | |
| reel_urls.append(post.url) | |
| if len(reel_urls) >= num_reels: | |
| break | |
| for reel_url in reel_urls: | |
| shortcode = reel_url.split('/')[-2] | |
| post = instaloader.Post.from_shortcode(loader.context, shortcode) | |
| loader.download_post(post, target='downloaded_reels') | |
| # Find the video file name | |
| video_files = [f for f in os.listdir('downloaded_reels') if f.endswith('.mp4')] | |
| if not video_files: | |
| raise ValueError("No video file found in the downloaded reels.") | |
| return [os.path.join('downloaded_reels', video_files[i]) for i in range(0, len(video_files))], reel_urls | |
| except Exception as e: | |
| print(f"Error downloading reels: {e}") | |
| return [], [] | |
| def parse_query_with_groq( | |
| query: str, | |
| groq_api_key: str, | |
| seed: int = 42, | |
| llama_model: str = "llama-3.2-11b-text-preview" | |
| ) -> Optional[str]: | |
| """ | |
| Enhanced sentiment analysis with Groq API | |
| Args: | |
| query: Input text for sentiment analysis | |
| groq_api_key: API key for Groq | |
| seed: Random seed for reproducibility | |
| llama_model: Model identifier | |
| """ | |
| url = "https://api.groq.com/openai/v1/chat/completions" | |
| # Normalize query | |
| #query = ' '.join(query.lower().split()) | |
| headers = { | |
| "Authorization": f"Bearer {groq_api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| system_message = """You are a precise sentiment analysis assistant. | |
| Analyze the user_prompt and provide a JSON-formatted list of objects, where each object contains: | |
| - sentiment_score: a float between -1 (very negative) and 1 (very positive) | |
| - frame_index: the corresponding frame index | |
| Strictly follow this JSON format: | |
| [ | |
| {"sentiment_score": <float>, "frame_index": <int>}, | |
| ... | |
| ] | |
| """ | |
| payload = { | |
| "model": llama_model, | |
| "response_format": { | |
| "type": "json_schema", | |
| "json_schema": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "sentiment_score": {"type": "number"}, | |
| "frame_index": {"type": "integer"} | |
| }, | |
| "required": ["sentiment_score", "frame_index"] | |
| } | |
| } | |
| }, | |
| "messages": [ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": query} | |
| ], | |
| "temperature": 0, | |
| "max_tokens": 300, | |
| "seed": seed | |
| } | |
| try: | |
| response = requests.post(url, headers=headers, json=payload, timeout=30) | |
| response.raise_for_status() | |
| print(f"DEBUG : Raw Response is {response}") | |
| parsed_response = response.json()['choices'][0]['message']['content'] | |
| print(f"DEBUG : Raw Response is {parsed_response}") | |
| return parsed_response | |
| except Exception as e: | |
| print(f"Sentiment Analysis Error: {e}") | |
| return None | |
| def extract_frames(video_path, output_folder, fps=1): | |
| # Create the output folder if it doesn't exist | |
| os.makedirs(output_folder, exist_ok=True) | |
| # Open the video file | |
| cap = cv2.VideoCapture(video_path) | |
| # Check if the video was opened successfully | |
| if not cap.isOpened(): | |
| print(f"Error: Could not open video file {video_path}") | |
| return | |
| # Get the frames per second of the video | |
| video_fps = cap.get(cv2.CAP_PROP_FPS) | |
| # Calculate the interval between frames to capture based on desired fps | |
| frame_interval = int(video_fps / fps) | |
| count = 0 | |
| frame_count = 0 | |
| time_stamps = [] | |
| while True: | |
| # Read a frame from the video | |
| ret, frame = cap.read() | |
| # Break the loop if there are no more frames | |
| if not ret: | |
| break | |
| # Save every 'frame_interval' frame | |
| if count % frame_interval == 0: | |
| frame_filename = os.path.join(output_folder, f"image{frame_count}.jpg") | |
| cv2.imwrite(frame_filename, frame) | |
| print(f"Extracted: {frame_filename}") | |
| frame_count += 1 | |
| time_stamps.append(count/video_fps) | |
| count += 1 | |
| # Release the video capture object | |
| cap.release() | |
| print("Frame extraction completed.") | |
| return frame_count, time_stamps | |
| def download_instagram_reel_old(reel_url, username="shivani.sharma2814@gmail.com", password="instagram@123"): | |
| # Remove previous downloads if they exist | |
| os.system("rm -rf downloaded_reels") | |
| os.makedirs("downloaded_reels", exist_ok=True) | |
| # Create an instance of Instaloader | |
| print(f"Creating instance of instaloader") | |
| loader = instaloader.Instaloader( | |
| download_videos=True, | |
| download_video_thumbnails=True, | |
| download_comments=True | |
| ) | |
| try: | |
| # Login to Instagram | |
| loader.login(username, password) | |
| # Extract the shortcode from the URL | |
| shortcode = reel_url.split('/')[-2] | |
| # Download the reel using the shortcode | |
| post = instaloader.Post.from_shortcode(loader.context, shortcode) | |
| loader.download_post(post, target='downloaded_reels') | |
| # Extract comments | |
| comments = post.get_comments() | |
| print(f"Comments are : {comments}") | |
| for comment in comments: | |
| print(f"{comment.owner.username}: {comment.text}") | |
| # Find the video file name | |
| video_files = [f for f in os.listdir('downloaded_reels') if f.endswith('.mp4')] | |
| if not video_files: | |
| raise ValueError("No video file found in the downloaded reels.") | |
| return os.path.join('downloaded_reels', video_files[0]) | |
| except Exception as e: | |
| print(f"Error downloading reel: {e}") | |
| return None | |
| def analyze_frames_with_florence(image_folder, timestamps): | |
| # Set up device and dtype | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| # Load Florence-2 model | |
| model = AutoModelForCausalLM.from_pretrained( | |
| "microsoft/Florence-2-large", | |
| torch_dtype=torch_dtype, | |
| trust_remote_code=True | |
| ).to(device) | |
| processor = AutoProcessor.from_pretrained( | |
| "microsoft/Florence-2-large", | |
| trust_remote_code=True | |
| ) | |
| prompt = "<DETAILED_CAPTION>" | |
| # Collect frame analysis results | |
| frame_analyses = [] | |
| # Iterate through all images in the specified folder | |
| N = len(os.listdir(image_folder)) # Count number of images in the folder | |
| for i in range(N): | |
| image_path = os.path.join(image_folder, f"image{i}.jpg") | |
| image = Image.open(image_path) | |
| inputs = processor(text=prompt, images=image, return_tensors="pt").to(device) | |
| generated_ids = model.generate( | |
| input_ids=inputs["input_ids"], | |
| pixel_values=inputs["pixel_values"], | |
| max_new_tokens=1024, | |
| num_beams=3, | |
| do_sample=False | |
| ) | |
| generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0] | |
| parsed_answer = processor.post_process_generation( | |
| generated_text, | |
| task=prompt, | |
| image_size=(image.width, image.height) | |
| ) | |
| frame_analyses.append({ | |
| 'Frame_Index': i, | |
| 'Caption': parsed_answer | |
| }) | |
| print(f"Frame {i}, TimeStamp {timestamps[i]} sec : {parsed_answer}") | |
| return frame_analyses | |
| def main(): | |
| # Specify the URL of the reel | |
| reel_url = "https://www.instagram.com/purnagummies/reel/C7RRVstqtwY/" | |
| fps = 0.5 | |
| # Download the reel | |
| st.title("BrandScan") | |
| hashtag = st.text_input("Enter the hashtag (without #):", "purnagummies") | |
| video_paths = [] | |
| if st.button("Download Reels"): | |
| if hashtag: | |
| with st.spinner("Downloading reels..."): | |
| video_paths, reel_urls = download_instagram_reels(hashtag) | |
| if reel_urls: | |
| st.success(f"Downloaded {len(video_paths)} reels:") | |
| for url in reel_urls: | |
| st.write(url) | |
| else: | |
| st.error("No reels found or an error occurred.") | |
| else: | |
| st.error("Please enter a valid hashtag.") | |
| #video_path = download_instagram_reel(reel_urls[0]) | |
| if len(video_paths) == 0: | |
| print("Failed to download the reel.") | |
| return | |
| #video_path | |
| video_path = video_paths[0] | |
| # Collect images from the video | |
| image_folder = "downloaded_reels/images" | |
| os.makedirs(image_folder, exist_ok=True) | |
| # Extract frames from the video | |
| N, timestamps = extract_frames(video_path, image_folder, fps) | |
| print(f"Analyzing video {video_path} with {N} frames extracted at {fps} frames per second") | |
| # Analyze frames with Florence-2 | |
| frame_analyses = analyze_frames_with_florence(image_folder, timestamps) | |
| # Optional: You can further process or store the frame_analyses as needed | |
| print("Frame analysis completed.") | |
| frame_analyses_str = "<Frame_Index>; <Description>\n" | |
| for item in frame_analyses: | |
| frame_analyses_str += item['Frame_Index'] + "; " + item['Caption'] + "\n" | |
| print(frame_analyses_str) | |
| sentiment_analysis = parse_query_with_groq(frame_analyses_str, os.getenv("GROQ_API_KEY")) | |
| print("Sentiment Analysis on the video:") | |
| print(sentiment_analysis) | |
| if __name__ == "__main__": | |
| main() |