Spaces:
Sleeping
Sleeping
| import os | |
| import fitz # PyMuPDF | |
| import docx | |
| import io | |
| import json | |
| import gradio as gr | |
| import pytesseract | |
| from PIL import Image | |
| from tqdm import tqdm | |
| import chromadb | |
| import torch | |
| import nltk | |
| from sentence_transformers import SentenceTransformer, util | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| # ---------------------------- | |
| # β Ensure nltk punkt is available | |
| # ---------------------------- | |
| try: | |
| nltk.data.find("tokenizers/punkt") | |
| except LookupError: | |
| nltk.download("punkt") | |
| from nltk.tokenize import sent_tokenize | |
| # ---------------------------- | |
| # βοΈ Config | |
| # ---------------------------- | |
| MANUAL_DIR = "./Manuals" | |
| CHROMA_DIR = "./chroma_store" | |
| CHUNK_SIZE = 750 | |
| CHUNK_OVERLAP = 100 | |
| MAX_CONTEXT = 3 | |
| DEFAULT_MODEL = "meta-llama/Llama-3-8b-Instruct" | |
| MODEL_OPTIONS = [ | |
| "meta-llama/Llama-3-8b-Instruct", | |
| "mistralai/Mistral-7B-Instruct-v0.3", | |
| "google/gemma-1.1-7b-it" | |
| ] | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| # ---------------------------- | |
| # π Utility functions | |
| # ---------------------------- | |
| def extract_pdf_text(path): | |
| text_blocks = [] | |
| doc = fitz.open(path) | |
| for i, page in enumerate(doc): | |
| text = page.get_text() | |
| if not text.strip(): | |
| img = Image.open(io.BytesIO(page.get_pixmap().tobytes("png"))) | |
| text = pytesseract.image_to_string(img) | |
| text_blocks.append({"page": i + 1, "text": text}) | |
| return text_blocks | |
| def extract_docx_text(path): | |
| doc = docx.Document(path) | |
| full_text = "\n".join([para.text for para in doc.paragraphs]) | |
| return [{"page": 1, "text": full_text}] | |
| def split_sentences(text): | |
| try: | |
| return sent_tokenize(text) | |
| except Exception: | |
| return text.split(". ") | |
| def chunk_text(sentences): | |
| chunks = [] | |
| current = [] | |
| count = 0 | |
| for sentence in sentences: | |
| tokens = sentence.split() | |
| if count + len(tokens) > CHUNK_SIZE: | |
| chunks.append(" ".join(current)) | |
| current = current[-CHUNK_OVERLAP:] | |
| count = sum(len(s.split()) for s in current) | |
| current.append(sentence) | |
| count += len(tokens) | |
| if current: | |
| chunks.append(" ".join(current)) | |
| return chunks | |
| def embed_all(): | |
| client = chromadb.PersistentClient(path=CHROMA_DIR) | |
| if "manual_chunks" in [c.name for c in client.list_collections()]: | |
| client.delete_collection("manual_chunks") | |
| collection = client.create_collection("manual_chunks") | |
| embedder = SentenceTransformer("all-MiniLM-L6-v2") | |
| for fname in os.listdir(MANUAL_DIR): | |
| fpath = os.path.join(MANUAL_DIR, fname) | |
| if fname.lower().endswith(".pdf"): | |
| pages = extract_pdf_text(fpath) | |
| elif fname.lower().endswith(".docx"): | |
| pages = extract_docx_text(fpath) | |
| else: | |
| continue | |
| for page in pages: | |
| sents = split_sentences(page["text"]) | |
| chunks = chunk_text(sents) | |
| for idx, chunk in enumerate(chunks): | |
| cid = f"{fname}::p{page['page']}::c{idx}" | |
| collection.add(documents=[chunk], ids=[cid], metadatas=[{"source": fname, "page": page["page"]}]) | |
| return collection, embedder | |
| def get_model(model_id): | |
| tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN) | |
| model = AutoModelForCausalLM.from_pretrained(model_id, token=HF_TOKEN, torch_dtype=torch.float32) | |
| return pipeline("text-generation", model=model, tokenizer=tokenizer, device=-1) | |
| def run_query(question, model_name): | |
| results = db.query(query_texts=[question], n_results=MAX_CONTEXT) | |
| if not results or not results.get("documents"): | |
| return "No matching information found." | |
| context = "\n\n".join(results["documents"][0]) | |
| prompt = f""" | |
| You are a helpful assistant. Use the following context to answer the question. | |
| Context: | |
| {context} | |
| Question: {question} | |
| Answer: | |
| """ | |
| model = get_model(model_name) | |
| res = model(prompt, max_new_tokens=300)[0]['generated_text'] | |
| return res.split("Answer:")[-1].strip() | |
| # ---------------------------- | |
| # β Startup: Embed manuals | |
| # ---------------------------- | |
| db, embedder = embed_all() | |
| # ---------------------------- | |
| # ποΈ Gradio UI | |
| # ---------------------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown(""" | |
| # π SmartManuals-AI (Docker) | |
| Ask any question from the preloaded manuals (PDF + Word). | |
| """) | |
| with gr.Row(): | |
| question = gr.Textbox(label="Ask a Question") | |
| model = gr.Dropdown(choices=MODEL_OPTIONS, value=DEFAULT_MODEL, label="Choose LLM") | |
| btn = gr.Button("Ask") | |
| answer = gr.Textbox(label="Answer", lines=10) | |
| btn.click(fn=run_query, inputs=[question, model], outputs=answer) | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |