Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import json | |
| import hashlib | |
| import time | |
| from datetime import datetime | |
| # Simple in-memory database | |
| users_db = {} | |
| usage_db = {} | |
| # Configuration | |
| API_URL = "https://smilyai-labs-sam-z-api.hf.space/v1/chat" | |
| DAILY_MESSAGE_LIMIT = 50 | |
| RATE_LIMIT_SECONDS = 2 | |
| def hash_password(password): | |
| return hashlib.sha256(password.encode()).hexdigest() | |
| def check_user_exists(username): | |
| return username in users_db | |
| def register_user(username, password): | |
| if check_user_exists(username): | |
| return False, "Username already exists" | |
| users_db[username] = { | |
| 'password': hash_password(password), | |
| 'created_at': datetime.now() | |
| } | |
| usage_db[username] = { | |
| 'messages_today': 0, | |
| 'last_reset': datetime.now().date(), | |
| 'last_message_time': 0 | |
| } | |
| return True, "Account created successfully!" | |
| def login_user(username, password): | |
| if not check_user_exists(username): | |
| return False, "Username not found" | |
| if users_db[username]['password'] != hash_password(password): | |
| return False, "Incorrect password" | |
| return True, "Login successful!" | |
| def check_usage_limits(username): | |
| if username not in usage_db: | |
| usage_db[username] = { | |
| 'messages_today': 0, | |
| 'last_reset': datetime.now().date(), | |
| 'last_message_time': 0 | |
| } | |
| user_usage = usage_db[username] | |
| # Reset daily counter | |
| if user_usage['last_reset'] != datetime.now().date(): | |
| user_usage['messages_today'] = 0 | |
| user_usage['last_reset'] = datetime.now().date() | |
| # Check daily limit | |
| if user_usage['messages_today'] >= DAILY_MESSAGE_LIMIT: | |
| return False, f"Daily limit of {DAILY_MESSAGE_LIMIT} messages reached. Try again tomorrow." | |
| # Check rate limit | |
| time_since_last = time.time() - user_usage['last_message_time'] | |
| if time_since_last < RATE_LIMIT_SECONDS: | |
| wait_time = RATE_LIMIT_SECONDS - time_since_last | |
| return False, f"Please wait {wait_time:.1f} seconds before sending another message." | |
| return True, "" | |
| def update_usage(username): | |
| usage_db[username]['messages_today'] += 1 | |
| usage_db[username]['last_message_time'] = time.time() | |
| def get_usage_info(username): | |
| if username not in usage_db: | |
| return f"Messages today: 0/{DAILY_MESSAGE_LIMIT}" | |
| user_usage = usage_db[username] | |
| if user_usage['last_reset'] != datetime.now().date(): | |
| return f"Messages today: 0/{DAILY_MESSAGE_LIMIT}" | |
| return f"Messages today: {user_usage['messages_today']}/{DAILY_MESSAGE_LIMIT}" | |
| def chat_function(message, history, username): | |
| """Non-streaming chat function that yields incrementally""" | |
| if not username: | |
| history.append([message, "β οΈ Please log in first"]) | |
| return history | |
| # Check usage limits | |
| can_send, error_msg = check_usage_limits(username) | |
| if not can_send: | |
| history.append([message, f"β οΈ {error_msg}"]) | |
| return history | |
| # Update usage | |
| update_usage(username) | |
| # Prepare messages for API | |
| messages = [] | |
| for user_msg, assistant_msg in history: | |
| if user_msg: | |
| messages.append({"role": "user", "content": user_msg}) | |
| if assistant_msg: | |
| messages.append({"role": "assistant", "content": assistant_msg}) | |
| messages.append({"role": "user", "content": message}) | |
| # Add user message to history | |
| history.append([message, ""]) | |
| try: | |
| # Make streaming request | |
| response = requests.post( | |
| API_URL, | |
| json={ | |
| "messages": messages, | |
| "max_tokens": 1000, | |
| "stream": True | |
| }, | |
| stream=True, | |
| timeout=60 | |
| ) | |
| if response.status_code != 200: | |
| history[-1][1] = f"β Error: Server returned status {response.status_code}" | |
| return history | |
| full_response = "" | |
| buffer = "" | |
| # Stream the response | |
| for chunk in response.iter_content(chunk_size=1): | |
| if chunk: | |
| buffer += chunk.decode('utf-8', errors='ignore') | |
| # Process complete lines | |
| while '\n' in buffer: | |
| line, buffer = buffer.split('\n', 1) | |
| line = line.strip() | |
| if line.startswith('data: '): | |
| data = line[6:] | |
| if data == '[DONE]': | |
| break | |
| try: | |
| parsed = json.loads(data) | |
| content = parsed.get('choices', [{}])[0].get('delta', {}).get('content', '') | |
| if content: | |
| full_response += content | |
| history[-1][1] = full_response | |
| yield history | |
| except json.JSONDecodeError: | |
| continue | |
| # Final yield | |
| if full_response: | |
| history[-1][1] = full_response | |
| else: | |
| history[-1][1] = "β No response received from server" | |
| return history | |
| except requests.exceptions.Timeout: | |
| history[-1][1] = "β Request timed out. Please try again." | |
| return history | |
| except requests.exceptions.RequestException as e: | |
| history[-1][1] = f"β Connection error: {str(e)}" | |
| return history | |
| except Exception as e: | |
| history[-1][1] = f"β Unexpected error: {str(e)}" | |
| return history | |
| def handle_auth(username, password): | |
| """Handle authentication - auto-detect sign in or sign up""" | |
| if not username or not password: | |
| return None, "β Please enter both username and password", gr.update(visible=True), gr.update(visible=False), "", [] | |
| if len(username) < 3: | |
| return None, "β Username must be at least 3 characters", gr.update(visible=True), gr.update(visible=False), "", [] | |
| if len(password) < 6: | |
| return None, "β Password must be at least 6 characters", gr.update(visible=True), gr.update(visible=False), "", [] | |
| # Check if user exists | |
| if check_user_exists(username): | |
| # Try to login | |
| success, message = login_user(username, password) | |
| if success: | |
| return username, f"β Welcome back, {username}!", gr.update(visible=False), gr.update(visible=True), get_usage_info(username), [] | |
| else: | |
| return None, f"β {message}", gr.update(visible=True), gr.update(visible=False), "", [] | |
| else: | |
| # Auto-register new user | |
| success, message = register_user(username, password) | |
| if success: | |
| return username, f"β Welcome, {username}! Your account has been created.", gr.update(visible=False), gr.update(visible=True), get_usage_info(username), [] | |
| else: | |
| return None, f"β {message}", gr.update(visible=True), gr.update(visible=False), "", [] | |
| def logout_user(): | |
| """Logout current user""" | |
| return None, "π Logged out successfully", gr.update(visible=True), gr.update(visible=False), "", [] | |
| def refresh_usage(username): | |
| """Refresh usage display""" | |
| if username: | |
| return get_usage_info(username) | |
| return "" | |
| # Custom CSS | |
| custom_css = """ | |
| .auth-container { | |
| max-width: 500px; | |
| margin: 0 auto; | |
| } | |
| .chat-container { | |
| max-width: 900px; | |
| margin: 0 auto; | |
| } | |
| """ | |
| # Create Gradio interface | |
| with gr.Blocks(theme=gr.themes.Soft(), css=custom_css, title="AI Chatbot") as demo: | |
| current_user = gr.State(None) | |
| gr.Markdown("# π€ AI Chatbot with Authentication & Limits") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| auth_status = gr.Markdown("### Please log in or create an account") | |
| with gr.Column(scale=1): | |
| usage_display = gr.Markdown("") | |
| # Authentication section | |
| with gr.Group(visible=True, elem_classes="auth-container") as auth_section: | |
| gr.Markdown("### π Login or Sign Up") | |
| gr.Markdown("*Enter your credentials. New username? Account will be created automatically!*") | |
| username_input = gr.Textbox(label="Username", placeholder="Enter username (min 3 chars)") | |
| password_input = gr.Textbox(label="Password", type="password", placeholder="Enter password (min 6 chars)") | |
| auth_button = gr.Button("π Login / Sign Up", variant="primary", size="lg") | |
| # Chat section | |
| with gr.Group(visible=False, elem_classes="chat-container") as chat_section: | |
| chatbot = gr.Chatbot(height=500, show_label=False, bubble_full_width=False) | |
| with gr.Row(): | |
| msg_input = gr.Textbox( | |
| label="Message", | |
| placeholder="Type your message here and press Enter...", | |
| scale=9, | |
| show_label=False | |
| ) | |
| send_button = gr.Button("Send", variant="primary", scale=1) | |
| with gr.Row(): | |
| clear_button = gr.ClearButton([chatbot], value="ποΈ Clear Chat", size="sm") | |
| refresh_button = gr.Button("π Refresh Usage", size="sm") | |
| logout_button = gr.Button("πͺ Logout", size="sm", variant="stop") | |
| # Info section | |
| with gr.Accordion("βΉοΈ Information & Limits", open=False): | |
| gr.Markdown(f""" | |
| **Usage Limits:** | |
| - π Daily message limit: {DAILY_MESSAGE_LIMIT} messages per user | |
| - β±οΈ Rate limit: 1 message every {RATE_LIMIT_SECONDS} seconds | |
| - π Limits reset daily at midnight | |
| **Features:** | |
| - π― Auto-detect login/signup (creates account if username doesn't exist) | |
| - π Real-time streaming responses from AI | |
| - πΎ Conversation history tracking | |
| - π Usage monitoring and enforcement | |
| - π Secure password hashing | |
| **How to use:** | |
| 1. Enter a username and password | |
| 2. Click "Login / Sign Up" - if the username is new, an account will be created | |
| 3. Start chatting! Watch your usage limits in the top right | |
| """) | |
| # Event handlers | |
| def submit_auth(username, password): | |
| return handle_auth(username, password) | |
| auth_button.click( | |
| submit_auth, | |
| inputs=[username_input, password_input], | |
| outputs=[current_user, auth_status, auth_section, chat_section, usage_display, chatbot] | |
| ) | |
| # Chat submit with streaming | |
| msg_input.submit( | |
| chat_function, | |
| inputs=[msg_input, chatbot, current_user], | |
| outputs=[chatbot] | |
| ).then( | |
| lambda: "", None, msg_input | |
| ).then( | |
| refresh_usage, | |
| inputs=[current_user], | |
| outputs=[usage_display] | |
| ) | |
| send_button.click( | |
| chat_function, | |
| inputs=[msg_input, chatbot, current_user], | |
| outputs=[chatbot] | |
| ).then( | |
| lambda: "", None, msg_input | |
| ).then( | |
| refresh_usage, | |
| inputs=[current_user], | |
| outputs=[usage_display] | |
| ) | |
| refresh_button.click( | |
| refresh_usage, | |
| inputs=[current_user], | |
| outputs=[usage_display] | |
| ) | |
| logout_button.click( | |
| logout_user, | |
| outputs=[current_user, auth_status, auth_section, chat_section, usage_display, chatbot] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=False) |