Spaces:
Build error
Build error
| import os | |
| import gradio as gr | |
| import torch | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| # Configuration | |
| DOCS_DIR = ".business_docs" | |
| EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
| MODEL_NAME = "microsoft/phi-3-mini-4k-instruct" # CPU-optimized model | |
| # System Initialization | |
| def initialize_system(): | |
| # Validate documents folder | |
| if not os.path.exists(DOCS_DIR): | |
| raise FileNotFoundError(f"Missing documents folder: {DOCS_DIR}") | |
| # Process PDFs | |
| pdf_files = [os.path.join(DOCS_DIR, f) for f in os.listdir(DOCS_DIR) if f.endswith(".pdf")] | |
| if not pdf_files: | |
| raise ValueError(f"No PDFs found in {DOCS_DIR}") | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=512, # Optimized for CPU | |
| chunk_overlap=50 | |
| ) | |
| documents = [] | |
| for pdf_path in pdf_files: | |
| try: | |
| loader = PyPDFLoader(pdf_path) | |
| documents.extend(loader.load_and_split(text_splitter)) | |
| except Exception as e: | |
| print(f"Error processing {pdf_path}: {str(e)}") | |
| # Create embeddings | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name=EMBEDDING_MODEL, | |
| model_kwargs={'device': 'cpu'}, | |
| encode_kwargs={'normalize_embeddings': True} | |
| ) | |
| vector_store = FAISS.from_documents(documents, embeddings) | |
| # Load CPU-optimized model | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float32, | |
| device_map="cpu" | |
| ) | |
| except Exception as e: | |
| raise RuntimeError(f"Model loading failed: {str(e)}") | |
| return vector_store, model, tokenizer | |
| # Initialize system | |
| try: | |
| vector_store, model, tokenizer = initialize_system() | |
| print("β System ready with business documents") | |
| except Exception as e: | |
| print(f"β Initialization failed: {str(e)}") | |
| raise | |
| # Response Generation | |
| def generate_response(query): | |
| try: | |
| # Context retrieval | |
| docs = vector_store.similarity_search(query, k=2) | |
| context = "\n".join([d.page_content for d in docs]) | |
| # Phi-3 prompt template | |
| prompt = f"""<|system|> | |
| Answer ONLY using the business documents. Respond to unknown queries with: "This information is not available in our current documentation." | |
| Context: {context}</s> | |
| <|user|> | |
| {query}</s> | |
| <|assistant|> | |
| """ | |
| # Generate response | |
| inputs = tokenizer(prompt, return_tensors="pt", return_attention_mask=False) | |
| outputs = model.generate( | |
| inputs.input_ids, | |
| max_new_tokens=200, | |
| temperature=0.1, | |
| do_sample=True, | |
| pad_token_id=tokenizer.eos_token_id | |
| ) | |
| response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| return response.split("<|assistant|>")[-1].strip() | |
| except Exception as e: | |
| return f"Error: Please try again. ({str(e)[:50]})" | |
| # Gradio Interface | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π Business Documentation Assistant") | |
| chatbot = gr.Chatbot(height=300) | |
| msg = gr.Textbox(placeholder="Ask about our services...", label="") | |
| clear = gr.Button("Clear History") | |
| def respond(message, history): | |
| response = generate_response(message) | |
| history.append((message, response)) | |
| return "", history | |
| msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |