Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| from io import BytesIO | |
| from typing import List, Dict, Tuple | |
| import pdfplumber | |
| from docx import Document | |
| from sentence_transformers import SentenceTransformer | |
| from pinecone import Pinecone | |
| import openai | |
| import tempfile | |
| import json | |
| import re | |
| import markdown | |
| from bs4 import BeautifulSoup | |
| from datetime import datetime | |
| from huggingface_hub import hf_hub_download, HfApi | |
| import pypandoc | |
| # ----------------- CONFIG ----------------- | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
| PINECONE_INDEX = "legal-ai" | |
| HF_DATASET_REPO = "omarkashif/legal-draft-templates" | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| openai_client = openai.OpenAI(api_key=OPENAI_API_KEY) | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| index = pc.Index(PINECONE_INDEX) | |
| embedder = SentenceTransformer("all-mpnet-base-v2") | |
| api = HfApi() | |
| # ----------------- HELPERS ----------------- | |
| def load_reference_text(uploaded_file) -> str: | |
| if not uploaded_file: | |
| return "" | |
| if uploaded_file.name.lower().endswith(".docx"): | |
| doc = Document(uploaded_file) | |
| return "\n".join(p.text for p in doc.paragraphs) | |
| elif uploaded_file.name.lower().endswith(".pdf"): | |
| text = "" | |
| with pdfplumber.open(uploaded_file) as pdf: | |
| for page in pdf.pages: | |
| t = page.extract_text() | |
| if t: | |
| text += t + "\n" | |
| return text | |
| elif uploaded_file.name.lower().endswith(".txt"): | |
| return uploaded_file.read().decode("utf-8", errors="ignore") | |
| return "" | |
| def load_templates_json() -> List[Dict]: | |
| """Load templates.json from HF dataset repo.""" | |
| try: | |
| file_path = hf_hub_download( | |
| repo_id=HF_DATASET_REPO, | |
| filename="templates.json", | |
| repo_type="dataset" | |
| ) | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return [] | |
| def save_template_to_hf(name: str, analysis: str) -> Tuple[bool, str]: | |
| """Save new template into HF dataset repo (templates.json).""" | |
| try: | |
| # 1. Load latest file | |
| file_path = hf_hub_download( | |
| repo_id=HF_DATASET_REPO, | |
| filename="templates.json", | |
| repo_type="dataset" | |
| ) | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # 2. Check duplicates | |
| existing_names = [t.get("name") for t in data] | |
| if name in existing_names: | |
| return False, f"Template name '{name}' already exists." | |
| # 3. Append new template | |
| data.append({ | |
| "name": name, | |
| "analysis": analysis, | |
| "uploaded_at": datetime.utcnow().isoformat() | |
| }) | |
| # 4. Save locally | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| # 5. Push back to repo | |
| api.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo="templates.json", | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add new template: {name}", | |
| token=HF_TOKEN | |
| ) | |
| return True, f"β Template '{name}' added to HF dataset." | |
| except Exception as e: | |
| return False, f"β Error saving template: {e}" | |
| def parse_json_safe(raw_text: str, fallback: str) -> List[str]: | |
| try: | |
| return json.loads(raw_text) | |
| except: | |
| matches = re.findall(r'"([^"]+)"', raw_text) | |
| if matches: | |
| return matches | |
| return [fallback[:512]] | |
| def build_queries_with_llm(user_text: str, max_queries: int = 15) -> List[str]: | |
| system_prompt = ( | |
| "You are a legal research assistant. " | |
| "A new petition needs to be drafted using the following client/case description. " | |
| "Devise 5β6 or more concise queries that will be helpful to retrieve relevant information " | |
| "from a knowledge base containing the Constitution of Pakistan, Punjab case law, " | |
| "and FBR tax ordinances. " | |
| "Return ONLY a JSON array of strings, no extra text." | |
| ) | |
| try: | |
| resp = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_text}], | |
| temperature=0.1, | |
| max_tokens=2000 | |
| ) | |
| raw = resp.choices[0].message.content.strip() | |
| return parse_json_safe(raw, user_text)[:max_queries] | |
| except Exception: | |
| return [user_text[:512]] | |
| def pinecone_search(queries: List[str], top_k: int = 10, max_chars: int = 10000) -> Tuple[str, List[Dict]]: | |
| seen_texts, context_parts, citations = set(), [], [] | |
| for q in queries: | |
| vec = embedder.encode(q).tolist() | |
| res = index.query(vector=vec, top_k=top_k, include_metadata=True) | |
| matches = res.get("matches", []) | |
| for m in matches: | |
| md = m.get("metadata", {}) | |
| txt = md.get("text") or "" | |
| if not txt or txt[:200] in seen_texts: | |
| continue | |
| seen_texts.add(txt[:200]) | |
| context_parts.append(f"- {txt.strip()}") | |
| citations.append({ | |
| "score": float(m.get("score") or 0.0), | |
| "source": md.get("chunk_id") or md.get("title") or "Unknown" | |
| }) | |
| if sum(len(p) for p in context_parts) > max_chars: | |
| break | |
| return "\n".join(context_parts), citations | |
| # def markdown_to_docx(md_text: str) -> str: | |
| # html = markdown.markdown(md_text) | |
| # soup = BeautifulSoup(html, "html.parser") | |
| # doc = Document() | |
| # for el in soup.descendants: | |
| # if el.name == "h1": | |
| # doc.add_heading(el.get_text(), level=1) | |
| # elif el.name == "h2": | |
| # doc.add_heading(el.get_text(), level=2) | |
| # elif el.name == "h3": | |
| # doc.add_heading(el.get_text(), level=3) | |
| # elif el.name == "p": | |
| # doc.add_paragraph(el.get_text()) | |
| # elif el.name == "li": | |
| # doc.add_paragraph(f"β’ {el.get_text()}") | |
| # tmp_path = os.path.join(tempfile.gettempdir(), "draft.docx") | |
| # doc.save(tmp_path) | |
| # return tmp_path | |
| def markdown_to_docx(md_text: str) -> str: | |
| """Convert Markdown text to DOCX using Pandoc (preserves full formatting).""" | |
| tmp_path = os.path.join(tempfile.gettempdir(), "draft.docx") | |
| try: | |
| pypandoc.convert_text( | |
| md_text, | |
| "docx", | |
| format="md", | |
| outputfile=tmp_path, | |
| extra_args=["--standalone"] | |
| ) | |
| return tmp_path | |
| except Exception as e: | |
| # Fallback simple converter | |
| from docx import Document | |
| doc = Document() | |
| # doc.add_paragraph("(Conversion via Pandoc failed β fallback applied.)") | |
| doc.add_paragraph(md_text) | |
| doc.save(tmp_path) | |
| return tmp_path | |
| # ----------------- ANALYZER ----------------- | |
| def analyze_template_draft(ref_text: str) -> str: | |
| if not ref_text: | |
| return "(no template provided)" | |
| system_prompt = """You are a legal draft analyzer. | |
| Your task is to carefully analyze the uploaded legal draft document and summarize its full structure and style. | |
| Extract the following information clearly and systematically: | |
| 1. Headings and subheadings (exact order). | |
| 2. Approximate length/word count per section. | |
| 3. Purpose of each section (what content it usually contains). | |
| 4. Writing style and tone (formal/informal, persuasive, assertive, etc.). | |
| 5. Formatting conventions (headings, numbering, bullet points, capitalization). | |
| 6. Sentence/paragraph length and complexity. | |
| 7. Any special legal phrases or terminology patterns. | |
| 8. Any notes on length and overall flow. | |
| Return a report that can be given as instructions to another model so it can treat this document as template to write a new legal draft based on this template (in terms of style, language, tone, length, format). | |
| Do not rewrite the draft, only analyze it.""" | |
| try: | |
| resp = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": ref_text[:40000]}], | |
| max_completion_tokens=1000, | |
| temperature = 0.1 | |
| ) | |
| return resp.choices[0].message.content.strip() | |
| except Exception as e: | |
| return f"(Analyzer error: {e})" | |
| # ----------------- MAIN ----------------- | |
| def generate_legal_draft(case_text, uploaded_file, template_name, new_template_name, add_citations=True): | |
| yield gr.update(value="π Searching in Knowledge Base..."), None | |
| queries = build_queries_with_llm(case_text) | |
| context_text, citations = pinecone_search(queries, top_k=10) | |
| # Handle template | |
| template_analysis = "" | |
| if template_name and template_name != "None": | |
| # Load existing template | |
| templates = load_templates_json() | |
| chosen = next((t for t in templates if t["name"] == template_name), None) | |
| template_analysis = chosen["analysis"] if chosen else "(not found)" | |
| elif uploaded_file: | |
| yield gr.update(value="π Analyzing Uploaded Template..."), None | |
| ref_text = load_reference_text(uploaded_file) | |
| template_analysis = analyze_template_draft(ref_text) | |
| # Save to HF dataset | |
| if new_template_name: | |
| ok, msg = save_template_to_hf(new_template_name, template_analysis) | |
| yield gr.update(value=msg), None | |
| else: | |
| template_analysis = "(Template uploaded but no name provided)" | |
| yield gr.update(value="βοΈ Generating Final Draft..."), None | |
| system_prompt = """You are an expert legal drafter for Pakistani law. Your task is to create a professional, court-ready legal petition in MARKDOWN format using four inputs: | |
| 1. User Input: Case details including client info, petition type, court, facts, relevant laws, and sections. | |
| 2. Knowledge Base Context: Relevant laws, case precedents, and ordinances retrieved from the vector database (Constitution of Pakistan, Punjab case law, FBR ordinances). | |
| 3. Template Draft Analysis: A structured analysis of an uploaded legal template (headings, section purposes, tone, formatting rules, length, style). | |
| 4. Fallback: If some info is missing, state explicitly instead of hallucinating. | |
| Instructions | |
| - Replicate the section hierarchy and style described in the template analysis. | |
| - Ensure clarity, professionalism, and persuasive legal argumentation. | |
| - Integrate legal context where appropriate with accurate citations. | |
| - Output must be MARKDOWN format only, no explanations or extra commentary. | |
| """ | |
| user_prompt = f""" | |
| **User Input:** | |
| {case_text} | |
| **Knowledge Base Context:** | |
| {context_text or '(no matches)'} | |
| **Template Draft Analysis:** | |
| {template_analysis} | |
| """ | |
| try: | |
| resp = openai_client.chat.completions.create( | |
| model="gpt-5", | |
| messages=[{"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}], | |
| max_completion_tokens=15000, | |
| verbosity="high" | |
| ) | |
| draft_md = resp.choices[0].message.content.strip() | |
| except Exception as e: | |
| draft_md = f"OpenAI error: {e}" | |
| if add_citations and citations: | |
| draft_md += "\n\n### References\n" | |
| for i, c in enumerate(citations, 1): | |
| draft_md += f"{i}. {c['source']} (score: {c['score']:.3f})\n" | |
| docx_path = markdown_to_docx(draft_md) | |
| yield gr.update(value=draft_md), docx_path | |
| # ----------------- GRADIO UI ----------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## βοΈ AI Legal Draft Generator\nUpload or select a template, then enter case details.") | |
| case_text = gr.Textbox(label="Case Details", lines=10, placeholder="Enter client and case info...") | |
| uploaded_file = gr.File(label="Upload New Template (DOCX/PDF/TXT)", file_types=[".docx",".pdf",".txt"]) | |
| new_template_name = gr.Textbox(label="New Template Name (if uploading)") | |
| templates = load_templates_json() | |
| template_names = ["None"] + [t["name"] for t in templates] | |
| template_name = gr.Dropdown(choices=template_names, value="None", label="Select Existing Template") | |
| add_citations = gr.Checkbox(label="Append citations", value=True) | |
| draft_output = gr.Markdown(label="Draft Output") | |
| download_btn = gr.DownloadButton(label="β¬οΈ Download Word") | |
| btn = gr.Button("Generate Draft") | |
| btn.click( | |
| generate_legal_draft, | |
| inputs=[case_text, uploaded_file, template_name, new_template_name, add_citations], | |
| outputs=[draft_output, download_btn] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |