Spaces:
Sleeping
Sleeping
File size: 6,339 Bytes
3107242 a316fee 3107242 a316fee 3107242 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
from mcp.server.fastmcp import FastMCP
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi
from typing import List, Dict
from src.configs.config import RAW_CSV, CHUNKS_FILE, METADATA_FILE, LOG_DIR, PROCESSED_CHUNKS_CSV
from src.utils.helpers import load_metadata, load_chunks_from_disk, load_youtube_data, load_prompt_template
import logging
from dotenv import load_dotenv
import os
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from src.configs.config import TITLE_FAISS_INDEX_FILE, TITLE_EMBEDDINGS_FILE, EMBEDDING_MODEL, METADATA_FILE
import pandas as pd
LOG_FILE = os.path.join(LOG_DIR, "Agents.log")
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
load_dotenv()
api_key = os.getenv("YOUTUBE_API_KEY")
mcp = FastMCP("summary-server")
ytb_meta = load_youtube_data(RAW_CSV)
chunks_data = load_chunks_from_disk(CHUNKS_FILE)
metadata = load_metadata(METADATA_FILE)
chunked_transcript= load_youtube_data(PROCESSED_CHUNKS_CSV)
@mcp.tool()
async def fetch_new_youtube_transcripts(channel_id: str= "UCLmLW2hwH-kk9w8QrdX8uAA", max_videos: int = 2) -> List[Dict]:
"""Fetch transcripts and metadata for the most recent YouTube videos from the specified channel."""
try:
youtube = build("youtube", "v3", developerKey=api_key)
request = youtube.search().list(part="id,snippet", channelId=channel_id, maxResults=max_videos, order="date")
response = request.execute()
except Exception as e:
logging.error(f"Error fetching YouTube data: {e}")
return []
results = []
for item in response.get("items", []):
try:
if isinstance(item.get("id"), dict) and "videoId" in item.get("id", {}):
vid = item["id"]["videoId"]
try:
transcript = YouTubeTranscriptApi.get_transcript(vid, languages=["ar", "fr", "en"])
text = " ".join([entry["text"] for entry in transcript])
results.append({
"id": vid,
"titre": item["snippet"]["title"],
"date": item["snippet"]["publishedAt"],
"sous-titre": text,
"lien": f"https://www.youtube.com/watch?v={vid}"
})
except Exception as e:
logging.warning(f"Could not fetch transcript for {vid}: {e}")
continue
else:
logging.warning(f"Skipping item with invalid id structure: {item.get('id')}")
except Exception as e:
logging.warning(f"Error processing item {item}: {e}")
continue
# Ensure we return up to max_videos results
return results[:max_videos]
@mcp.prompt(
name="summarize_doc_by_link",
description="Summarize a document using its download link"
)
async def summarize_doc_by_link(link: str):
"""Summarize a document using its download link."""
# Find the document in chunks_data using the link
document = next((doc for doc in chunks_data if doc.get("download_link") == link), None)
if not document:
return f"Document with link '{link}' not found in chunks."
chunks = document.get("chunks", [])
full_text = "\n".join(chunk["text"] for chunk in chunks)
prompt_path = "src/prompts/summarize_doc.txt"
prompt = load_prompt_template(prompt_path, {
"full_text": full_text
})
return prompt
@mcp.prompt(
name="summarize_video_by_link",
description="Summarize a YouTube video from chunked CSV data using a sample of its transcript."
)
async def summarize_video_by_link(link: str):
"""Summarize a YouTube video from chunked CSV data using a sample of its transcript."""
# Filter rows matching the given link
chunks = [row for row in chunked_transcript if row.get("lien") == link]
if not chunks:
return f"No chunks found for video with link '{link}'."
# Sort by chunk_id (assumes format like "1_0", "1_1", etc.)
try:
chunks.sort(key=lambda x: tuple(map(int, x["chunk_id"].split("_"))))
except Exception:
pass # fallback: leave order as-is
# smarter sampling
selected_chunks = sample_chunks(chunks, n=3)
# Extract metadata
title = selected_chunks[0]["titre"]
partial_transcript = "\n".join(chunk["texte"] for chunk in selected_chunks).strip()
if not partial_transcript:
return f"No transcript text found for the provided video link"
# Build prompt with disclaimer
prompt_path = "src/prompts/summarize_video.txt"
prompt = load_prompt_template(prompt_path, {
"title": title,
"transcript": partial_transcript
})
return prompt
@mcp.prompt(
name="summarize_doc_by_title",
description="Summarize a document using its title"
)
async def summarize_doc_by_title(title_query: str):
result = search_title(title_query)
link = result["link"]
return await summarize_doc_by_link(link)
@mcp.prompt(
name="summarize_document",
description="Summarize a document by link or title. If the query contains 'https', it is treated as a link."
)
async def summarize_document(query: str):
if "https" in query:
return await summarize_doc_by_link(query)
else:
return await summarize_doc_by_title(query)
# search_title
def search_title(query):
# Load index and metadata
index = faiss.read_index(str(TITLE_FAISS_INDEX_FILE))
model = SentenceTransformer(EMBEDDING_MODEL)
metadata = pd.read_csv(METADATA_FILE)
titles = metadata["Nom du document"].tolist()
links = metadata["Lien"].tolist()
# Embed query
query_vec = model.encode([query], convert_to_tensor=False)
D, I = index.search(np.array(query_vec), k=1)
best_idx = I[0][0]
return {"title": titles[best_idx], "link": links[best_idx]}
def sample_chunks(chunks, n=3):
"""Pick N evenly spaced chunks across the whole video."""
total = len(chunks)
if total <= n:
return chunks
step = total / n
return [chunks[int(i * step)] for i in range(n)]
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio') |