Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import os | |
| import time | |
| import tempfile | |
| import subprocess | |
| import re | |
| import requests | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Tuple, Any | |
| from difflib import SequenceMatcher | |
| from huggingface_hub import InferenceClient | |
| import pandas as pd | |
| subprocess.run(["pip", "install", "pypdf", "-q"], capture_output=True) | |
| try: | |
| from pypdf import PdfReader | |
| except Exception: | |
| PdfReader = None | |
| try: | |
| import PyPDF2 | |
| except Exception: | |
| PyPDF2 = None | |
| try: | |
| from gtts import gTTS | |
| except Exception: | |
| gTTS = None | |
| try: | |
| from docx import Document | |
| except Exception: | |
| Document = None | |
| SAVE_DIR = Path("/data/saved_docs") | |
| SAVE_DIR.mkdir(parents=True, exist_ok=True) | |
| MODEL = "meta-llama/Llama-3.3-70B-Instruct" | |
| def get_client(): | |
| token = os.environ.get("HF_TOKEN", "") or os.environ.get("HUGGINGFACE_TOKEN", "") | |
| return InferenceClient(model=MODEL, token=token if token else None) | |
| def call_llama(prompt, max_tokens=2000): | |
| try: | |
| client = get_client() | |
| response = client.chat_completion( | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=max_tokens, | |
| temperature=0.7, | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def extract_pdf_text(path): | |
| if not path: | |
| return "" | |
| if hasattr(path, "name"): | |
| path = path.name | |
| try: | |
| if PdfReader is None: | |
| return "[PDF reader unavailable]" | |
| reader = PdfReader(path) | |
| pages = [] | |
| for i, page in enumerate(reader.pages): | |
| text = page.extract_text() or "" | |
| if text.strip(): | |
| pages.append(f"--- Page {i + 1} ---\n{text.strip()}") | |
| if not pages: | |
| return "[No extractable text – may be a scanned PDF]" | |
| full = "\n\n".join(pages) | |
| return full[:14000] + "\n\n[...truncated]" if len(full) > 14000 else full | |
| except Exception as e: | |
| return f"[PDF read error: {e}]" | |
| def phoenix_analyse(pdf_file): | |
| if not pdf_file: | |
| return "Please upload a PDF.", "", "", "" | |
| doc = extract_pdf_text(pdf_file) | |
| if doc.startswith("["): | |
| return doc, "", "", "" | |
| base = f"DOCUMENT:\n\n{doc}\n\n{'=' * 60}\n\n" | |
| r1 = call_llama( | |
| base | |
| + "You are Introbot – part of the Metabolic Phoenix Framework by Dwayne Anthony Brian Galloway.\n\n" | |
| + "From this document extract:\n" | |
| + "- Every key fact, who said it and when\n" | |
| + "- Every admission, explicit or implied\n" | |
| + "- Every denial\n" | |
| + "- Every contradiction\n" | |
| + "- Every gap – what should have been said but wasn't\n" | |
| + "- Any statutory duties that were triggered\n\n" | |
| + "Format as a clean numbered FACT LEDGER." | |
| ) | |
| r2 = call_llama( | |
| base | |
| + "You are RHB 2.0 + CPF – part of the Metabolic Phoenix Framework by Dwayne Anthony Brian Galloway.\n\n" | |
| + "From this document:\n" | |
| + "- Map every institution to every statutory duty they had\n" | |
| + "- State COMPLIANT or NON-COMPLIANT for each duty with evidence\n" | |
| + "- Identify every legal error made\n" | |
| + "- Trace the full causal chain from first breach to final harm\n" | |
| + "- Identify any Equality Act 2010 breaches especially s.13 and s.20\n" | |
| + "- State what remedy is appropriate for each breach\n\n" | |
| + "Be forensic. Cite statute sections." | |
| ) | |
| r3 = call_llama( | |
| base | |
| + "You are Phoenix Badger – part of the Metabolic Phoenix Framework by Dwayne Anthony Brian Galloway.\n\n" | |
| + "Run all four Ouroboros rings on this document:\n\n" | |
| + "RING A – FORWARD SCAN: What anomalies, evasion tactics and red flags are present?\n\n" | |
| + "RING B – BACKWARD SWEEP: What past decisions in this document would be decided differently under current law?\n\n" | |
| + "RING C – TYPOLOGY MUTATION: What new patterns of institutional misconduct does this reveal?\n\n" | |
| + "RING D – DRIFT CORRECTION: What internal contradictions or inconsistencies exist within the document itself?\n\n" | |
| + "End with ESCALATION MAP: which bodies should receive this in priority order and why." | |
| ) | |
| r4 = call_llama( | |
| f"You are Layoutbot – part of the Metabolic Phoenix Framework by Dwayne Anthony Brian Galloway.\n\n" | |
| f"DOCUMENT EXTRACT:\n{doc[:3000]}\n\n" | |
| f"FACT LEDGER:\n{r1[:1500]}\n\n" | |
| f"COMPLIANCE ANALYSIS:\n{r2[:1500]}\n\n" | |
| f"LOOP OUTPUT:\n{r3[:1500]}\n\n" | |
| f"{'=' * 60}\n\n" | |
| "Produce a complete litigation-ready case summary:\n\n" | |
| "EXECUTIVE SUMMARY – plain English, what happened\n" | |
| "KEY BREACHES – each with statute\n" | |
| "TOP 5 ARGUMENTS – strongest first\n" | |
| "RECOMMENDED NEXT STEPS – in priority order\n" | |
| "ESCALATION MAP – which body gets what and when\n\n" | |
| "Ms. Nala | Injectiv8 | Dwayne Anthony Brian Galloway" | |
| ) | |
| return r1, r2, r3, r4 | |
| def save_phoenix_analysis(r1, r2, r3, r4): | |
| if not r1: | |
| return "Nothing to save – run an analysis first." | |
| ts = time.strftime("%Y-%m-%d %H:%M") | |
| doc_id = str(int(time.time() * 1000)) | |
| content = ( | |
| "METABOLIC PHOENIX FRAMEWORK – FULL ANALYSIS\n" | |
| f"Generated: {ts}\n" | |
| "Ms. Nala | Injectiv8 | Dwayne Anthony Brian Galloway\n\n" | |
| f"{'=' * 60}\n\nCYCLE 1 – FACT LEDGER\n\n{r1}\n\n" | |
| f"{'=' * 60}\n\nCYCLE 2 – COMPLIANCE ANALYSIS\n\n{r2}\n\n" | |
| f"{'=' * 60}\n\nCYCLE 3 – OUROBOROS LOOPS\n\n{r3}\n\n" | |
| f"{'=' * 60}\n\nCYCLE 4 – FULL CASE OUTPUT\n\n{r4}" | |
| ) | |
| try: | |
| index_path = SAVE_DIR / "index.json" | |
| library = json.loads(index_path.read_text()) if index_path.exists() else [] | |
| entry = { | |
| "id": doc_id, | |
| "title": f"Analysis – {ts}", | |
| "content": content, | |
| "saved_at": ts, | |
| } | |
| library.insert(0, entry) | |
| index_path.write_text(json.dumps(library, indent=2), encoding="utf-8") | |
| (SAVE_DIR / f"{doc_id}.txt").write_text(content, encoding="utf-8") | |
| return f"Saved: Analysis – {ts}" | |
| except Exception as e: | |
| return f"Save error: {e}" | |
| def load_library(): | |
| index_path = SAVE_DIR / "index.json" | |
| try: | |
| return json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else [] | |
| except Exception: | |
| return [] | |
| def lib_choices(library): | |
| return [f"{e['title']} ({e['saved_at']})" for e in library] | |
| def choice_to_entry(library, choice): | |
| choices = lib_choices(library) | |
| try: | |
| return library[choices.index(choice)] | |
| except Exception: | |
| return None | |
| def lib_refresh(): | |
| library = load_library() | |
| return gr.update(choices=lib_choices(library), value=None), "" | |
| def lib_view(selection): | |
| library = load_library() | |
| if not selection: | |
| return "" | |
| entry = choice_to_entry(library, selection) | |
| return entry["content"] if entry else "Not found." | |
| def lib_download(selection): | |
| library = load_library() | |
| if not selection: | |
| return gr.update(visible=False), "Select a document first." | |
| entry = choice_to_entry(library, selection) | |
| if not entry: | |
| return gr.update(visible=False), "Not found." | |
| file_path = SAVE_DIR / f"{entry['id']}.txt" | |
| if file_path.exists(): | |
| return gr.update(value=str(file_path), visible=True), "Ready." | |
| return gr.update(visible=False), "File missing." | |
| def lib_delete(selection): | |
| library = load_library() | |
| if not selection: | |
| return gr.update(choices=lib_choices(library), value=None), "Select a document." | |
| entry = choice_to_entry(library, selection) | |
| if not entry: | |
| return gr.update(choices=lib_choices(library), value=None), "Not found." | |
| library = [x for x in library if x["id"] != entry["id"]] | |
| (SAVE_DIR / "index.json").write_text(json.dumps(library, indent=2), encoding="utf-8") | |
| file_path = SAVE_DIR / f"{entry['id']}.txt" | |
| if file_path.exists(): | |
| file_path.unlink() | |
| return gr.update(choices=lib_choices(library), value=None), "Deleted." | |
| class DocumentParser: | |
| def extract_text_from_pdf(file_path: str) -> str: | |
| try: | |
| if PyPDF2 is None: | |
| return "Error reading PDF: PyPDF2 unavailable" | |
| with open(file_path, "rb") as f: | |
| reader = PyPDF2.PdfReader(f) | |
| text = "" | |
| for page in reader.pages: | |
| extracted = page.extract_text() | |
| if extracted: | |
| text += extracted + "\n" | |
| return text | |
| except Exception as e: | |
| return f"Error reading PDF: {str(e)}" | |
| def extract_text_from_docx(file_path: str) -> str: | |
| try: | |
| if Document is None: | |
| return "Error reading DOCX: python-docx not installed" | |
| doc = Document(file_path) | |
| return "\n\n".join([p.text for p in doc.paragraphs if p.text.strip()]) | |
| except Exception as e: | |
| return f"Error reading DOCX: {str(e)}" | |
| def extract_text_from_file(file_path: str) -> str: | |
| if not file_path: | |
| return "Error: No file path provided" | |
| try: | |
| lower = file_path.lower() | |
| if lower.endswith(".pdf"): | |
| return DocumentParser.extract_text_from_pdf(file_path) | |
| if lower.endswith(".docx"): | |
| return DocumentParser.extract_text_from_docx(file_path) | |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read() | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| def parse_document(text: str) -> Dict: | |
| paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()] | |
| sentences = [] | |
| for paragraph in paragraphs: | |
| sentences.extend([s.strip() + "." for s in paragraph.split(".") if s.strip()]) | |
| return { | |
| "full_text": text, | |
| "paragraphs": paragraphs, | |
| "sentences": sentences, | |
| "word_count": len(text.split()), | |
| "char_count": len(text), | |
| } | |
| class ForensicAnalyzer: | |
| RISK_PATTERNS = { | |
| "denial_trap": r"\b(deny|denies|denied|denial|no such|never|not applicable)\b", | |
| "admission_by_silence": r"\b(fail|failed|failure|omit|omitted|omission|no response|no reply)\b", | |
| "procedural_hedging": r"\b(without prejudice|subject to|conditional|provided that|notwithstanding)\b", | |
| "burden_flip": r"\b(claimant must|burden|onus|responsibility|incumbent upon|required to prove)\b", | |
| } | |
| def find_duplicates(docs: List[Dict]) -> List[Dict]: | |
| duplicates = [] | |
| for i, doc1 in enumerate(docs): | |
| for j, doc2 in enumerate(docs[i + 1:], start=i + 1): | |
| for para1 in doc1["paragraphs"]: | |
| for para2 in doc2["paragraphs"]: | |
| if len(para1) < 20 or len(para2) < 20: | |
| continue | |
| ratio = SequenceMatcher(None, para1, para2).ratio() | |
| if ratio > 0.85: | |
| duplicates.append({ | |
| "doc1": i, | |
| "doc2": j, | |
| "similarity": ratio, | |
| "text": para1[:5000] + "...", | |
| "type": "duplicate" if ratio > 0.95 else "near_duplicate", | |
| }) | |
| return duplicates | |
| def find_unique_content(docs: List[Dict]) -> List[Dict]: | |
| unique_items = [] | |
| for i, doc in enumerate(docs): | |
| for para in doc["paragraphs"]: | |
| if len(para) < 20: | |
| continue | |
| is_unique = True | |
| for j, other_doc in enumerate(docs): | |
| if i == j: | |
| continue | |
| for other_para in other_doc["paragraphs"]: | |
| if SequenceMatcher(None, para, other_para).ratio() > 0.8: | |
| is_unique = False | |
| break | |
| if not is_unique: | |
| break | |
| if is_unique: | |
| unique_items.append({ | |
| "doc_index": i, | |
| "content": para, | |
| "length": len(para), | |
| "type": "unique_paragraph", | |
| }) | |
| return unique_items | |
| def detect_risk_language(text: str) -> List[Dict]: | |
| findings = [] | |
| for risk_type, pattern in ForensicAnalyzer.RISK_PATTERNS.items(): | |
| for match in re.finditer(pattern, text, re.IGNORECASE): | |
| start = max(0, match.start() - 50) | |
| end = min(len(text), match.end() + 50) | |
| findings.append({ | |
| "type": risk_type, | |
| "matched_text": match.group(), | |
| "context": text[start:end], | |
| "position": match.start(), | |
| }) | |
| return findings | |
| def analyze_documents(docs: List[Dict]) -> Dict: | |
| return { | |
| "duplicates": ForensicAnalyzer.find_duplicates(docs), | |
| "unique_content": ForensicAnalyzer.find_unique_content(docs), | |
| "risk_language": [ | |
| item | |
| for doc in docs | |
| for item in ForensicAnalyzer.detect_risk_language(doc["full_text"]) | |
| ], | |
| } | |
| class DocumentMerger: | |
| def select_best_content(duplicates, docs): | |
| best_versions = [] | |
| processed = set() | |
| for duplicate in duplicates: | |
| key = tuple(sorted([duplicate["doc1"], duplicate["doc2"]])) | |
| if key in processed: | |
| continue | |
| processed.add(key) | |
| doc1_text = docs[duplicate["doc1"]]["full_text"] | |
| doc2_text = docs[duplicate["doc2"]]["full_text"] | |
| best_versions.append(doc1_text if len(doc1_text) > len(doc2_text) else doc2_text) | |
| return best_versions | |
| def merge_documents(docs, analysis): | |
| output = [] | |
| output.append("MASTER EVIDENCE DOCUMENT") | |
| output.append("=" * 80) | |
| output.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| output.append(f"Source Documents: {len(docs)}") | |
| output.append("=" * 80 + "\n") | |
| output.append("EXECUTIVE SUMMARY") | |
| output.append("-" * 80) | |
| output.append(f"Total Duplicates Found: {len(analysis['duplicates'])}") | |
| output.append(f"Unique Content Items: {len(analysis['unique_content'])}") | |
| output.append(f"Risk Language Instances: {len(analysis['risk_language'])}\n") | |
| output.append("CONSOLIDATED CONTENT") | |
| output.append("-" * 80) | |
| best_versions = DocumentMerger.select_best_content(analysis["duplicates"], docs) | |
| if best_versions: | |
| for i, content in enumerate(best_versions, 1): | |
| output.append(f"\n[Section {i}]") | |
| output.append(content[:50000] + "..." if len(content) > 50000 else content) | |
| else: | |
| output.append("No duplicate content found across documents.") | |
| output.append("\n\nUNIQUE FINDINGS") | |
| output.append("-" * 80) | |
| for item in analysis["unique_content"][:100]: | |
| output.append(f"\n[From Document {item['doc_index'] + 1}]") | |
| content = item["content"] | |
| output.append(content[:50000] + "..." if len(content) > 50000 else content) | |
| output.append("\n\nRISK LANGUAGE ANALYSIS") | |
| output.append("-" * 80) | |
| risk_summary = {} | |
| for risk in analysis["risk_language"]: | |
| risk_summary[risk["type"]] = risk_summary.get(risk["type"], 0) + 1 | |
| if risk_summary: | |
| for risk_type, count in risk_summary.items(): | |
| output.append(f"{risk_type.replace('_', ' ').title()}: {count} instances") | |
| else: | |
| output.append("No risk language patterns detected.") | |
| output.append("\n" + "=" * 80) | |
| output.append("END OF MASTER EVIDENCE DOCUMENT") | |
| return "\n".join(output) | |
| def ai_summarise(merged_doc: str, analysis: Dict) -> str: | |
| try: | |
| hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN") | |
| risk_summary = {} | |
| for risk in analysis.get("risk_language", []): | |
| risk_summary[risk["type"]] = risk_summary.get(risk["type"], 0) + 1 | |
| input_text = f""" | |
| Forensic document analysis results: | |
| - Duplicates found: {len(analysis.get('duplicates', []))} | |
| - Unique content items: {len(analysis.get('unique_content', []))} | |
| - Risk language: {json.dumps(risk_summary)} | |
| Document excerpt: | |
| {merged_doc[:50000]} | |
| """.strip() | |
| headers = {"Content-Type": "application/json"} | |
| if hf_token: | |
| headers["Authorization"] = f"Bearer {hf_token}" | |
| response = requests.post( | |
| "https://router.huggingface.co/hf-inference/models/facebook/bart-large-cnn", | |
| headers=headers, | |
| json={"inputs": input_text, "parameters": {"max_length": 400, "min_length": 100, "do_sample": False}}, | |
| timeout=120, | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, list) and data: | |
| summary = data[0].get("summary_text", "") | |
| risk_lines = "\n".join([f"- {k.replace('_', ' ').title()}: {v} instances" for k, v in risk_summary.items()]) | |
| return f"AI SUMMARY\n{'=' * 60}\n\n{summary}\n\nRisk Language Breakdown:\n{risk_lines}" | |
| return "Summary could not be extracted from model response." | |
| if response.status_code == 503: | |
| return "Model is loading. Try again shortly." | |
| return f"Summarisation unavailable. Status {response.status_code}: {response.text}" | |
| except Exception as e: | |
| return f"Summarisation error: {str(e)}" | |
| def export_txt(merged_doc: str, analysis_report: str) -> str: | |
| try: | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="w", encoding="utf-8") | |
| tmp.write("SEAMSTRESSES FRAMEWORK – EXPORT\n") | |
| tmp.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
| tmp.write("=" * 80 + "\n\n") | |
| tmp.write(merged_doc) | |
| tmp.write("\n\n" + "=" * 80 + "\n\n") | |
| tmp.write(analysis_report) | |
| tmp.close() | |
| return tmp.name | |
| except Exception: | |
| return None | |
| def export_pdf(merged_doc: str, analysis_report: str) -> str: | |
| try: | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import mm | |
| from reportlab.lib import colors | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, HRFlowable | |
| from reportlab.lib.enums import TA_CENTER | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") | |
| tmp.close() | |
| doc = SimpleDocTemplate( | |
| tmp.name, | |
| pagesize=A4, | |
| rightMargin=20 * mm, | |
| leftMargin=20 * mm, | |
| topMargin=20 * mm, | |
| bottomMargin=20 * mm, | |
| ) | |
| styles = getSampleStyleSheet() | |
| title_style = ParagraphStyle( | |
| "Title", | |
| parent=styles["Heading1"], | |
| fontSize=16, | |
| alignment=TA_CENTER, | |
| spaceAfter=6, | |
| ) | |
| heading_style = ParagraphStyle( | |
| "Heading", | |
| parent=styles["Heading2"], | |
| fontSize=11, | |
| textColor=colors.HexColor("#1a1a2e"), | |
| spaceAfter=4, | |
| ) | |
| body_style = ParagraphStyle( | |
| "Body", | |
| parent=styles["Normal"], | |
| fontSize=9, | |
| leading=14, | |
| spaceAfter=3, | |
| ) | |
| story = [] | |
| story.append(Paragraph("Seamstresses Framework", title_style)) | |
| story.append(Paragraph("Forensic Document Analysis Report", heading_style)) | |
| story.append(Paragraph(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", body_style)) | |
| story.append(HRFlowable(width="100%", thickness=1, color=colors.grey)) | |
| story.append(Spacer(1, 10)) | |
| story.append(Paragraph("Master Document", heading_style)) | |
| for line in merged_doc.split("\n"): | |
| if line.strip(): | |
| safe = line.replace("&", "&").replace("<", "<").replace(">", ">") | |
| if line.startswith("===") or line.startswith("---"): | |
| story.append(HRFlowable(width="100%", thickness=0.5, color=colors.lightgrey)) | |
| elif line.isupper() and len(line) > 3 and not line.startswith("["): | |
| story.append(Paragraph(safe, heading_style)) | |
| else: | |
| story.append(Paragraph(safe, body_style)) | |
| story.append(Spacer(1, 12)) | |
| story.append(Paragraph("Analysis Report", heading_style)) | |
| story.append(HRFlowable(width="100%", thickness=0.5, color=colors.lightgrey)) | |
| for line in analysis_report.split("\n"): | |
| if line.strip(): | |
| safe = line.replace("&", "&").replace("<", "<").replace(">", ">") | |
| if line.isupper() and len(line) > 3: | |
| story.append(Paragraph(safe, heading_style)) | |
| else: | |
| story.append(Paragraph(safe, body_style)) | |
| doc.build(story) | |
| return tmp.name | |
| except Exception: | |
| return None | |
| def export_docx(merged_doc: str, analysis_report: str, summary: str) -> str: | |
| try: | |
| if Document is None: | |
| return None | |
| from docx.enum.text import WD_ALIGN_PARAGRAPH | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".docx") | |
| tmp.close() | |
| doc = Document() | |
| title = doc.add_heading("Seamstresses Framework", 0) | |
| title.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| sub = doc.add_heading("Forensic Document Analysis Report", level=2) | |
| sub.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| doc.add_paragraph(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| doc.add_paragraph() | |
| if summary: | |
| doc.add_heading("AI Summary", level=1) | |
| doc.add_paragraph(summary) | |
| doc.add_paragraph() | |
| doc.add_heading("Master Document", level=1) | |
| for line in merged_doc.split("\n"): | |
| if line.strip(): | |
| if line.startswith("===") or line.startswith("---"): | |
| doc.add_paragraph("─" * 60) | |
| elif line.isupper() and len(line) > 3 and not line.startswith("["): | |
| doc.add_heading(line, level=2) | |
| else: | |
| doc.add_paragraph(line) | |
| doc.add_page_break() | |
| doc.add_heading("Analysis Report", level=1) | |
| for line in analysis_report.split("\n"): | |
| if line.strip(): | |
| if line.isupper() and len(line) > 3: | |
| doc.add_heading(line, level=2) | |
| else: | |
| doc.add_paragraph(line) | |
| doc.save(tmp.name) | |
| return tmp.name | |
| except Exception: | |
| return None | |
| def process_documents(file1, file2, file3): | |
| try: | |
| raw_inputs = [file1, file2, file3] | |
| files = [] | |
| for uploaded in raw_inputs: | |
| if uploaded is None: | |
| continue | |
| if isinstance(uploaded, dict): | |
| path = uploaded.get("path") or uploaded.get("name") | |
| elif hasattr(uploaded, "name"): | |
| path = uploaded.name | |
| elif isinstance(uploaded, str): | |
| path = uploaded | |
| else: | |
| path = str(uploaded) | |
| if path: | |
| files.append(path) | |
| if not files: | |
| return "Please upload at least one document.", "", "", None, None, None | |
| docs = [] | |
| file_names = [] | |
| for file_path in files: | |
| text = DocumentParser.extract_text_from_file(file_path) | |
| if not text or text.startswith("Error"): | |
| return f"Error: {text}", "", "", None, None, None | |
| docs.append(DocumentParser.parse_document(text)) | |
| file_names.append(os.path.basename(file_path)) | |
| analysis = ForensicAnalyzer.analyze_documents(docs) | |
| merged_doc = DocumentMerger.merge_documents(docs, analysis) | |
| analysis_report = f"FORENSIC ANALYSIS REPORT\n{'=' * 80}\n\n" | |
| analysis_report += f"FILES ANALYZED: {', '.join(file_names)}\n\n" | |
| analysis_report += f"DUPLICATE ANALYSIS\n{'-' * 80}\n" | |
| analysis_report += f"Total Duplicates Found: {len(analysis['duplicates'])}\n\n" | |
| for duplicate in analysis["duplicates"][:50]: | |
| analysis_report += ( | |
| f"- Documents {duplicate['doc1'] + 1} & {duplicate['doc2'] + 1}: " | |
| f"{duplicate['similarity']:.1%} similarity\n" | |
| ) | |
| analysis_report += f"\nUNIQUE CONTENT\n{'-' * 80}\n" | |
| analysis_report += f"Total Unique Items: {len(analysis['unique_content'])}\n\n" | |
| for item in analysis["unique_content"][:50]: | |
| analysis_report += f"- From Document {item['doc_index'] + 1}: {item['content'][:5000]}...\n" | |
| analysis_report += f"\nRISK LANGUAGE DETECTED\n{'-' * 80}\n" | |
| risk_types = {} | |
| for risk in analysis["risk_language"]: | |
| risk_types[risk["type"]] = risk_types.get(risk["type"], 0) + 1 | |
| if risk_types: | |
| for risk_type, count in risk_types.items(): | |
| analysis_report += f"- {risk_type.replace('_', ' ').title()}: {count} instances\n" | |
| else: | |
| analysis_report += "No risk language patterns detected.\n" | |
| summary = ai_summarise(merged_doc, analysis) | |
| txt_path = export_txt(merged_doc, analysis_report) | |
| pdf_path = export_pdf(merged_doc, analysis_report) | |
| docx_path = export_docx(merged_doc, analysis_report, summary) | |
| return merged_doc, analysis_report, summary, txt_path, pdf_path, docx_path | |
| except Exception as e: | |
| import traceback | |
| return f"Error: {str(e)}\n\n{traceback.format_exc()}", "", "", None, None, None | |
| CSS = """ | |
| @import url('https://fonts.googleapis.com/css2?family=Syne:wght@400;700;800&family=DM+Mono:wght@400;500&display=swap'); | |
| :root { | |
| --bg: #030712; | |
| --surface: #0a0f1e; | |
| --surface2: #0f1629; | |
| --border: #1a2544; | |
| --accent: #00CFFF; | |
| --accent2: #FFE500; | |
| --accent3: #0066FF; | |
| --text: #E8F4FF; | |
| --muted: #8AA0C0; | |
| --r: 8px; | |
| } | |
| body, .gradio-container { | |
| background: var(--bg) !important; | |
| font-family: 'Syne', sans-serif !important; | |
| color: var(--text) !important; | |
| } | |
| .hdr { | |
| text-align: center; | |
| padding: 2.4rem 1rem 1.6rem; | |
| border-bottom: 1px solid var(--border); | |
| background: | |
| radial-gradient(ellipse at 30% 50%, rgba(0,102,255,0.12) 0%, transparent 60%), | |
| radial-gradient(ellipse at 70% 50%, rgba(0,207,255,0.08) 0%, transparent 60%); | |
| } | |
| .hdr h1 { | |
| font-size: 2.2rem; | |
| font-weight: 800; | |
| letter-spacing: -0.04em; | |
| background: linear-gradient(135deg, var(--accent) 0%, var(--accent2) 100%); | |
| -webkit-background-clip: text; | |
| -webkit-text-fill-color: transparent; | |
| margin: 0 0 0.3rem; | |
| } | |
| .hdr .sub { | |
| color: var(--muted); | |
| font-family: 'DM Mono', monospace; | |
| font-size: .75rem; | |
| letter-spacing: .06em; | |
| } | |
| .hdr .own { | |
| color: var(--accent2); | |
| font-family: 'DM Mono', monospace; | |
| font-size: .68rem; | |
| margin-top: .3rem; | |
| letter-spacing: .1em; | |
| text-transform: uppercase; | |
| } | |
| textarea, input[type=text] { | |
| background: var(--surface) !important; | |
| border: 1px solid var(--border) !important; | |
| border-radius: var(--r) !important; | |
| color: var(--text) !important; | |
| font-family: 'DM Mono', monospace !important; | |
| } | |
| button.primary { | |
| background: linear-gradient(135deg, var(--accent3), var(--accent)) !important; | |
| color: #030712 !important; | |
| border: none !important; | |
| border-radius: var(--r) !important; | |
| font-family: 'Syne', sans-serif !important; | |
| font-weight: 800 !important; | |
| } | |
| button.secondary { | |
| background: var(--surface2) !important; | |
| color: var(--accent2) !important; | |
| border: 1px solid var(--accent3) !important; | |
| border-radius: var(--r) !important; | |
| font-family: 'Syne', sans-serif !important; | |
| font-weight: 700 !important; | |
| } | |
| .out textarea { | |
| background: #020510 !important; | |
| border-left: 3px solid var(--accent) !important; | |
| font-family: 'DM Mono', monospace !important; | |
| font-size: .84rem !important; | |
| line-height: 1.7 !important; | |
| } | |
| .foot { | |
| text-align: center; | |
| padding: 1.5rem; | |
| color: var(--muted); | |
| font-family: 'DM Mono', monospace; | |
| font-size: .68rem; | |
| border-top: 1px solid var(--border); | |
| margin-top: 2rem; | |
| } | |
| """ | |
| with gr.Blocks(css=CSS, title="Unified Litigation Framework") as demo: | |
| gr.HTML( | |
| """ | |
| <div class="hdr"> | |
| <h1>Unified Litigation Framework</h1> | |
| <p class="sub">Metabolic Phoenix · Seamstresses Framework · Black Polished Chrome</p> | |
| <p class="own">Invented & Owned by Dwayne Anthony Brian Galloway · 2026</p> | |
| </div> | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| with gr.TabItem("Metabolic Phoenix"): | |
| gr.Markdown("### Metabolic Phoenix Framework - Advanced Document Analysis") | |
| pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"], type="filepath") | |
| run_btn = gr.Button("Analyse", variant="primary") | |
| with gr.Accordion("Cycle 1 – Fact Ledger", open=True): | |
| out1 = gr.Textbox(label="", lines=12, elem_classes=["out"]) | |
| with gr.Accordion("Cycle 2 – Compliance Analysis", open=False): | |
| out2 = gr.Textbox(label="", lines=12, elem_classes=["out"]) | |
| with gr.Accordion("Cycle 3 – Ouroboros Loops", open=False): | |
| out3 = gr.Textbox(label="", lines=12, elem_classes=["out"]) | |
| with gr.Accordion("Cycle 4 – Full Case Output", open=False): | |
| out4 = gr.Textbox(label="", lines=12, elem_classes=["out"]) | |
| save_btn = gr.Button("Save Analysis", variant="secondary") | |
| save_msg = gr.Markdown("") | |
| with gr.Accordion("Saved Analyses", open=False): | |
| lib_ref_btn = gr.Button("Refresh List", variant="secondary") | |
| lib_dd = gr.Dropdown( | |
| label="Saved Analyses", | |
| choices=lib_choices(load_library()), | |
| value=None, | |
| interactive=True, | |
| ) | |
| lib_viewer = gr.Textbox( | |
| label="Content", | |
| lines=16, | |
| elem_classes=["out"], | |
| interactive=False, | |
| ) | |
| with gr.Row(): | |
| lib_dl_btn = gr.Button("Download", variant="secondary") | |
| lib_del_btn = gr.Button("Delete", variant="secondary") | |
| lib_dl_file = gr.File(label="Your file", visible=False) | |
| lib_msg = gr.Markdown("") | |
| run_btn.click(phoenix_analyse, inputs=[pdf_input], outputs=[out1, out2, out3, out4]) | |
| save_btn.click(save_phoenix_analysis, inputs=[out1, out2, out3, out4], outputs=[save_msg]) | |
| lib_ref_btn.click(lib_refresh, outputs=[lib_dd, lib_viewer]) | |
| lib_dd.change(lib_view, inputs=[lib_dd], outputs=[lib_viewer]) | |
| lib_dl_btn.click(lib_download, inputs=[lib_dd], outputs=[lib_dl_file, lib_msg]) | |
| lib_del_btn.click(lib_delete, inputs=[lib_dd], outputs=[lib_dd, lib_msg]) | |
| with gr.TabItem("Seamstresses Framework"): | |
| gr.Markdown("### Seamstresses Framework - Forensic Document Analysis") | |
| with gr.Row(): | |
| file1 = gr.File(label="Document 1", file_types=[".pdf", ".docx", ".txt"], type="filepath") | |
| file2 = gr.File(label="Document 2", file_types=[".pdf", ".docx", ".txt"], type="filepath") | |
| file3 = gr.File(label="Document 3", file_types=[".pdf", ".docx", ".txt"], type="filepath") | |
| analyze_btn = gr.Button("Analyze Documents", variant="primary") | |
| with gr.Tabs(): | |
| with gr.TabItem("Master Document"): | |
| merged_output = gr.Textbox(label="Merged Document", lines=30, max_lines=200) | |
| with gr.TabItem("Analysis Report"): | |
| analysis_output = gr.Textbox(label="Forensic Analysis", lines=30, max_lines=200) | |
| with gr.TabItem("AI Summary"): | |
| summary_output = gr.Textbox(label="AI Summary", lines=30, max_lines=200) | |
| with gr.TabItem("Export"): | |
| with gr.Row(): | |
| txt_file = gr.File(label="Plain Text", interactive=False) | |
| pdf_file = gr.File(label="PDF", interactive=False) | |
| docx_file = gr.File(label="Word Document", interactive=False) | |
| analyze_btn.click( | |
| fn=process_documents, | |
| inputs=[file1, file2, file3], | |
| outputs=[merged_output, analysis_output, summary_output, txt_file, pdf_file, docx_file], | |
| ) | |
| gr.Markdown( | |
| """ | |
| ### Risk Language Detected | |
| - Denial Trap | |
| - Admission by Silence | |
| - Procedural Hedging | |
| - Burden Flip | |
| """ | |
| ) | |
| with gr.TabItem("Black Polished Chrome"): | |
| gr.Markdown("### Black Polished Chrome - Litigation Infiltration Engine") | |
| gr.Markdown(""" | |
| **Black Polished Chrome** is a precision methodology for turning institutional wrongdoing into structured, | |
| evidence-led, tribunal-ready force. | |
| **Features:** | |
| - Upload & Build: Extract dates, amounts, smoking guns, admissions, and contradictions | |
| - Master Chronology: Every event extracted and sorted by date | |
| - Fact-Lie-Law-Consequence Matrix: Every breach mapped across four columns | |
| - The Strategy Demon: Tactical engine that reviews your matrix and delivers strategic advice | |
| - Document Generation: Letter Before Claim, SAR, Particulars of Claim, Skeleton Argument, Barrister Brief | |
| - Damages Calculator: Quantify direct loss, injury to feelings, consequential loss, aggravated damages | |
| **Legal Architecture:** | |
| - Consumer Rights Act 2015 | |
| - Equality Act 2010 | |
| - FCA Consumer Duty & PRIN | |
| - UK GDPR & Data Protection Act 2018 | |
| - Defamation & Credit Marker Law | |
| - Hardship & Consequential Loss | |
| Created by Dwayne Anthony Brian Galloway · 2026 | |
| """) | |
| gr.HTML( | |
| """ | |
| <div class="foot"> | |
| THE UNIFIED LITIGATION FRAMEWORK · METABOLIC PHOENIX · SEAMSTRESSES FRAMEWORK · BLACK POLISHED CHROME | |
| · Injectiv8 · Ms. Nala · © Dwayne Anthony Brian Galloway 2026 · All Rights Reserved | |
| </div> | |
| """ | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |