Spaces:
Running
Running
| import streamlit as st | |
| import requests | |
| from supabase import create_client | |
| import time | |
| import fitz | |
| import numpy as np | |
| from urllib.parse import quote | |
| from PIL import Image | |
| from io import BytesIO | |
| import uuid | |
| import os | |
| # Configure Streamlit to avoid permission issues | |
| os.environ["STREAMLIT_SERVER_ENABLE_STATIC"] = "false" | |
| os.environ["STREAMLIT_SERVER_ENABLE_WEBSOCKET_COMPRESSION"] = "false" | |
| os.environ["STREAMLIT_SERVER_FILE_WATCHER"] = "false" | |
| # Prevent usage tracking and file writing in HF Spaces | |
| os.environ["STREAMLIT_BROWSER_GATHER_USAGE_STATS"] = "false" | |
| os.environ["STREAMLIT_METRICS_ENABLED"] = "false" | |
| # Redirect config path to a writable temp directory | |
| os.environ["HOME"] = "/tmp" | |
| os.environ["XDG_CONFIG_HOME"] = "/tmp" | |
| # Supabase and OpenRouter configurations | |
| OR_API_KEY = os.environ.get("OR_API_KEY") | |
| OR_API_URL = os.environ.get("OR_API_URL") | |
| MODEL = os.environ.get("MODEL") | |
| PROJECT_URL = os.environ.get("PROJECT_URL") | |
| DB_API_KEY = os.environ.get("DB_API_KEY") | |
| if not all([OR_API_KEY, OR_API_URL, MODEL, PROJECT_URL, DB_API_KEY]): | |
| st.error("Missing required environment variables. Please check your Space's secrets.") | |
| st.stop() | |
| supabase = create_client(PROJECT_URL, DB_API_KEY) | |
| portfolio_faq = { | |
| "who are you": "I'm X.A.N.E. β Ezz Eldin Ahmed's assistant. He's a statistics major passionate about data science, automation, and machine learning. He built me using Python, Streamlit, and Django.", | |
| "what do you do": "I support users by answering questions about Ezz Eldin's work, skills, and projects. Think of me as a smart, interactive portfolio guide.", | |
| "skills": "Ezz is skilled in Python, R, SQL, statistical modeling, automation, and full-stack development with Streamlit and Django. He also works with Supabase, Excel, and Figma.", | |
| "projects": "His projects include a regression tool, time series forecaster, OCR scanner, AI chatbot (that's me), and a custom platform replacing many third-party tools.", | |
| "tools": "He primarily uses Python, Streamlit, Django, and Supabase. He also works with Excel, R, and Figma β and is currently exploring Power BI.", | |
| "education": "Ezz studies Statistics and Economics at the Faculty of Economics and Political Science β blending theory, data, and real-world application.", | |
| "experience": "He's coordinated a data science scholarship with EMAM, co-founded a research center, and led student-driven tools for learning and analytics.", | |
| "favorite project": "His favorite project is this portfolio β a central hub for his tools, chatbot, and regression models, all seamlessly embedded into one platform.", | |
| "what does xane stand for": "X.A.N.E. stands for: eXtended Artificial Neural Entity. I'm more than code β I'm a part of his creative process.", | |
| "how can i reach him": "You can reach Ezz via LinkedIn or the contact form on this website. He's always open to opportunities and collaboration.", | |
| "can i see the source code": "Some projects are public on his GitHub, while others are private or under development. You can ask about a specific project.", | |
| "is this chatbot ai-powered": "Yes, partially. I'm built on a rule-based system with optional LLM integration for advanced answers and search tasks.", | |
| "what's special about this site": "Unlike typical portfolios, this site is dynamic β combining tools, models, and a living assistant into one seamless interface.", | |
| "why streamlit": "Because it allows rapid, elegant development of interactive apps β perfect for building tools quickly without compromising UX.", | |
| "what's next": "More ML engineering projects, improved explainability using SHAP/SHAPASH, and diving deeper into NLP and generative AI." | |
| } | |
| # Common functions | |
| def chatbot(prompt): | |
| headers = {"Authorization": f"Bearer {OR_API_KEY}", "Content-Type": "application/json"} | |
| payload = {"model": MODEL, "messages": prompt} | |
| try: | |
| response = requests.post(OR_API_URL, json=payload, headers=headers) | |
| if response.status_code == 200: | |
| return response.json()["choices"][0]["message"]["content"] | |
| else: | |
| return fallback_pollinations(prompt[-1]["content"]) | |
| except Exception as e: | |
| st.error(f"OpenRouter error: {e}") | |
| return fallback_pollinations(prompt[-1]["content"]) | |
| def fallback_pollinations(message): | |
| try: | |
| fallback_url = f"https://text.pollinations.ai/{message}" | |
| response = requests.get(fallback_url) | |
| if response.status_code == 200: | |
| return response.text.strip() | |
| else: | |
| return "Pollinations also failed to respond. Please try again later." | |
| except Exception as e: | |
| return f"Text fallback failed: {e}" | |
| def save_memory(chat_name, role, content): | |
| """Saves chat memory with session isolation""" | |
| data = { | |
| "chat_name": chat_name, | |
| "role": role, | |
| "content": content, | |
| "session_id": st.session_state.xane_id # Critical for session isolation | |
| } | |
| supabase.table("chats").insert(data).execute() | |
| def load_memory(chat_name): | |
| """Loads chat memory only for current session""" | |
| response = supabase.table("chats").select("*").match({ | |
| "chat_name": chat_name, | |
| "session_id": st.session_state.xane_id # Only get current session's chats | |
| }).execute() | |
| return response.data | |
| def load_all_memory(): | |
| """Loads all chat sessions only for current user""" | |
| response = supabase.table("chats").select("chat_name").eq( | |
| "session_id", st.session_state.xane_id | |
| ).execute() | |
| chat_names = {row['chat_name'] for row in response.data} | |
| return {name: load_memory(name) for name in chat_names} or {"Default": []} | |
| def delete_chat(chat_name): | |
| """Deletes only current session's chat""" | |
| supabase.table("chats").delete().match({ | |
| "chat_name": chat_name, | |
| "session_id": st.session_state.xane_id | |
| }).execute() | |
| def gradual_display(text, placeholder): | |
| """Displays text gradually.""" | |
| displayed_text = "" | |
| for char in text: | |
| displayed_text += char | |
| placeholder.markdown(displayed_text) | |
| time.sleep(0.0005) | |
| def extract_pdf_text(uploaded_file): | |
| doc = fitz.open(stream=uploaded_file.read(), filetype="pdf") | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| return text | |
| # Welcome messages | |
| welcome_messages = [ | |
| "Greetings, warrior. Ready to unlock some secrets?", | |
| "Hello, I am XANE, your digital ninja assistant.", | |
| "Hey, apprentice. Ready to master the art of knowledge?", | |
| "XANE here. How can I assist you on your quest?", | |
| "Welcome, ninja. Let's crack the code together.", | |
| "Hi there, ready to unleash your inner ninja?", | |
| "Step into the dojo. Ask anything, learn everything.", | |
| "XANE at your service. What's your mission today?", | |
| "Greetings, young ninja. The path to insight awaits.", | |
| "Hey! Time to sharpen your skills and knowledge.", | |
| "Hello, ninja-in-training! How can I guide you?", | |
| "Welcome back, warrior. Let's conquer your questions.", | |
| "Greetings from the digital dojo. What's next on your path?", | |
| "Hey, warrior! Let's hack through your toughest problems.", | |
| "Welcome, ninja master in the making. What's your next move?" | |
| ] | |
| # Page 1: Chatbot | |
| def chatbot_page(): | |
| # Initialize session ID | |
| if "xane_id" not in st.session_state: | |
| st.session_state.xane_id = str(uuid.uuid4()) | |
| # Load all sessions from Supabase | |
| if "chat_sessions" not in st.session_state: | |
| st.session_state.chat_sessions = load_all_memory() | |
| # Set default chat if not present | |
| if "current_chat" not in st.session_state: | |
| st.session_state.current_chat = ( | |
| list(st.session_state.chat_sessions.keys())[0] if st.session_state.chat_sessions else "Default" | |
| ) | |
| # Load messages from selected chat | |
| st.session_state.messages = st.session_state.chat_sessions.get(st.session_state.current_chat, []) | |
| if "show_greeting" not in st.session_state: | |
| st.session_state.show_greeting = True | |
| # Display greeting at the top (only once per session) | |
| if st.session_state.show_greeting: | |
| greeting = np.random.choice(welcome_messages) | |
| st.header(greeting) | |
| st.session_state.show_greeting = False | |
| st.sidebar.title("π¬ Chats") | |
| # Chat management | |
| chat_options = list(st.session_state.chat_sessions.keys()) + ["β New Chat"] | |
| selected_chat = st.sidebar.selectbox("Choose a chat:", chat_options, key="chat_selector") | |
| if selected_chat == "β New Chat": | |
| new_chat_name = st.sidebar.text_input("Enter chat name:", key="new_chat_name") | |
| if st.sidebar.button("Create"): | |
| if new_chat_name and new_chat_name not in st.session_state.chat_sessions: | |
| st.session_state.chat_sessions[new_chat_name] = [] | |
| st.session_state.current_chat = new_chat_name | |
| st.session_state.messages = [] | |
| st.rerun() | |
| elif selected_chat != st.session_state.current_chat: | |
| st.session_state.current_chat = selected_chat | |
| st.session_state.messages = st.session_state.chat_sessions.get(st.session_state.current_chat, []) | |
| st.rerun() | |
| if st.sidebar.button("π§Ή Clear Current Chat"): | |
| delete_chat(st.session_state.current_chat) | |
| st.session_state.chat_sessions[st.session_state.current_chat] = [] | |
| st.session_state.messages = [] | |
| st.rerun() | |
| # Display chat messages | |
| for msg in st.session_state.messages: | |
| with st.chat_message(msg['role']): | |
| st.markdown(msg['content']) | |
| # Chat input and processing | |
| def send_message(): | |
| chat_input = st.chat_input( | |
| "Ask me anything or upload files", | |
| key="chat_input", | |
| max_chars=None, | |
| accept_file="multiple", | |
| file_type=["jpg", "jpeg", "png", "pdf", "txt"], | |
| disabled=False | |
| ) | |
| if chat_input: | |
| # Process text input | |
| if chat_input.text and chat_input.text.strip(): | |
| with st.chat_message("user"): | |
| st.markdown(chat_input.text) | |
| save_memory(st.session_state.current_chat, "user", chat_input.text) | |
| st.session_state.messages.append({"role": "user", "content": chat_input.text}) | |
| # Process file uploads | |
| if chat_input.files: | |
| with st.chat_message("user"): | |
| for uploaded_file in chat_input.files: | |
| if uploaded_file.type.startswith('image/'): | |
| st.image(uploaded_file) | |
| file_content = f"" | |
| elif uploaded_file.type == "application/pdf": | |
| try: | |
| uploaded_file.seek(0) | |
| text = extract_pdf_text(uploaded_file) | |
| file_content = text | |
| st.warning(f"PDF file uploaded: {uploaded_file.name}") | |
| except Exception as e: | |
| st.error(f"Failed to extract PDF text: {e}") | |
| file_content = f"[PDF file: {uploaded_file.name}]" | |
| elif uploaded_file.type == "text/plain": | |
| try: | |
| uploaded_file.seek(0) | |
| text = uploaded_file.read().decode("utf-8") | |
| file_content = text | |
| st.warning(f"Text file uploaded: {uploaded_file.name}") | |
| except Exception as e: | |
| st.error(f"Failed to read text file: {e}") | |
| file_content = f"[Text file: {uploaded_file.name}]" | |
| else: | |
| st.warning(f"Unsupported file type: {uploaded_file.type}") | |
| continue | |
| save_memory(st.session_state.current_chat, "user", file_content) | |
| st.session_state.messages.append({"role": "user", "content": file_content}) | |
| # Generate response | |
| response = "" | |
| if chat_input.text and isinstance(chat_input.text, str) and chat_input.text.strip(): | |
| for question, answer in portfolio_faq.items(): | |
| if question.lower() in chat_input.text.lower(): | |
| response = answer | |
| break | |
| if not response: | |
| with st.spinner("XANE is thinking... π€"): | |
| response = chatbot(st.session_state.messages) | |
| st.session_state.messages.append({"role": "assistant", "content": response}) | |
| save_memory(st.session_state.current_chat, "assistant", response) | |
| with st.chat_message("assistant"): | |
| placeholder = st.empty() | |
| gradual_display(response, placeholder) | |
| st.session_state.chat_sessions[st.session_state.current_chat] = st.session_state.messages | |
| send_message() | |
| # Page 2: Image Generator | |
| def image_generator_page(): | |
| st.title("π¨ Pollinations β Free MultiβModel Generator") | |
| # All controls inside the sidebar | |
| with st.sidebar: | |
| prompt = st.text_area("ποΈ Prompt", "A futuristic cyberpunk cityscape at night, neon lights reflecting on wet streets, towering skyscrapers with holographic advertisements, flying cars zooming between buildings, a mysterious figure in a high-tech cloak walking through the rain, ultra-detailed 4K cinematic lighting, Blade Runner meets Ghost in the Shell style", height=100) | |
| model = st.selectbox("βοΈ Model", ["flux", "flux-pro", "flux-cablyai", "turbo"]) | |
| width = st.slider("Width", 512, 1536, 1024, 128) | |
| height = st.slider("Height", 512, 1536, 1024, 128) | |
| seed = st.number_input("Seed (optional)", value=42) | |
| # Generate button stays inside the sidebar | |
| if st.button("β¨ Generate Image"): | |
| if not prompt.strip(): | |
| st.warning("Enter a prompt first.") | |
| else: | |
| prompt_enc = quote(prompt.strip()) | |
| url = ( | |
| f"https://image.pollinations.ai/prompt/{prompt_enc}" | |
| f"?model={model}&width={width}&height={height}&seed={seed}" | |
| ) | |
| with st.spinner("Generating image..."): | |
| try: | |
| resp = requests.get(url, timeout=60) | |
| resp.raise_for_status() | |
| img = Image.open(BytesIO(resp.content)) | |
| # Store image in session state | |
| st.session_state.generated_image = img | |
| st.session_state.image_model = model | |
| st.session_state.image_width = width | |
| st.session_state.image_height = height | |
| except Exception as e: | |
| st.error(f"β Error generating image: {e}") | |
| # Display generated image in the main area | |
| if "generated_image" in st.session_state: | |
| st.image( | |
| st.session_state.generated_image, | |
| caption=f"{st.session_state.image_model} β {st.session_state.image_width}Γ{st.session_state.image_height}", | |
| use_container_width=True | |
| ) | |
| st.download_button( | |
| "π₯ Download Image", | |
| data=convert_image_to_bytes(st.session_state.generated_image), | |
| file_name=f"pollinations_{st.session_state.image_model}.jpg", | |
| mime="image/jpeg" | |
| ) | |
| # Helper function to convert PIL Image to bytes | |
| def convert_image_to_bytes(img): | |
| img_byte_arr = BytesIO() | |
| img.save(img_byte_arr, format='JPEG') | |
| return img_byte_arr.getvalue() | |
| # Main app | |
| def main(): | |
| # Initialize session ID if not already set | |
| if "xane_id" not in st.session_state: | |
| st.session_state.xane_id = str(uuid.uuid4()) | |
| # Sidebar navigation | |
| st.sidebar.title("Navigation") | |
| page = st.sidebar.radio("Go to", ["Chatbot", "Image Generator"], key="nav_radio") | |
| if page == "Chatbot": | |
| if "generated_image" in st.session_state: | |
| del st.session_state.generated_image | |
| chatbot_page() | |
| elif page == "Image Generator": | |
| if "messages" in st.session_state: | |
| del st.session_state.messages | |
| image_generator_page() | |
| if __name__ == '__main__': | |
| st.set_page_config(page_title="XANE - AI Assistant", layout="wide") | |
| main() |