Spaces:
Sleeping
Sleeping
| """ | |
| Dwani.ai Gradio UI - FIXED File Upload + Backend Health Check | |
| """ | |
| import gradio as gr | |
| import requests | |
| from typing import List, Dict | |
| from datetime import datetime | |
| import time | |
| import json | |
| # BACKEND MUST BE LIVE - Test this first! | |
| API_BASE = "https://gaganyatri-rag-backend.hf.space" | |
| class DwaniClient: | |
| def __init__(self, base_url=API_BASE): | |
| self.base_url = base_url.rstrip('/') | |
| def test_backend(self): | |
| """Test if backend is alive""" | |
| try: | |
| resp = requests.get(f"{self.base_url}/", timeout=5) | |
| return resp.status_code == 200, resp.json().get('status', 'Unknown') | |
| except: | |
| return False, "Backend not responding" | |
| def upload_file(self, file_obj) -> dict: | |
| """Upload single file object (Gradio format)""" | |
| files = {'file': (file_obj.name, file_obj, 'application/pdf')} | |
| resp = requests.post(f"{self.base_url}/files/upload", | |
| files=files, timeout=30) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def get_file_status(self, file_id: str) -> dict: | |
| resp = requests.get(f"{self.base_url}/files/{file_id}", timeout=10) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def list_files(self) -> List[dict]: | |
| resp = requests.get(f"{self.base_url}/files/", timeout=10) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def chat(self, file_ids: List[str], messages: List[Dict]) -> dict: | |
| payload = {"file_ids": file_ids, "messages": messages} | |
| resp = requests.post(f"{self.base_url}/chat-with-document", | |
| json=payload, timeout=60) | |
| resp.raise_for_status() | |
| return resp.json() | |
| client = DwaniClient() | |
| uploaded_files = {} | |
| chat_history = [] | |
| selected_files = [] | |
| def test_connection(): | |
| """Test backend connection""" | |
| is_alive, status = client.test_backend() | |
| if is_alive: | |
| return f"β Backend LIVE: {status}", gr.update(visible=True) | |
| else: | |
| return f"β Backend DOWN: {status}\n\nFIX: Check https://huggingface.co/spaces/Sahana31/RAG_backend", gr.update(visible=False) | |
| def poll_file_status(file_id: str, max_wait=120): | |
| """Poll until file ready""" | |
| for i in range(max_wait//2): | |
| try: | |
| status = client.get_file_status(file_id) | |
| if status['status'] == 'completed': | |
| return status, True | |
| if status['status'] == 'failed': | |
| return status, False | |
| time.sleep(2) | |
| except Exception as e: | |
| time.sleep(2) | |
| return {'status': 'timeout'}, False | |
| def upload_multiple(files): | |
| """Handle Gradio file list""" | |
| global uploaded_files | |
| if not files: | |
| return "No files selected", gr.update(choices=[]), create_file_list(), gr.update() | |
| status_msgs = ["**π€ UPLOADING...**"] | |
| for file_obj in files: | |
| try: | |
| # Upload file | |
| result = client.upload_file(file_obj) | |
| file_id = result['file_id'] | |
| filename = result['filename'] | |
| uploaded_files[file_id] = { | |
| 'filename': filename, | |
| 'status': 'pending', | |
| 'file_id': file_id | |
| } | |
| # Poll status | |
| final_status, success = poll_file_status(file_id) | |
| if success: | |
| uploaded_files[file_id]['status'] = 'completed' | |
| status_msgs.append(f"β {filename}") | |
| else: | |
| uploaded_files[file_id]['status'] = final_status['status'] | |
| status_msgs.append(f"β {filename} - {final_status['status']}") | |
| except requests.exceptions.HTTPError as e: | |
| status_msgs.append(f"β {file_obj.name} - HTTP {e.response.status_code}") | |
| except Exception as e: | |
| status_msgs.append(f"β {file_obj.name} - {str(e)}") | |
| # Only completed files for chat | |
| choices = [(info['filename'], info['file_id']) for info in uploaded_files.values() | |
| if info['status'] == 'completed'] | |
| return "\n".join(status_msgs), gr.update(choices=choices), create_file_list(), gr.update() | |
| def refresh_files(): | |
| try: | |
| files = client.list_files() | |
| global uploaded_files | |
| uploaded_files.clear() | |
| for f in files: | |
| uploaded_files[f['file_id']] = { | |
| 'filename': f['filename'], | |
| 'status': f['status'], | |
| 'file_id': f['file_id'] | |
| } | |
| choices = [(f['filename'], f['file_id']) for f in files if f['status'] == 'completed'] | |
| return create_file_list(), gr.update(choices=choices) | |
| except Exception as e: | |
| return f"β Refresh failed: {str(e)}", gr.update() | |
| def create_file_list(): | |
| if not uploaded_files: | |
| return "**No files uploaded**" | |
| lines = ["**π Your Files:**"] | |
| for info in uploaded_files.values(): | |
| emoji = {'completed': 'β ', 'processing': 'π', 'pending': 'β³', 'failed': 'β'}.get(info['status'], 'β') | |
| lines.append(f"{emoji} {info['filename']} ({info['status']})") | |
| return "\n".join(lines) | |
| def update_selected_files(files): | |
| global selected_files | |
| selected_files = files or [] | |
| return len(selected_files) | |
| def send_message(message, history): | |
| global chat_history, selected_files | |
| if not message.strip(): | |
| return history, "" | |
| if not selected_files: | |
| return history, "β οΈ Select files first!" | |
| # Add user message | |
| user_msg = {"role": "user", "content": message} | |
| thinking_msg = {"role": "assistant", "content": "π Thinking..."} | |
| new_history = history + [user_msg, thinking_msg] | |
| try: | |
| # Send to backend | |
| api_messages = chat_history + [user_msg] | |
| result = client.chat(selected_files, api_messages) | |
| # Update history | |
| chat_history.extend([user_msg, {"role": "assistant", "content": result['answer']}]) | |
| # Format response | |
| response = format_chat_response(result) | |
| final_history = history + [user_msg, {"role": "assistant", "content": response}] | |
| return final_history, "" | |
| except Exception as e: | |
| error_msg = f"β Error: {str(e)}" | |
| return new_history[:-1] + [{"role": "assistant", "content": error_msg}], error_msg | |
| def format_chat_response(result): | |
| answer = result['answer'] | |
| if result.get('sources'): | |
| sources = "\n\n**π Sources:**\n" | |
| for i, src in enumerate(result['sources'][:3], 1): | |
| sources += f"{i}. **{src['filename']}** (Page {src.get('page', '?')})\n" | |
| excerpt = src.get('excerpt', '')[:100] + "..." | |
| sources += f" > {excerpt}\n" | |
| return answer + sources | |
| return answer | |
| def clear_chat(): | |
| global chat_history | |
| chat_history = [] | |
| return [] | |
| # === GRADIO UI === | |
| with gr.Blocks(title="Dwani.ai RAG") as demo: | |
| gr.Markdown("# π Dwani.ai - Document Chatbot") | |
| gr.Markdown("**Upload PDFs β Chat with page-accurate citations**") | |
| # Backend status | |
| with gr.Row(): | |
| status_display = gr.Markdown("π‘ Testing backend...") | |
| test_btn = gr.Button("π Test Backend", variant="secondary") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("## π€ Upload PDFs") | |
| file_input = gr.File( | |
| label="Select multiple PDFs", | |
| file_types=[".pdf"], | |
| file_count="multiple" | |
| ) | |
| upload_btn = gr.Button("π Upload & Process", variant="primary") | |
| status_output = gr.Markdown("Ready...") | |
| refresh_btn = gr.Button("π Refresh") | |
| files_display = gr.Markdown("No files") | |
| with gr.Column(scale=2): | |
| gr.Markdown("## π Document Manager") | |
| file_checkboxes = gr.CheckboxGroup( | |
| label="Chat with these files:", | |
| choices=[], | |
| value=[], | |
| ) | |
| file_count = gr.Number(label="Selected", interactive=False) | |
| with gr.Row(): | |
| gr.Markdown("## π¬ Chat") | |
| chatbot = gr.Chatbot(height=500) | |
| with gr.Row(): | |
| msg_input = gr.Textbox(placeholder="Ask about your documents...", scale=4) | |
| send_btn = gr.Button("Send", variant="primary", scale=1) | |
| clear_btn = gr.Button("ποΈ New Chat", variant="secondary") | |
| # Events | |
| test_btn.click(test_connection, outputs=[status_display, upload_btn]) | |
| upload_btn.click( | |
| upload_multiple, | |
| inputs=file_input, | |
| outputs=[status_output, file_checkboxes, files_display, file_count] | |
| ) | |
| refresh_btn.click(refresh_files, outputs=[files_display, file_checkboxes]) | |
| file_checkboxes.change(update_selected_files, inputs=file_checkboxes, outputs=file_count) | |
| send_btn.click(send_message, inputs=[msg_input, chatbot], outputs=[chatbot, msg_input]) | |
| msg_input.submit(send_message, inputs=[msg_input, chatbot], outputs=[chatbot, msg_input]) | |
| clear_btn.click(clear_chat, outputs=chatbot) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |