Spaces:
Sleeping
Sleeping
| import email | |
| import os | |
| import json | |
| import hashlib | |
| from datetime import datetime | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore, auth as firebase_auth | |
| # Initialize Firebase | |
| def initialize_firebase(): | |
| """ | |
| Initialize Firebase using environment variable or file fallback. | |
| Priority: | |
| 1. FIREBASE_CREDENTIALS_JSON env var (HF Spaces/production) | |
| 2. firebase-credentials.json file (local development) | |
| On HF Spaces: MUST have FIREBASE_CREDENTIALS_JSON set (fails loudly if missing) | |
| Locally: Falls back to file, then mock mode for development | |
| """ | |
| # Check if running on HF Spaces | |
| is_hf_space = os.environ.get('SPACE_ID') is not None | |
| try: | |
| # Try environment variable first (HF Spaces/production) | |
| firebase_creds_json = os.environ.get('FIREBASE_CREDENTIALS_JSON') | |
| if firebase_creds_json: | |
| print("Loading Firebase credentials from FIREBASE_CREDENTIALS_JSON environment variable") | |
| cred_dict = json.loads(firebase_creds_json) | |
| cred = credentials.Certificate(cred_dict) | |
| elif is_hf_space: | |
| # Running on HF Spaces but no credentials - FAIL LOUDLY | |
| raise RuntimeError( | |
| "CRITICAL ERROR: Running on Hugging Face Spaces but FIREBASE_CREDENTIALS_JSON is not set!\n" | |
| "Please add the secret in Space Settings > Repository Secrets.\n" | |
| "The application cannot run without Firebase credentials." | |
| ) | |
| else: | |
| # Local development - try file fallback | |
| print("Loading Firebase credentials from firebase-credentials.json file") | |
| cred = credentials.Certificate('firebase-credentials.json') | |
| firebase_admin.initialize_app(cred) | |
| db = firestore.client() | |
| print("✅ Firebase initialized successfully!") | |
| return True | |
| except FileNotFoundError: | |
| if is_hf_space: | |
| raise RuntimeError("CRITICAL ERROR: Firebase credentials not found on HF Spaces!") | |
| print("⚠️ firebase-credentials.json not found. Using mock mode for local development.") | |
| return False | |
| except json.JSONDecodeError as e: | |
| if is_hf_space: | |
| raise RuntimeError(f"CRITICAL ERROR: Invalid JSON in FIREBASE_CREDENTIALS_JSON: {e}") | |
| print(f"⚠️ Invalid JSON in credentials: {e}. Using mock mode.") | |
| return False | |
| except Exception as e: | |
| if is_hf_space: | |
| raise RuntimeError(f"CRITICAL ERROR: Firebase initialization failed on HF Spaces: {e}") | |
| print(f"⚠️ Firebase initialization failed: {e}. Using mock mode for local development.") | |
| return False | |
| # Initialize Firebase | |
| FIREBASE_ENABLED = initialize_firebase() | |
| db = firestore.client() if FIREBASE_ENABLED else None | |
| # Mock databases for development/testing | |
| mock_users = {} | |
| mock_agents = {} | |
| mock_workspaces = {} | |
| # Default workspace ID (same for all users) | |
| DEFAULT_WORKSPACE_ID = "q7KfX2rP9Wb83hNa64RmTpUiAZ0vL3dS" | |
| def hash_password(password): | |
| """Hash password for storage""" | |
| return hashlib.sha256(password.encode()).hexdigest() | |
| def generate_workspace_data(user_id, workspace_id): | |
| """ | |
| Generate workspace data with shareScript | |
| """ | |
| share_script = f'''<script> (function (w, d) {{ w.CLIENT_IDENTIFIER = "{user_id}"; w.WORKSPACE_IDENTIFIER = '{workspace_id}'; w.AVATAR_IMAGE_URL = 'https://firebasestorage.googleapis.com/v0/b/curious-learner-3845a.appspot.com/o/avatars%2Fchatbot.png?alt=media&token=7d937685-197b-4947-b9f9-3c05f0271d20'; w.CHATBOT_IDENTIFIER = ''; var h = d.head || d.getElementsByTagName("head")[0]; var s = d.createElement('script'); s.setAttribute('type', 'text/javascript'); s.async = true; s.setAttribute('src', 'https://dynamic-ui-chatbot.web.app/client.js'); h.appendChild(s); }})(window, document); </script>''' | |
| return { | |
| 'chatbotType': 'widget', | |
| 'id': workspace_id, | |
| 'shareScript': share_script, | |
| 'workspaceName': 'Default Workspace' | |
| } | |
| def register_user(username, email, password): | |
| """ | |
| Register a new user in Firebase | |
| Returns: (success: bool, message: str, user_id: str or None) | |
| """ | |
| try: | |
| if FIREBASE_ENABLED: | |
| # Create user in Firebase Auth | |
| user = firebase_auth.create_user( | |
| email=email, | |
| password=password, | |
| display_name=username | |
| ) | |
| # Store user data in g_users collection | |
| user_data = { | |
| 'name': username, | |
| 'email': email, | |
| 'created_at': firestore.SERVER_TIMESTAMP | |
| } | |
| db.collection('g_users').document(user.uid).set(user_data) | |
| # Create default workspace as subcollection inside user's document | |
| # Path: g_users/{user_id}/workspaces/{DEFAULT_WORKSPACE_ID} | |
| workspace_data = generate_workspace_data(user.uid, DEFAULT_WORKSPACE_ID) | |
| workspace_data['timestamp'] = firestore.SERVER_TIMESTAMP | |
| db.collection('g_users').document(user.uid).collection('workspaces').document(DEFAULT_WORKSPACE_ID).set( | |
| workspace_data) | |
| return True, "Registration successful!", user.uid | |
| else: | |
| # Mock registration | |
| if email in mock_users: | |
| return False, "User already exists!", None | |
| user_id = hashlib.sha256(f"{email}{datetime.now()}".encode()).hexdigest()[:28] | |
| workspace_data = generate_workspace_data(user_id, DEFAULT_WORKSPACE_ID) | |
| workspace_data['timestamp'] = datetime.now() | |
| mock_users[email] = { | |
| 'user_id': user_id, | |
| 'name': username, | |
| 'email': email, | |
| 'password': hash_password(password), | |
| 'created_at': datetime.now(), | |
| 'workspaces': { | |
| DEFAULT_WORKSPACE_ID: workspace_data | |
| } | |
| } | |
| return True, "Registration successful!", user_id | |
| except Exception as e: | |
| return False, f"Registration failed: {str(e)}", None | |
| def login_user(email, password): | |
| """ | |
| Login user | |
| Returns: (success: bool, message: str, user_id: str or None) | |
| """ | |
| try: | |
| if FIREBASE_ENABLED: | |
| # In production, you'd verify with Firebase Auth | |
| # For Gradio, we'll use a simplified approach | |
| users = db.collection('g_users').where('email', '==', email).limit(1).get() | |
| if users: | |
| user_doc = list(users)[0] | |
| user_id = user_doc.id | |
| return True, "Login successful!", user_id | |
| else: | |
| return False, "Invalid credentials!", None | |
| else: | |
| # Mock login | |
| if email in mock_users: | |
| user_data = mock_users[email] | |
| if user_data['password'] == hash_password(password): | |
| return True, "Login successful!", user_data['user_id'] | |
| return False, "Invalid credentials!", None | |
| except Exception as e: | |
| return False, f"Login failed: {str(e)}", None | |
| def generate_chatbot_share_script(user_id, workspace_id, chatbot_id): | |
| """ | |
| Generate shareScript for chatbot | |
| """ | |
| return f'''<script> (function (w, d) {{ w.CLIENT_IDENTIFIER = "{user_id}"; w.WORKSPACE_IDENTIFIER = '{workspace_id}'; w.AVATAR_IMAGE_URL = 'https://firebasestorage.googleapis.com/v0/b/curious-learner-3845a.appspot.com/o/avatars%2Ffemale_profile.png?alt=media&token=58bf4a74-24c8-41b1-b8bd-e381b798bb8d'; w.CHATBOT_IDENTIFIER = '{chatbot_id}'; var h = d.head || d.getElementsByTagName("head")[0]; var s = d.createElement('script'); s.setAttribute('type', 'text/javascript'); s.async = true; s.setAttribute('src', 'https://dynamic-ui-chatbot.web.app/client.js'); h.appendChild(s); }})(window, document); </script>''' | |
| def generate_chatbot_data(user_id, workspace_id, chatbot_id, chatbot_name, provider_name, model_name, | |
| mcp_github_enabled, github_token, | |
| system_prompt): | |
| """ | |
| Generate chatbot document data with agent configuration | |
| """ | |
| # Build MCP config for chatMessageModels | |
| mcp_config = {} | |
| if mcp_github_enabled and github_token: | |
| mcp_config["github"] = { | |
| "url": "https://api.githubcopilot.com/mcp/", | |
| "transport": "streamable_http", | |
| "headers": { | |
| "Authorization": f"Bearer {github_token}" | |
| } | |
| } | |
| # Map provider name to lowercase for consistency | |
| provider_lower = provider_name.lower() | |
| # Agent chat message model | |
| agent_message_model = { | |
| 'chatMessageType': 'agent', | |
| 'chatMessage': 'Hi I am a GitHub MCP server bot, how can I help you today?', | |
| 'llm': { | |
| 'provider': provider_lower, # Use the provider from user input | |
| 'model': model_name | |
| }, | |
| 'mcp': mcp_config, | |
| 'systemPrompt': system_prompt, | |
| 'errorText': '', | |
| 'replayType': 'multiple', | |
| 'validationRegex': '', | |
| 'switchButtonModels': [ | |
| { | |
| 'title': 'Back Button', | |
| 'value': True, | |
| 'isDisabled': False | |
| }, | |
| { | |
| 'title': 'Skip Button', | |
| 'value': True, | |
| 'isDisabled': False | |
| } | |
| ], | |
| 'optionModels': [] | |
| } | |
| return { | |
| 'chatMessageModels': [agent_message_model], | |
| 'chatbotName': chatbot_name, | |
| 'chatbotType': 'widget', | |
| 'designModel': { | |
| 'avatarModel': { | |
| 'avatarType': 'asset', | |
| 'value': 'asset/image/female_profile.png' | |
| }, | |
| 'branding': True, | |
| 'dynamicTheme': '{}', | |
| 'isDynamicTheme': False, | |
| 'position': 'bottomRight', | |
| 'primaryColor': 4287203839, | |
| 'themeColor': 4294967295, | |
| 'welcomeMessage': True | |
| }, | |
| 'id': chatbot_id, | |
| "notificationModel": { | |
| "sendTo": [ | |
| "afmjoaa@gmail.com", | |
| "mprattoy@gmail.com" | |
| ], | |
| "subjectHeader": "New Response from Curious Learner", | |
| "sendIncompleteResponse": True, | |
| "leadInfo": "notificationCount", | |
| "emailNotification": True | |
| }, | |
| 'responseCount': 0, | |
| 'shareScript': generate_chatbot_share_script(user_id, workspace_id, chatbot_id) | |
| } | |
| def create_agent(user_id, agent_name, provider_name, model_name, mcp_github_enabled, github_token, system_prompt): | |
| """ | |
| Create a new React agent | |
| Args: | |
| user_id: User ID creating the agent | |
| agent_name: Name of the agent | |
| provider_name: AI provider (OpenAI or Gemini) | |
| model_name: Model to use | |
| mcp_github_enabled: Whether GitHub MCP is enabled | |
| github_token: GitHub token if MCP enabled | |
| system_prompt: System prompt for the agent | |
| Returns: (success: bool, message: str, chatbot_id: str or None, share_script: str or None) | |
| """ | |
| if not user_id: | |
| return False, "Please login first!", None, None | |
| if not agent_name or not provider_name or not model_name or not system_prompt: | |
| return False, "Agent name, provider, model, and system prompt are required!", None, None | |
| # Validate GitHub token if GitHub MCP is enabled | |
| if mcp_github_enabled and not github_token: | |
| return False, "GitHub token is required when GitHub MCP is enabled!", None, None | |
| try: | |
| agent_id = hashlib.sha256(f"{agent_name}{user_id}{datetime.now()}".encode()).hexdigest()[:12] | |
| # Generate unique chatbot ID (UUID format) | |
| import uuid | |
| chatbot_id = str(uuid.uuid1()) | |
| # Build MCP config | |
| mcp_config = {} | |
| if mcp_github_enabled: | |
| mcp_config["github"] = { | |
| "url": "https://api.githubcopilot.com/mcp/", | |
| "transport": "streamable_http", | |
| "headers": { | |
| "Authorization": f"Bearer {github_token}"} | |
| } | |
| # Generate share script for the agent | |
| share_script = generate_chatbot_share_script(user_id, DEFAULT_WORKSPACE_ID, chatbot_id) | |
| agent_data = { | |
| 'agent_id': agent_id, | |
| 'agent_name': agent_name, | |
| 'provider_name': provider_name, # Store provider | |
| 'model_name': model_name, | |
| 'mcp_config': mcp_config, | |
| 'github_token': github_token if mcp_github_enabled else None, | |
| 'system_prompt': system_prompt, | |
| 'created_by': user_id, | |
| 'created_at': firestore.SERVER_TIMESTAMP if FIREBASE_ENABLED else datetime.now().isoformat(), | |
| 'agent_url': f"https://dynamic-ui-chatbot.web.app/#/chats?clientId={user_id}&workspaceId={DEFAULT_WORKSPACE_ID}&chatbotId={chatbot_id}", | |
| 'chatbot_id': chatbot_id, | |
| 'share_script': share_script | |
| } | |
| # Generate chatbot document data with agent configuration | |
| chatbot_data = generate_chatbot_data( | |
| user_id, | |
| DEFAULT_WORKSPACE_ID, | |
| chatbot_id, | |
| agent_name, | |
| provider_name, # Pass provider | |
| model_name, | |
| mcp_github_enabled, | |
| github_token, | |
| system_prompt | |
| ) | |
| if FIREBASE_ENABLED: | |
| # Save agent to agents collection | |
| db.collection('agents').document(agent_id).set(agent_data) | |
| # Create chatbot document inside workspace | |
| # Path: g_users/{user_id}/workspaces/{DEFAULT_WORKSPACE_ID}/chatbots/{chatbot_id} | |
| chatbot_data['timestamp'] = firestore.SERVER_TIMESTAMP | |
| db.collection('g_users').document(user_id).collection('workspaces').document( | |
| DEFAULT_WORKSPACE_ID).collection('chats').document(chatbot_id).set(chatbot_data) | |
| else: | |
| mock_agents[agent_id] = agent_data | |
| # Store chatbot in mock workspace | |
| for user_email, user_data in mock_users.items(): | |
| if user_data['user_id'] == user_id: | |
| if 'chats' not in user_data['workspaces'][DEFAULT_WORKSPACE_ID]: | |
| user_data['workspaces'][DEFAULT_WORKSPACE_ID]['chats'] = {} | |
| chatbot_data['timestamp'] = datetime.now() | |
| user_data['workspaces'][DEFAULT_WORKSPACE_ID]['chats'][chatbot_id] = chatbot_data | |
| break | |
| success_msg = f"✅ Agent '{agent_name}' created successfully with {provider_name} {model_name}!" | |
| return True, success_msg, chatbot_id, share_script | |
| except Exception as e: | |
| return False, f"Failed to create agent: {str(e)}", None, None | |
| def get_user_agents(user_id): | |
| """ | |
| Get all agents/chatbots created by a user, sorted by timestamp descending (newest first) | |
| Returns: list of agent info strings | |
| """ | |
| if not user_id: | |
| return [] | |
| try: | |
| if FIREBASE_ENABLED: | |
| # Fetch chatbots from the workspace subcollection, ordered by timestamp descending | |
| # Path: g_users/{user_id}/workspaces/{DEFAULT_WORKSPACE_ID}/chats | |
| chatbots = db.collection('g_users').document(user_id).collection('workspaces').document( | |
| DEFAULT_WORKSPACE_ID).collection('chats').order_by('timestamp', | |
| direction=firestore.Query.DESCENDING).get() | |
| agent_list = [] | |
| for chatbot in chatbots: | |
| data = chatbot.to_dict() | |
| chatbot_id = data.get('id', chatbot.id) | |
| chatbot_name = data.get('chatbotName', 'Unnamed Agent') | |
| # provider = data.get('provider', 'Unknown') # Get provider | |
| # model = data.get('model', 'Unknown') # Get model | |
| timestamp = data.get('timestamp', None) | |
| share_script = data.get('shareScript', '') | |
| # --- Extract provider & model safely --- | |
| chat_message_models = data.get("chatMessageModels", []) | |
| if chat_message_models and isinstance(chat_message_models, list): | |
| llm_data = chat_message_models[0].get("llm", {}) | |
| provider = llm_data.get("provider", "Unknown") | |
| model = llm_data.get("model", "Unknown") | |
| else: | |
| provider = "Unknown" | |
| model = "Unknown" | |
| # Format timestamp if available | |
| timestamp_str = "" | |
| if timestamp: | |
| try: | |
| timestamp_str = f" Created: {timestamp.strftime('%Y-%m-%d %H:%M')}\n" | |
| except: | |
| pass | |
| agent_url = f"https://dynamic-ui-chatbot.web.app/#/individual_chat_screen?clientId={user_id}&workspaceId={DEFAULT_WORKSPACE_ID}&chatbotId={chatbot_id}" | |
| # Format with provider, model, timestamp, and shareScript info | |
| agent_info = f"🤖 {chatbot_name}\n" | |
| agent_info += f" Provider: {provider} | Model: {model}\n" | |
| if timestamp_str: | |
| agent_info += timestamp_str | |
| agent_info += f" URL: {agent_url}\n" | |
| agent_info += f" 📋 Share Script:\n{share_script}" | |
| agent_list.append(agent_info) | |
| return agent_list | |
| else: | |
| agent_list = [] | |
| for user_email, user_data in mock_users.items(): | |
| if user_data['user_id'] == user_id: | |
| workspaces = user_data.get('workspaces', {}) | |
| default_workspace = workspaces.get(DEFAULT_WORKSPACE_ID, {}) | |
| chatbots = default_workspace.get('chats', {}) | |
| # Sort chatbots by timestamp descending (newest first) | |
| sorted_chatbots = sorted( | |
| chatbots.items(), | |
| key=lambda x: x[1].get('timestamp', datetime.min), | |
| reverse=True | |
| ) | |
| for chatbot_id, data in sorted_chatbots: | |
| chatbot_name = data.get('chatbotName', 'Unnamed Agent') | |
| provider = data.get('provider', 'Unknown') | |
| model = data.get('model', 'Unknown') | |
| timestamp = data.get('timestamp', None) | |
| share_script = data.get('shareScript', '') | |
| # Format timestamp if available | |
| timestamp_str = "" | |
| if timestamp: | |
| try: | |
| timestamp_str = f" Created: {timestamp.strftime('%Y-%m-%d %H:%M')}\n" | |
| except: | |
| pass | |
| agent_url = f"https://dynamic-ui-chatbot.web.app/#/individual_chat_screen?clientId={user_id}&workspaceId={DEFAULT_WORKSPACE_ID}&chatbotId={chatbot_id}" | |
| # Format with provider, model, timestamp, and shareScript info | |
| agent_info = f"🤖 {chatbot_name}\n" | |
| agent_info += f" Provider: {provider} | Model: {model}\n" | |
| if timestamp_str: | |
| agent_info += timestamp_str | |
| agent_info += f" URL: {agent_url}\n" | |
| agent_info += f" 📋 Share Script:\n{share_script}" | |
| agent_list.append(agent_info) | |
| break | |
| return agent_list | |
| except Exception as e: | |
| print(f"Error fetching agents: {e}") | |
| return [] | |
| def get_user_by_id(user_id): | |
| """ | |
| Get user data by user ID | |
| Returns: user data dict or None | |
| """ | |
| try: | |
| if FIREBASE_ENABLED: | |
| user_doc = db.collection('g_users').document(user_id).get() | |
| if user_doc.exists: | |
| return user_doc.to_dict() | |
| return None | |
| else: | |
| for email, data in mock_users.items(): | |
| if data['user_id'] == user_id: | |
| return data | |
| return None | |
| except Exception as e: | |
| print(f"Error fetching user: {e}") | |
| return None | |
| def get_user_workspace(user_id): | |
| """ | |
| Get user's default workspace | |
| Returns: workspace data dict or None | |
| """ | |
| try: | |
| if FIREBASE_ENABLED: | |
| # Path: g_users/{user_id}/workspaces/{DEFAULT_WORKSPACE_ID} | |
| workspace_doc = db.collection('g_users').document(user_id).collection('workspaces').document( | |
| DEFAULT_WORKSPACE_ID).get() | |
| if workspace_doc.exists: | |
| return workspace_doc.to_dict() | |
| return None | |
| else: | |
| for user_email, user_data in mock_users.items(): | |
| if user_data['user_id'] == user_id and 'workspaces' in user_data: | |
| return user_data['workspaces'].get(DEFAULT_WORKSPACE_ID) | |
| return None | |
| except Exception as e: | |
| print(f"Error fetching workspace: {e}") | |
| return None |