Spaces:
Sleeping
Sleeping
| from mcp.server.fastmcp import FastMCP | |
| from googleapiclient.discovery import build | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from typing import List, Dict | |
| from src.configs.config import RAW_CSV, CHUNKS_FILE, METADATA_FILE, LOG_DIR, PROCESSED_CHUNKS_CSV | |
| from src.utils.helpers import load_metadata, load_chunks_from_disk, load_youtube_data, load_prompt_template | |
| import logging | |
| from dotenv import load_dotenv | |
| import os | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import numpy as np | |
| from src.configs.config import TITLE_FAISS_INDEX_FILE, TITLE_EMBEDDINGS_FILE, EMBEDDING_MODEL, METADATA_FILE | |
| import pandas as pd | |
| LOG_FILE = os.path.join(LOG_DIR, "Agents.log") | |
| logging.basicConfig( | |
| filename=LOG_FILE, | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| load_dotenv() | |
| api_key = os.getenv("YOUTUBE_API_KEY") | |
| mcp = FastMCP("summary-server") | |
| ytb_meta = load_youtube_data(RAW_CSV) | |
| chunks_data = load_chunks_from_disk(CHUNKS_FILE) | |
| metadata = load_metadata(METADATA_FILE) | |
| chunked_transcript= load_youtube_data(PROCESSED_CHUNKS_CSV) | |
| async def fetch_new_youtube_transcripts(channel_id: str= "UCLmLW2hwH-kk9w8QrdX8uAA", max_videos: int = 2) -> List[Dict]: | |
| """Fetch transcripts and metadata for the most recent YouTube videos from the specified channel.""" | |
| try: | |
| youtube = build("youtube", "v3", developerKey=api_key) | |
| request = youtube.search().list(part="id,snippet", channelId=channel_id, maxResults=max_videos, order="date") | |
| response = request.execute() | |
| except Exception as e: | |
| logging.error(f"Error fetching YouTube data: {e}") | |
| return [] | |
| results = [] | |
| for item in response.get("items", []): | |
| try: | |
| if isinstance(item.get("id"), dict) and "videoId" in item.get("id", {}): | |
| vid = item["id"]["videoId"] | |
| try: | |
| transcript = YouTubeTranscriptApi.get_transcript(vid, languages=["ar", "fr", "en"]) | |
| text = " ".join([entry["text"] for entry in transcript]) | |
| results.append({ | |
| "id": vid, | |
| "titre": item["snippet"]["title"], | |
| "date": item["snippet"]["publishedAt"], | |
| "sous-titre": text, | |
| "lien": f"https://www.youtube.com/watch?v={vid}" | |
| }) | |
| except Exception as e: | |
| logging.warning(f"Could not fetch transcript for {vid}: {e}") | |
| continue | |
| else: | |
| logging.warning(f"Skipping item with invalid id structure: {item.get('id')}") | |
| except Exception as e: | |
| logging.warning(f"Error processing item {item}: {e}") | |
| continue | |
| # Ensure we return up to max_videos results | |
| return results[:max_videos] | |
| async def summarize_doc_by_link(link: str): | |
| """Summarize a document using its download link.""" | |
| # Find the document in chunks_data using the link | |
| document = next((doc for doc in chunks_data if doc.get("download_link") == link), None) | |
| if not document: | |
| return f"Document with link '{link}' not found in chunks." | |
| chunks = document.get("chunks", []) | |
| full_text = "\n".join(chunk["text"] for chunk in chunks) | |
| prompt_path = "src/prompts/summarize_doc.txt" | |
| prompt = load_prompt_template(prompt_path, { | |
| "full_text": full_text | |
| }) | |
| return prompt | |
| async def summarize_video_by_link(link: str): | |
| """Summarize a YouTube video from chunked CSV data using a sample of its transcript.""" | |
| # Filter rows matching the given link | |
| chunks = [row for row in chunked_transcript if row.get("lien") == link] | |
| if not chunks: | |
| return f"No chunks found for video with link '{link}'." | |
| # Sort by chunk_id (assumes format like "1_0", "1_1", etc.) | |
| try: | |
| chunks.sort(key=lambda x: tuple(map(int, x["chunk_id"].split("_")))) | |
| except Exception: | |
| pass # fallback: leave order as-is | |
| # smarter sampling | |
| selected_chunks = sample_chunks(chunks, n=3) | |
| # Extract metadata | |
| title = selected_chunks[0]["titre"] | |
| partial_transcript = "\n".join(chunk["texte"] for chunk in selected_chunks).strip() | |
| if not partial_transcript: | |
| return f"No transcript text found for the provided video link" | |
| # Build prompt with disclaimer | |
| prompt_path = "src/prompts/summarize_video.txt" | |
| prompt = load_prompt_template(prompt_path, { | |
| "title": title, | |
| "transcript": partial_transcript | |
| }) | |
| return prompt | |
| async def summarize_doc_by_title(title_query: str): | |
| result = search_title(title_query) | |
| link = result["link"] | |
| return await summarize_doc_by_link(link) | |
| async def summarize_document(query: str): | |
| if "https" in query: | |
| return await summarize_doc_by_link(query) | |
| else: | |
| return await summarize_doc_by_title(query) | |
| # search_title | |
| def search_title(query): | |
| # Load index and metadata | |
| index = faiss.read_index(str(TITLE_FAISS_INDEX_FILE)) | |
| model = SentenceTransformer(EMBEDDING_MODEL) | |
| metadata = pd.read_csv(METADATA_FILE) | |
| titles = metadata["Nom du document"].tolist() | |
| links = metadata["Lien"].tolist() | |
| # Embed query | |
| query_vec = model.encode([query], convert_to_tensor=False) | |
| D, I = index.search(np.array(query_vec), k=1) | |
| best_idx = I[0][0] | |
| return {"title": titles[best_idx], "link": links[best_idx]} | |
| def sample_chunks(chunks, n=3): | |
| """Pick N evenly spaced chunks across the whole video.""" | |
| total = len(chunks) | |
| if total <= n: | |
| return chunks | |
| step = total / n | |
| return [chunks[int(i * step)] for i in range(n)] | |
| if __name__ == "__main__": | |
| # Initialize and run the server | |
| mcp.run(transport='stdio') |