Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import io | |
| import os | |
| import sqlite3 | |
| import sys | |
| from difflib import SequenceMatcher | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from flask import Flask, jsonify, request | |
| from flask_cors import CORS | |
| from werkzeug.security import check_password_hash, generate_password_hash | |
| BASE_DIR = Path(__file__).resolve().parent | |
| PROJECT_ROOT = BASE_DIR.parent | |
| DB_PATH = Path(os.getenv("DB_PATH", BASE_DIR / "app.db")) | |
| app = Flask(__name__) | |
| CORS(app) | |
| def _bootstrap_site_packages() -> None: | |
| """ | |
| Make backend resilient when dependencies are split across: | |
| - project venv site-packages | |
| - user local site-packages (~/.local) | |
| """ | |
| py_ver = f"{sys.version_info.major}.{sys.version_info.minor}" | |
| candidate_paths = [ | |
| PROJECT_ROOT / "venv" / "lib" / f"python{py_ver}" / "site-packages", | |
| Path.home() / ".local" / "lib" / f"python{py_ver}" / "site-packages", | |
| ] | |
| for path in candidate_paths: | |
| path_str = str(path) | |
| if path.exists() and path_str not in sys.path: | |
| sys.path.append(path_str) | |
| _bootstrap_site_packages() | |
| def get_db_connection() -> sqlite3.Connection: | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db() -> None: | |
| with get_db_connection() as conn: | |
| conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| full_name TEXT NOT NULL, | |
| email TEXT NOT NULL UNIQUE, | |
| password_hash TEXT NOT NULL, | |
| created_at TEXT NOT NULL | |
| ) | |
| """ | |
| ) | |
| conn.commit() | |
| def _extract_text_data(file_bytes: bytes, file_ext: str): | |
| if file_ext == "txt": | |
| return [{"text": file_bytes.decode("utf-8", errors="ignore"), "page": 1}] | |
| if file_ext == "pdf": | |
| import pdfplumber | |
| extracted = [] | |
| with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: | |
| for i, page in enumerate(pdf.pages): | |
| text = page.extract_text() or "" | |
| if text.strip(): | |
| extracted.append({"text": text, "page": i + 1}) | |
| return extracted | |
| if file_ext == "docx": | |
| import docx | |
| doc = docx.Document(io.BytesIO(file_bytes)) | |
| text = "\n".join(p.text for p in doc.paragraphs if p.text is not None) | |
| return [{"text": text, "page": 1}] if text.strip() else [] | |
| raise ValueError("Unsupported file type. Use PDF, DOCX, or TXT.") | |
| def _extract_clauses(text_data, source: str = "final", id_start: int = 0): | |
| import re | |
| clauses = [] | |
| clause_id = id_start | |
| for chunk in text_data: | |
| raw_text = chunk.get("text", "") | |
| page_num = chunk.get("page", 1) | |
| pattern = re.compile(r".+?(?:[.!?](?:\s+|$)|$)", re.DOTALL) | |
| for match in pattern.finditer(raw_text): | |
| cleaned = " ".join(match.group(0).split()) | |
| if len(cleaned) < 30: | |
| continue | |
| start_idx = match.start() | |
| line_no = raw_text[:start_idx].count("\n") + 1 | |
| clauses.append( | |
| { | |
| "id": clause_id, | |
| "text": cleaned, | |
| "page": page_num, | |
| "line": line_no, | |
| "source": source, | |
| } | |
| ) | |
| clause_id += 1 | |
| return clauses | |
| def _normalize_person_name(raw: str) -> str: | |
| import re | |
| if not raw: | |
| return "" | |
| cleaned = " ".join(str(raw).split()) | |
| cleaned = re.sub(r"[^A-Za-z.\s]", " ", cleaned) | |
| cleaned = re.sub(r"\s+", " ", cleaned).strip() | |
| cleaned = re.sub(r"\b(mr|mrs|ms|miss|shri|smt)\.?\b", "", cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r"\s+", " ", cleaned).strip() | |
| stop_words = { | |
| "the", | |
| "vendor", | |
| "vendee", | |
| "party", | |
| "agreement", | |
| "hereinafter", | |
| "called", | |
| "referred", | |
| "to", | |
| "as", | |
| "and", | |
| "or", | |
| "by", | |
| "of", | |
| } | |
| parts = [p for p in cleaned.split(" ") if p and p.lower() not in stop_words] | |
| if not parts: | |
| return "" | |
| parts = parts[:4] | |
| name = " ".join(p.capitalize() for p in parts if len(p) > 1) | |
| return name[:80].strip() | |
| def _extract_party_name(text: str, role: str) -> str: | |
| import re | |
| if not text: | |
| return "Not found" | |
| compact = " ".join(str(text).split()) | |
| role_l = role.lower() | |
| patterns = [ | |
| # Role -> Name (e.g., "vendor: suresh kumar") | |
| rf"\b{role_l}\b\s*[:,-]?\s*(?:is\s+)?(?:mr\.?|mrs\.?|ms\.?|shri|smt\.?)?\s*([A-Za-z][A-Za-z.\s]{{1,80}}?)(?=,|\.|;|\bson of\b|\bwife of\b|\bresiding\b|\baged\b|$)", | |
| rf"\bthe\s+{role_l}\b\s*[:,-]?\s*(?:is\s+)?(?:mr\.?|mrs\.?|ms\.?|shri|smt\.?)?\s*([A-Za-z][A-Za-z.\s]{{1,80}}?)(?=,|\.|;|\bson of\b|\bwife of\b|\bresiding\b|\baged\b|$)", | |
| # Name -> role via legal wording | |
| rf"(?:mr\.?|mrs\.?|ms\.?|shri|smt\.?)?\s*([A-Za-z][A-Za-z.\s]{{1,80}}?)\s+(?:hereinafter\s+(?:called|referred\s+to\s+as)|called)\s+(?:the\s+)?{role_l}\b", | |
| # Name (role) | |
| rf"\b([A-Za-z][A-Za-z.\s]{{1,60}}?)\s*\(\s*{role_l}\s*\)", | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, compact, flags=re.IGNORECASE) | |
| if not match: | |
| continue | |
| candidate = _normalize_person_name(match.group(1)) | |
| if candidate: | |
| return candidate | |
| if re.search(rf"\b{role_l}\b", compact, flags=re.IGNORECASE): | |
| return f"{role.title()} mentioned (name not parsed)" | |
| return "Not found" | |
| def _extract_document_parties(text_data): | |
| full_text = "\n".join(chunk.get("text", "") for chunk in (text_data or [])) | |
| vendor = _extract_party_name(full_text, "vendor") | |
| vendee = _extract_party_name(full_text, "vendee") | |
| return {"vendor": vendor, "vendee": vendee} | |
| def _similarity(a: str, b: str) -> float: | |
| return SequenceMatcher(None, a.lower(), b.lower()).ratio() | |
| def _threshold_for_mode(scan_mode: str) -> float: | |
| mode = (scan_mode or "").lower() | |
| if "deep" in mode: | |
| return 0.50 | |
| if "strict" in mode: | |
| return 0.85 | |
| return 0.60 | |
| def _is_supported_ext(file_ext: str) -> bool: | |
| return file_ext in {"pdf", "docx", "txt"} | |
| def _source_label(source: str) -> str: | |
| if source == "final": | |
| return "Final" | |
| if source.startswith("reference_"): | |
| idx = source.split("_")[-1] | |
| return f"Reference {idx}" | |
| return source.title() | |
| def _normalized_clause_text(text: str) -> str: | |
| import re | |
| return re.sub(r"\s+", " ", str(text or "").strip().lower()) | |
| def _token_set(text: str) -> set[str]: | |
| import re | |
| return set(re.findall(r"[a-z]{3,}", _normalized_clause_text(text))) | |
| def _numeric_tokens(text: str) -> set[str]: | |
| import re | |
| return set(re.findall(r"\b\d+(?:[.,]\d+)?%?\b", str(text or ""))) | |
| def _rule_based_category(text_a: str, text_b: str, similarity: float): | |
| a_norm = _normalized_clause_text(text_a) | |
| b_norm = _normalized_clause_text(text_b) | |
| tokens_a = _token_set(text_a) | |
| tokens_b = _token_set(text_b) | |
| common = len(tokens_a & tokens_b) | |
| denom = max(len(tokens_a | tokens_b), 1) | |
| jaccard = common / denom | |
| if a_norm and b_norm and a_norm == b_norm: | |
| return ("duplication", "DUPLICATION_EXACT", 0.99, "Exact repeated clause text.") | |
| if similarity >= 0.94 and jaccard >= 0.88: | |
| return ("duplication", "DUPLICATION_NEAR", 0.94, "Near-duplicate clause wording.") | |
| nums_a = _numeric_tokens(text_a) | |
| nums_b = _numeric_tokens(text_b) | |
| if jaccard >= 0.45 and nums_a and nums_b and nums_a != nums_b: | |
| return ( | |
| "inconsistency", | |
| "NUMERIC_INCONSISTENCY", | |
| 0.9, | |
| f"Numeric mismatch detected: {sorted(nums_a)} vs {sorted(nums_b)}.", | |
| ) | |
| neg_words = ("shall not", "will not", "not", "never", "prohibited", "forbidden") | |
| pos_words = ("shall", "will", "must", "required", "permitted", "allowed") | |
| a_has_neg = any(w in a_norm for w in neg_words) | |
| b_has_neg = any(w in b_norm for w in neg_words) | |
| a_has_pos = any(w in a_norm for w in pos_words) | |
| b_has_pos = any(w in b_norm for w in pos_words) | |
| if jaccard >= 0.5 and ((a_has_neg and b_has_pos) or (b_has_neg and a_has_pos)): | |
| return ("contradiction", "LEGAL_CONFLICT", 0.9, "Opposite obligation/negation polarity.") | |
| return (None, None, 0.0, "") | |
| def _analyze_clauses(clauses, threshold: float, focus_source: str = "final"): | |
| if str(PROJECT_ROOT) not in sys.path: | |
| sys.path.append(str(PROJECT_ROOT)) | |
| try: | |
| from analysis.common_analyzer import analyze_pair | |
| except Exception as exc: | |
| raise RuntimeError(f"Analyzer import failed: {exc}") from exc | |
| findings = [] | |
| line_issues = [] | |
| counts = {"duplication": 0, "inconsistency": 0, "contradiction": 0} | |
| compared_pairs = 0 | |
| max_pairs = 15000 | |
| seen_findings = set() | |
| seen_line_issues = set() | |
| def normalize_category(label: str, reason: str, similarity: float) -> str | None: | |
| lbl = (label or "").upper() | |
| rsn = (reason or "").lower() | |
| if lbl in {"NUMERIC_INCONSISTENCY"}: | |
| return "inconsistency" | |
| if lbl in {"LEGAL_CONFLICT", "CONTRADICTION"}: | |
| return "contradiction" | |
| if lbl in {"DUPLICATION", "ENTAILMENT"}: | |
| return "duplication" | |
| if lbl in {"CANDIDATE", "QUALIFICATION"} and similarity >= 0.92: | |
| return "duplication" | |
| if "negation" in rsn or "conflict" in rsn: | |
| return "contradiction" | |
| return None | |
| for i in range(len(clauses)): | |
| for j in range(i + 1, len(clauses)): | |
| compared_pairs += 1 | |
| if compared_pairs > max_pairs: | |
| break | |
| clause_a = clauses[i] | |
| clause_b = clauses[j] | |
| source_a = str(clause_a.get("source", "final")) | |
| source_b = str(clause_b.get("source", "final")) | |
| # Compare only pairs involving final/focus doc: | |
| # - final vs final | |
| # - final vs reference | |
| if source_a != focus_source and source_b != focus_source: | |
| continue | |
| similarity = _similarity(clause_a["text"], clause_b["text"]) | |
| category, label, confidence, reason = _rule_based_category( | |
| clause_a["text"], clause_b["text"], similarity | |
| ) | |
| if category is None: | |
| label, confidence, reason = analyze_pair( | |
| clause_a["text"], | |
| clause_b["text"], | |
| similarity, | |
| threshold=threshold, | |
| ) | |
| if not label or label == "NO_CONFLICT": | |
| continue | |
| category = normalize_category(label, reason, similarity) | |
| if category is None: | |
| continue | |
| finding_key = ( | |
| category, | |
| clause_a["page"], | |
| clause_a["line"], | |
| clause_b["page"], | |
| clause_b["line"], | |
| label, | |
| ) | |
| if finding_key in seen_findings: | |
| continue | |
| seen_findings.add(finding_key) | |
| findings.append( | |
| { | |
| "issueType": label, | |
| "category": category, | |
| "confidence": round(float(confidence), 4), | |
| "reason": reason, | |
| "clause1": clause_a["text"], | |
| "clause2": clause_b["text"], | |
| "location1": f"Pg {clause_a['page']}, Ln {clause_a['line']}", | |
| "location2": f"Pg {clause_b['page']}, Ln {clause_b['line']}", | |
| "source1": source_a, | |
| "source2": source_b, | |
| "sourceLabel1": _source_label(source_a), | |
| "sourceLabel2": _source_label(source_b), | |
| "page1": clause_a["page"], | |
| "line1": clause_a["line"], | |
| "page2": clause_b["page"], | |
| "line2": clause_b["line"], | |
| } | |
| ) | |
| counts[category] += 1 | |
| for clause in (clause_a, clause_b): | |
| source = str(clause.get("source", "final")) | |
| line_key = (category, source, clause["page"], clause["line"], label) | |
| if line_key in seen_line_issues: | |
| continue | |
| seen_line_issues.add(line_key) | |
| line_issues.append( | |
| { | |
| "category": category, | |
| "issueType": label, | |
| "confidence": round(float(confidence), 4), | |
| "page": clause["page"], | |
| "line": clause["line"], | |
| "source": source, | |
| "sourceLabel": _source_label(source), | |
| "location": f"{_source_label(source)} - Pg {clause['page']}, Ln {clause['line']}", | |
| "reason": reason, | |
| } | |
| ) | |
| if compared_pairs > max_pairs: | |
| break | |
| findings.sort(key=lambda item: item["confidence"], reverse=True) | |
| line_issues.sort(key=lambda item: (item["page"], item["line"])) | |
| return findings, line_issues, counts, compared_pairs | |
| def _build_page_summaries(clauses, line_issues, text_data): | |
| pages = {} | |
| page_text_map = {} | |
| for chunk in text_data or []: | |
| page = int(chunk.get("page", 1)) | |
| if page in page_text_map: | |
| continue | |
| raw = str(chunk.get("text", "") or "") | |
| lines = [ln.strip() for ln in raw.splitlines() if ln.strip()] | |
| page_text_map[page] = " ".join(lines[:2])[:260] | |
| for clause in clauses: | |
| page = int(clause.get("page", 1)) | |
| pages.setdefault( | |
| page, | |
| { | |
| "page": page, | |
| "clauseCount": 0, | |
| "duplicationCount": 0, | |
| "inconsistencyCount": 0, | |
| "contradictionCount": 0, | |
| "issueCount": 0, | |
| "keyLines": [], | |
| "pageSnippet": page_text_map.get(page, ""), | |
| }, | |
| ) | |
| pages[page]["clauseCount"] += 1 | |
| for issue in line_issues: | |
| page = int(issue.get("page", 1)) | |
| pages.setdefault( | |
| page, | |
| { | |
| "page": page, | |
| "clauseCount": 0, | |
| "duplicationCount": 0, | |
| "inconsistencyCount": 0, | |
| "contradictionCount": 0, | |
| "issueCount": 0, | |
| "keyLines": [], | |
| "pageSnippet": page_text_map.get(page, ""), | |
| }, | |
| ) | |
| category = issue.get("category") | |
| if category in {"duplication", "inconsistency", "contradiction"}: | |
| pages[page][f"{category}Count"] += 1 | |
| pages[page]["issueCount"] += 1 | |
| if len(pages[page]["keyLines"]) < 6: | |
| line_ref = f"Ln {issue.get('line', '-')}: {issue.get('issueType', '-')}" | |
| if line_ref not in pages[page]["keyLines"]: | |
| pages[page]["keyLines"].append(line_ref) | |
| page_summaries = [] | |
| for page in sorted(pages.keys()): | |
| item = pages[page] | |
| item["summaryText"] = ( | |
| f"Page {page} contains {item['clauseCount']} clauses and {item['issueCount']} flagged lines " | |
| f"(duplication: {item['duplicationCount']}, inconsistency: {item['inconsistencyCount']}, " | |
| f"contradiction: {item['contradictionCount']})." | |
| ) | |
| page_summaries.append(item) | |
| return page_summaries | |
| def _shorten_text(text: str, limit: int = 220) -> str: | |
| s = " ".join(str(text or "").split()) | |
| if len(s) <= limit: | |
| return s | |
| return s[: limit - 3].rstrip() + "..." | |
| def _clause_label(text: str, fallback_id: int) -> str: | |
| import re | |
| raw = str(text or "") | |
| m = re.search(r"\bclause\s*(\d+)\s*(?:\(([^)]+)\))?", raw, flags=re.IGNORECASE) | |
| if m: | |
| num = m.group(1) | |
| title = (m.group(2) or "").strip() | |
| return f"Clause {num}" + (f" ({title})" if title else "") | |
| return f"Clause {fallback_id}" | |
| def _build_detailed_summary(clauses, page_summaries, findings): | |
| from collections import defaultdict | |
| clauses_by_page = defaultdict(list) | |
| for clause in clauses: | |
| clauses_by_page[int(clause.get("page", 1))].append(clause) | |
| lines = ["Here is the detailed summary of the document content:", ""] | |
| for page_item in page_summaries: | |
| page = int(page_item.get("page", 1)) | |
| page_clauses = sorted(clauses_by_page.get(page, []), key=lambda c: (c.get("line", 0), c.get("id", 0))) | |
| lines.append(f"Page {page} Summary:") | |
| if not page_clauses: | |
| lines.append(f"- No clauses extracted for Page {page}.") | |
| lines.append("") | |
| continue | |
| for idx, clause in enumerate(page_clauses[:12], start=1): | |
| label = _clause_label(clause.get("text", ""), idx) | |
| summary = _shorten_text(clause.get("text", ""), 210) | |
| lines.append(f"- {label}: {summary} (Page {page}, Line {clause.get('line', '-')})") | |
| if len(page_clauses) > 12: | |
| lines.append(f"- Additional clauses on this page: {len(page_clauses) - 12}") | |
| lines.append("") | |
| contradictions = [f for f in findings if f.get("category") == "contradiction"] | |
| inconsistencies = [f for f in findings if f.get("category") == "inconsistency"] | |
| duplicates = [f for f in findings if f.get("category") == "duplication"] | |
| lines.append("Summary of Key Contradictions Noted:") | |
| if contradictions: | |
| for idx, item in enumerate(contradictions[:10], start=1): | |
| lines.append( | |
| f"- {idx}. {item.get('issueType', 'LEGAL_CONFLICT')}: " | |
| f"{_shorten_text(item.get('reason', ''), 170)} " | |
| f"({item.get('location1', '-') } vs {item.get('location2', '-')})" | |
| ) | |
| else: | |
| lines.append("- No strong contradiction pair detected.") | |
| lines.append("") | |
| lines.append("Summary of Key Inconsistencies Noted:") | |
| if inconsistencies: | |
| for idx, item in enumerate(inconsistencies[:10], start=1): | |
| lines.append( | |
| f"- {idx}. {item.get('issueType', 'INCONSISTENCY')}: " | |
| f"{_shorten_text(item.get('reason', ''), 170)} " | |
| f"({item.get('location1', '-') } vs {item.get('location2', '-')})" | |
| ) | |
| else: | |
| lines.append("- No strong inconsistency pair detected.") | |
| lines.append("") | |
| lines.append("Summary of Key Duplications Noted:") | |
| if duplicates: | |
| for idx, item in enumerate(duplicates[:10], start=1): | |
| lines.append( | |
| f"- {idx}. {item.get('issueType', 'DUPLICATION')}: " | |
| f"{_shorten_text(item.get('reason', ''), 170)} " | |
| f"({item.get('location1', '-') } vs {item.get('location2', '-')})" | |
| ) | |
| else: | |
| lines.append("- No major duplication pair detected.") | |
| return "\n".join(lines) | |
| # Ensure schema exists even when started via `flask run`. | |
| init_db() | |
| def health_check(): | |
| return jsonify({"status": "ok"}), 200 | |
| def root(): | |
| return ( | |
| jsonify( | |
| { | |
| "message": "Backend is running.", | |
| "endpoints": [ | |
| "GET /api/health", | |
| "POST /api/register", | |
| "POST /api/login", | |
| "POST /api/analyze", | |
| "GET /health", | |
| "POST /register", | |
| "POST /login", | |
| "POST /analyze", | |
| ], | |
| } | |
| ), | |
| 200, | |
| ) | |
| def health_check_alias(): | |
| return health_check() | |
| def register(): | |
| data = request.get_json(silent=True) or {} | |
| full_name = str(data.get("fullName", "")).strip() | |
| email = str(data.get("email", "")).strip().lower() | |
| password = str(data.get("password", "")) | |
| if not full_name or not email or not password: | |
| return jsonify({"error": "fullName, email, and password are required."}), 400 | |
| if len(password) < 6: | |
| return jsonify({"error": "Password must be at least 6 characters."}), 400 | |
| password_hash = generate_password_hash(password) | |
| created_at = datetime.now(timezone.utc).isoformat() | |
| try: | |
| with get_db_connection() as conn: | |
| conn.execute( | |
| "INSERT INTO users (full_name, email, password_hash, created_at) VALUES (?, ?, ?, ?)", | |
| (full_name, email, password_hash, created_at), | |
| ) | |
| conn.commit() | |
| except sqlite3.IntegrityError: | |
| return jsonify({"error": "Email already registered."}), 409 | |
| return jsonify({"message": "User created successfully."}), 201 | |
| def register_alias(): | |
| return register() | |
| def login(): | |
| data = request.get_json(silent=True) or {} | |
| email = str(data.get("email", "")).strip().lower() | |
| password = str(data.get("password", "")) | |
| if not email or not password: | |
| return jsonify({"error": "email and password are required."}), 400 | |
| with get_db_connection() as conn: | |
| user = conn.execute( | |
| "SELECT id, full_name, email, password_hash FROM users WHERE email = ?", | |
| (email,), | |
| ).fetchone() | |
| if user is None or not check_password_hash(user["password_hash"], password): | |
| return jsonify({"error": "Invalid email or password."}), 401 | |
| return ( | |
| jsonify( | |
| { | |
| "message": "Login successful.", | |
| "user": { | |
| "id": user["id"], | |
| "fullName": user["full_name"], | |
| "email": user["email"], | |
| }, | |
| } | |
| ), | |
| 200, | |
| ) | |
| def analyze(): | |
| uploaded = request.files.get("file") | |
| reference_uploads = request.files.getlist("referenceFiles") | |
| scan_mode = request.form.get("scanMode", "Standard Scan (Recommended)") | |
| threshold = _threshold_for_mode(scan_mode) | |
| if uploaded is None or uploaded.filename is None or uploaded.filename.strip() == "": | |
| return jsonify({"error": "Please upload the final document file."}), 400 | |
| file_ext = uploaded.filename.rsplit(".", 1)[-1].lower() if "." in uploaded.filename else "" | |
| if not _is_supported_ext(file_ext): | |
| return jsonify({"error": "Unsupported file type. Use PDF, DOCX, or TXT."}), 400 | |
| if len(reference_uploads) > 2: | |
| return jsonify({"error": "You can upload up to 2 reference documents."}), 400 | |
| try: | |
| final_file_bytes = uploaded.read() | |
| final_text_data = _extract_text_data(file_bytes=final_file_bytes, file_ext=file_ext) | |
| if not final_text_data: | |
| return jsonify({"error": "Could not extract text from final document."}), 400 | |
| final_clauses = _extract_clauses(final_text_data, source="final", id_start=0) | |
| if len(final_clauses) < 2: | |
| return jsonify({"error": "Not enough clauses found in final document for analysis."}), 400 | |
| all_clauses = list(final_clauses) | |
| for idx, ref in enumerate(reference_uploads, start=1): | |
| if ref is None or ref.filename is None or ref.filename.strip() == "": | |
| continue | |
| ref_ext = ref.filename.rsplit(".", 1)[-1].lower() if "." in ref.filename else "" | |
| if not _is_supported_ext(ref_ext): | |
| return jsonify({"error": f"Unsupported reference file type for {ref.filename}."}), 400 | |
| ref_text_data = _extract_text_data(file_bytes=ref.read(), file_ext=ref_ext) | |
| if not ref_text_data: | |
| continue | |
| ref_clauses = _extract_clauses( | |
| ref_text_data, | |
| source=f"reference_{idx}", | |
| id_start=len(all_clauses), | |
| ) | |
| all_clauses.extend(ref_clauses) | |
| parties = _extract_document_parties(final_text_data) | |
| findings, line_issues, counts, compared_pairs = _analyze_clauses( | |
| clauses=all_clauses, threshold=threshold, focus_source="final" | |
| ) | |
| final_line_issues = [item for item in line_issues if item.get("source") == "final"] | |
| page_summaries = _build_page_summaries( | |
| clauses=final_clauses, line_issues=final_line_issues, text_data=final_text_data | |
| ) | |
| detailed_summary = _build_detailed_summary( | |
| clauses=final_clauses, | |
| page_summaries=page_summaries, | |
| findings=findings, | |
| ) | |
| except Exception as exc: | |
| return jsonify({"error": f"Analysis failed: {exc}"}), 500 | |
| return ( | |
| jsonify( | |
| { | |
| "message": "Analysis completed.", | |
| "summary": { | |
| "scanMode": scan_mode, | |
| "threshold": threshold, | |
| "vendor": parties["vendor"], | |
| "vendee": parties["vendee"], | |
| "clauses": len(final_clauses), | |
| "referenceDocs": len([r for r in reference_uploads if r and r.filename]), | |
| "pairsCompared": compared_pairs, | |
| "issuesFound": len(findings), | |
| "duplicationCount": counts["duplication"], | |
| "inconsistencyCount": counts["inconsistency"], | |
| "contradictionCount": counts["contradiction"], | |
| }, | |
| "pageSummaries": page_summaries, | |
| "detailedSummary": detailed_summary, | |
| "findings": findings[:50], | |
| "lineIssues": line_issues[:200], | |
| "finalLineIssues": final_line_issues[:200], | |
| } | |
| ), | |
| 200, | |
| ) | |
| def login_alias(): | |
| return login() | |
| def analyze_alias(): | |
| return analyze() | |
| if __name__ == "__main__": | |
| # Keep defaults production-safe and compatible with restricted environments. | |
| debug_mode = os.getenv("FLASK_DEBUG", "0") == "1" | |
| host = os.getenv("HOST", "127.0.0.1") | |
| port = int(os.getenv("PORT", "5000")) | |
| app.run(host=host, port=port, debug=debug_mode, use_reloader=False) | |