Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import gradio as gr | |
| import requests | |
| from dotenv import load_dotenv | |
| import gradio.components as gc | |
| import uuid | |
| # Load environment variables | |
| load_dotenv() | |
| # Get sensitive config from environment variables (set these in your .env file) | |
| #ELASTICSEARCH_URL = os.getenv("ELASTICSEARCH_URL") | |
| #ELASTICSEARCH_USER = os.getenv("ELASTICSEARCH_USER") | |
| #ELASTICSEARCH_PASSWORD = os.getenv("ELASTICSEARCH_PASSWORD") | |
| #OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| #AWS_LAMBDA_URL = os.getenv("AWS_LAMBDA_URL") | |
| GRADIO_AUTH_USERNAME = os.getenv("GRADIO_AUTH_USERNAME") | |
| GRADIO_AUTH_PASSWORD = os.getenv("GRADIO_AUTH_PASSWORD") | |
| # Check required env vars for local development only | |
| if not os.getenv("SPACE_ID"): | |
| missing_vars = [] | |
| for var in ["GRADIO_AUTH_USERNAME", "GRADIO_AUTH_PASSWORD"]: | |
| if not os.getenv(var): | |
| missing_vars.append(var) | |
| if missing_vars: | |
| print(f"Warning: Missing auth environment variables for local development: {', '.join(missing_vars)}") | |
| es = None | |
| # Initialize OpenAI | |
| #openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
| def chat_completion(messages, model="gpt-3.5-turbo", temperature=0.1): | |
| #return openai_client.chat.completions.create( | |
| # model=model, | |
| # messages=messages, | |
| # temperature=temperature | |
| #) | |
| return None | |
| def process_faq(question, user_id="anonymous", model="claude-sonnet"): | |
| """Process FAQ by calling AWS Lambda function with streaming response""" | |
| try: | |
| # Determine the correct Lambda URL and model parameter based on selection | |
| if model.startswith("nova-"): | |
| lambda_url = "https://tz2ttiieoc5z4aq6pskg24zu740bvqup.lambda-url.us-west-2.on.aws/" | |
| # lambda_url = "https://l2fhyrulj6yjzonazngpxdiswm0mgfvp.lambda-url.us-west-2.on.aws/" | |
| model_param = model.replace("nova-", "") # Extract micro/lite/pro | |
| elif model.startswith("claude-"): | |
| lambda_url = "https://myzano2bfze54q6yqp32wwpj6q0ixpmy.lambda-url.us-west-2.on.aws/" | |
| model_param = model.replace("claude-", "") # Extract haiku/sonnet | |
| else: | |
| return "Error: Invalid model selection" | |
| # Prepare the request payload | |
| payload = { | |
| "message": question.strip(), | |
| "user_id": user_id, | |
| "model": model_param | |
| } | |
| print(f"DEBUG: Sending to {lambda_url}") | |
| print(f"DEBUG: Payload: {json.dumps(payload, indent=2)}") | |
| # Make the API call with streaming | |
| with requests.post( | |
| lambda_url, | |
| headers={"Content-Type": "application/json"}, | |
| json=payload, | |
| stream=True | |
| ) as response: | |
| if response.status_code != 200: | |
| return f"Error: Lambda function returned status code {response.status_code}" | |
| # Process the streaming response | |
| full_response = "" | |
| for chunk in response.iter_content(chunk_size=1024, decode_unicode=True): | |
| if chunk: | |
| try: | |
| # Try to parse the chunk as JSON | |
| chunk_data = json.loads(chunk) | |
| if "response" in chunk_data: | |
| chunk_text = chunk_data["response"] | |
| full_response += chunk_text | |
| yield full_response | |
| except json.JSONDecodeError: | |
| # If not JSON, treat as plain text | |
| full_response += chunk | |
| yield full_response | |
| return full_response | |
| except Exception as e: | |
| return f"Error processing FAQ: {str(e)}" | |
| def natural_to_query(natural_query): | |
| """Convert natural language to Elasticsearch query body""" | |
| try: | |
| prompt = f"""Convert the following natural language query into an Elasticsearch query body.\nThe query should be in JSON format and follow Elasticsearch query DSL syntax.\n\nNatural language query: {natural_query}\n\nReturn only the JSON query body, nothing else.""" | |
| response = chat_completion([ | |
| {"role": "system", "content": "You are an expert in Elasticsearch query DSL. Convert natural language to Elasticsearch queries."}, | |
| {"role": "user", "content": prompt} | |
| ], model="gpt-3.5-turbo", temperature=0.1) | |
| # Extract and format the query | |
| if hasattr(response, 'choices'): | |
| # For OpenAI v1.x | |
| content = response.choices[0].message.content.strip() | |
| else: | |
| # For OpenAI v0.x | |
| content = response["choices"][0]["message"]["content"].strip() | |
| try: | |
| query_json = json.loads(content) | |
| return json.dumps(query_json, indent=2) | |
| except json.JSONDecodeError: | |
| return content | |
| except Exception as e: | |
| return f"Error generating query: {str(e)}" | |
| def execute_elasticsearch_query(query_body): | |
| """Execute the Elasticsearch query""" | |
| try: | |
| # Parse the query body | |
| query_json = json.loads(query_body) | |
| # Execute the query | |
| response = es.search( | |
| index="your_index_name", # Replace with your actual index name | |
| body=query_json | |
| ) | |
| # Format the response | |
| return json.dumps(response, indent=2) | |
| except json.JSONDecodeError: | |
| return "Error: Invalid JSON query body" | |
| except Exception as e: | |
| return f"Error executing query: {str(e)}" | |
| # --- Gradio v4.x UI --- | |
| def faq_wrapper(question, user_id, model): | |
| # Gradio expects a non-generator for Interface | |
| result = "" | |
| for chunk in process_faq(question, user_id, model): | |
| result = chunk | |
| # Convert literal \n characters to actual newlines | |
| result = result.replace('\\n', '\n') | |
| # Remove leading/trailing quotes if present | |
| result = result.strip('"\'') | |
| return result | |
| def elasticsearch_generate(natural_input): | |
| return natural_to_query(natural_input) | |
| def elasticsearch_execute(query_body): | |
| return execute_elasticsearch_query(query_body) | |
| with gr.Blocks() as demo: | |
| gc.Markdown("# MCP Tools - Local Version") | |
| with gr.Tab(label="FAQ"): # type: ignore | |
| faq_input = gc.Textbox(label="Enter your question", lines=3) | |
| model_selector = gc.Dropdown( | |
| label="Select Model", | |
| choices=["nova-micro", "nova-pro", "claude-haiku", "claude-sonnet"], | |
| value="claude-sonnet", | |
| interactive=True | |
| ) | |
| # Generate random user ID for this session | |
| session_user_id = str(uuid.uuid4())[:8] | |
| faq_button = gc.Button("Process") | |
| # Loading animation HTML | |
| loading_html = """ | |
| <div style="display: flex; justify-content: center; align-items: center; min-height: 100px; border: 1px solid #ddd; border-radius: 8px; background-color: #f9f9f9;"> | |
| <div style="display: inline-block; width: 40px; height: 40px; border: 4px solid #f3f3f3; border-top: 4px solid #3498db; border-radius: 50%; animation: spin 1s linear infinite;"></div> | |
| <style> | |
| @keyframes spin { | |
| 0% { transform: rotate(0deg); } | |
| 100% { transform: rotate(360deg); } | |
| } | |
| </style> | |
| </div> | |
| """ | |
| # Empty bounding box HTML | |
| empty_box_html = """ | |
| <div style="min-height: 100px; border: 1px solid #ddd; border-radius: 8px; background-color: #f9f9f9; padding: 20px;"> | |
| </div> | |
| """ | |
| faq_output = gc.HTML(label="Response", value=empty_box_html) | |
| with gr.Row(): # type: ignore | |
| thumbs_down = gc.Button("Report bad response", elem_id="thumbs-down", interactive=True) | |
| feedback_msg = gc.Markdown(visible=False) | |
| def report_bad_response(): | |
| return gr.update(value="Bad response reported. Thank you for your feedback.", visible=True), gr.update(interactive=False) | |
| thumbs_down.click(report_bad_response, outputs=[feedback_msg, thumbs_down]) | |
| # Combined function to handle loading state and processing | |
| def process_with_loading(question, model): | |
| # Show loading spinner | |
| yield gr.update(value=loading_html) | |
| # Process the question | |
| result = faq_wrapper(question, session_user_id, model) | |
| # Format response in bounding box and show result | |
| response_html = f""" | |
| <div style="min-height: 100px; border: 1px solid #ddd; border-radius: 8px; background-color: #ffffff; padding: 20px;"> | |
| <div style="white-space: pre-wrap; line-height: 1.5;">{result}</div> | |
| </div> | |
| """ | |
| yield gr.update(value=response_html) | |
| faq_button.click( | |
| process_with_loading, | |
| inputs=[faq_input, model_selector], | |
| outputs=[faq_output], | |
| show_progress=False | |
| ) | |
| with gr.Tab(label="Elasticsearch"): # type: ignore | |
| gc.Markdown("### Step 1: Natural Language to Query") | |
| natural_input = gc.Textbox(label="Describe what you want to search for", lines=3, placeholder="Example: Find all documents containing 'machine learning' in the title") | |
| generate_button = gc.Button("Generate Query") | |
| query_output = gc.Textbox(label="Generated Query Body", lines=10, placeholder="The generated Elasticsearch query will appear here") | |
| generate_button.click(elasticsearch_generate, inputs=natural_input, outputs=query_output) | |
| gc.Markdown("### Step 2: Execute Query") | |
| gc.Markdown("You can modify the query above if needed, then click Execute") | |
| execute_button = gc.Button("Execute Query") | |
| result_output = gc.Textbox(label="Query Results", lines=10, placeholder="The query results will appear here") | |
| execute_button.click(elasticsearch_execute, inputs=query_output, outputs=result_output) | |
| if __name__ == "__main__": | |
| # Check if running in Hugging Face Spaces | |
| is_spaces = os.getenv("SPACE_ID") is not None | |
| # Configure launch parameters for Spaces | |
| if is_spaces: | |
| launch_params = { | |
| "server_name": "0.0.0.0", | |
| "server_port": int(os.getenv("PORT", 7860)), | |
| "share": False | |
| } | |
| else: | |
| # Local development with auth | |
| auth_username = GRADIO_AUTH_USERNAME | |
| auth_password = GRADIO_AUTH_PASSWORD | |
| launch_params = { | |
| "server_name": "0.0.0.0", | |
| "server_port": int(os.getenv("PORT", 7860)), | |
| "share": True, | |
| "auth": (auth_username, auth_password), | |
| "auth_message": "Please enter your credentials to access the application." | |
| } | |
| # Launch the app | |
| demo.launch(**launch_params) | |