Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import shutil | |
| import uuid | |
| import re | |
| from datetime import datetime | |
| print("=" * 50) | |
| print("Starting SparkMart AI Application...") | |
| print("=" * 50) | |
| UPLOAD_DIR = "uploads" | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| sessions = {} | |
| def get_session(session_id): | |
| return sessions.setdefault(session_id, {"conversation": [], "stage": "greeting", "file_uploaded": False, "upload_requested": False}) | |
| def get_order_id_from_conversation(conversation): | |
| for msg in conversation: | |
| content = str(msg.get('content', msg[0] if isinstance(msg, list) and msg else msg)) | |
| numbers = re.findall(r'\b\d{4,}\b', content) | |
| if numbers: | |
| return numbers[-1] | |
| return None | |
| def chat(message, history, session_id): | |
| if not message.strip(): | |
| return history, "", gr.update(visible=False) | |
| session = get_session(session_id) | |
| history = history or [] | |
| try: | |
| from main import agent_respond | |
| response = agent_respond(message, session_id, history) | |
| if not response: | |
| response = "I'm here to help! How can I assist you today?" | |
| history.extend([{"role": "user", "content": message}, {"role": "assistant", "content": response}]) | |
| session["conversation"] = history | |
| show_upload = any(word in response.lower() for word in ["upload", "attach", "image", "video", "file", "photo"]) | |
| if show_upload: | |
| session["upload_requested"] = True | |
| return history, "", gr.update(visible=show_upload) | |
| except Exception as e: | |
| print(f"Chat error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| error_msg = "I encountered a technical issue. Please try rephrasing your message or contact support if the problem persists." | |
| history.extend([{"role": "user", "content": message}, {"role": "assistant", "content": error_msg}]) | |
| return history, "", gr.update(visible=False) | |
| def handle_upload(file, session_id, history): | |
| if not file: | |
| return history, None, gr.update(visible=False), gr.update(visible=False) | |
| allowed_ext = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.mp4', '.avi', '.mov', '.webm', '.mkv'} | |
| file_ext = os.path.splitext(file.name)[1].lower() | |
| if file_ext not in allowed_ext: | |
| history = history or [] | |
| history.extend([{"role": "user", "content": "Invalid file type"}, {"role": "assistant", "content": "Please upload only image files (JPG, PNG, GIF, etc.) or video files (MP4, AVI, MOV, etc.)."}]) | |
| return history, None, gr.update(visible=False), gr.update(visible=False) | |
| return history, file, gr.update(visible=True), gr.update(visible=True) | |
| def submit_file(file, session_id, history): | |
| if not file: | |
| return history, gr.update(visible=False), None, gr.update(visible=False) | |
| session = get_session(session_id) | |
| history = history or [] | |
| try: | |
| order_id = get_order_id_from_conversation(session["conversation"]) | |
| base_name = f"ORDER_{order_id}" if order_id else f"SESSION_{session_id[:8]}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| final_filename = f"{base_name}{os.path.splitext(file.name)[1]}" | |
| shutil.copy2(file.name, os.path.join(UPLOAD_DIR, final_filename)) | |
| session.update({"file_uploaded": True, "upload_requested": False}) | |
| file_message = f"[File uploaded: {final_filename}]" | |
| from main import agent_respond | |
| response = agent_respond(file_message, session_id, history) | |
| history.extend([{"role": "user", "content": f"File uploaded: {os.path.basename(file.name)}"}, {"role": "assistant", "content": response}]) | |
| session["conversation"] = history | |
| return history, gr.update(visible=False), None, gr.update(visible=False) | |
| except Exception as e: | |
| print(f"Upload error: {e}") | |
| history.extend([{"role": "user", "content": "Upload Error"}, {"role": "assistant", "content": f"Upload failed: {str(e)}. Please try again or describe the issue in text."}]) | |
| return history, gr.update(visible=False), None, gr.update(visible=False) | |
| def reupload_file(): | |
| return None, gr.update(visible=False) | |
| def reset_chat(session_id): | |
| sessions.pop(session_id, None) | |
| return None, "", gr.update(visible=False), None, gr.update(visible=False), str(uuid.uuid4()) | |
| try: | |
| print("Loading main module...") | |
| import main | |
| print("Main module loaded successfully!") | |
| except Exception as e: | |
| print(f"ERROR loading main module: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| print("Creating Gradio interface...") | |
| css = """ | |
| .gradio-container { | |
| font-family: Arial, sans-serif; | |
| } | |
| button{ | |
| background-color: #8D4585 !important; | |
| color: white !important; | |
| border: none !important; | |
| } | |
| button:hover { | |
| background-color: #8D4585 !important; | |
| } | |
| #title-header { | |
| background-color: #8D4585; | |
| color: white; | |
| padding: 20px; | |
| border-radius: 8px; | |
| text-align: center; | |
| margin-bottom: 20px; | |
| } | |
| #title-header h1 { | |
| color: white; | |
| margin: 0; | |
| } | |
| #title-header p { | |
| color: white; | |
| margin: 5px 0 0 0; | |
| } | |
| """ | |
| with gr.Blocks(title="SparkMart AI - Customer Support", css=css) as demo: | |
| session_id = gr.State(lambda: str(uuid.uuid4())) | |
| with gr.Column(elem_id="title-header"): | |
| gr.Markdown("# SparkMart AI") | |
| gr.Markdown("Welcome to SparkMart! I'm here to help you with your orders, complaints, product recommendations and general queries.") | |
| chatbot = gr.Chatbot( | |
| height=500, | |
| placeholder="Hello! I'm SparkMart AI. How may I assist you today?", | |
| type='messages' | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| placeholder="Type your message here...", | |
| scale=4 | |
| ) | |
| send_btn = gr.Button("Send", scale=1) | |
| with gr.Group(visible=False) as upload_area: | |
| gr.Markdown("Upload Image or Video") | |
| gr.Markdown("Please upload an image or video to help us understand your issue better.") | |
| file_input = gr.File( | |
| file_types=["image", "video"], | |
| label="Choose file" | |
| ) | |
| with gr.Row(): | |
| reupload_btn = gr.Button("Re-upload") | |
| submit_btn = gr.Button("Submit File", visible=False) | |
| with gr.Row(): | |
| reset_btn = gr.Button("Reset Chat") | |
| gr.Markdown("Click 'Reset Chat' to start a new conversation") | |
| temp_file = gr.State() | |
| send_btn.click( | |
| chat, | |
| inputs=[msg, chatbot, session_id], | |
| outputs=[chatbot, msg, upload_area] | |
| ) | |
| msg.submit( | |
| chat, | |
| inputs=[msg, chatbot, session_id], | |
| outputs=[chatbot, msg, upload_area] | |
| ) | |
| file_input.change( | |
| handle_upload, | |
| inputs=[file_input, session_id, chatbot], | |
| outputs=[chatbot, temp_file, submit_btn, upload_area] | |
| ) | |
| submit_btn.click( | |
| submit_file, | |
| inputs=[temp_file, session_id, chatbot], | |
| outputs=[chatbot, upload_area, file_input, submit_btn] | |
| ) | |
| reupload_btn.click( | |
| reupload_file, | |
| outputs=[file_input, submit_btn] | |
| ) | |
| reset_btn.click( | |
| reset_chat, | |
| inputs=[session_id], | |
| outputs=[chatbot, msg, upload_area, file_input, submit_btn, session_id] | |
| ) | |
| print("Gradio interface created successfully!") | |
| if __name__ == "__main__": | |
| print("Launching Gradio app...") | |
| demo.launch() |