| import gradio as gr |
| import os |
| import hashlib |
| import json |
| import pickle |
| from datetime import datetime, timedelta |
| from pathlib import Path |
| from dotenv import load_dotenv |
|
|
| |
| from Hyde import get_answer_using_hyde |
| from QueryDecomposition import get_answer_using_query_decomposition |
| from QueryExpansion import get_answer_using_query_expansion |
| from RagFusion import get_answer_using_rag_fusion |
| from StepBackQuery import get_answer |
|
|
| |
| from AdvancedRag import ( |
| get_answer_using_multi_query, |
| get_answer_using_parent_child, |
| get_answer_using_contextual_compression, |
| get_answer_using_cross_encoder, |
| get_answer_using_semantic_routing |
| ) |
|
|
| load_dotenv() |
|
|
| |
| CACHE_DIR = Path("rag_cache") |
| CACHE_DIR.mkdir(exist_ok=True) |
| CACHE_EXPIRY_HOURS = 24 |
|
|
| |
| RAG_TECHNIQUES = { |
| |
| "HyDE (Hypothetical Document Embeddings)": get_answer_using_hyde, |
| "Query Decomposition": get_answer_using_query_decomposition, |
| "Query Expansion": get_answer_using_query_expansion, |
| "RAG Fusion": get_answer_using_rag_fusion, |
| "Step Back Query": get_answer, |
| |
| |
| "Multi-Query Retrieval": get_answer_using_multi_query, |
| "Parent-Child Retrieval": get_answer_using_parent_child, |
| "Contextual Compression": get_answer_using_contextual_compression, |
| "Cross-Encoder Reranking": get_answer_using_cross_encoder, |
| "Semantic Routing": get_answer_using_semantic_routing, |
| } |
|
|
| def generate_cache_key(link, technique): |
| """ |
| Generate a unique cache key based on link and technique |
| """ |
| cache_string = f"{link}_{technique}" |
| return hashlib.md5(cache_string.encode()).hexdigest() |
|
|
| def get_cache_file_path(cache_key): |
| """ |
| Get the full path for a cache file |
| """ |
| return CACHE_DIR / f"{cache_key}.pkl" |
|
|
| def is_cache_valid(cache_file_path): |
| """ |
| Check if cache file exists and is not expired |
| """ |
| if not cache_file_path.exists(): |
| return False |
| |
| |
| file_time = datetime.fromtimestamp(cache_file_path.stat().st_mtime) |
| expiry_time = datetime.now() - timedelta(hours=CACHE_EXPIRY_HOURS) |
| |
| return file_time > expiry_time |
|
|
| def save_to_cache(cache_key, data): |
| """ |
| Save data to cache file |
| """ |
| try: |
| cache_file_path = get_cache_file_path(cache_key) |
| cache_data = { |
| 'data': data, |
| 'timestamp': datetime.now().isoformat(), |
| 'cache_key': cache_key |
| } |
| |
| with open(cache_file_path, 'wb') as f: |
| pickle.dump(cache_data, f) |
| |
| print(f"β
Cached result for key: {cache_key}") |
| return True |
| except Exception as e: |
| print(f"β Failed to save cache: {e}") |
| return False |
|
|
| def load_from_cache(cache_key): |
| """ |
| Load data from cache file |
| """ |
| try: |
| cache_file_path = get_cache_file_path(cache_key) |
| |
| if not is_cache_valid(cache_file_path): |
| return None |
| |
| with open(cache_file_path, 'rb') as f: |
| cache_data = pickle.load(f) |
| |
| print(f"π― Cache hit for key: {cache_key}") |
| return cache_data['data'] |
| except Exception as e: |
| print(f"β Failed to load cache: {e}") |
| return None |
|
|
| def clear_expired_cache(): |
| """ |
| Automatically clear expired cache files |
| """ |
| try: |
| cache_files = list(CACHE_DIR.glob("*.pkl")) |
| expired_count = 0 |
| |
| for cache_file in cache_files: |
| if not is_cache_valid(cache_file): |
| cache_file.unlink() |
| expired_count += 1 |
| |
| if expired_count > 0: |
| print(f"π§Ή Auto-cleared {expired_count} expired cache files") |
| except Exception as e: |
| print(f"β Failed to auto-clear expired cache: {e}") |
|
|
| def process_rag_query(link, question, technique): |
| """ |
| Process the RAG query using the selected technique with caching |
| """ |
| try: |
| if not link or not question: |
| return "Please provide both a link and a question." |
| |
| if not link.startswith(('http://', 'https://')): |
| return "Please provide a valid URL starting with http:// or https://" |
| |
| |
| clear_expired_cache() |
| |
| |
| cache_key = generate_cache_key(link, technique) |
| |
| |
| cached_result = load_from_cache(cache_key) |
| if cached_result is not None: |
| |
| if isinstance(cached_result, dict) and question in cached_result: |
| return cached_result[question] |
| |
| |
| rag_function = RAG_TECHNIQUES.get(technique) |
| |
| if not rag_function: |
| return "Invalid technique selected." |
| |
| print(f"π Processing new query: {technique} for {link}") |
| |
| |
| answer = rag_function(link, question) |
| |
| |
| if cached_result is None: |
| cached_result = {} |
| elif not isinstance(cached_result, dict): |
| cached_result = {} |
| |
| cached_result[question] = answer |
| save_to_cache(cache_key, cached_result) |
| |
| return answer |
| |
| except Exception as e: |
| return f"Error processing query: {str(e)}\n\nNote: Advanced techniques require additional dependencies. Make sure you have installed: sentence-transformers, scikit-learn" |
|
|
| def create_webpage_preview(link): |
| """ |
| Create an HTML iframe to preview the webpage |
| """ |
| if not link: |
| return "" |
| |
| if not link.startswith(('http://', 'https://')): |
| return "<p style='color: red;'>Please provide a valid URL starting with http:// or https://</p>" |
| |
| |
| iframe_html = f""" |
| <div style="width: 100%; height: 500px; border: 1px solid #ccc; border-radius: 5px;"> |
| <iframe src="{link}" width="100%" height="100%" frameborder="0" |
| style="border-radius: 5px;"> |
| <p>Your browser does not support iframes. |
| <a href="{link}" target="_blank">Click here to open the link</a></p> |
| </iframe> |
| </div> |
| """ |
| return iframe_html |
|
|
| |
| def create_interface(): |
| with gr.Blocks(title="Advanced RAG Techniques", theme=gr.themes.Soft()) as demo: |
| |
| gr.Markdown(""" |
| # π Advanced RAG Techniques Comparison Tool |
| """) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| |
| gr.Markdown("## π Input Section") |
| |
| link_input = gr.Textbox( |
| label="Website URL", |
| placeholder="https://example.com/article", |
| info="Enter the URL of the webpage you want to analyze" |
| ) |
| |
| question_input = gr.Textbox( |
| label="Your Question", |
| placeholder="What is the main topic discussed in this article?", |
| info="Ask any question about the content of the webpage" |
| ) |
| |
| technique_dropdown = gr.Dropdown( |
| choices=list(RAG_TECHNIQUES.keys()), |
| label="RAG Technique", |
| value="Multi-Query Retrieval", |
| info="Choose the RAG technique - try the new advanced techniques!" |
| ) |
| |
| submit_btn = gr.Button("π Submit Query", variant="primary", size="lg") |
| |
| |
| gr.Markdown("## π‘ Answer") |
| answer_output = gr.Textbox( |
| label="Generated Answer", |
| lines=10, |
| interactive=False, |
| placeholder="Your answer will appear here..." |
| ) |
| |
| with gr.Column(scale=1): |
| |
| gr.Markdown("## π Webpage Preview") |
| webpage_preview = gr.HTML( |
| label="Webpage Content", |
| value="<p style='text-align: center; color: #666; padding: 50px;'>Enter a URL to preview the webpage</p>" |
| ) |
| |
| |
| link_input.change( |
| fn=create_webpage_preview, |
| inputs=[link_input], |
| outputs=[webpage_preview] |
| ) |
| |
| submit_btn.click( |
| fn=process_rag_query, |
| inputs=[link_input, question_input, technique_dropdown], |
| outputs=[answer_output] |
| ) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| return demo |
|
|
| |
| if __name__ == "__main__": |
| |
| if not os.getenv("OPENAI_API_KEY"): |
| print("Warning: OPENAI_API_KEY not found in environment variables.") |
| print("Please make sure to set your OpenAI API key in your .env file.") |
| |
| |
| demo = create_interface() |
| demo.launch( |
| share=True, |
| ) |