Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import pdfplumber | |
| import numpy as np | |
| import faiss | |
| import zipfile | |
| import re | |
| from transformers import T5Tokenizer, T5ForConditionalGeneration | |
| from sentence_transformers import SentenceTransformer | |
| # ------------------------- | |
| # Step 1: Unzip documents.zip if needed | |
| # ------------------------- | |
| def unzip_docs(): | |
| if os.path.exists("documents.zip") and not os.path.exists("docs"): | |
| print("π Unzipping documents.zip...") | |
| with zipfile.ZipFile("documents.zip", "r") as zip_ref: | |
| zip_ref.extractall("docs") | |
| print("β Extracted to /docs") | |
| # ------------------------- | |
| # Step 2: Load PDF Content | |
| # ------------------------- | |
| def load_docs(folder="docs"): | |
| all_text = "" | |
| files_found = [] | |
| for root, _, files in os.walk(folder): | |
| for fname in files: | |
| if fname.lower().endswith(".pdf"): | |
| path = os.path.join(root, fname) | |
| files_found.append(path) | |
| with pdfplumber.open(path) as pdf: | |
| for page in pdf.pages: | |
| text = page.extract_text() | |
| if text: | |
| all_text += text + "\n" | |
| print(f"π Loaded {len(files_found)} PDF(s).") | |
| print("π§Ύ Sample Text:\n", all_text[:500]) | |
| return all_text | |
| # ------------------------- | |
| # Step 3: Chunk text semantically | |
| # ------------------------- | |
| def chunk_text(text, max_words=300): | |
| raw_paragraphs = re.split(r'\n{2,}|\n\s*\d{1,4}\s*\n', text) # split by double newlines or page numbers | |
| chunks = [] | |
| current = "" | |
| for para in raw_paragraphs: | |
| if len((current + para).split()) < max_words: | |
| current += " " + para.strip() | |
| else: | |
| chunks.append(current.strip()) | |
| current = para | |
| if current: | |
| chunks.append(current.strip()) | |
| clean_chunks = [chunk for chunk in chunks if len(chunk.split()) > 30] | |
| print(f"π¦ Total Chunks: {len(clean_chunks)}") | |
| for i, chunk in enumerate(clean_chunks[:3]): | |
| print(f"πΉ Chunk {i+1}:\n{chunk[:300]}\n") | |
| return clean_chunks | |
| # ------------------------- | |
| # Step 4: Build FAISS RAG Index | |
| # ------------------------- | |
| def build_index(): | |
| unzip_docs() | |
| raw_text = load_docs("docs") | |
| global doc_chunks | |
| doc_chunks = chunk_text(raw_text) | |
| embeddings = embedder.encode(doc_chunks, convert_to_numpy=True, normalize_embeddings=True) | |
| rag_index = faiss.IndexFlatIP(embeddings.shape[1]) | |
| rag_index.add(embeddings) | |
| return rag_index | |
| # ------------------------- | |
| # Step 5: RAG-based Answer Generator | |
| # ------------------------- | |
| def generate_answer(question): | |
| q_embed = embedder.encode([question], normalize_embeddings=True) | |
| D, I = index.search(np.array(q_embed), top_k) | |
| retrieved = [f"Passage {i+1}:\n{doc_chunks[i]}" for i in I[0]] | |
| context = "\n\n".join(retrieved) | |
| prompt = ( | |
| "You are KatibaGPT, an expert legal assistant trained on the Constitution of Kenya. " | |
| "Use ONLY the following legal text to answer the question. Your response must be accurate, structured, and clearly reference the Constitution.\n\n" | |
| f"Context:\n{context}\n\n" | |
| f"Question: {question}\n\nAnswer:" | |
| ) | |
| input_ids = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).input_ids | |
| output_ids = model.generate( | |
| input_ids, | |
| max_length=700, | |
| num_beams=6, | |
| temperature=0.6, | |
| repetition_penalty=1.2, | |
| early_stopping=True | |
| ) | |
| return tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
| # ------------------------- | |
| # Step 6: Load Everything Once | |
| # ------------------------- | |
| print("β³ Loading KatibaGPT backend...") | |
| embedder = SentenceTransformer("multi-qa-MiniLM-L6-cos-v1") | |
| tokenizer = T5Tokenizer.from_pretrained("google/flan-t5-large") | |
| model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large") | |
| top_k = 5 | |
| doc_chunks = [] | |
| index = build_index() | |
| print("β KatibaGPT is ready.") | |
| # ------------------------- | |
| # Step 7: Gradio UI | |
| # ------------------------- | |
| demo = gr.Interface( | |
| fn=generate_answer, | |
| inputs=gr.Textbox(label="Ask KatibaGPT", placeholder="e.g. What does Article 43 say about the right to housing?"), | |
| outputs=gr.Textbox(label="Answer"), | |
| title="βοΈ KatibaGPT β Kenyan Constitution Legal Assistant", | |
| description="Ask questions about the Constitution of Kenya. KatibaGPT retrieves specific clauses and answers clearly using grounded legal text.", | |
| examples=[ | |
| "What is the process of impeaching a president in Kenya?", | |
| "Explain the right to education under the Constitution.", | |
| "Which article talks about the role of the judiciary?" | |
| ] | |
| ) | |
| demo.launch() | |