from mcp.server.fastmcp import FastMCP from googleapiclient.discovery import build from youtube_transcript_api import YouTubeTranscriptApi from typing import List, Dict from src.configs.config import RAW_CSV, CHUNKS_FILE, METADATA_FILE, LOG_DIR, PROCESSED_CHUNKS_CSV from src.utils.helpers import load_metadata, load_chunks_from_disk, load_youtube_data, load_prompt_template import logging from dotenv import load_dotenv import os from sentence_transformers import SentenceTransformer import faiss import numpy as np from src.configs.config import TITLE_FAISS_INDEX_FILE, TITLE_EMBEDDINGS_FILE, EMBEDDING_MODEL, METADATA_FILE import pandas as pd LOG_FILE = os.path.join(LOG_DIR, "Agents.log") logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) load_dotenv() api_key = os.getenv("YOUTUBE_API_KEY") mcp = FastMCP("summary-server") ytb_meta = load_youtube_data(RAW_CSV) chunks_data = load_chunks_from_disk(CHUNKS_FILE) metadata = load_metadata(METADATA_FILE) chunked_transcript= load_youtube_data(PROCESSED_CHUNKS_CSV) @mcp.tool() async def fetch_new_youtube_transcripts(channel_id: str= "UCLmLW2hwH-kk9w8QrdX8uAA", max_videos: int = 2) -> List[Dict]: """Fetch transcripts and metadata for the most recent YouTube videos from the specified channel.""" try: youtube = build("youtube", "v3", developerKey=api_key) request = youtube.search().list(part="id,snippet", channelId=channel_id, maxResults=max_videos, order="date") response = request.execute() except Exception as e: logging.error(f"Error fetching YouTube data: {e}") return [] results = [] for item in response.get("items", []): try: if isinstance(item.get("id"), dict) and "videoId" in item.get("id", {}): vid = item["id"]["videoId"] try: transcript = YouTubeTranscriptApi.get_transcript(vid, languages=["ar", "fr", "en"]) text = " ".join([entry["text"] for entry in transcript]) results.append({ "id": vid, "titre": item["snippet"]["title"], "date": item["snippet"]["publishedAt"], "sous-titre": text, "lien": f"https://www.youtube.com/watch?v={vid}" }) except Exception as e: logging.warning(f"Could not fetch transcript for {vid}: {e}") continue else: logging.warning(f"Skipping item with invalid id structure: {item.get('id')}") except Exception as e: logging.warning(f"Error processing item {item}: {e}") continue # Ensure we return up to max_videos results return results[:max_videos] @mcp.prompt( name="summarize_doc_by_link", description="Summarize a document using its download link" ) async def summarize_doc_by_link(link: str): """Summarize a document using its download link.""" # Find the document in chunks_data using the link document = next((doc for doc in chunks_data if doc.get("download_link") == link), None) if not document: return f"Document with link '{link}' not found in chunks." chunks = document.get("chunks", []) full_text = "\n".join(chunk["text"] for chunk in chunks) prompt_path = "src/prompts/summarize_doc.txt" prompt = load_prompt_template(prompt_path, { "full_text": full_text }) return prompt @mcp.prompt( name="summarize_video_by_link", description="Summarize a YouTube video from chunked CSV data using a sample of its transcript." ) async def summarize_video_by_link(link: str): """Summarize a YouTube video from chunked CSV data using a sample of its transcript.""" # Filter rows matching the given link chunks = [row for row in chunked_transcript if row.get("lien") == link] if not chunks: return f"No chunks found for video with link '{link}'." # Sort by chunk_id (assumes format like "1_0", "1_1", etc.) try: chunks.sort(key=lambda x: tuple(map(int, x["chunk_id"].split("_")))) except Exception: pass # fallback: leave order as-is # smarter sampling selected_chunks = sample_chunks(chunks, n=3) # Extract metadata title = selected_chunks[0]["titre"] partial_transcript = "\n".join(chunk["texte"] for chunk in selected_chunks).strip() if not partial_transcript: return f"No transcript text found for the provided video link" # Build prompt with disclaimer prompt_path = "src/prompts/summarize_video.txt" prompt = load_prompt_template(prompt_path, { "title": title, "transcript": partial_transcript }) return prompt @mcp.prompt( name="summarize_doc_by_title", description="Summarize a document using its title" ) async def summarize_doc_by_title(title_query: str): result = search_title(title_query) link = result["link"] return await summarize_doc_by_link(link) @mcp.prompt( name="summarize_document", description="Summarize a document by link or title. If the query contains 'https', it is treated as a link." ) async def summarize_document(query: str): if "https" in query: return await summarize_doc_by_link(query) else: return await summarize_doc_by_title(query) # search_title def search_title(query): # Load index and metadata index = faiss.read_index(str(TITLE_FAISS_INDEX_FILE)) model = SentenceTransformer(EMBEDDING_MODEL) metadata = pd.read_csv(METADATA_FILE) titles = metadata["Nom du document"].tolist() links = metadata["Lien"].tolist() # Embed query query_vec = model.encode([query], convert_to_tensor=False) D, I = index.search(np.array(query_vec), k=1) best_idx = I[0][0] return {"title": titles[best_idx], "link": links[best_idx]} def sample_chunks(chunks, n=3): """Pick N evenly spaced chunks across the whole video.""" total = len(chunks) if total <= n: return chunks step = total / n return [chunks[int(i * step)] for i in range(n)] if __name__ == "__main__": # Initialize and run the server mcp.run(transport='stdio')