| import logging |
| import os |
| import docx |
| import PyPDF2 |
| from docx.shared import RGBColor, Pt |
| from io import BytesIO, IOBase |
| import tempfile |
| import re |
| import datetime |
| import gradio as gr |
| from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM |
|
|
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s - %(message)s" |
| ) |
| logger = logging.getLogger("LLM-Legal-App") |
|
|
| |
| |
| |
|
|
| |
| api_key = os.environ.get("HUGGINGFACE_API_KEY") |
|
|
| if not api_key: |
| logger.error("Hugging Face API key not found in environment variables.") |
| raise ValueError("Hugging Face API key not found. Set it with `os.environ['HUGGINGFACE_API_KEY'] = 'your_api_key'`") |
|
|
| logger.info("Successfully retrieved Hugging Face API key.") |
|
|
|
|
| |
| |
| |
|
|
| |
| model_name = "Daemontatox/DocumentCogito" |
| tokenizer = AutoTokenizer.from_pretrained(model_name) |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_name, token=api_key) |
| generator = pipeline("text2text-generation", model=model, tokenizer=tokenizer, device_map="auto") |
|
|
|
|
| def generate_legal_document(doc_type, party_a, party_b, context, country): |
| """ |
| Uses the Hugging Face model to generate a legal document. |
| """ |
| logger.info(f"Starting generation for doc_type={doc_type!r}.") |
| party_a = party_a if party_a else "[Party A Not Provided]" |
| party_b = party_b if party_b else "[Party B Not Provided]" |
| context = context if context else "[Context Not Provided]" |
|
|
| prompt = f""" |
| Generate a {doc_type} for: |
| 1) {party_a} |
| 2) {party_b} |
| |
| Context/brief of the agreement: |
| {context}. |
| |
| The document should include: |
| - Purpose of the {doc_type} |
| - Responsibilities and obligations of each party |
| - Confidentiality terms |
| - Payment terms (use [To Be Determined] if not specified) |
| - Term (duration) and termination |
| - Governing law: {country} |
| - Jurisdiction: [Appropriate region in {country} if not provided] |
| - Signature blocks |
| |
| Use formal language, but keep it relatively clear and readable. |
| For any missing information, use placeholders like [To Be Determined]. |
| Include a disclaimer that this is a draft and not legally binding until reviewed and signed. |
| """ |
| logger.debug(f"Generated prompt:\n{prompt}") |
|
|
| try: |
| |
| generated_text = generator( |
| prompt, |
| max_length=1400, |
| num_return_sequences=1, |
| temperature=0.3, |
| )[0]['generated_text'] |
|
|
| logger.info("Document generation complete.") |
| return generated_text |
|
|
| except Exception as e: |
| logger.exception("Error generating legal document.") |
| return f"Error generating document: {e}" |
|
|
|
|
| def review_legal_document(doc_text, doc_type, party_a, party_b): |
| """Reviews document using the Hugging Face model.""" |
| logger.info("Starting document review (rule-based and wording).") |
|
|
| |
| rule_based_prompt = f""" |
| Review the following document and provide feedback based on these rules: |
| |
| Document text: |
| \"\"\" |
| {doc_text} |
| \"\"\" |
| 1) Parties and Authority: ... (rest of prompt from previous turns) ... |
| """ |
| logger.debug(f"Generated rule-based review prompt:\n{rule_based_prompt}") |
|
|
| try: |
| rule_based_review = generator( |
| rule_based_prompt, |
| max_length=2000, |
| num_return_sequences=1, |
| temperature=0.3, |
| )[0]['generated_text'] |
| except Exception as e: |
| logger.exception("Error during rule-based review.") |
| return f"Error during rule-based review: {e}" |
|
|
| |
| wording_analysis_prompt = f""" |
| Analyze the wording of the following legal document: |
| |
| Document text: |
| \"\"\" |
| {doc_text} |
| \"\"\" |
| |
| Provide an analysis covering: ... (rest of prompt from previous turns) ... |
| """ |
| logger.debug(f"Generated wording analysis prompt:\n{wording_analysis_prompt}") |
|
|
| try: |
| wording_analysis = generator( |
| wording_analysis_prompt, |
| max_length=1400, |
| num_return_sequences=1, |
| temperature=0.3, |
| )[0]['generated_text'] |
| except Exception as e: |
| logger.exception("Error during wording analysis.") |
| return f"Error during wording analysis: {e}" |
|
|
| combined_review = f"Rule-Based Analysis:\n\n{rule_based_review}\n\nWording Analysis:\n\n{wording_analysis}" |
| return combined_review |
|
|
|
|
| |
| |
| |
|
|
| def parse_bytesio(file_data: BytesIO) -> str: |
| """Parses a BytesIO object.""" |
| logger.info("Parsing BytesIO object...") |
| |
| try: |
| |
| try: |
| doc_obj = docx.Document(file_data) |
| return "\n".join([para.text for para in doc_obj.paragraphs]).strip() |
| except docx.opc.exceptions.PackageNotFoundError: |
| logger.info("BytesIO is not DOCX, trying PDF.") |
| file_data.seek(0) |
| try: |
| pdf_reader = PyPDF2.PdfReader(file_data) |
| return "\n".join([page.extract_text() for page in pdf_reader.pages if page.extract_text()]).strip() |
| except Exception as e: |
| logger.exception(f"Error parsing BytesIO as PDF: {e}") |
| return f"Error parsing BytesIO as PDF: {e}" |
| except Exception as e: |
| logger.exception(f"Error processing BytesIO: {e}") |
| return f"Error processing file content: {e}" |
| except Exception as e: |
| logger.exception(f"Error parsing BytesIO: {e}") |
| return f"Error parsing BytesIO: {e}" |
|
|
| def parse_uploaded_file_path(file_data) -> str: |
| """Takes file data, determines type, extracts text.""" |
| |
| if not file_data: |
| logger.warning("No file provided.") |
| return "" |
| if isinstance(file_data, str): |
| file_path = file_data |
| logger.info(f"Received filepath: {file_path}") |
| elif isinstance(file_data, dict) and 'name' in file_data: |
| file_path = file_data['name'] |
| logger.info(f"Received file object with name: {file_path}") |
| elif isinstance(file_data, (BytesIO, IOBase)): |
| return parse_bytesio(file_data) |
| else: |
| logger.error(f"Unexpected file_data type: {type(file_data)}") |
| return "Error: Unexpected file data format." |
|
|
| logger.info(f"Attempting to parse file at {file_path}") |
| try: |
| _, ext = os.path.splitext(file_path) |
| ext = ext.lower() |
| if ext == ".pdf": |
| with open(file_path, "rb") as f: |
| pdf_reader = PyPDF2.PdfReader(f) |
| return "\n".join([page.extract_text() for page in pdf_reader.pages if page.extract_text()]).strip() |
| elif ext == ".docx": |
| doc_obj = docx.Document(file_path) |
| return "\n".join([para.text for para in doc_obj.paragraphs]).strip() |
| else: |
| return "Unsupported file format." |
| except Exception as e: |
| logger.exception(f"Error parsing file: {e}") |
| return f"Error parsing file: {e}" |
| finally: |
| pass |
|
|
| |
| |
| |
|
|
| def clean_markdown(text): |
| """Removes common Markdown formatting.""" |
| |
| if not text: return "" |
| text = re.sub(r'^#+\s+', '', text, flags=re.MULTILINE) |
| text = re.sub(r'(\*\*|__)(.*?)(\*\*|__)', r'\2', text) |
| text = re.sub(r'(\*|_)(.*?)(\*|_)', r'\2', text) |
| text = re.sub(r'^[\-\+\*]\s+', '', text, flags=re.MULTILINE) |
| text = re.sub(r'^\d+\.\s+', '', text, flags=re.MULTILINE) |
| text = re.sub(r'^[-_*]{3,}$', '', text, flags=re.MULTILINE) |
| text = re.sub(r'!\[(.*?)\]\((.*?)\)', '', text) |
| text = re.sub(r'\[(.*?)\]\((.*?)\)', r'\1', text) |
| return text.strip() |
|
|
| def create_and_save_docx(doc_text, review_text=None, doc_type="Unknown", party_a="Party A", party_b="Party B"): |
| """Creates DOCX, adds review, saves to temp file, returns path.""" |
| logger.debug("Creating and saving DOCX.") |
| |
| document = docx.Document() |
|
|
| now = datetime.datetime.now() |
| timestamp = now.strftime("%Y%m%d_%H%M%S") |
| file_name = f"GEN_AI_Review_{doc_type}_{timestamp}.docx" |
|
|
| title = f"Gen AI Analysis of {doc_type} between companies {party_a} and {party_b}" |
| document.add_heading(title, level=1) |
|
|
| if doc_text: |
| document.add_heading("Generated Document", level=2) |
| for para in clean_markdown(doc_text).split("\n"): |
| document.add_paragraph(para) |
|
|
| if review_text: |
| document.add_heading("LLM Review", level=2) |
| for section in review_text.split("\n\n"): |
| if section.startswith("Rule-Based Analysis:"): |
| analysis_heading = document.add_paragraph() |
| analysis_run = analysis_heading.add_run("Rule-Based Analysis") |
| analysis_run.font.size = Pt(14) |
| analysis_run.font.color.rgb = RGBColor(0xFF, 0x00, 0x00) |
| for para in section[len("Rule-Based Analysis:"):].split("\n"): |
| if re.match(r"^\d+\)", para): |
| p = document.add_paragraph(style='List Number') |
| p.add_run(para).font.color.rgb = RGBColor(0xFF, 0x00, 0x00) |
| else: |
| document.add_paragraph(para) |
|
|
| elif section.startswith("Wording Analysis:"): |
| analysis_heading = document.add_paragraph() |
| analysis_run = analysis_heading.add_run("Wording Analysis") |
| analysis_run.font.size = Pt(14) |
| analysis_run.font.color.rgb = RGBColor(0xFF, 0x00, 0x00) |
| for para in section[len("Wording Analysis:"):].split("\n"): |
| document.add_paragraph(para) |
| else: |
| document.add_paragraph(section) |
|
|
|
|
| with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{file_name}") as tmpfile: |
| document.save(tmpfile.name) |
| logger.debug(f"DOCX saved to: {tmpfile.name}") |
| return tmpfile.name |
|
|
| |
| |
| |
|
|
| def generate_document_interface(doc_type, party_a, party_b, context, country): |
| """Handles document generation.""" |
| logger.info(f"User requested doc generation: {doc_type}, {country}") |
| doc_text = generate_legal_document(doc_type, party_a, party_b, context, country) |
| if doc_text.startswith("Error"): |
| return doc_text, None |
| docx_file_path = create_and_save_docx(doc_text, doc_type=doc_type, party_a=party_a, party_b=party_b) |
| return doc_text, docx_file_path |
|
|
| def review_document_interface(file_data, doc_type, party_a, party_b): |
| """Handles document review.""" |
| logger.info("User requested review.") |
| if not file_data: |
| return "No file uploaded.", None |
|
|
| original_text = parse_uploaded_file_path(file_data) |
| if original_text.startswith("Error") or original_text.startswith("Unsupported"): |
| return original_text, None |
|
|
| review_text = review_legal_document(original_text, doc_type, party_a, party_b) |
| if review_text.startswith("Error"): |
| return review_text, None |
|
|
| docx_file_path = create_and_save_docx(None, review_text, doc_type, party_a, party_b) |
| return review_text, docx_file_path |
|
|
| |
| |
| |
| custom_css = """ |
| .tab-one { |
| background-color: #D1EEFC; /* Light blue */ |
| color: #333; |
| } |
| .tab-two { |
| background-color: #FCEED1; /* Light orange */ |
| color: #333; |
| } |
| """ |
|
|
| def build_app(): |
| with gr.Blocks(css=custom_css) as demo: |
| gr.Markdown( |
| """ |
| # UST Global LLM-based Legal Reviewer |
| |
| **Review an Existing MOU, SOW, MSA in PDF/DOCX format**: Upload a document for analysis. |
| |
| **Disclaimer**: This tool provides assistance but is not a substitute for professional legal advice. |
| """ |
| ) |
| with gr.Tabs(selected=1): |
| with gr.Tab("Generate Document",visible=False): |
| doc_type = gr.Dropdown(label="Document Type", choices=["MOU", "MSA", "SoW", "NDA"], value="MOU") |
| party_a = gr.Textbox(label="Party A Name", placeholder="e.g., Tech Innovations LLC") |
| party_b = gr.Textbox(label="Party B Name", placeholder="e.g., Global Consulting Corp") |
| context = gr.Textbox(label="Context/Brief", placeholder="Short summary of the agreement...") |
| country = gr.Dropdown(label="Governing Law (Country)", choices=["India", "Malaysia", "US", "UK", "Singapore", "Japan"], value="India") |
| gen_button = gr.Button("Generate Document") |
| gen_output_text = gr.Textbox(label="Generated Document", lines=15, placeholder="Generated document will appear here...") |
| gen_output_file = gr.File(label="Download DOCX", type="filepath") |
| gen_button.click( |
| generate_document_interface, |
| inputs=[doc_type, party_a, party_b, context, country], |
| outputs=[gen_output_text, gen_output_file] |
| ) |
|
|
| with gr.Tab("Review Document",elem_classes="tab-one", id=1): |
| |
| doc_type_review = gr.Dropdown(label="Document Type", choices=["MOU", "MSA", "SoW", "NDA"], value="MOU", visible=False) |
| party_a_review = gr.Textbox(label="Party A Name", visible=False) |
| party_b_review = gr.Textbox(label="Party B Name", visible=False) |
|
|
| file_input = gr.File(label="Upload PDF/DOCX for Review", type="filepath") |
| review_button = gr.Button("Review Document") |
| review_output_text = gr.Textbox(label="Review", lines=15, placeholder="Review will appear here...") |
| review_output_file = gr.File(label="Download Reviewed DOCX", type="filepath") |
| review_button.click( |
| review_document_interface, |
| inputs=[file_input, doc_type_review, party_a_review, party_b_review], |
| outputs=[review_output_text, review_output_file] |
| ) |
| |
| gen_button.click(lambda x, y, z: (x, y, z), [doc_type, party_a, party_b], [doc_type_review, party_a_review, party_b_review]) |
|
|
| gr.Markdown("**Note:** Scanned PDFs may not parse correctly. .docx is generally preferred.") |
|
|
| return demo |
|
|
| logger.info("Initializing Gradio interface...") |
| demo = build_app() |
| logger.info("Launching Gradio app.") |
| demo.launch(debug=True) |