Spaces:
Running
Running
| from langchain_community.vectorstores import FAISS | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI | |
| from langchain.schema import Document | |
| from langchain.chains import RetrievalQA | |
| from langchain.chains.summarize import load_summarize_chain | |
| from langchain.prompts import PromptTemplate | |
| import aiohttp | |
| import re | |
| import os | |
| from sqlalchemy.orm import Session | |
| from ..db import crud | |
| import dotenv | |
| dotenv.load_dotenv() | |
| API_KEY_COMMENTS = os.getenv('API_KEY_COMMENTS') | |
| API_KEY_VIDEO = os.getenv('API_KEY_VIDEO') | |
| BASE_URL = "https://www.googleapis.com/youtube/v3" | |
| GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') | |
| embed_model = "gemini-embedding-001" | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-flash-lite-latest", | |
| google_api_key=GOOGLE_API_KEY, | |
| temperature=1.0, | |
| max_output_tokens=2048 | |
| ) | |
| tools = [{"google_search": {}}] | |
| llm_with_grounding = llm.bind_tools(tools) | |
| embedding_model = GoogleGenerativeAIEmbeddings( | |
| google_api_key=GOOGLE_API_KEY, | |
| model=embed_model | |
| ) | |
| local_cache = {} | |
| async def fetch_comments_data(video_id, max_results=100, order="relevance"): | |
| url = f"{BASE_URL}/commentThreads?part=snippet&videoId={video_id}&key={API_KEY_COMMENTS}&maxResults={max_results}&order={order}" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as response: | |
| if response.status == 200: | |
| return await response.json() | |
| return None | |
| async def fetch_channel_details(channel_id): | |
| url = f"{BASE_URL}/channels?part=snippet%2CcontentDetails%2Cstatistics&id={channel_id}&key={API_KEY_VIDEO}" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| if result["items"]: | |
| return result["items"][0].get("statistics", {}) | |
| return {} | |
| async def fetch_video_details(video_id): | |
| video_details_url = f"{BASE_URL}/videos?part=snippet,statistics,contentDetails&id={video_id}&key={API_KEY_VIDEO}" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(video_details_url) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| if result["items"]: | |
| video_details = result["items"][0] | |
| return video_details | |
| return None | |
| async def extract_comments(video_id): | |
| if video_id in local_cache and "Comments" in local_cache[video_id]: | |
| print(f"Using cached comments for video ID: {video_id}") | |
| return local_cache[video_id]["Comments"] | |
| video = await fetch_video_details(video_id) | |
| if not video: | |
| return [] | |
| com_cnt = int(video['statistics']['commentCount']) if 'commentCount' in video['statistics'] else 0 | |
| all_comments = [] | |
| # Fetch comments by relevance | |
| if com_cnt > 0: | |
| com_data_rel = await fetch_comments_data(video_id, min(100, com_cnt), "relevance") | |
| if com_data_rel and "items" in com_data_rel: | |
| for item in com_data_rel["items"]: | |
| snippet = item["snippet"]["topLevelComment"]["snippet"] | |
| all_comments.append({ | |
| "Author": snippet["authorDisplayName"], | |
| "CommentText": snippet["textOriginal"], | |
| "LikeCount": snippet["likeCount"], | |
| "PublishDate": snippet["publishedAt"], | |
| "AuthorLogoUrl": snippet["authorProfileImageUrl"], | |
| "SortBy": "relevance" | |
| }) | |
| # Fetch additional comments by time if needed | |
| if com_cnt > 100: | |
| remaining_comments = com_cnt - len(all_comments) | |
| if remaining_comments > 0: | |
| com_data_time = await fetch_comments_data(video_id, min(100, remaining_comments), "time") | |
| if com_data_time and "items" in com_data_time: | |
| for item in com_data_time["items"]: | |
| snippet = item["snippet"]["topLevelComment"]["snippet"] | |
| all_comments.append({ | |
| "Author": snippet["authorDisplayName"], | |
| "CommentText": snippet["textOriginal"], | |
| "LikeCount": snippet["likeCount"], | |
| "PublishDate": snippet["publishedAt"], | |
| "AuthorLogoUrl": snippet["authorProfileImageUrl"], | |
| "SortBy": "time" | |
| }) | |
| # Initialize cache structure for this video | |
| if video_id not in local_cache: | |
| local_cache[video_id] = {} | |
| local_cache[video_id]["Comments"] = all_comments | |
| return all_comments | |
| def remove_links(comment): | |
| return re.sub(r'https?://\S+|www\.\S+', '', comment).strip() | |
| def has_multiple_timestamps(comment): | |
| timestamps = re.findall(r'\d{1,2}(:\d{2}){1,3}', comment) | |
| return len(timestamps) > 3 | |
| def has_char_timestamps(comment): | |
| pattern = r'^\s*\d{1,2}(:\d{2}){1,3}[\s\u200B]*[-:|]*[\s\u200B]+[a-zA-Z]{2,}.*$' | |
| matches = re.findall(pattern, comment, flags=re.MULTILINE) | |
| return len(matches) > 3 | |
| def is_code_heavy(comment): | |
| comment = comment.strip() | |
| high_signal_keywords = re.findall( | |
| r'\b(def|class|return|import|lambda|function|const|var|=>|try|except|elif)\b', | |
| comment | |
| ) | |
| code_structures = re.findall(r'(==|===|{|}|\[|\]|::|->|=)', comment) | |
| indented_lines = re.findall(r'^\s{4,}', comment, re.MULTILINE) | |
| num_lines = comment.count('\n') + 1 | |
| score = 0 | |
| if len(high_signal_keywords) >= 2: | |
| score += 2 | |
| if len(code_structures) >= 3: | |
| score += 2 | |
| if len(indented_lines) >= 2: | |
| score += 2 | |
| if num_lines >= 3: | |
| score += 1 | |
| if len(high_signal_keywords) > 0 and len(code_structures) > 0: | |
| score += 1 | |
| return score >= 5 | |
| def clean_and_filter_comment(comment): | |
| comment = remove_links(comment) | |
| if is_code_heavy(comment): | |
| return None | |
| if has_multiple_timestamps(comment) and not has_char_timestamps(comment): | |
| return None | |
| if len(comment.strip()) == 0: | |
| return None | |
| return comment.strip() | |
| def process_comments(comment_list): | |
| cleaned_comments = [] | |
| for comment in comment_list: | |
| cleaned = clean_and_filter_comment(comment) | |
| if cleaned: | |
| cleaned_comments.append(cleaned) | |
| return cleaned_comments | |
| def format_comment(comment): | |
| return f"{comment['Author']}: {comment['CommentText']} (Likes: {comment['LikeCount']})" | |
| custom_prompt = PromptTemplate( | |
| input_variables=["text", "title", "channel_name"], | |
| template=""" | |
| IMPORTANT: Keep your entire response under 1000 tokens. Be concise. Focus on essential insights. Avoid over-explaining or repeating. | |
| You are a critical and insightful assistant summarizing YouTube comments. | |
| You are summarizing the comment section of a video titled: "{title}", published by the channel: "{channel_name}". | |
| Your tasks are to: | |
| 1. **Summarize**: Identify and summarize the main opinions, reactions, and themes across the comments. | |
| 2. **Highlight**: Highlight praise, criticism, and any notable disagreements or debates. | |
| 3. **Spot Interesting Comments**: Mention particularly insightful, surprising, or unique perspectives, especially if they contrast with the general opinion. | |
| 4. **Fact Check**: Identify any factual claims made in the comments. For each claim: | |
| - Evaluate whether it's accurate, misleading, false, or unverifiable. | |
| - Reference widely accepted knowledge or consensus to support your evaluation. | |
| - When possible, include the author's handle (e.g., @username) for clarity. | |
| Return your output in this format: | |
| **Summary of Opinions**: ... | |
| **Common Themes & Sentiments**: ... | |
| **Notable or Unique Comments**: ... | |
| **Fact Check Notes**: | |
| - @username: "Comment content or claim..." → ✅ True / ❌ False | |
| - Explanation: Only if a comment is False | |
| Comments are shown below, with author names starting with @: | |
| {text} | |
| Summary: | |
| """ | |
| ) | |
| def chunk_comments(comments, chunk_size=20): | |
| chunks = [] | |
| for i in range(0, len(comments), chunk_size): | |
| chunk = comments[i:i + chunk_size] | |
| if chunk: | |
| chunks.append(Document(page_content="\n".join(chunk))) | |
| return chunks | |
| def get_qa_prompt(summary): | |
| qa_prompt = PromptTemplate( | |
| input_variables=["context", "question"], | |
| template=f""" | |
| You are a sharp and knowledge-driven assistant analyzing YouTube comments. | |
| Here is a summary of the comment section: | |
| {summary} | |
| Here are the most relevant comment chunks, with author names: | |
| {{context}} | |
| Answer the following question based on the comments, but also incorporate your broader reasoning, factual understanding, and critical thinking. | |
| Your response should: | |
| - **Be direct, honest, and grounded in fact and logic**. | |
| - If commenters make false or unsupported claims, clearly point them out and explain why. | |
| - If a comment makes a valid point or well-reasoned argument, acknowledge and explain it. | |
| - Avoid vague disclaimers like "this is just their opinion" unless it's truly subjective with no clear reasoning path. | |
| - Reference author handles (e.g., @username) or comment snippets for clarity when helpful. | |
| Question: {{question}} | |
| Answer: | |
| """ | |
| ) | |
| return qa_prompt | |
| async def ensure_processed_comments(video_id): | |
| """Ensure comments are processed and cached for a video.""" | |
| if video_id not in local_cache: | |
| local_cache[video_id] = {} | |
| # Check if processed comments are already cached | |
| if "ProcessedComments" in local_cache[video_id]: | |
| return local_cache[video_id]["ProcessedComments"] | |
| try: | |
| # Get raw comments (will use cache if available) | |
| comments = await extract_comments(video_id) | |
| if not comments: | |
| raise ValueError("No raw comments found for processing.") | |
| # Process and cache the formatted/cleaned comments | |
| formatted_comments = [format_comment(comment) for comment in comments] | |
| cleaned_comments = process_comments(formatted_comments) | |
| if not cleaned_comments: | |
| raise ValueError("No valid comments found after cleaning.") | |
| local_cache[video_id]["ProcessedComments"] = cleaned_comments | |
| return cleaned_comments | |
| except ValueError as ve: | |
| raise ve | |
| except Exception as e: | |
| print(f"An unexpected error occurred during comment processing for {video_id}: {e}") | |
| raise RuntimeError(f"Failed to process comments due to an internal issue: {str(e)}") | |
| async def summarize_comments(db: Session, video_id: str, title: str = '', channel_name: str = ''): | |
| """Checking if Comments Summary already exist in DB""" | |
| cached_video = crud.get_or_create_video_store(db, video_id) | |
| if cached_video and cached_video.comment_summary: | |
| print(f"Using cached comment summary for video ID: {video_id}") | |
| return cached_video.comment_summary | |
| try: | |
| cleaned_comments = await ensure_processed_comments(video_id) | |
| if not cleaned_comments: | |
| raise ValueError("No valid comments found after cleaning for summarization.") | |
| comment_text = "\n\n".join(cleaned_comments) | |
| all_comments_docs = Document(page_content=comment_text) | |
| summary_chain = load_summarize_chain(llm=llm_with_grounding, chain_type="stuff", prompt=custom_prompt) | |
| response = summary_chain.invoke({ | |
| "input_documents": [all_comments_docs], | |
| "title": title, | |
| "channel_name": channel_name | |
| }) | |
| summary = response['output_text'].strip() | |
| # Cache the summary in the database | |
| crud.update_comment_summary(db, video_id=video_id, summary=summary) | |
| return summary | |
| except ValueError as ve: | |
| raise ve | |
| except Exception as e: | |
| print(f"An unexpected error occurred during comment summarization for {video_id}: {e}") | |
| raise RuntimeError(f"Failed to generate summary due to an internal issue: {str(e)}") | |
| async def answer_question(db: Session, video_id: str, question: str): | |
| try: | |
| """Answer questions based on comments, using DB for summary cache.""" | |
| summary = await summarize_comments(db, video_id) | |
| processed_comments = await ensure_processed_comments(video_id) | |
| if not processed_comments: | |
| raise ValueError("No comments available to answer the question after processing.") | |
| # Check for a cached vectorstore | |
| if local_cache.get(video_id, {}).get("comment_vectorstore"): | |
| print(f"Using local cache for comment vectorstore for video ID: {video_id}") | |
| vectorstore = local_cache[video_id]["comment_vectorstore"] | |
| else: | |
| print(f"Creating and caching comment vectorstore for video ID: {video_id}") | |
| if not processed_comments: | |
| raise ValueError("Cannot create vectorstore: No cleaned comments available.") | |
| chunked_docs = chunk_comments(processed_comments) | |
| vectorstore = FAISS.from_documents(chunked_docs, embedding_model) | |
| local_cache.setdefault(video_id, {})["comment_vectorstore"] = vectorstore | |
| qa_prompt = get_qa_prompt(summary) | |
| retriever = vectorstore.as_retriever() | |
| qa_chain = RetrievalQA.from_chain_type( | |
| llm=llm_with_grounding, retriever=retriever, chain_type="stuff", | |
| chain_type_kwargs={"prompt": qa_prompt}, | |
| ) | |
| answer = qa_chain.invoke(question) | |
| if not answer: | |
| raise ValueError("LLM could not provide a meaningful answer to the question.") | |
| return answer['result'] | |
| except (ValueError, RuntimeError) as e: | |
| # Re-raise specific exceptions from sub-functions (summarize_comments, ensure_processed_comments) | |
| raise e | |
| except Exception as e: | |
| print(f"Error answering comment question: {e}") | |
| # Transform general exceptions into a RuntimeError for the API layer | |
| raise RuntimeError(f"Error processing question: {str(e)}") |