Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| """mm.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/14DP21Av9xxhDyabaRw3aPWyjuIZjmi1h | |
| """ | |
| # from google.colab import drive | |
| # drive.mount('/content/drive') | |
| from pathlib import Path | |
| # !pip install load_dotenv pdfplumber langchain langchain_community langchain_google_genai chromadb tiktoken | |
| import pickle | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| import os | |
| import pdfplumber | |
| from langchain_core.documents import Document | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.vectorstores import Chroma | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain.retrievers.multi_vector import MultiVectorRetriever | |
| from langchain.storage import InMemoryStore | |
| from uuid import uuid4 | |
| from langchain.agents import Tool, initialize_agent | |
| from langchain.agents.agent_types import AgentType | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| import re | |
| from langchain.callbacks.tracers import LangChainTracer | |
| from uuid import uuid4 | |
| tracer = LangChainTracer() | |
| def parse_pdf_metadata(filename): | |
| """ | |
| dictionari whis metedata: | |
| company, year, form_type, quarter | |
| """ | |
| match = re.match(r"([a-z]+)_(\d{4})_(10k|10q)(?:_(\d{1}q))?\.pdf", filename, re.IGNORECASE) | |
| #match = re.match(r"([a-z]+)_(\d{4})_(10k|10q)(?:_([qQ]?[1-4]|[1-4][qQ]))?\.pdf", filename, re.IGNORECASE) | |
| if not match: | |
| raise ValueError(f"Filename '{filename}' does not match expected pattern.") | |
| company, year, form_type, quarter = match.groups() | |
| return { | |
| "company": company.capitalize(), | |
| "year": int(year), | |
| "form_type": form_type.upper(), | |
| "quarter": int(quarter[0]) if quarter else None | |
| } | |
| def clean_metadata(metadata: dict) -> dict: | |
| return {k: v for k, v in metadata.items() if v is not None} | |
| # LLM | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash") | |
| # === Settings === | |
| # CHROMA_TEXT_DIR = "/content/drive/My Drive/chroma_text" | |
| EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
| text_embedding = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) | |
| # === PDF extraction === | |
| def extract_pdf(pdf_path): | |
| metadata_base = parse_pdf_metadata(os.path.basename(pdf_path)) | |
| text_docs, table_chunks = [], [] | |
| with pdfplumber.open(pdf_path) as pdf: | |
| for page_num, page in enumerate(pdf.pages): | |
| # === Tables | |
| for table in page.extract_tables(): | |
| table_text = "\n".join( | |
| [" | ".join(cell if cell else "" for cell in row) for row in table if row] | |
| ) | |
| if table_text.strip(): | |
| table_chunks.append(Document(page_content=table_text, metadata=clean_metadata({**metadata_base,"type": "table", "page": page_num,}))) | |
| # === Text | |
| text = page.extract_text() | |
| if text: | |
| metadata = clean_metadata({**metadata_base, "type": "text", "page": page_num}) | |
| text_docs.append(Document(page_content=text, metadata=metadata)) | |
| # === Split text into chunks | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) | |
| chunked_text_docs = splitter.split_documents(text_docs) | |
| return chunked_text_docs + table_chunks | |
| # === Extract documents | |
| DOCSTORE_PATH = "docstore.pkl" | |
| if os.path.exists(DOCSTORE_PATH): | |
| print("📦 Load docstore from file...") | |
| with open(DOCSTORE_PATH, "rb") as f: | |
| text_docs = pickle.load(f) | |
| else: | |
| PDF_DIR = Path("/content/drive/My Drive/data") # Укажи путь к корневой папке | |
| text_docs = [] | |
| for pdf_file in PDF_DIR.rglob("*.pdf"): | |
| try: | |
| docs = extract_pdf(str(pdf_file)) | |
| text_docs.extend(docs) | |
| print(f"✅ Processed: {pdf_file.name} — find {len(docs)} docs.") | |
| except Exception as e: | |
| print(f"❌ error {pdf_file.name}: {e}") | |
| # === unical ID | |
| # Assign unique doc_id to text documents | |
| for doc in text_docs: | |
| if "doc_id" not in doc.metadata: | |
| doc.metadata["doc_id"] = str(uuid4()) | |
| #print(doc.metadata) | |
| for doc in text_docs: | |
| doc.metadata["source"] = doc.metadata["doc_id"] | |
| with open(DOCSTORE_PATH, "wb") as f: | |
| pickle.dump(text_docs, f) | |
| print("✅ docstore save to file.") | |
| memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
| CHROMA_TEXT_DIR = "chroma_text" | |
| # === Save text index | |
| if not os.path.exists(CHROMA_TEXT_DIR) or not os.listdir(CHROMA_TEXT_DIR): | |
| text_store = Chroma.from_documents( | |
| documents=text_docs, | |
| embedding=text_embedding, | |
| persist_directory=CHROMA_TEXT_DIR, | |
| collection_name='finance_data' | |
| ) | |
| text_store.persist() | |
| print(f"✅ Text index created with {len(text_docs)} docs.") | |
| else: | |
| text_store = Chroma(persist_directory=CHROMA_TEXT_DIR, embedding_function=text_embedding, collection_name='finance_data') | |
| print("📁 Loaded existing text index.") | |
| # === 1. docstore | |
| # docstore (text_docs с unic doc_id) | |
| docstore = InMemoryStore() | |
| docstore.mset([(doc.metadata["source"], doc) for doc in text_docs]) | |
| doc_ids = list(docstore.yield_keys()) | |
| # creat retriever | |
| retriever = MultiVectorRetriever( | |
| vectorstore=text_store, # | |
| docstore=docstore, | |
| id_key="source" | |
| ) | |
| def multimodal_retrieve(query: str) -> str: | |
| docs = retriever.get_relevant_documents(query) | |
| if not docs: | |
| return "No relevant documents found." | |
| # Составим полные цитаты | |
| quotes = [] | |
| for i, doc in enumerate(docs): | |
| meta = doc.metadata | |
| source_info = f"{meta.get('company', '')}, {meta.get('year', '')}, {meta.get('form_type', '')}, page {meta.get('page', '')}" | |
| quote = f"📄 Source {i+1}: ({source_info})\n\"{doc.page_content.strip()}\"\n" | |
| quotes.append(quote) | |
| combined_quotes = "\n\n".join(quotes) | |
| # Вернём как один текст — LLM увидит это как input | |
| return f"The following documents were retrieved for the query:\n\n{combined_quotes}" | |
| # Creat Tool | |
| tools = [ | |
| Tool( | |
| name="MultimodalSearch", | |
| func=multimodal_retrieve, | |
| description="Returns full quotes from financial documents relevant to the user's question. Use for document-based answers with citations." | |
| ) | |
| ] | |
| # print("🔍 prüfung vektors und documente:") | |
| # print(len(retriever.vectorstore.similarity_search("net sales", k=1))) | |
| # results = text_store.similarity_search("net sales", k=1) | |
| # print(results[0].metadata) | |
| # results = retriever.vectorstore.similarity_search("net sales", k=1) | |
| # for doc in results: | |
| # doc_id = doc.metadata.get("source") | |
| # print("🧭 Vector metadata:", doc.metadata) | |
| # if not doc_id: | |
| # print("⚠️ no 'source' in metadata!") | |
| # continue | |
| # original_doc = docstore.mget([doc_id])[0] | |
| # if original_doc is None: | |
| # print("❌ no finde in docstore:", doc_id) | |
| # else: | |
| # print("✅ find:", original_doc.page_content[:300]) | |
| #inicialization agent | |
| agent_mm_rag = initialize_agent( | |
| tools=tools, | |
| llm=llm, | |
| agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
| memory=memory, | |
| verbose=True, | |
| callbacks=[tracer], | |
| #handle_parsing_errors=True | |
| ) | |
| if __name__ == "__main__": | |
| response = agent_mm_rag.run("what are apple's net sales for 2024, 2023 and 2022 and long term assets") | |
| print("\n🤖 Ansver:\n", response) | |