Spaces:
Runtime error
Runtime error
| # app.py | |
| import subprocess | |
| import sys | |
| import os | |
| # Run installation commands at startup | |
| def install_packages(): | |
| print("Starting package installation...") | |
| # Upgrade pip | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", "pip"]) | |
| # Install compatible versions in a specific order | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "websockets==10.4"]) | |
| # Update both gradio and gradio-client to compatible versions | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "gradio==3.44.4"]) | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "gradio-client==0.6.1"]) | |
| # Install the rest of the requirements | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "PyPDF2==3.0.1"]) | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "langchain==0.0.340"]) | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "faiss-cpu==1.7.4"]) | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "sentence-transformers==2.3.0"]) | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "zhipuai>=2.1.0"]) | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "transformers==4.35.2"]) | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "torch==2.1.0"]) | |
| # Updated huggingface-hub version to resolve dependency conflict | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "huggingface-hub==0.24.0"]) | |
| print("Package installation completed successfully") | |
| # Run the installation | |
| install_packages() | |
| # Now continue with the rest of the app | |
| import gradio as gr | |
| import sqlite3 | |
| from datetime import datetime | |
| from PyPDF2 import PdfReader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.vectorstores import FAISS | |
| from langchain.chains import RetrievalQA | |
| from langchain.prompts import PromptTemplate | |
| from langchain.llms.base import LLM | |
| from typing import Optional, List, Dict, Any | |
| from zhipuai import ZhipuAI | |
| # Custom LLM wrapper for Zhipu AI | |
| class ZhipuAILLM(LLM): | |
| api_key: str | |
| # Updated model name to a more commonly available one | |
| model: str = "glm-4-flash" # Changed from "chatglm3-6b" | |
| temperature: float = 0.1 | |
| # Declare client as a field to avoid Pydantic validation error | |
| client: Optional[ZhipuAI] = None | |
| def __init__(self, api_key: str, **kwargs: Any): | |
| # Pass api_key to parent class | |
| super().__init__(api_key=api_key, **kwargs) | |
| self.model = kwargs.get("model", self.model) | |
| self.temperature = kwargs.get("temperature", self.temperature) | |
| # Initialize client after setting attributes | |
| self.client = ZhipuAI(api_key=self.api_key) | |
| def _llm_type(self) -> str: | |
| return "zhipuai" | |
| def _call(self, prompt: str, stop: Optional[List[str]] = None, **kwargs: Any) -> str: | |
| if self.client is None: | |
| raise ValueError("ZhipuAI client not initialized") | |
| try: | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=self.temperature | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| # Handle API errors gracefully | |
| error_msg = str(e) | |
| if "403" in error_msg: | |
| return "I apologize, but I'm currently unable to access the language model. This could be due to API access restrictions. Please check your API key and model permissions." | |
| elif "429" in error_msg: | |
| return "I'm experiencing high demand right now. Please try again in a moment." | |
| else: | |
| return f"An error occurred: {error_msg}" | |
| # Database setup | |
| DB_PATH = "chat_history.db" | |
| def init_db(): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS chat_history ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| user_message TEXT NOT NULL, | |
| bot_response TEXT NOT NULL | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| def save_chat(user_message, bot_response): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| cursor.execute( | |
| "INSERT INTO chat_history (timestamp, user_message, bot_response) VALUES (?, ?, ?)", | |
| (timestamp, user_message, bot_response) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def get_chat_history(): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT timestamp, user_message, bot_response FROM chat_history ORDER BY timestamp DESC") | |
| history = cursor.fetchall() | |
| conn.close() | |
| return history | |
| # Initialize database | |
| init_db() | |
| # Initialize RAG system | |
| def initialize_system(pdf_path): | |
| # Check if PDF file exists | |
| if not os.path.exists(pdf_path): | |
| raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
| # Extract text from PDF | |
| pdf_reader = PdfReader(pdf_path) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| # Split text into chunks | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| length_function=len | |
| ) | |
| chunks = text_splitter.split_text(text) | |
| # Create embeddings | |
| try: | |
| embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| except Exception as e: | |
| print(f"Error with HuggingFaceEmbeddings: {e}") | |
| # Fallback to a different embedding method | |
| from langchain.embeddings import FakeEmbeddings | |
| embeddings = FakeEmbeddings(size=384) | |
| print("Using FakeEmbeddings as fallback") | |
| # Create vector store | |
| vector_store = FAISS.from_texts(chunks, embeddings) | |
| # Check if API key is available | |
| if "ZHIPU_API_KEY" not in os.environ: | |
| raise ValueError("ZHIPU_API_KEY environment variable is not set") | |
| # Initialize Zhipu LLM | |
| llm = ZhipuAILLM( | |
| api_key=os.environ["ZHIPU_API_KEY"], | |
| model="glm-4", # Updated model name | |
| temperature=0.1 | |
| ) | |
| # Create prompt template | |
| prompt_template = """ | |
| You are a personal avatar representing me. Answer the question based only on the provided context. | |
| If the information is not in the context, politely say you don't have that information. | |
| Always answer in first person as if you are me. | |
| Context: {context} | |
| Question: {question} | |
| Answer: | |
| """ | |
| prompt = PromptTemplate( | |
| template=prompt_template, | |
| input_variables=["context", "question"] | |
| ) | |
| # Create RAG chain | |
| qa_chain = RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type="stuff", | |
| retriever=vector_store.as_retriever(), | |
| chain_type_kwargs={"prompt": prompt}, | |
| return_source_documents=True | |
| ) | |
| return qa_chain | |
| # Initialize on startup | |
| qa_chain = None | |
| try: | |
| qa_chain = initialize_system("Henry_Linkedin_Profile.pdf") | |
| print("System initialized successfully") | |
| except Exception as e: | |
| print(f"Error initializing system: {e}") | |
| # Create a dummy chain to allow the app to run | |
| # Instead of using OpenAI, we'll create a simple dummy chain | |
| class DummyChain: | |
| def __call__(self, inputs): | |
| return {"result": f"System initialization failed: {str(e)}"} | |
| qa_chain = DummyChain() | |
| # Chat function | |
| def chat(message, history): | |
| try: | |
| result = qa_chain({"query": message}) | |
| response = result["result"] | |
| formatted_response = f"{response}\n\n*(Information from your profile)*" | |
| # Save to database | |
| save_chat(message, formatted_response) | |
| return formatted_response | |
| except Exception as e: | |
| error_msg = f"Error processing your request: {str(e)}" | |
| save_chat(message, error_msg) | |
| return error_msg | |
| # Function to display chat history | |
| def display_history(): | |
| history = get_chat_history() | |
| if not history: | |
| return "No chat history yet." | |
| formatted_history = [] | |
| for timestamp, user_msg, bot_resp in history: | |
| formatted_history.append(f"**[{timestamp}]**") | |
| formatted_history.append(f"**You:** {user_msg}") | |
| formatted_history.append(f"**Avatar:** {bot_resp}") | |
| formatted_history.append("---") | |
| return "\n".join(formatted_history) | |
| # Function to clear chat history | |
| def clear_history(): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM chat_history") | |
| conn.commit() | |
| conn.close() | |
| return "Chat history cleared." | |
| # Create Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# My Personal Avatar") | |
| gr.Markdown("Ask me anything about my background, skills, or experience!") | |
| with gr.Tabs(): | |
| # Chat tab | |
| with gr.TabItem("Chat"): | |
| # Using a simpler chat interface | |
| chatbot = gr.Chatbot(height=500) | |
| msg = gr.Textbox(label="Your Question", placeholder="Type your question here...") | |
| clear = gr.Button("Clear Conversation") | |
| def respond(message, chat_history): | |
| if not message: | |
| return "", chat_history | |
| bot_message = chat(message, chat_history) | |
| chat_history.append((message, bot_message)) | |
| return "", chat_history | |
| msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| # History tab | |
| with gr.TabItem("Chat History"): | |
| history_output = gr.Markdown() | |
| refresh_button = gr.Button("Refresh History") | |
| clear_button = gr.Button("Clear History") | |
| refresh_button.click(display_history, outputs=history_output) | |
| clear_button.click(clear_history, outputs=history_output) | |
| # Initialize history display | |
| demo.load(display_history, outputs=history_output) | |
| if __name__ == "__main__": | |
| demo.launch() |