File size: 6,339 Bytes
3107242
 
 
 
 
 
 
 
 
 
 
a316fee
 
 
 
 
 
 
3107242
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a316fee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3107242
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from mcp.server.fastmcp import FastMCP
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi
from typing import List, Dict
from src.configs.config import RAW_CSV, CHUNKS_FILE, METADATA_FILE, LOG_DIR, PROCESSED_CHUNKS_CSV
from src.utils.helpers import load_metadata, load_chunks_from_disk, load_youtube_data, load_prompt_template
import logging
from dotenv import load_dotenv
import os


from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from src.configs.config import TITLE_FAISS_INDEX_FILE, TITLE_EMBEDDINGS_FILE, EMBEDDING_MODEL, METADATA_FILE
import pandas as pd


LOG_FILE = os.path.join(LOG_DIR, "Agents.log")
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

load_dotenv()
api_key = os.getenv("YOUTUBE_API_KEY")
mcp = FastMCP("summary-server")

ytb_meta = load_youtube_data(RAW_CSV)
chunks_data = load_chunks_from_disk(CHUNKS_FILE)
metadata = load_metadata(METADATA_FILE)
chunked_transcript= load_youtube_data(PROCESSED_CHUNKS_CSV)


@mcp.tool()
async def fetch_new_youtube_transcripts(channel_id: str= "UCLmLW2hwH-kk9w8QrdX8uAA", max_videos: int = 2) -> List[Dict]:
    """Fetch transcripts and metadata for the most recent YouTube videos from the specified channel."""
    try:
        youtube = build("youtube", "v3", developerKey=api_key)
        request = youtube.search().list(part="id,snippet", channelId=channel_id, maxResults=max_videos, order="date")
        response = request.execute()
    except Exception as e:
        logging.error(f"Error fetching YouTube data: {e}")
        return []

    results = []
    for item in response.get("items", []):
        try:
            if isinstance(item.get("id"), dict) and "videoId" in item.get("id", {}):
                vid = item["id"]["videoId"]
                try:
                    transcript = YouTubeTranscriptApi.get_transcript(vid, languages=["ar", "fr", "en"])
                    text = " ".join([entry["text"] for entry in transcript])
                    results.append({
                        "id": vid,
                        "titre": item["snippet"]["title"],
                        "date": item["snippet"]["publishedAt"],
                        "sous-titre": text,
                        "lien": f"https://www.youtube.com/watch?v={vid}"
                    })
                except Exception as e:
                    logging.warning(f"Could not fetch transcript for {vid}: {e}")
                    continue
            else:
                logging.warning(f"Skipping item with invalid id structure: {item.get('id')}")
        except Exception as e:
            logging.warning(f"Error processing item {item}: {e}")
            continue
    
    # Ensure we return up to max_videos results
    return results[:max_videos]


@mcp.prompt(
    name="summarize_doc_by_link",
    description="Summarize a document using its download link"
)
async def summarize_doc_by_link(link: str):
    """Summarize a document using its download link."""
    # Find the document in chunks_data using the link
    document = next((doc for doc in chunks_data if doc.get("download_link") == link), None)

    if not document:
        return f"Document with link '{link}' not found in chunks."

    chunks = document.get("chunks", [])
    full_text = "\n".join(chunk["text"] for chunk in chunks)
    prompt_path = "src/prompts/summarize_doc.txt"
    prompt = load_prompt_template(prompt_path, {
        "full_text": full_text
    })
    return prompt

@mcp.prompt(
    name="summarize_video_by_link",
    description="Summarize a YouTube video from chunked CSV data using a sample of its transcript."
)
async def summarize_video_by_link(link: str):
    """Summarize a YouTube video from chunked CSV data using a sample of its transcript."""

    # Filter rows matching the given link
    chunks = [row for row in chunked_transcript if row.get("lien") == link]

    if not chunks:
        return f"No chunks found for video with link '{link}'."

    # Sort by chunk_id (assumes format like "1_0", "1_1", etc.)
    try:
        chunks.sort(key=lambda x: tuple(map(int, x["chunk_id"].split("_"))))
    except Exception:
        pass  # fallback: leave order as-is

    # smarter sampling
    selected_chunks = sample_chunks(chunks, n=3) 

    # Extract metadata
    title = selected_chunks[0]["titre"]
    partial_transcript = "\n".join(chunk["texte"] for chunk in selected_chunks).strip()

    if not partial_transcript:
        return f"No transcript text found for the provided video link"

    # Build prompt with disclaimer
    prompt_path = "src/prompts/summarize_video.txt"
    prompt = load_prompt_template(prompt_path, {
        "title": title,
        "transcript": partial_transcript
    })
    return prompt

@mcp.prompt(
    name="summarize_doc_by_title",
    description="Summarize a document using its title"
)
async def summarize_doc_by_title(title_query: str):
    result = search_title(title_query)
    link = result["link"]
    return await summarize_doc_by_link(link)

@mcp.prompt(
    name="summarize_document",
    description="Summarize a document by link or title. If the query contains 'https', it is treated as a link."
)
async def summarize_document(query: str):
    if "https" in query:
        return await summarize_doc_by_link(query)
    else:
        return await summarize_doc_by_title(query)
# search_title 
def search_title(query):
    # Load index and metadata
    index = faiss.read_index(str(TITLE_FAISS_INDEX_FILE))
    model = SentenceTransformer(EMBEDDING_MODEL)
    metadata = pd.read_csv(METADATA_FILE)
    titles = metadata["Nom du document"].tolist()
    links = metadata["Lien"].tolist()
    # Embed query
    query_vec = model.encode([query], convert_to_tensor=False)
    D, I = index.search(np.array(query_vec), k=1)
    best_idx = I[0][0]
    return {"title": titles[best_idx], "link": links[best_idx]}


def sample_chunks(chunks, n=3):
    """Pick N evenly spaced chunks across the whole video."""

    total = len(chunks)
    if total <= n:
        return chunks
    step = total / n
    return [chunks[int(i * step)] for i in range(n)]


if __name__ == "__main__":
    # Initialize and run the server
    mcp.run(transport='stdio')