RAG_APP / src /mcp /server.py
ELHACHYMI's picture
add search title
a316fee verified
from mcp.server.fastmcp import FastMCP
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi
from typing import List, Dict
from src.configs.config import RAW_CSV, CHUNKS_FILE, METADATA_FILE, LOG_DIR, PROCESSED_CHUNKS_CSV
from src.utils.helpers import load_metadata, load_chunks_from_disk, load_youtube_data, load_prompt_template
import logging
from dotenv import load_dotenv
import os
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from src.configs.config import TITLE_FAISS_INDEX_FILE, TITLE_EMBEDDINGS_FILE, EMBEDDING_MODEL, METADATA_FILE
import pandas as pd
LOG_FILE = os.path.join(LOG_DIR, "Agents.log")
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
load_dotenv()
api_key = os.getenv("YOUTUBE_API_KEY")
mcp = FastMCP("summary-server")
ytb_meta = load_youtube_data(RAW_CSV)
chunks_data = load_chunks_from_disk(CHUNKS_FILE)
metadata = load_metadata(METADATA_FILE)
chunked_transcript= load_youtube_data(PROCESSED_CHUNKS_CSV)
@mcp.tool()
async def fetch_new_youtube_transcripts(channel_id: str= "UCLmLW2hwH-kk9w8QrdX8uAA", max_videos: int = 2) -> List[Dict]:
"""Fetch transcripts and metadata for the most recent YouTube videos from the specified channel."""
try:
youtube = build("youtube", "v3", developerKey=api_key)
request = youtube.search().list(part="id,snippet", channelId=channel_id, maxResults=max_videos, order="date")
response = request.execute()
except Exception as e:
logging.error(f"Error fetching YouTube data: {e}")
return []
results = []
for item in response.get("items", []):
try:
if isinstance(item.get("id"), dict) and "videoId" in item.get("id", {}):
vid = item["id"]["videoId"]
try:
transcript = YouTubeTranscriptApi.get_transcript(vid, languages=["ar", "fr", "en"])
text = " ".join([entry["text"] for entry in transcript])
results.append({
"id": vid,
"titre": item["snippet"]["title"],
"date": item["snippet"]["publishedAt"],
"sous-titre": text,
"lien": f"https://www.youtube.com/watch?v={vid}"
})
except Exception as e:
logging.warning(f"Could not fetch transcript for {vid}: {e}")
continue
else:
logging.warning(f"Skipping item with invalid id structure: {item.get('id')}")
except Exception as e:
logging.warning(f"Error processing item {item}: {e}")
continue
# Ensure we return up to max_videos results
return results[:max_videos]
@mcp.prompt(
name="summarize_doc_by_link",
description="Summarize a document using its download link"
)
async def summarize_doc_by_link(link: str):
"""Summarize a document using its download link."""
# Find the document in chunks_data using the link
document = next((doc for doc in chunks_data if doc.get("download_link") == link), None)
if not document:
return f"Document with link '{link}' not found in chunks."
chunks = document.get("chunks", [])
full_text = "\n".join(chunk["text"] for chunk in chunks)
prompt_path = "src/prompts/summarize_doc.txt"
prompt = load_prompt_template(prompt_path, {
"full_text": full_text
})
return prompt
@mcp.prompt(
name="summarize_video_by_link",
description="Summarize a YouTube video from chunked CSV data using a sample of its transcript."
)
async def summarize_video_by_link(link: str):
"""Summarize a YouTube video from chunked CSV data using a sample of its transcript."""
# Filter rows matching the given link
chunks = [row for row in chunked_transcript if row.get("lien") == link]
if not chunks:
return f"No chunks found for video with link '{link}'."
# Sort by chunk_id (assumes format like "1_0", "1_1", etc.)
try:
chunks.sort(key=lambda x: tuple(map(int, x["chunk_id"].split("_"))))
except Exception:
pass # fallback: leave order as-is
# smarter sampling
selected_chunks = sample_chunks(chunks, n=3)
# Extract metadata
title = selected_chunks[0]["titre"]
partial_transcript = "\n".join(chunk["texte"] for chunk in selected_chunks).strip()
if not partial_transcript:
return f"No transcript text found for the provided video link"
# Build prompt with disclaimer
prompt_path = "src/prompts/summarize_video.txt"
prompt = load_prompt_template(prompt_path, {
"title": title,
"transcript": partial_transcript
})
return prompt
@mcp.prompt(
name="summarize_doc_by_title",
description="Summarize a document using its title"
)
async def summarize_doc_by_title(title_query: str):
result = search_title(title_query)
link = result["link"]
return await summarize_doc_by_link(link)
@mcp.prompt(
name="summarize_document",
description="Summarize a document by link or title. If the query contains 'https', it is treated as a link."
)
async def summarize_document(query: str):
if "https" in query:
return await summarize_doc_by_link(query)
else:
return await summarize_doc_by_title(query)
# search_title
def search_title(query):
# Load index and metadata
index = faiss.read_index(str(TITLE_FAISS_INDEX_FILE))
model = SentenceTransformer(EMBEDDING_MODEL)
metadata = pd.read_csv(METADATA_FILE)
titles = metadata["Nom du document"].tolist()
links = metadata["Lien"].tolist()
# Embed query
query_vec = model.encode([query], convert_to_tensor=False)
D, I = index.search(np.array(query_vec), k=1)
best_idx = I[0][0]
return {"title": titles[best_idx], "link": links[best_idx]}
def sample_chunks(chunks, n=3):
"""Pick N evenly spaced chunks across the whole video."""
total = len(chunks)
if total <= n:
return chunks
step = total / n
return [chunks[int(i * step)] for i in range(n)]
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')