|
|
import os |
|
|
from typing import List, Dict |
|
|
from dotenv import load_dotenv |
|
|
from pathlib import Path |
|
|
|
|
|
from evoagentx.core.logging import logger |
|
|
from evoagentx.storages.base import StorageHandler |
|
|
from evoagentx.rag.rag import RAGEngine |
|
|
from evoagentx.storages.storages_config import VectorStoreConfig, DBConfig, StoreConfig |
|
|
from evoagentx.rag.rag_config import RAGConfig, ReaderConfig, IndexConfig, EmbeddingConfig, RetrievalConfig |
|
|
from evoagentx.rag.schema import Query, TextChunk |
|
|
from evoagentx.benchmark.real_mm_rag import RealMMRAG |
|
|
from evoagentx.models.openai_model import OpenAILLM |
|
|
from evoagentx.models.model_configs import OpenAILLMConfig |
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
def demonstrate_rag_to_generation_pipeline(): |
|
|
"""Simple demo: Index 20 docs, retrieve 5, generate answer.""" |
|
|
print("π EvoAgentX Multimodal RAG-to-Generation Pipeline") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
openai_key = os.getenv("OPENAI_API_KEY") |
|
|
if not openai_key: |
|
|
print("β OPENAI_API_KEY not found. Please set it to run this demo.") |
|
|
return |
|
|
|
|
|
|
|
|
voyage_key = os.getenv("VOYAGE_API_KEY") |
|
|
if not voyage_key: |
|
|
print("β VOYAGE_API_KEY not found. Please set it to run this demo.") |
|
|
return |
|
|
|
|
|
|
|
|
datasets = RealMMRAG("./debug/data/real_mm_rag") |
|
|
samples = datasets.get_random_samples(20, seed=42) |
|
|
print(f"π Dataset loaded with {len(samples)} samples") |
|
|
|
|
|
|
|
|
store_config = StoreConfig( |
|
|
dbConfig=DBConfig(db_name="sqlite", path="./debug/data/real_mm_rag/cache/demo.sql"), |
|
|
vectorConfig=VectorStoreConfig(vector_name="faiss", dimensions=1024, index_type="flat_l2"), |
|
|
path="./debug/data/real_mm_rag/cache/indexing" |
|
|
) |
|
|
storage_handler = StorageHandler(storageConfig=store_config) |
|
|
|
|
|
rag_config = RAGConfig( |
|
|
modality="multimodal", |
|
|
reader=ReaderConfig(recursive=True, exclude_hidden=True, errors="ignore"), |
|
|
embedding=EmbeddingConfig(provider="voyage", model_name="voyage-multimodal-3", device="cpu" ,api_key=voyage_key), |
|
|
index=IndexConfig(index_type="vector"), |
|
|
retrieval=RetrievalConfig(retrivel_type="vector", top_k=5, similarity_cutoff=0.3) |
|
|
) |
|
|
search_engine = RAGEngine(config=rag_config, storage_handler=storage_handler) |
|
|
|
|
|
|
|
|
print("\nπ Step 1: Indexing 20 documents...") |
|
|
corpus_id = "demo_corpus" |
|
|
valid_paths = [s["image_path"] for s in samples if os.path.exists(s["image_path"])][:20] |
|
|
|
|
|
if len(valid_paths) < 20: |
|
|
print(f"β οΈ Only found {len(valid_paths)} valid image paths, using those") |
|
|
|
|
|
corpus = search_engine.read(file_paths=valid_paths, corpus_id=corpus_id) |
|
|
search_engine.add(index_type="vector", nodes=corpus, corpus_id=corpus_id) |
|
|
print(f"β
Indexed {len(corpus.chunks)} image documents") |
|
|
|
|
|
|
|
|
query_sample = next((s for s in samples if s["query"] and len(s["query"].strip()) > 10), None) |
|
|
if not query_sample: |
|
|
print("β No suitable query found in samples") |
|
|
return |
|
|
|
|
|
query_text = query_sample["query"] |
|
|
target_image = query_sample["image_filename"] |
|
|
|
|
|
print(f"\nπ Step 2: Querying with: '{query_text}'") |
|
|
print(f"π― Target document: {target_image}") |
|
|
|
|
|
|
|
|
query = Query(query_str=query_text, top_k=5) |
|
|
result = search_engine.query(query, corpus_id=corpus_id) |
|
|
retrieved_chunks = result.corpus.chunks |
|
|
|
|
|
print(f"\nπ Retrieved {len(retrieved_chunks)} documents:") |
|
|
retrieved_paths = [] |
|
|
for i, chunk in enumerate(retrieved_chunks): |
|
|
filename = Path(chunk.image_path).name if chunk.image_path else "Unknown" |
|
|
similarity = getattr(chunk.metadata, 'similarity_score', 0.0) |
|
|
retrieved_paths.append(filename) |
|
|
print(f" {i+1}. {filename} (similarity: {similarity:.3f})") |
|
|
|
|
|
|
|
|
print(f"\nπ€ Step 3: Generating answer with GPT-4o...") |
|
|
|
|
|
try: |
|
|
|
|
|
llm_config = OpenAILLMConfig( |
|
|
model="gpt-4o", |
|
|
openai_key=openai_key, |
|
|
temperature=0.1, |
|
|
max_tokens=300 |
|
|
) |
|
|
llm = OpenAILLM(config=llm_config) |
|
|
|
|
|
print("β
LLM initialized successfully") |
|
|
|
|
|
|
|
|
content = [TextChunk(text=f"Query: {query_text}\n\nAnalyze these retrieved images and answer the query:")] |
|
|
content.extend(retrieved_chunks[:3]) |
|
|
|
|
|
|
|
|
response = llm.generate(messages=[ |
|
|
{"role": "system", "content": "You are an expert image analyst. Answer queries based on provided images."}, |
|
|
{"role": "user", "content": content} |
|
|
]) |
|
|
|
|
|
print("β
Response generated successfully") |
|
|
answer = response.content |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
error_details = traceback.format_exc() |
|
|
print(f"β Detailed error:") |
|
|
print(error_details) |
|
|
answer = f"Error in generation: {str(e)}" |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("π FINAL RESULTS") |
|
|
print("=" * 60) |
|
|
print(f"π QUERY: {query_text}") |
|
|
print(f"\nπ RETRIEVED PATHS:") |
|
|
for i, path in enumerate(retrieved_paths): |
|
|
print(f" {i+1}. {path}") |
|
|
print(f"\nπ― TARGET DOCUMENT: {target_image}") |
|
|
print(f"\nπ€ GENERATED ANSWER:") |
|
|
print(answer) |
|
|
print("EXPECTED ANSWER:") |
|
|
print(query_sample["answer"]) |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
search_engine.clear(corpus_id=corpus_id) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demonstrate_rag_to_generation_pipeline() |
|
|
|