File size: 5,934 Bytes
5374a2d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
import os
from typing import List, Dict
from dotenv import load_dotenv
from pathlib import Path
from evoagentx.core.logging import logger
from evoagentx.storages.base import StorageHandler
from evoagentx.rag.rag import RAGEngine
from evoagentx.storages.storages_config import VectorStoreConfig, DBConfig, StoreConfig
from evoagentx.rag.rag_config import RAGConfig, ReaderConfig, IndexConfig, EmbeddingConfig, RetrievalConfig
from evoagentx.rag.schema import Query, TextChunk
from evoagentx.benchmark.real_mm_rag import RealMMRAG
from evoagentx.models.openai_model import OpenAILLM
from evoagentx.models.model_configs import OpenAILLMConfig
# Load environment
load_dotenv()
def demonstrate_rag_to_generation_pipeline():
"""Simple demo: Index 20 docs, retrieve 5, generate answer."""
print("π EvoAgentX Multimodal RAG-to-Generation Pipeline")
print("=" * 60)
# Check if OpenAI API key is available
openai_key = os.getenv("OPENAI_API_KEY")
if not openai_key:
print("β OPENAI_API_KEY not found. Please set it to run this demo.")
return
# Check if VOYAGE API key is available
voyage_key = os.getenv("VOYAGE_API_KEY")
if not voyage_key:
print("β VOYAGE_API_KEY not found. Please set it to run this demo.")
return
# Initialize dataset
datasets = RealMMRAG("./debug/data/real_mm_rag")
samples = datasets.get_random_samples(20, seed=42) # Get 20 samples
print(f"π Dataset loaded with {len(samples)} samples")
# Setup storage and RAG engine
store_config = StoreConfig(
dbConfig=DBConfig(db_name="sqlite", path="./debug/data/real_mm_rag/cache/demo.sql"),
vectorConfig=VectorStoreConfig(vector_name="faiss", dimensions=1024, index_type="flat_l2"),
path="./debug/data/real_mm_rag/cache/indexing"
)
storage_handler = StorageHandler(storageConfig=store_config)
rag_config = RAGConfig(
modality="multimodal",
reader=ReaderConfig(recursive=True, exclude_hidden=True, errors="ignore"),
embedding=EmbeddingConfig(provider="voyage", model_name="voyage-multimodal-3", device="cpu" ,api_key=voyage_key),
index=IndexConfig(index_type="vector"),
retrieval=RetrievalConfig(retrivel_type="vector", top_k=5, similarity_cutoff=0.3)
)
search_engine = RAGEngine(config=rag_config, storage_handler=storage_handler)
# Index 20 documents
print("\nπ Step 1: Indexing 20 documents...")
corpus_id = "demo_corpus"
valid_paths = [s["image_path"] for s in samples if os.path.exists(s["image_path"])][:20]
if len(valid_paths) < 20:
print(f"β οΈ Only found {len(valid_paths)} valid image paths, using those")
corpus = search_engine.read(file_paths=valid_paths, corpus_id=corpus_id)
search_engine.add(index_type="vector", nodes=corpus, corpus_id=corpus_id)
print(f"β
Indexed {len(corpus.chunks)} image documents")
# Find a good query sample
query_sample = next((s for s in samples if s["query"] and len(s["query"].strip()) > 10), None)
if not query_sample:
print("β No suitable query found in samples")
return
query_text = query_sample["query"]
target_image = query_sample["image_filename"]
print(f"\nπ Step 2: Querying with: '{query_text}'")
print(f"π― Target document: {target_image}")
# Retrieve 5 documents
query = Query(query_str=query_text, top_k=5)
result = search_engine.query(query, corpus_id=corpus_id)
retrieved_chunks = result.corpus.chunks
print(f"\nπ Retrieved {len(retrieved_chunks)} documents:")
retrieved_paths = []
for i, chunk in enumerate(retrieved_chunks):
filename = Path(chunk.image_path).name if chunk.image_path else "Unknown"
similarity = getattr(chunk.metadata, 'similarity_score', 0.0)
retrieved_paths.append(filename)
print(f" {i+1}. {filename} (similarity: {similarity:.3f})")
# Generate answer using multimodal LLM
print(f"\nπ€ Step 3: Generating answer with GPT-4o...")
try:
# Initialize LLM with proper configuration
llm_config = OpenAILLMConfig(
model="gpt-4o",
openai_key=openai_key,
temperature=0.1,
max_tokens=300
)
llm = OpenAILLM(config=llm_config)
print("β
LLM initialized successfully")
# Prepare content with text and retrieved images - LLM handles everything automatically
content = [TextChunk(text=f"Query: {query_text}\n\nAnalyze these retrieved images and answer the query:")]
content.extend(retrieved_chunks[:3]) # Add top 3 retrieved images
# Generate response - seamless multimodal generation
response = llm.generate(messages=[
{"role": "system", "content": "You are an expert image analyst. Answer queries based on provided images."},
{"role": "user", "content": content}
])
print("β
Response generated successfully")
answer = response.content
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"β Detailed error:")
print(error_details)
answer = f"Error in generation: {str(e)}"
# Print final results
print("\n" + "=" * 60)
print("π FINAL RESULTS")
print("=" * 60)
print(f"π QUERY: {query_text}")
print(f"\nπ RETRIEVED PATHS:")
for i, path in enumerate(retrieved_paths):
print(f" {i+1}. {path}")
print(f"\nπ― TARGET DOCUMENT: {target_image}")
print(f"\nπ€ GENERATED ANSWER:")
print(answer)
print("EXPECTED ANSWER:")
print(query_sample["answer"])
print("=" * 60)
# Cleanup
search_engine.clear(corpus_id=corpus_id)
if __name__ == "__main__":
demonstrate_rag_to_generation_pipeline()
|