Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Just search - A Smart Search Agent using Menlo/Lucy-128k | |
| Part of the Just, AKA Simple series | |
| Built with Gradio, DuckDuckGo Search, and Hugging Face Transformers | |
| """ | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from duckduckgo_search import DDGS | |
| import json | |
| import re | |
| import time | |
| from typing import List, Dict, Tuple | |
| import spaces | |
| # Initialize the model and tokenizer globally for efficiency | |
| MODEL_NAME = "Menlo/Lucy-128k" | |
| tokenizer = None | |
| model = None | |
| search_pipeline = None | |
| def initialize_model(): | |
| """Initialize the Menlo/Lucy-128k model and tokenizer""" | |
| global tokenizer, model, search_pipeline | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| torch_dtype=torch.float16, | |
| device_map="auto", | |
| trust_remote_code=True | |
| ) | |
| search_pipeline = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| torch_dtype=torch.float16, | |
| device_map="auto", | |
| max_new_tokens=2048, | |
| temperature=0.7, | |
| do_sample=True, | |
| pad_token_id=tokenizer.eos_token_id | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Error initializing model: {e}") | |
| return False | |
| def extract_thinking_and_response(text: str) -> Tuple[str, str]: | |
| """Extract thinking process and clean response from AI output""" | |
| thinking = "" | |
| response = text | |
| # Extract thinking content | |
| thinking_match = re.search(r'<think>(.*?)</think>', text, re.DOTALL) | |
| if thinking_match: | |
| thinking = thinking_match.group(1).strip() | |
| response = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL) | |
| # Clean up the response | |
| response = re.sub(r'^(Assistant:|AI:|Response:|Answer:)\s*', '', response.strip()) | |
| response = re.sub(r'\[INST\].*?\[\/INST\]', '', response) | |
| response = re.sub(r'<\|.*?\|>', '', response) | |
| return thinking.strip(), response.strip() | |
| def clean_response(text: str) -> str: | |
| """Clean up the AI response to extract just the relevant content""" | |
| _, response = extract_thinking_and_response(text) | |
| return response | |
| def generate_search_queries(user_query: str) -> Tuple[List[str], str]: | |
| """Generate multiple search queries based on user input using AI""" | |
| prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
| You are a search query generator. Given a user's question, generate 3-5 different search queries that would help find comprehensive information to answer their question. Return only the search queries, one per line, without numbering or bullet points. | |
| Example: | |
| User: "What are the latest developments in AI?" | |
| latest AI developments 2024 | |
| artificial intelligence breakthroughs recent | |
| AI technology advances news | |
| machine learning innovations 2024 | |
| <|eot_id|><|start_header_id|>user<|end_header_id|> | |
| {user_query} | |
| <|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
| try: | |
| response = search_pipeline(prompt, max_new_tokens=200, temperature=0.3) | |
| generated_text = response[0]['generated_text'] | |
| # Extract just the assistant's response | |
| assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] | |
| thinking, cleaned_response = extract_thinking_and_response(assistant_response) | |
| # Split into individual queries and clean them | |
| queries = [q.strip() for q in cleaned_response.split('\n') if q.strip()] | |
| # Filter out any non-query text | |
| queries = [q for q in queries if len(q) > 5 and not q.startswith('Note:') and not q.startswith('Example:')] | |
| return queries[:5], thinking # Return max 5 queries and thinking | |
| except Exception as e: | |
| print(f"Error generating queries: {e}") | |
| # Fallback to simple query variations | |
| return [user_query, f"{user_query} 2024", f"{user_query} latest"], "" | |
| def search_web(queries: List[str]) -> List[Dict]: | |
| """Search the web using DuckDuckGo with multiple queries""" | |
| all_results = [] | |
| ddgs = DDGS() | |
| for query in queries: | |
| try: | |
| results = ddgs.text(query, max_results=5, region='wt-wt', safesearch='moderate') | |
| for result in results: | |
| result['search_query'] = query | |
| all_results.append(result) | |
| time.sleep(0.5) # Rate limiting | |
| except Exception as e: | |
| print(f"Error searching for '{query}': {e}") | |
| continue | |
| # Remove duplicates based on URL | |
| seen_urls = set() | |
| unique_results = [] | |
| for result in all_results: | |
| if result['href'] not in seen_urls: | |
| seen_urls.add(result['href']) | |
| unique_results.append(result) | |
| return unique_results[:15] # Return max 15 results | |
| def filter_relevant_results(user_query: str, search_results: List[Dict]) -> Tuple[List[Dict], str]: | |
| """Use AI to filter and rank search results by relevance""" | |
| if not search_results: | |
| return [], "" | |
| # Prepare results summary for AI | |
| results_text = "" | |
| for i, result in enumerate(search_results[:12]): # Limit to avoid token overflow | |
| results_text += f"{i+1}. Title: {result.get('title', 'No title')}\n" | |
| results_text += f" URL: {result.get('href', 'No URL')}\n" | |
| results_text += f" Snippet: {result.get('body', 'No description')[:200]}...\n\n" | |
| prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
| You are a search result evaluator. Given a user's question and search results, identify which results are most relevant and helpful for answering the question. | |
| Return only the numbers of the most relevant results (1-5 results maximum), separated by commas. Consider: | |
| - Direct relevance to the question | |
| - Credibility of the source | |
| - Recency of information | |
| - Comprehensiveness of content | |
| Example response: 1, 3, 7 | |
| <|eot_id|><|start_header_id|>user<|end_header_id|> | |
| Question: {user_query} | |
| Search Results: | |
| {results_text} | |
| <|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
| try: | |
| response = search_pipeline(prompt, max_new_tokens=100, temperature=0.1) | |
| generated_text = response[0]['generated_text'] | |
| # Extract assistant's response | |
| assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] | |
| thinking, cleaned_response = extract_thinking_and_response(assistant_response) | |
| # Extract numbers | |
| numbers = re.findall(r'\d+', cleaned_response) | |
| selected_indices = [int(n) - 1 for n in numbers if int(n) <= len(search_results)] | |
| return [search_results[i] for i in selected_indices if 0 <= i < len(search_results)][:5], thinking | |
| except Exception as e: | |
| print(f"Error filtering results: {e}") | |
| return search_results[:5], "" # Fallback to first 5 results | |
| def generate_final_answer(user_query: str, selected_results: List[Dict]) -> Tuple[str, str]: | |
| """Generate final answer based on selected search results""" | |
| if not selected_results: | |
| return "I couldn't find relevant information to answer your question. Please try rephrasing your query.", "" | |
| # Prepare context from selected results | |
| context = "" | |
| for i, result in enumerate(selected_results): | |
| context += f"Source {i+1}: {result.get('title', 'Unknown')}\n" | |
| context += f"Content: {result.get('body', 'No content available')}\n" | |
| context += f"URL: {result.get('href', 'No URL')}\n\n" | |
| prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
| You are a helpful research assistant. Based on the provided search results, give a comprehensive answer to the user's question. | |
| Guidelines: | |
| - Synthesize information from multiple sources | |
| - Be accurate and factual | |
| - Cite sources when possible | |
| - If information is conflicting, mention it | |
| - Keep the answer well-structured and easy to read | |
| - Include relevant URLs for further reading | |
| <|eot_id|><|start_header_id|>user<|end_header_id|> | |
| Question: {user_query} | |
| Search Results: | |
| {context} | |
| Please provide a comprehensive answer based on these sources. | |
| <|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
| try: | |
| response = search_pipeline(prompt, max_new_tokens=1024, temperature=0.2) | |
| generated_text = response[0]['generated_text'] | |
| # Extract assistant's response | |
| assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] | |
| thinking, answer = extract_thinking_and_response(assistant_response) | |
| return answer, thinking | |
| except Exception as e: | |
| print(f"Error generating final answer: {e}") | |
| return "I encountered an error while processing the search results. Please try again.", "" | |
| def search_agent_workflow(user_query: str, progress=gr.Progress()) -> Tuple[str, str, str]: | |
| """Main workflow that orchestrates the search agent""" | |
| if not user_query.strip(): | |
| return "Please enter a search query.", "", "" | |
| progress(0.1, desc="Initializing...") | |
| all_thinking = [] | |
| # Step 1: Generate search queries | |
| progress(0.2, desc="Generating search queries...") | |
| queries, thinking1 = generate_search_queries(user_query) | |
| if thinking1: | |
| all_thinking.append(f"**Query Generation:**\n{thinking1}") | |
| queries_text = "Generated queries:\n" + "\n".join(f"β’ {q}" for q in queries) | |
| # Step 2: Search the web | |
| progress(0.4, desc="Searching the web...") | |
| search_results = search_web(queries) | |
| if not search_results: | |
| return "No search results found. Please try a different query.", queries_text, "\n\n".join(all_thinking) | |
| # Step 3: Filter relevant results | |
| progress(0.6, desc="Filtering relevant results...") | |
| relevant_results, thinking2 = filter_relevant_results(user_query, search_results) | |
| if thinking2: | |
| all_thinking.append(f"**Result Filtering:**\n{thinking2}") | |
| # Step 4: Generate final answer | |
| progress(0.8, desc="Generating comprehensive answer...") | |
| final_answer, thinking3 = generate_final_answer(user_query, relevant_results) | |
| if thinking3: | |
| all_thinking.append(f"**Answer Generation:**\n{thinking3}") | |
| progress(1.0, desc="Complete!") | |
| # Prepare debug info | |
| debug_info = f"{queries_text}\n\nSelected {len(relevant_results)} relevant sources:\n" | |
| for i, result in enumerate(relevant_results): | |
| debug_info += f"{i+1}. {result.get('title', 'No title')} - {result.get('href', 'No URL')}\n" | |
| thinking_display = "\n\n".join(all_thinking) if all_thinking else "No thinking process recorded." | |
| return final_answer, debug_info, thinking_display | |
| # Custom CSS for dark blue theme and mobile responsiveness | |
| custom_css = """ | |
| /* Dark blue theme */ | |
| :root { | |
| --primary-bg: #0a1628; | |
| --secondary-bg: #1e3a5f; | |
| --accent-bg: #2563eb; | |
| --text-primary: #f8fafc; | |
| --text-secondary: #cbd5e1; | |
| --border-color: #334155; | |
| --input-bg: #1e293b; | |
| --button-bg: #3b82f6; | |
| --button-hover: #2563eb; | |
| } | |
| /* Global styles */ | |
| .gradio-container { | |
| background: linear-gradient(135deg, var(--primary-bg) 0%, var(--secondary-bg) 100%) !important; | |
| color: var(--text-primary) !important; | |
| font-family: 'Inter', 'Segoe UI', system-ui, sans-serif !important; | |
| } | |
| /* Mobile responsiveness */ | |
| @media (max-width: 768px) { | |
| .gradio-container { | |
| padding: 10px !important; | |
| } | |
| .gr-form { | |
| gap: 15px !important; | |
| } | |
| .gr-button { | |
| font-size: 16px !important; | |
| padding: 12px 20px !important; | |
| } | |
| } | |
| /* Input styling */ | |
| .gr-textbox textarea, .gr-textbox input { | |
| background: var(--input-bg) !important; | |
| border: 1px solid var(--border-color) !important; | |
| color: var(--text-primary) !important; | |
| border-radius: 8px !important; | |
| } | |
| /* Button styling */ | |
| .gr-button { | |
| background: linear-gradient(135deg, var(--button-bg) 0%, var(--accent-bg) 100%) !important; | |
| color: white !important; | |
| border: none !important; | |
| border-radius: 8px !important; | |
| font-weight: 600 !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .gr-button:hover { | |
| background: linear-gradient(135deg, var(--button-hover) 0%, var(--button-bg) 100%) !important; | |
| transform: translateY(-1px) !important; | |
| box-shadow: 0 4px 12px rgba(59, 130, 246, 0.3) !important; | |
| } | |
| /* Output styling */ | |
| .gr-markdown, .gr-textbox { | |
| background: var(--input-bg) !important; | |
| border: 1px solid var(--border-color) !important; | |
| border-radius: 8px !important; | |
| color: var(--text-primary) !important; | |
| } | |
| /* Header styling */ | |
| .gr-markdown h1 { | |
| color: var(--accent-bg) !important; | |
| text-align: center !important; | |
| margin-bottom: 20px !important; | |
| font-size: 2.5rem !important; | |
| font-weight: 700 !important; | |
| } | |
| /* Thinking section styling */ | |
| #thinking-output { | |
| background: var(--secondary-bg) !important; | |
| border: 1px solid var(--border-color) !important; | |
| border-radius: 8px !important; | |
| padding: 15px !important; | |
| font-family: 'Fira Code', 'Monaco', monospace !important; | |
| font-size: 0.9rem !important; | |
| line-height: 1.4 !important; | |
| } | |
| /* Loading animation */ | |
| .gr-loading { | |
| background: var(--secondary-bg) !important; | |
| border-radius: 8px !important; | |
| } | |
| /* Scrollbar styling */ | |
| ::-webkit-scrollbar { | |
| width: 8px; | |
| } | |
| ::-webkit-scrollbar-track { | |
| background: var(--primary-bg); | |
| } | |
| ::-webkit-scrollbar-thumb { | |
| background: var(--accent-bg); | |
| border-radius: 4px; | |
| } | |
| ::-webkit-scrollbar-thumb:hover { | |
| background: var(--button-hover); | |
| } | |
| """ | |
| def create_interface(): | |
| """Create the Gradio interface""" | |
| with gr.Blocks( | |
| theme=gr.themes.Base( | |
| primary_hue="blue", | |
| secondary_hue="slate", | |
| neutral_hue="slate", | |
| text_size="lg", | |
| spacing_size="lg", | |
| radius_size="md" | |
| ), | |
| css=custom_css, | |
| title="Just search - AI Search Agent", | |
| head="<meta name='viewport' content='width=device-width, initial-scale=1.0'>" | |
| ) as interface: | |
| gr.Markdown("# π Just search", elem_id="header") | |
| gr.Markdown( | |
| "*Part of the Just, AKA Simple series*\n\n" | |
| "**Intelligent search agent powered by Menlo/Lucy-128k**\n\n" | |
| "Ask any question and get comprehensive answers from the web.", | |
| elem_id="description" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| query_input = gr.Textbox( | |
| label="Your Question", | |
| placeholder="Ask me anything... (e.g., 'What are the latest developments in AI?')", | |
| lines=2, | |
| elem_id="query-input" | |
| ) | |
| with gr.Column(scale=1): | |
| search_btn = gr.Button( | |
| "π Search", | |
| variant="primary", | |
| size="lg", | |
| elem_id="search-button" | |
| ) | |
| with gr.Row(): | |
| answer_output = gr.Markdown( | |
| label="Answer", | |
| elem_id="answer-output", | |
| height=400 | |
| ) | |
| with gr.Accordion("π€ AI Thinking Process", open=False): | |
| thinking_output = gr.Markdown( | |
| label="Model's Chain of Thought", | |
| elem_id="thinking-output", | |
| height=300 | |
| ) | |
| with gr.Accordion("π§ Debug Info", open=False): | |
| debug_output = gr.Textbox( | |
| label="Search Process Details", | |
| lines=8, | |
| elem_id="debug-output" | |
| ) | |
| # Event handlers | |
| search_btn.click( | |
| fn=search_agent_workflow, | |
| inputs=[query_input], | |
| outputs=[answer_output, debug_output, thinking_output], | |
| show_progress=True | |
| ) | |
| query_input.submit( | |
| fn=search_agent_workflow, | |
| inputs=[query_input], | |
| outputs=[answer_output, debug_output, thinking_output], | |
| show_progress=True | |
| ) | |
| # Example queries | |
| gr.Examples( | |
| examples=[ | |
| ["What are the latest breakthroughs in quantum computing?"], | |
| ["How does climate change affect ocean currents?"], | |
| ["What are the best practices for sustainable agriculture?"], | |
| ["Explain the recent developments in renewable energy technology"], | |
| ["What are the health benefits of the Mediterranean diet?"] | |
| ], | |
| inputs=query_input, | |
| outputs=[answer_output, debug_output, thinking_output], | |
| fn=search_agent_workflow, | |
| cache_examples=False | |
| ) | |
| gr.Markdown( | |
| "---\n**Note:** This search agent generates multiple queries, searches the web, " | |
| "filters results for relevance, and provides comprehensive answers. " | |
| "Results are sourced from DuckDuckGo search." | |
| ) | |
| return interface | |
| def main(): | |
| """Main function to initialize and launch the app""" | |
| print("π Initializing Just search...") | |
| # Initialize the model | |
| if not initialize_model(): | |
| print("β Failed to initialize model. Please check your setup.") | |
| return | |
| print("β Model initialized successfully!") | |
| print("π Creating interface...") | |
| # Create and launch the interface | |
| interface = create_interface() | |
| print("π Just search is ready!") | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| show_error=True, | |
| debug=True | |
| ) | |
| if __name__ == "__main__": | |
| main() |