ELI-chatbot / utilities /vectorstore /SummaryManager.py
StefanoDUrso's picture
handling contexts
f88de80
import os
import json
import requests
from dotenv import load_dotenv
import tiktoken
#from langchain_openai import OpenAI
#from langchain_community.llms import OpenAI
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
from langchain_community.callbacks.manager import get_openai_callback
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains.summarize import load_summarize_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.schema import Document
from sklearn.cluster import KMeans
from sklearn.manifold import TSNE
#import matplotlib.pyplot as plt
import warnings
from warnings import simplefilter
import numpy as np
#default_model='gpt-4o-mini'
#default_embedding_model='text-embedding-3-small'
#default_temperature=0
#default_chunk_size=5000
#default_chunk_overlap=1000
#default_max_lenght=5000
#default_separator="\n\n\n\n\n"
'''
On average each token contains 4 characters; 1000 tokens is about 750 words
- chunk size 1000 -> 250 tokens
- chunk size 2000 -> 500 tokens
- chunk size 5000 -> 1250 tokens
- chunk size 10000 -> 2500 tokens
On GPT-4o-mini:
- the price for 10k tokens in input is about 0.001€ (1k -> 0.00015)
- the price for 10k tokens in output is about 0.006€ (1k -> 0.0006)
- the input limit is 128k tokens
- the output limit is 16k tokens
'''
map_prompt_en = """
Please provide a summary of the following text:
{text}
"""
map_prompt_it = """
Per favore, fornisci un riassunto del seguente testo:
{text}
"""
combine_prompt_en = """
Based on the key points and ideas provided in the previous summaries, craft a final, cohesive summary that presents the information as if it were an original piece of content.
Ensure that the summary flows smoothly, maintaining a consistent tone and style throughout. Avoid explicitly referencing the structure or order of the previous summaries, and focus on creating a well-organized and comprehensive narrative that conveys the central themes and insights clearly and naturally.
Return the summary in Markdown format, using `###` for headings, `-` for lists, and keeping lines separated for better readability. Make sure that each item in a list starts on a new line, and that each sentence in a paragraph is separated by a blank line. Never write multiple sentences on the same line, unless they are parts of the same continuous paragraph. Make sure that each mathematical formula is written in LaTeX and enclosed in the delimiters \\( \\) for inline formulas and \\[ \\] for block formulas. Each sentence or idea should be separated by a carriage return for better readability. When displaying code, always use triple backticks (```) and specify the language (e.g. ```html, ```python, ```javascript) to ensure proper formatting and maintain proper indentation and layout.
{text}
"""
combine_prompt_it = """
Basandoti sui punti chiave e le idee fornite nei riassunti precedenti, crea un riassunto finale e coeso che presenti le informazioni come se fossero un contenuto originale.
Assicurati che il riassunto scorra in modo fluido, mantenendo un tono e uno stile coerenti in tutto. Evita di fare riferimento esplicito alla struttura o all'ordine dei riassunti precedenti e concentrati sulla creazione di una narrazione ben organizzata e completa che trasmetta chiaramente e naturalmente i temi e le intuizioni centrali.
Restituisci il riassunto in formato Markdown, utilizzando `###` per i titoli, `-` per gli elenchi e mantenendo le linee separate per una migliore leggibilitΓ . Assicurati che ogni elemento di un elenco inizi su una nuova riga, e che ogni frase di un paragrafo sia separata da una riga vuota. Non scrivere mai piΓΉ frasi sulla stessa riga, a meno che non siano parti dello stesso paragrafo continuo. Assicurati che ogni formula matematica sia scritta in LaTeX e racchiusa tra i delimitatori \\( \\) per le formule inline e \\[ \\] per le formule di blocco. Ogni frase o idea deve essere separata da un ritorno a capo per una migliore leggibilitΓ . Quando mostri del codice, usa sempre i backtick tripli (```) e specifica la lingua (es. ```html, ```python, ```javascript) per garantire la corretta formattazione e mantenere l'indentazione e il layout corretti.
{text}
"""
initial_summary_prompt_intro_en = """
Read the following introductory text and write a brief summary of the main themes.
Start with the sentence: "The content is about..." or a similar phrase, followed by a list of 3–5 key points. Use a clear and informative style.
"""
initial_summary_prompt_intro_it = """
Leggi il seguente testo introduttivo e scrivi un riassunto breve dei temi principali.
Inizia con la frase: "Il contenuto tratta di..." oppure una formula equivalente, e segui con un elenco di 3–5 punti chiave. Usa uno stile chiaro e informativo.
"""
initial_summary_prompt_closing_en = """
End with a sentence indicating that this is just an introduction and the content continues.
"""
initial_summary_prompt_closing_it = """
Concludi con una frase che segnali che il contenuto continua oltre questo estratto.
"""
initial_summary_prompt_formatting = """
Return the summary in Markdown format, using `###` for headings, `-` for lists, and keeping lines separated for better readability. Make sure that each item in a list starts on a new line, and that each sentence in a paragraph is separated by a blank line. Never write multiple sentences on the same line, unless they are parts of the same continuous paragraph. Make sure that each mathematical formula is written in LaTeX and enclosed in the delimiters \\( \\) for inline formulas and \\[ \\] for block formulas. Each sentence or idea should be separated by a carriage return for better readability. When displaying code, always use triple backticks (```) and specify the language (e.g. ```html, ```python, ```javascript) to ensure proper formatting and maintain proper indentation and layout.
"""
def get_map_prompt(language):
if language=='it':
return map_prompt_it
else:
return map_prompt_en
def get_combine_prompt(language):
if language=='it':
return combine_prompt_it
else:
return combine_prompt_en
def get_initial_summary_prompt(language, is_partial=False):
if language == 'it':
prompt = initial_summary_prompt_intro_it
if is_partial:
prompt += "\n" + initial_summary_prompt_closing_it
else:
prompt = initial_summary_prompt_intro_en
if is_partial:
prompt += "\n" + initial_summary_prompt_closing_en
# Aggiungi sempre la parte sulla formattazione Markdown
prompt += "\n" + initial_summary_prompt_formatting
# Appendice per il blocco di testo
prompt += "\n\nText:\n{combined_text}"
return prompt
# Constants
MAX_TOTAL_TOKENS = 6000 # Safe token limit for summarization
CHUNK_SIZE = 2000 # Define a fixed chunk size
MAX_SELECTED_DOCS = 5
class SummaryManager:
def __init__(self, language, qdrant_manager, model='gpt-4o-mini', temperature=0, max_tokens=MAX_TOTAL_TOKENS):
self.language = language
self.qdrant_manager = qdrant_manager
self.model = model
self.temperature = temperature
self.max_tokens = max_tokens
print("Model:", self.model)
self.llm = ChatOpenAI(model=model, temperature=temperature)
def do_initial_summary(self):
"""
Builds a lightweight initial summary using the first 2-3 chunks of the document.
If an initial summary already exists in the vector store, it returns that instead.
"""
# STEP 1: cerca se giΓ  esiste
print("πŸ”Ž Checking for existing initial summary...", flush=True)
all_docs = self.qdrant_manager.get_documents()
for doc in all_docs:
if doc["payload"].get("type") == "initial_summary" and doc["payload"].get("language") == self.language:
print("βœ… Found existing initial summary in vector store.")
return doc["payload"]["page_content"]
# STEP 2: genera il riassunto se non esiste
print("πŸš€ Generating initial summary from early chunks...", flush=True)
if not all_docs:
print("⚠️ No documents available for summary.")
return None
# Filtra solo i documenti con chunk_index
chunk_docs = [doc for doc in all_docs if 'chunk_index' in doc["payload"]]
total_chunks = len(chunk_docs)
# Prendi i primi 3 chunk ordinati
selected_chunks = sorted(chunk_docs, key=lambda d: d["payload"]["chunk_index"])[:3]
is_partial = len(selected_chunks) < total_chunks
combined_text = "\n".join([doc["payload"]["page_content"] for doc in selected_chunks])[:3000]
# Prompt dinamico
prompt_template = get_initial_summary_prompt(self.language, is_partial=is_partial)
prompt = prompt_template.format(combined_text=combined_text)
try:
from langchain_core.messages import HumanMessage
response = self.llm.invoke([HumanMessage(content=prompt)])
summary = response.content
# STEP 3: salva il riassunto nel vector store
inserted = self.qdrant_manager.insert_text(
text=summary,
metadata={
"type": "initial_summary",
"file_name": self.qdrant_manager.collection_name,
"language": self.language
}
)
if inserted:
print("πŸ“ Initial summary saved to vector store.")
return summary
except Exception as e:
print(f"❌ Error generating initial summary: {e}")
return None
def do_summary_map_reduce(self):
"""
Returns a full summary using Map-Reduce summarization.
If a final summary already exists in the vector store, returns that instead.
"""
# STEP 1: check if summary already exists
print("πŸ”Ž Checking for existing final summary...", flush=True)
all_documents = self.qdrant_manager.get_documents()
for doc in all_documents:
if doc["payload"].get("type") == "map_summary" and doc["payload"].get("language") == self.language:
print("βœ… Found existing final summary in vector store.")
return doc["payload"]["page_content"], 0, 0
if not all_documents:
print("❌ No documents found in collection.")
return None, 0, 0
# STEP 2: filtra fuori i documenti di tipo summary
filtered_docs = [
doc for doc in all_documents
if not str(doc["payload"].get("type", "")).endswith("summary")
]
if not filtered_docs:
print("❌ No non-summary documents available for summarization.")
return None, 0, 0
print(f"πŸ“„ {len(filtered_docs)} documents after filtering summaries.", flush=True)
# STEP 3: extract vectors & text
embeddings = [doc["vector"] for doc in filtered_docs]
documents = [doc["payload"]["page_content"] for doc in filtered_docs]
metadata = [doc["payload"] for doc in filtered_docs]
# STEP 4: select up to MAX_SELECTED_DOCS chunks via KMeans
MAX_SELECTED_DOCS = 5
selected_docs = self._select_best_chunks(
documents=documents,
metadata=metadata,
embeddings=embeddings,
max_chunks=MAX_SELECTED_DOCS
)
total_tokens = sum(self.llm.get_num_tokens(doc.page_content) for doc in selected_docs)
print(f"βœ… Selected {len(selected_docs)} docs with total tokens: {total_tokens}")
# STEP 5: load LangChain prompts
map_prompt_template = PromptTemplate(template=get_map_prompt(self.language), input_variables=["text"])
combine_prompt_template = PromptTemplate(template=get_combine_prompt(self.language), input_variables=["text"])
print("πŸ”„ Loading summarization chain...")
summary_chain = load_summarize_chain(
llm=self.llm,
chain_type='map_reduce',
map_prompt=map_prompt_template,
combine_prompt=combine_prompt_template,
verbose=False
)
print("πŸ“Š Checking token size of each formatted_doc...")
for i, doc in enumerate(selected_docs):
token_count = self.llm.get_num_tokens(doc.page_content)
print(f"Chunk {i+1}: {token_count} tokens")
# STEP 6: run the chain with token tracking
with get_openai_callback() as cb:
result = summary_chain.invoke({"input_documents": selected_docs})
input_tokens_used = cb.prompt_tokens
output_tokens_used = cb.completion_tokens
total_tokens = cb.total_tokens
print(f"🧾 Token usage: total={total_tokens}")
full_summary = result['output_text']
print("βœ… Map-reduce summary generated.")
# STEP 7: store the final summary
inserted = self.qdrant_manager.insert_text(
text=full_summary,
metadata={
"type": "map_summary",
"file_name": self.qdrant_manager.collection_name,
"language": self.language
}
)
if inserted:
print("πŸ“ Final summary saved to vector store.")
return full_summary, input_tokens_used, output_tokens_used
def do_summary_stuff(self):
"""
Returns a full summary using STUFF summarization strategy.
Uses all documents if total token count is within limits,
otherwise selects the best subset under token budget.
"""
# STEP 1: check if summary already exists
print("πŸ”Ž Checking for existing final summary...", flush=True)
all_documents = self.qdrant_manager.get_documents()
for doc in all_documents:
if doc["payload"].get("type") == "stuff_summary" and doc["payload"].get("language") == self.language:
print("βœ… Found existing final summary in vector store.")
return doc["payload"]["page_content"], 0, 0
if not all_documents:
print("❌ No documents found in collection.")
return None, 0, 0
# STEP 2: filtra fuori i documenti di tipo summary
filtered_docs = [
doc for doc in all_documents
if not str(doc["payload"].get("type", "")).endswith("summary")
]
if not filtered_docs:
print("❌ No non-summary documents available for summarization.")
return None, 0, 0
# STEP 3: extract vectors & text
embeddings = [doc["vector"] for doc in filtered_docs]
documents = [doc["payload"]["page_content"] for doc in filtered_docs]
metadata = [doc["payload"] for doc in filtered_docs]
# STEP 4: selezione intelligente con fallback a clustering
selected_docs = self._get_chunks_for_stuff(
documents=documents,
metadata=metadata,
embeddings=embeddings,
llm=self.llm,
max_tokens=self.max_tokens,
fallback_max_chunks=5
)
total_tokens = sum(self.llm.get_num_tokens(doc.page_content) for doc in selected_docs)
print(f"βœ… Selected {len(selected_docs)} docs with total tokens: {total_tokens}")
# STEP 5: load chain
combine_prompt_template = PromptTemplate(
template=get_combine_prompt(self.language),
input_variables=["text"]
)
print("πŸ”„ Running summarization with 'stuff' strategy...")
summary_chain = load_summarize_chain(
llm=self.llm,
chain_type='stuff',
prompt=combine_prompt_template,
verbose=False
)
# STEP 6: run the chain with token tracking
with get_openai_callback() as cb:
result = summary_chain.invoke({"input_documents": selected_docs})
input_tokens_used = cb.prompt_tokens
output_tokens_used = cb.completion_tokens
total_tokens = cb.total_tokens
print(f"🧾 Token usage: total={total_tokens}")
full_summary = result['output_text']
print("βœ… Stuff summary generated.")
# STEP 7: store the final summary
inserted = self.qdrant_manager.insert_text(
text=full_summary,
metadata={
"type": "stuff_summary",
"file_name": self.qdrant_manager.collection_name,
"language": self.language
}
)
if inserted:
print("πŸ“ Final summary saved to vector store.")
return full_summary, input_tokens_used, output_tokens_used
def _find_closest_embeddings(self,vectors, num_clusters, kmeans):
closest_indices = []
for i in range(num_clusters):
distances = np.linalg.norm(vectors - kmeans.cluster_centers_[i], axis=1)
closest_index = np.argmin(distances)
closest_indices.append(closest_index)
selected_indices = sorted(closest_indices)
return selected_indices
def _count_tokens(text,model):
if not text:
return 0
encoding = tiktoken.encoding_for_model(model)
num_tokens = len(encoding.encode(text))
return num_tokens
def _select_best_chunks(self, documents, metadata, embeddings, max_chunks=5):
assert len(documents) == len(metadata) == len(embeddings)
num_clusters = min(max_chunks, len(documents))
print(f"πŸ“Œ Selecting {num_clusters} best chunks (max={max_chunks})")
if num_clusters > 1:
kmeans = KMeans(n_clusters=num_clusters, random_state=42).fit(embeddings)
indices = self._find_closest_embeddings(embeddings, num_clusters, kmeans)
else:
indices = list(range(min(len(documents), max_chunks)))
selected = [(documents[i], metadata[i]) for i in indices[:max_chunks]]
# Ordina per chunk_index
selected = sorted(selected, key=lambda x: x[1].get("chunk_index", 0))
return [Document(page_content=d, metadata=m) for d, m in selected]
def _select_best_chunks_under_token_budget(self, documents, metadata, embeddings, llm, max_tokens, max_chunks=10):
assert len(documents) == len(metadata) == len(embeddings)
if max_chunks > len(documents):
max_chunks = len(documents)
kmeans = KMeans(n_clusters=max_chunks, random_state=42).fit(embeddings)
indices = self._find_closest_embeddings(embeddings, max_chunks, kmeans)
selected = []
total_tokens = 0
for i in indices:
doc_text = documents[i]
token_count = llm.get_num_tokens(doc_text)
if total_tokens + token_count > max_tokens:
break
selected.append((doc_text, metadata[i]))
total_tokens += token_count
selected = sorted(selected, key=lambda x: x[1].get("chunk_index", 0))
return [Document(page_content=d, metadata=m) for d, m in selected]
def _get_chunks_for_stuff(self, documents, metadata, embeddings, llm, max_tokens, fallback_max_chunks=5):
total_tokens = sum(llm.get_num_tokens(d) for d in documents)
if total_tokens <= max_tokens:
print(f"βœ… Using ALL documents ({total_tokens} tokens)")
ordered = sorted(zip(documents, metadata), key=lambda x: x[1].get("chunk_index", 0))
return [Document(page_content=d, metadata=m) for d, m in ordered]
else:
print(f"⚠️ Total tokens {total_tokens} exceed max {max_tokens} β€” selecting best subset")
return self._select_best_chunks_under_token_budget(
documents=documents,
metadata=metadata,
embeddings=embeddings,
llm=llm,
max_tokens=max_tokens,
max_chunks=fallback_max_chunks
)