# -*- coding: utf-8 -*- """mm.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/14DP21Av9xxhDyabaRw3aPWyjuIZjmi1h """ # from google.colab import drive # drive.mount('/content/drive') from pathlib import Path # !pip install load_dotenv pdfplumber langchain langchain_community langchain_google_genai chromadb tiktoken import pickle from dotenv import load_dotenv load_dotenv() import os import pdfplumber from langchain_core.documents import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Chroma from langchain_community.embeddings import HuggingFaceEmbeddings from langchain.retrievers.multi_vector import MultiVectorRetriever from langchain.storage import InMemoryStore from uuid import uuid4 from langchain.agents import Tool, initialize_agent from langchain.agents.agent_types import AgentType from langchain.memory import ConversationBufferMemory from langchain_google_genai import ChatGoogleGenerativeAI import re from langchain.callbacks.tracers import LangChainTracer from uuid import uuid4 tracer = LangChainTracer() def parse_pdf_metadata(filename): """ dictionari whis metedata: company, year, form_type, quarter """ match = re.match(r"([a-z]+)_(\d{4})_(10k|10q)(?:_(\d{1}q))?\.pdf", filename, re.IGNORECASE) #match = re.match(r"([a-z]+)_(\d{4})_(10k|10q)(?:_([qQ]?[1-4]|[1-4][qQ]))?\.pdf", filename, re.IGNORECASE) if not match: raise ValueError(f"Filename '{filename}' does not match expected pattern.") company, year, form_type, quarter = match.groups() return { "company": company.capitalize(), "year": int(year), "form_type": form_type.upper(), "quarter": int(quarter[0]) if quarter else None } def clean_metadata(metadata: dict) -> dict: return {k: v for k, v in metadata.items() if v is not None} # LLM llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash") # === Settings === # CHROMA_TEXT_DIR = "/content/drive/My Drive/chroma_text" EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2" text_embedding = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) # === PDF extraction === def extract_pdf(pdf_path): metadata_base = parse_pdf_metadata(os.path.basename(pdf_path)) text_docs, table_chunks = [], [] with pdfplumber.open(pdf_path) as pdf: for page_num, page in enumerate(pdf.pages): # === Tables for table in page.extract_tables(): table_text = "\n".join( [" | ".join(cell if cell else "" for cell in row) for row in table if row] ) if table_text.strip(): table_chunks.append(Document(page_content=table_text, metadata=clean_metadata({**metadata_base,"type": "table", "page": page_num,}))) # === Text text = page.extract_text() if text: metadata = clean_metadata({**metadata_base, "type": "text", "page": page_num}) text_docs.append(Document(page_content=text, metadata=metadata)) # === Split text into chunks splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) chunked_text_docs = splitter.split_documents(text_docs) return chunked_text_docs + table_chunks # === Extract documents DOCSTORE_PATH = "docstore.pkl" if os.path.exists(DOCSTORE_PATH): print("📦 Load docstore from file...") with open(DOCSTORE_PATH, "rb") as f: text_docs = pickle.load(f) else: PDF_DIR = Path("/content/drive/My Drive/data") # Укажи путь к корневой папке text_docs = [] for pdf_file in PDF_DIR.rglob("*.pdf"): try: docs = extract_pdf(str(pdf_file)) text_docs.extend(docs) print(f"✅ Processed: {pdf_file.name} — find {len(docs)} docs.") except Exception as e: print(f"❌ error {pdf_file.name}: {e}") # === unical ID # Assign unique doc_id to text documents for doc in text_docs: if "doc_id" not in doc.metadata: doc.metadata["doc_id"] = str(uuid4()) #print(doc.metadata) for doc in text_docs: doc.metadata["source"] = doc.metadata["doc_id"] with open(DOCSTORE_PATH, "wb") as f: pickle.dump(text_docs, f) print("✅ docstore save to file.") memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) CHROMA_TEXT_DIR = "chroma_text" # === Save text index if not os.path.exists(CHROMA_TEXT_DIR) or not os.listdir(CHROMA_TEXT_DIR): text_store = Chroma.from_documents( documents=text_docs, embedding=text_embedding, persist_directory=CHROMA_TEXT_DIR, collection_name='finance_data' ) text_store.persist() print(f"✅ Text index created with {len(text_docs)} docs.") else: text_store = Chroma(persist_directory=CHROMA_TEXT_DIR, embedding_function=text_embedding, collection_name='finance_data') print("📁 Loaded existing text index.") # === 1. docstore # docstore (text_docs с unic doc_id) docstore = InMemoryStore() docstore.mset([(doc.metadata["source"], doc) for doc in text_docs]) doc_ids = list(docstore.yield_keys()) # creat retriever retriever = MultiVectorRetriever( vectorstore=text_store, # docstore=docstore, id_key="source" ) def multimodal_retrieve(query: str) -> str: docs = retriever.get_relevant_documents(query) if not docs: return "No relevant documents found." # Составим полные цитаты quotes = [] for i, doc in enumerate(docs): meta = doc.metadata source_info = f"{meta.get('company', '')}, {meta.get('year', '')}, {meta.get('form_type', '')}, page {meta.get('page', '')}" quote = f"📄 Source {i+1}: ({source_info})\n\"{doc.page_content.strip()}\"\n" quotes.append(quote) combined_quotes = "\n\n".join(quotes) # Вернём как один текст — LLM увидит это как input return f"The following documents were retrieved for the query:\n\n{combined_quotes}" # Creat Tool tools = [ Tool( name="MultimodalSearch", func=multimodal_retrieve, description="Returns full quotes from financial documents relevant to the user's question. Use for document-based answers with citations." ) ] # print("🔍 prüfung vektors und documente:") # print(len(retriever.vectorstore.similarity_search("net sales", k=1))) # results = text_store.similarity_search("net sales", k=1) # print(results[0].metadata) # results = retriever.vectorstore.similarity_search("net sales", k=1) # for doc in results: # doc_id = doc.metadata.get("source") # print("🧭 Vector metadata:", doc.metadata) # if not doc_id: # print("⚠️ no 'source' in metadata!") # continue # original_doc = docstore.mget([doc_id])[0] # if original_doc is None: # print("❌ no finde in docstore:", doc_id) # else: # print("✅ find:", original_doc.page_content[:300]) #inicialization agent agent_mm_rag = initialize_agent( tools=tools, llm=llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, memory=memory, verbose=True, callbacks=[tracer], #handle_parsing_errors=True ) if __name__ == "__main__": response = agent_mm_rag.run("what are apple's net sales for 2024, 2023 and 2022 and long term assets") print("\n🤖 Ansver:\n", response)