Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import requests | |
| from dotenv import load_dotenv | |
| import tiktoken | |
| #from langchain_openai import OpenAI | |
| #from langchain_community.llms import OpenAI | |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
| from langchain_core.prompts import PromptTemplate, ChatPromptTemplate | |
| from langchain_community.callbacks.manager import get_openai_callback | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.chains.summarize import load_summarize_chain | |
| from langchain.chains.combine_documents import create_stuff_documents_chain | |
| from langchain.schema import Document | |
| from sklearn.cluster import KMeans | |
| from sklearn.manifold import TSNE | |
| #import matplotlib.pyplot as plt | |
| import warnings | |
| from warnings import simplefilter | |
| import numpy as np | |
| #default_model='gpt-4o-mini' | |
| #default_embedding_model='text-embedding-3-small' | |
| #default_temperature=0 | |
| #default_chunk_size=5000 | |
| #default_chunk_overlap=1000 | |
| #default_max_lenght=5000 | |
| #default_separator="\n\n\n\n\n" | |
| ''' | |
| On average each token contains 4 characters; 1000 tokens is about 750 words | |
| - chunk size 1000 -> 250 tokens | |
| - chunk size 2000 -> 500 tokens | |
| - chunk size 5000 -> 1250 tokens | |
| - chunk size 10000 -> 2500 tokens | |
| On GPT-4o-mini: | |
| - the price for 10k tokens in input is about 0.001β¬ (1k -> 0.00015) | |
| - the price for 10k tokens in output is about 0.006β¬ (1k -> 0.0006) | |
| - the input limit is 128k tokens | |
| - the output limit is 16k tokens | |
| ''' | |
| map_prompt_en = """ | |
| Please provide a summary of the following text: | |
| {text} | |
| """ | |
| map_prompt_it = """ | |
| Per favore, fornisci un riassunto del seguente testo: | |
| {text} | |
| """ | |
| combine_prompt_en = """ | |
| Based on the key points and ideas provided in the previous summaries, craft a final, cohesive summary that presents the information as if it were an original piece of content. | |
| Ensure that the summary flows smoothly, maintaining a consistent tone and style throughout. Avoid explicitly referencing the structure or order of the previous summaries, and focus on creating a well-organized and comprehensive narrative that conveys the central themes and insights clearly and naturally. | |
| Return the summary in Markdown format, using `###` for headings, `-` for lists, and keeping lines separated for better readability. Make sure that each item in a list starts on a new line, and that each sentence in a paragraph is separated by a blank line. Never write multiple sentences on the same line, unless they are parts of the same continuous paragraph. Make sure that each mathematical formula is written in LaTeX and enclosed in the delimiters \\( \\) for inline formulas and \\[ \\] for block formulas. Each sentence or idea should be separated by a carriage return for better readability. When displaying code, always use triple backticks (```) and specify the language (e.g. ```html, ```python, ```javascript) to ensure proper formatting and maintain proper indentation and layout. | |
| {text} | |
| """ | |
| combine_prompt_it = """ | |
| Basandoti sui punti chiave e le idee fornite nei riassunti precedenti, crea un riassunto finale e coeso che presenti le informazioni come se fossero un contenuto originale. | |
| Assicurati che il riassunto scorra in modo fluido, mantenendo un tono e uno stile coerenti in tutto. Evita di fare riferimento esplicito alla struttura o all'ordine dei riassunti precedenti e concentrati sulla creazione di una narrazione ben organizzata e completa che trasmetta chiaramente e naturalmente i temi e le intuizioni centrali. | |
| Restituisci il riassunto in formato Markdown, utilizzando `###` per i titoli, `-` per gli elenchi e mantenendo le linee separate per una migliore leggibilitΓ . Assicurati che ogni elemento di un elenco inizi su una nuova riga, e che ogni frase di un paragrafo sia separata da una riga vuota. Non scrivere mai piΓΉ frasi sulla stessa riga, a meno che non siano parti dello stesso paragrafo continuo. Assicurati che ogni formula matematica sia scritta in LaTeX e racchiusa tra i delimitatori \\( \\) per le formule inline e \\[ \\] per le formule di blocco. Ogni frase o idea deve essere separata da un ritorno a capo per una migliore leggibilitΓ . Quando mostri del codice, usa sempre i backtick tripli (```) e specifica la lingua (es. ```html, ```python, ```javascript) per garantire la corretta formattazione e mantenere l'indentazione e il layout corretti. | |
| {text} | |
| """ | |
| initial_summary_prompt_intro_en = """ | |
| Read the following introductory text and write a brief summary of the main themes. | |
| Start with the sentence: "The content is about..." or a similar phrase, followed by a list of 3β5 key points. Use a clear and informative style. | |
| """ | |
| initial_summary_prompt_intro_it = """ | |
| Leggi il seguente testo introduttivo e scrivi un riassunto breve dei temi principali. | |
| Inizia con la frase: "Il contenuto tratta di..." oppure una formula equivalente, e segui con un elenco di 3β5 punti chiave. Usa uno stile chiaro e informativo. | |
| """ | |
| initial_summary_prompt_closing_en = """ | |
| End with a sentence indicating that this is just an introduction and the content continues. | |
| """ | |
| initial_summary_prompt_closing_it = """ | |
| Concludi con una frase che segnali che il contenuto continua oltre questo estratto. | |
| """ | |
| initial_summary_prompt_formatting = """ | |
| Return the summary in Markdown format, using `###` for headings, `-` for lists, and keeping lines separated for better readability. Make sure that each item in a list starts on a new line, and that each sentence in a paragraph is separated by a blank line. Never write multiple sentences on the same line, unless they are parts of the same continuous paragraph. Make sure that each mathematical formula is written in LaTeX and enclosed in the delimiters \\( \\) for inline formulas and \\[ \\] for block formulas. Each sentence or idea should be separated by a carriage return for better readability. When displaying code, always use triple backticks (```) and specify the language (e.g. ```html, ```python, ```javascript) to ensure proper formatting and maintain proper indentation and layout. | |
| """ | |
| def get_map_prompt(language): | |
| if language=='it': | |
| return map_prompt_it | |
| else: | |
| return map_prompt_en | |
| def get_combine_prompt(language): | |
| if language=='it': | |
| return combine_prompt_it | |
| else: | |
| return combine_prompt_en | |
| def get_initial_summary_prompt(language, is_partial=False): | |
| if language == 'it': | |
| prompt = initial_summary_prompt_intro_it | |
| if is_partial: | |
| prompt += "\n" + initial_summary_prompt_closing_it | |
| else: | |
| prompt = initial_summary_prompt_intro_en | |
| if is_partial: | |
| prompt += "\n" + initial_summary_prompt_closing_en | |
| # Aggiungi sempre la parte sulla formattazione Markdown | |
| prompt += "\n" + initial_summary_prompt_formatting | |
| # Appendice per il blocco di testo | |
| prompt += "\n\nText:\n{combined_text}" | |
| return prompt | |
| # Constants | |
| MAX_TOTAL_TOKENS = 6000 # Safe token limit for summarization | |
| CHUNK_SIZE = 2000 # Define a fixed chunk size | |
| MAX_SELECTED_DOCS = 5 | |
| class SummaryManager: | |
| def __init__(self, language, qdrant_manager, model='gpt-4o-mini', temperature=0, max_tokens=MAX_TOTAL_TOKENS): | |
| self.language = language | |
| self.qdrant_manager = qdrant_manager | |
| self.model = model | |
| self.temperature = temperature | |
| self.max_tokens = max_tokens | |
| print("Model:", self.model) | |
| self.llm = ChatOpenAI(model=model, temperature=temperature) | |
| def do_initial_summary(self): | |
| """ | |
| Builds a lightweight initial summary using the first 2-3 chunks of the document. | |
| If an initial summary already exists in the vector store, it returns that instead. | |
| """ | |
| # STEP 1: cerca se giΓ esiste | |
| print("π Checking for existing initial summary...", flush=True) | |
| all_docs = self.qdrant_manager.get_documents() | |
| for doc in all_docs: | |
| if doc["payload"].get("type") == "initial_summary" and doc["payload"].get("language") == self.language: | |
| print("β Found existing initial summary in vector store.") | |
| return doc["payload"]["page_content"] | |
| # STEP 2: genera il riassunto se non esiste | |
| print("π Generating initial summary from early chunks...", flush=True) | |
| if not all_docs: | |
| print("β οΈ No documents available for summary.") | |
| return None | |
| # Filtra solo i documenti con chunk_index | |
| chunk_docs = [doc for doc in all_docs if 'chunk_index' in doc["payload"]] | |
| total_chunks = len(chunk_docs) | |
| # Prendi i primi 3 chunk ordinati | |
| selected_chunks = sorted(chunk_docs, key=lambda d: d["payload"]["chunk_index"])[:3] | |
| is_partial = len(selected_chunks) < total_chunks | |
| combined_text = "\n".join([doc["payload"]["page_content"] for doc in selected_chunks])[:3000] | |
| # Prompt dinamico | |
| prompt_template = get_initial_summary_prompt(self.language, is_partial=is_partial) | |
| prompt = prompt_template.format(combined_text=combined_text) | |
| try: | |
| from langchain_core.messages import HumanMessage | |
| response = self.llm.invoke([HumanMessage(content=prompt)]) | |
| summary = response.content | |
| # STEP 3: salva il riassunto nel vector store | |
| inserted = self.qdrant_manager.insert_text( | |
| text=summary, | |
| metadata={ | |
| "type": "initial_summary", | |
| "file_name": self.qdrant_manager.collection_name, | |
| "language": self.language | |
| } | |
| ) | |
| if inserted: | |
| print("π Initial summary saved to vector store.") | |
| return summary | |
| except Exception as e: | |
| print(f"β Error generating initial summary: {e}") | |
| return None | |
| def do_summary_map_reduce(self): | |
| """ | |
| Returns a full summary using Map-Reduce summarization. | |
| If a final summary already exists in the vector store, returns that instead. | |
| """ | |
| # STEP 1: check if summary already exists | |
| print("π Checking for existing final summary...", flush=True) | |
| all_documents = self.qdrant_manager.get_documents() | |
| for doc in all_documents: | |
| if doc["payload"].get("type") == "map_summary" and doc["payload"].get("language") == self.language: | |
| print("β Found existing final summary in vector store.") | |
| return doc["payload"]["page_content"], 0, 0 | |
| if not all_documents: | |
| print("β No documents found in collection.") | |
| return None, 0, 0 | |
| # STEP 2: filtra fuori i documenti di tipo summary | |
| filtered_docs = [ | |
| doc for doc in all_documents | |
| if not str(doc["payload"].get("type", "")).endswith("summary") | |
| ] | |
| if not filtered_docs: | |
| print("β No non-summary documents available for summarization.") | |
| return None, 0, 0 | |
| print(f"π {len(filtered_docs)} documents after filtering summaries.", flush=True) | |
| # STEP 3: extract vectors & text | |
| embeddings = [doc["vector"] for doc in filtered_docs] | |
| documents = [doc["payload"]["page_content"] for doc in filtered_docs] | |
| metadata = [doc["payload"] for doc in filtered_docs] | |
| # STEP 4: select up to MAX_SELECTED_DOCS chunks via KMeans | |
| MAX_SELECTED_DOCS = 5 | |
| selected_docs = self._select_best_chunks( | |
| documents=documents, | |
| metadata=metadata, | |
| embeddings=embeddings, | |
| max_chunks=MAX_SELECTED_DOCS | |
| ) | |
| total_tokens = sum(self.llm.get_num_tokens(doc.page_content) for doc in selected_docs) | |
| print(f"β Selected {len(selected_docs)} docs with total tokens: {total_tokens}") | |
| # STEP 5: load LangChain prompts | |
| map_prompt_template = PromptTemplate(template=get_map_prompt(self.language), input_variables=["text"]) | |
| combine_prompt_template = PromptTemplate(template=get_combine_prompt(self.language), input_variables=["text"]) | |
| print("π Loading summarization chain...") | |
| summary_chain = load_summarize_chain( | |
| llm=self.llm, | |
| chain_type='map_reduce', | |
| map_prompt=map_prompt_template, | |
| combine_prompt=combine_prompt_template, | |
| verbose=False | |
| ) | |
| print("π Checking token size of each formatted_doc...") | |
| for i, doc in enumerate(selected_docs): | |
| token_count = self.llm.get_num_tokens(doc.page_content) | |
| print(f"Chunk {i+1}: {token_count} tokens") | |
| # STEP 6: run the chain with token tracking | |
| with get_openai_callback() as cb: | |
| result = summary_chain.invoke({"input_documents": selected_docs}) | |
| input_tokens_used = cb.prompt_tokens | |
| output_tokens_used = cb.completion_tokens | |
| total_tokens = cb.total_tokens | |
| print(f"π§Ύ Token usage: total={total_tokens}") | |
| full_summary = result['output_text'] | |
| print("β Map-reduce summary generated.") | |
| # STEP 7: store the final summary | |
| inserted = self.qdrant_manager.insert_text( | |
| text=full_summary, | |
| metadata={ | |
| "type": "map_summary", | |
| "file_name": self.qdrant_manager.collection_name, | |
| "language": self.language | |
| } | |
| ) | |
| if inserted: | |
| print("π Final summary saved to vector store.") | |
| return full_summary, input_tokens_used, output_tokens_used | |
| def do_summary_stuff(self): | |
| """ | |
| Returns a full summary using STUFF summarization strategy. | |
| Uses all documents if total token count is within limits, | |
| otherwise selects the best subset under token budget. | |
| """ | |
| # STEP 1: check if summary already exists | |
| print("π Checking for existing final summary...", flush=True) | |
| all_documents = self.qdrant_manager.get_documents() | |
| for doc in all_documents: | |
| if doc["payload"].get("type") == "stuff_summary" and doc["payload"].get("language") == self.language: | |
| print("β Found existing final summary in vector store.") | |
| return doc["payload"]["page_content"], 0, 0 | |
| if not all_documents: | |
| print("β No documents found in collection.") | |
| return None, 0, 0 | |
| # STEP 2: filtra fuori i documenti di tipo summary | |
| filtered_docs = [ | |
| doc for doc in all_documents | |
| if not str(doc["payload"].get("type", "")).endswith("summary") | |
| ] | |
| if not filtered_docs: | |
| print("β No non-summary documents available for summarization.") | |
| return None, 0, 0 | |
| # STEP 3: extract vectors & text | |
| embeddings = [doc["vector"] for doc in filtered_docs] | |
| documents = [doc["payload"]["page_content"] for doc in filtered_docs] | |
| metadata = [doc["payload"] for doc in filtered_docs] | |
| # STEP 4: selezione intelligente con fallback a clustering | |
| selected_docs = self._get_chunks_for_stuff( | |
| documents=documents, | |
| metadata=metadata, | |
| embeddings=embeddings, | |
| llm=self.llm, | |
| max_tokens=self.max_tokens, | |
| fallback_max_chunks=5 | |
| ) | |
| total_tokens = sum(self.llm.get_num_tokens(doc.page_content) for doc in selected_docs) | |
| print(f"β Selected {len(selected_docs)} docs with total tokens: {total_tokens}") | |
| # STEP 5: load chain | |
| combine_prompt_template = PromptTemplate( | |
| template=get_combine_prompt(self.language), | |
| input_variables=["text"] | |
| ) | |
| print("π Running summarization with 'stuff' strategy...") | |
| summary_chain = load_summarize_chain( | |
| llm=self.llm, | |
| chain_type='stuff', | |
| prompt=combine_prompt_template, | |
| verbose=False | |
| ) | |
| # STEP 6: run the chain with token tracking | |
| with get_openai_callback() as cb: | |
| result = summary_chain.invoke({"input_documents": selected_docs}) | |
| input_tokens_used = cb.prompt_tokens | |
| output_tokens_used = cb.completion_tokens | |
| total_tokens = cb.total_tokens | |
| print(f"π§Ύ Token usage: total={total_tokens}") | |
| full_summary = result['output_text'] | |
| print("β Stuff summary generated.") | |
| # STEP 7: store the final summary | |
| inserted = self.qdrant_manager.insert_text( | |
| text=full_summary, | |
| metadata={ | |
| "type": "stuff_summary", | |
| "file_name": self.qdrant_manager.collection_name, | |
| "language": self.language | |
| } | |
| ) | |
| if inserted: | |
| print("π Final summary saved to vector store.") | |
| return full_summary, input_tokens_used, output_tokens_used | |
| def _find_closest_embeddings(self,vectors, num_clusters, kmeans): | |
| closest_indices = [] | |
| for i in range(num_clusters): | |
| distances = np.linalg.norm(vectors - kmeans.cluster_centers_[i], axis=1) | |
| closest_index = np.argmin(distances) | |
| closest_indices.append(closest_index) | |
| selected_indices = sorted(closest_indices) | |
| return selected_indices | |
| def _count_tokens(text,model): | |
| if not text: | |
| return 0 | |
| encoding = tiktoken.encoding_for_model(model) | |
| num_tokens = len(encoding.encode(text)) | |
| return num_tokens | |
| def _select_best_chunks(self, documents, metadata, embeddings, max_chunks=5): | |
| assert len(documents) == len(metadata) == len(embeddings) | |
| num_clusters = min(max_chunks, len(documents)) | |
| print(f"π Selecting {num_clusters} best chunks (max={max_chunks})") | |
| if num_clusters > 1: | |
| kmeans = KMeans(n_clusters=num_clusters, random_state=42).fit(embeddings) | |
| indices = self._find_closest_embeddings(embeddings, num_clusters, kmeans) | |
| else: | |
| indices = list(range(min(len(documents), max_chunks))) | |
| selected = [(documents[i], metadata[i]) for i in indices[:max_chunks]] | |
| # Ordina per chunk_index | |
| selected = sorted(selected, key=lambda x: x[1].get("chunk_index", 0)) | |
| return [Document(page_content=d, metadata=m) for d, m in selected] | |
| def _select_best_chunks_under_token_budget(self, documents, metadata, embeddings, llm, max_tokens, max_chunks=10): | |
| assert len(documents) == len(metadata) == len(embeddings) | |
| if max_chunks > len(documents): | |
| max_chunks = len(documents) | |
| kmeans = KMeans(n_clusters=max_chunks, random_state=42).fit(embeddings) | |
| indices = self._find_closest_embeddings(embeddings, max_chunks, kmeans) | |
| selected = [] | |
| total_tokens = 0 | |
| for i in indices: | |
| doc_text = documents[i] | |
| token_count = llm.get_num_tokens(doc_text) | |
| if total_tokens + token_count > max_tokens: | |
| break | |
| selected.append((doc_text, metadata[i])) | |
| total_tokens += token_count | |
| selected = sorted(selected, key=lambda x: x[1].get("chunk_index", 0)) | |
| return [Document(page_content=d, metadata=m) for d, m in selected] | |
| def _get_chunks_for_stuff(self, documents, metadata, embeddings, llm, max_tokens, fallback_max_chunks=5): | |
| total_tokens = sum(llm.get_num_tokens(d) for d in documents) | |
| if total_tokens <= max_tokens: | |
| print(f"β Using ALL documents ({total_tokens} tokens)") | |
| ordered = sorted(zip(documents, metadata), key=lambda x: x[1].get("chunk_index", 0)) | |
| return [Document(page_content=d, metadata=m) for d, m in ordered] | |
| else: | |
| print(f"β οΈ Total tokens {total_tokens} exceed max {max_tokens} β selecting best subset") | |
| return self._select_best_chunks_under_token_budget( | |
| documents=documents, | |
| metadata=metadata, | |
| embeddings=embeddings, | |
| llm=llm, | |
| max_tokens=max_tokens, | |
| max_chunks=fallback_max_chunks | |
| ) |