| import os |
| import gradio as gr |
| import zipfile |
| import requests |
| import json |
| from pathlib import Path |
| import logging |
| import numpy as np |
| from sentence_transformers import SentenceTransformer |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| import faiss |
| from simple_salesforce import Salesforce |
| from dotenv import load_dotenv |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| load_dotenv() |
|
|
| |
| sf_username = os.getenv("SF_USERNAME") |
| sf_password = os.getenv("SF_PASSWORD") |
| sf_security_token = os.getenv("SF_SECURITY_TOKEN") |
| sf_instance_url = os.getenv("SF_INSTANCE_URL") |
|
|
| |
| if not sf_username or not sf_password or not sf_security_token or not sf_instance_url: |
| logger.error("β Salesforce credentials are missing from environment variables!") |
| raise ValueError("Salesforce credentials are not properly set.") |
|
|
| |
| try: |
| sf = Salesforce( |
| username=sf_username, |
| password=sf_password, |
| security_token=sf_security_token, |
| instance_url=sf_instance_url |
| ) |
| logger.info("β
Connected to Salesforce") |
| except Exception as e: |
| logger.error(f"β Salesforce connection failed: {str(e)}") |
| raise |
|
|
| |
| def extract_zip(zip_path, extract_to): |
| try: |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
| zip_ref.extractall(extract_to) |
| logger.info(f"Extracted {zip_path} to {extract_to}") |
| except Exception as e: |
| logger.error(f"Failed to extract {zip_path}: {str(e)}") |
| raise |
|
|
| def load_documents(folder_path): |
| documents = [] |
| sources = [] |
| for file in Path(folder_path).rglob("*.txt"): |
| text = file.read_text(encoding="utf-8", errors="ignore") |
| documents.append(text) |
| sources.append(file.name) |
| return documents, sources |
|
|
| |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50) |
|
|
| |
| model = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
| |
| data_dir = Path("./data") |
| data_dir.mkdir(exist_ok=True) |
|
|
| doc_folders = [ |
| ("Company_Policies.zip", "Company_Policies"), |
| ("HR_Policies.zip", "Hr_Policies"), |
| ("Contract_Clauses.zip", "Contract_Clauses") |
| ] |
|
|
| all_chunks = [] |
| metadata = [] |
|
|
| for zip_name, folder in doc_folders: |
| zip_path = Path(zip_name) |
| if not zip_path.exists(): |
| logger.error(f"Zip file {zip_name} not found") |
| raise FileNotFoundError(f"Zip file {zip_name} not found") |
| extract_path = data_dir / folder |
| extract_path.mkdir(exist_ok=True) |
| extract_zip(zip_path, extract_path) |
| docs, sources = load_documents(extract_path) |
| if not docs: |
| logger.error(f"No documents found in {extract_path}") |
| raise ValueError(f"No documents found in {extract_path}") |
| for doc, src in zip(docs, sources): |
| chunks = text_splitter.split_text(doc) |
| all_chunks.extend(chunks) |
| src_url = f"https://company.com/{folder}/{src}" |
| metadata.extend([src_url] * len(chunks)) |
|
|
| |
| embeddings = model.encode(all_chunks) |
| index = faiss.IndexFlatL2(embeddings.shape[1]) |
| index.add(np.array(embeddings)) |
| logger.info("FAISS index built successfully") |
|
|
| |
| def create_salesforce_record(query, answer, confidence_percentage, source_link): |
| try: |
| |
| confidence_percentage = float(confidence_percentage) |
|
|
| |
| data = { |
| "User_Query__c": query, |
| "Answer__c": answer, |
| "Confidence_score__c": confidence_percentage, |
| "Document_Link__c": source_link, |
| } |
|
|
| |
| response = sf.Chat_Query_Log__c.create(data) |
| |
| |
| if 'id' in response: |
| record_id = response['id'] |
| logger.info(f"β
Record created successfully in Salesforce with ID: {record_id}") |
| return record_id |
| else: |
| |
| logger.error(f"β Failed to create Salesforce record. Response: {response}") |
| return None |
| except Exception as e: |
| |
| logger.error(f"Error creating Salesforce record: {str(e)}") |
| if 'response' in locals(): |
| logger.error(f"Salesforce API Response: {str(response)}") |
| return None |
|
|
| |
| def answer_query(query): |
| try: |
| logger.info(f"Processing query: {query}") |
| query_embedding = model.encode([query]) |
| D, I = index.search(np.array(query_embedding), k=3) |
| top_chunks = [all_chunks[i] for i in I[0]] |
| top_sources = [metadata[i] for i in I[0]] |
| distances = D[0] |
|
|
| relevant_chunks = [ |
| chunk for chunk, dist in zip(top_chunks, distances) if dist < 0.8 |
| ] |
| relevant_sources = [ |
| src for src, dist in zip(top_sources, distances) if dist < 0.8 |
| ] |
|
|
| if not relevant_chunks: |
| return "No relevant information found.", "Confidence: 0%", "Source Link: None" |
|
|
| answer = relevant_chunks[0].strip() |
| min_distance = min(distances) |
| confidence_percentage = max(0, 100 - (min_distance * 100)) |
| source_link = relevant_sources[0] if relevant_sources else "None" |
|
|
| |
| record_id = create_salesforce_record(query, answer, confidence_percentage, source_link) |
|
|
| if record_id: |
| return ( |
| answer, |
| f"Confidence: {confidence_percentage:.2f}%", |
| f"Source Link: {source_link}", |
| f"Salesforce Record ID: {record_id}" |
| ) |
| else: |
| return ( |
| answer, |
| f"Confidence: {confidence_percentage:.2f}%", |
| f"Source Link: {source_link}", |
| "Failed to create record in Salesforce" |
| ) |
| except Exception as e: |
| logger.error(f"Error in answer_query: {str(e)}") |
| return f"Error: {str(e)}", "", "", "" |
|
|
| |
| def process_question(q): |
| if not q.strip(): |
| return "Please enter a question.", "", "" |
| return answer_query(q) |
|
|
| with gr.Blocks(title="Company Documents Q&A", theme=gr.themes.Soft()) as demo: |
| gr.Markdown("## π Company Documents Q&A System") |
| |
| with gr.Row(): |
| with gr.Column(scale=3): |
| question = gr.Textbox( |
| label="Ask a Question", |
| placeholder="What are the conditions for permanent employment status?", |
| lines=1 |
| ) |
| with gr.Column(scale=1): |
| submit_btn = gr.Button("Submit", variant="primary") |
|
|
| with gr.Row(): |
| with gr.Column(): |
| answer_out = gr.Markdown(label="Answer") |
| conf_out = gr.Markdown(label="Confidence") |
| source_out = gr.Markdown(label="Source Link") |
| record_out = gr.Markdown(label="Salesforce Record ID") |
|
|
| submit_btn.click(fn=process_question, inputs=question, outputs=[answer_out, conf_out, source_out, record_out]) |
|
|
| demo.launch(server_name="0.0.0.0", server_port=7860, share=True) |
|
|