Multimodal-Market-Analyst-AI-System / Multimodal_agent_RAG.py
NataliaH's picture
Upload 14 files
1e2e833 verified
# -*- coding: utf-8 -*-
"""mm.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/14DP21Av9xxhDyabaRw3aPWyjuIZjmi1h
"""
# from google.colab import drive
# drive.mount('/content/drive')
from pathlib import Path
# !pip install load_dotenv pdfplumber langchain langchain_community langchain_google_genai chromadb tiktoken
import pickle
from dotenv import load_dotenv
load_dotenv()
import os
import pdfplumber
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryStore
from uuid import uuid4
from langchain.agents import Tool, initialize_agent
from langchain.agents.agent_types import AgentType
from langchain.memory import ConversationBufferMemory
from langchain_google_genai import ChatGoogleGenerativeAI
import re
from langchain.callbacks.tracers import LangChainTracer
from uuid import uuid4
tracer = LangChainTracer()
def parse_pdf_metadata(filename):
"""
dictionari whis metedata:
company, year, form_type, quarter
"""
match = re.match(r"([a-z]+)_(\d{4})_(10k|10q)(?:_(\d{1}q))?\.pdf", filename, re.IGNORECASE)
#match = re.match(r"([a-z]+)_(\d{4})_(10k|10q)(?:_([qQ]?[1-4]|[1-4][qQ]))?\.pdf", filename, re.IGNORECASE)
if not match:
raise ValueError(f"Filename '{filename}' does not match expected pattern.")
company, year, form_type, quarter = match.groups()
return {
"company": company.capitalize(),
"year": int(year),
"form_type": form_type.upper(),
"quarter": int(quarter[0]) if quarter else None
}
def clean_metadata(metadata: dict) -> dict:
return {k: v for k, v in metadata.items() if v is not None}
# LLM
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
# === Settings ===
# CHROMA_TEXT_DIR = "/content/drive/My Drive/chroma_text"
EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2"
text_embedding = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
# === PDF extraction ===
def extract_pdf(pdf_path):
metadata_base = parse_pdf_metadata(os.path.basename(pdf_path))
text_docs, table_chunks = [], []
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages):
# === Tables
for table in page.extract_tables():
table_text = "\n".join(
[" | ".join(cell if cell else "" for cell in row) for row in table if row]
)
if table_text.strip():
table_chunks.append(Document(page_content=table_text, metadata=clean_metadata({**metadata_base,"type": "table", "page": page_num,})))
# === Text
text = page.extract_text()
if text:
metadata = clean_metadata({**metadata_base, "type": "text", "page": page_num})
text_docs.append(Document(page_content=text, metadata=metadata))
# === Split text into chunks
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunked_text_docs = splitter.split_documents(text_docs)
return chunked_text_docs + table_chunks
# === Extract documents
DOCSTORE_PATH = "docstore.pkl"
if os.path.exists(DOCSTORE_PATH):
print("📦 Load docstore from file...")
with open(DOCSTORE_PATH, "rb") as f:
text_docs = pickle.load(f)
else:
PDF_DIR = Path("/content/drive/My Drive/data") # Укажи путь к корневой папке
text_docs = []
for pdf_file in PDF_DIR.rglob("*.pdf"):
try:
docs = extract_pdf(str(pdf_file))
text_docs.extend(docs)
print(f"✅ Processed: {pdf_file.name} — find {len(docs)} docs.")
except Exception as e:
print(f"❌ error {pdf_file.name}: {e}")
# === unical ID
# Assign unique doc_id to text documents
for doc in text_docs:
if "doc_id" not in doc.metadata:
doc.metadata["doc_id"] = str(uuid4())
#print(doc.metadata)
for doc in text_docs:
doc.metadata["source"] = doc.metadata["doc_id"]
with open(DOCSTORE_PATH, "wb") as f:
pickle.dump(text_docs, f)
print("✅ docstore save to file.")
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
CHROMA_TEXT_DIR = "chroma_text"
# === Save text index
if not os.path.exists(CHROMA_TEXT_DIR) or not os.listdir(CHROMA_TEXT_DIR):
text_store = Chroma.from_documents(
documents=text_docs,
embedding=text_embedding,
persist_directory=CHROMA_TEXT_DIR,
collection_name='finance_data'
)
text_store.persist()
print(f"✅ Text index created with {len(text_docs)} docs.")
else:
text_store = Chroma(persist_directory=CHROMA_TEXT_DIR, embedding_function=text_embedding, collection_name='finance_data')
print("📁 Loaded existing text index.")
# === 1. docstore
# docstore (text_docs с unic doc_id)
docstore = InMemoryStore()
docstore.mset([(doc.metadata["source"], doc) for doc in text_docs])
doc_ids = list(docstore.yield_keys())
# creat retriever
retriever = MultiVectorRetriever(
vectorstore=text_store, #
docstore=docstore,
id_key="source"
)
def multimodal_retrieve(query: str) -> str:
docs = retriever.get_relevant_documents(query)
if not docs:
return "No relevant documents found."
# Составим полные цитаты
quotes = []
for i, doc in enumerate(docs):
meta = doc.metadata
source_info = f"{meta.get('company', '')}, {meta.get('year', '')}, {meta.get('form_type', '')}, page {meta.get('page', '')}"
quote = f"📄 Source {i+1}: ({source_info})\n\"{doc.page_content.strip()}\"\n"
quotes.append(quote)
combined_quotes = "\n\n".join(quotes)
# Вернём как один текст — LLM увидит это как input
return f"The following documents were retrieved for the query:\n\n{combined_quotes}"
# Creat Tool
tools = [
Tool(
name="MultimodalSearch",
func=multimodal_retrieve,
description="Returns full quotes from financial documents relevant to the user's question. Use for document-based answers with citations."
)
]
# print("🔍 prüfung vektors und documente:")
# print(len(retriever.vectorstore.similarity_search("net sales", k=1)))
# results = text_store.similarity_search("net sales", k=1)
# print(results[0].metadata)
# results = retriever.vectorstore.similarity_search("net sales", k=1)
# for doc in results:
# doc_id = doc.metadata.get("source")
# print("🧭 Vector metadata:", doc.metadata)
# if not doc_id:
# print("⚠️ no 'source' in metadata!")
# continue
# original_doc = docstore.mget([doc_id])[0]
# if original_doc is None:
# print("❌ no finde in docstore:", doc_id)
# else:
# print("✅ find:", original_doc.page_content[:300])
#inicialization agent
agent_mm_rag = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
memory=memory,
verbose=True,
callbacks=[tracer],
#handle_parsing_errors=True
)
if __name__ == "__main__":
response = agent_mm_rag.run("what are apple's net sales for 2024, 2023 and 2022 and long term assets")
print("\n🤖 Ansver:\n", response)