final_year / backend /app.py
jayasrees's picture
Updated frontend UI and backend improvements
155dd44
from __future__ import annotations
import io
import os
import sqlite3
import sys
from difflib import SequenceMatcher
from datetime import datetime, timezone
from pathlib import Path
from flask import Flask, jsonify, request
from flask_cors import CORS
from werkzeug.security import check_password_hash, generate_password_hash
BASE_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = BASE_DIR.parent
DB_PATH = Path(os.getenv("DB_PATH", BASE_DIR / "app.db"))
app = Flask(__name__)
CORS(app)
def _bootstrap_site_packages() -> None:
"""
Make backend resilient when dependencies are split across:
- project venv site-packages
- user local site-packages (~/.local)
"""
py_ver = f"{sys.version_info.major}.{sys.version_info.minor}"
candidate_paths = [
PROJECT_ROOT / "venv" / "lib" / f"python{py_ver}" / "site-packages",
Path.home() / ".local" / "lib" / f"python{py_ver}" / "site-packages",
]
for path in candidate_paths:
path_str = str(path)
if path.exists() and path_str not in sys.path:
sys.path.append(path_str)
_bootstrap_site_packages()
def get_db_connection() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db() -> None:
with get_db_connection() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
full_name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
created_at TEXT NOT NULL
)
"""
)
conn.commit()
def _extract_text_data(file_bytes: bytes, file_ext: str):
if file_ext == "txt":
return [{"text": file_bytes.decode("utf-8", errors="ignore"), "page": 1}]
if file_ext == "pdf":
import pdfplumber
extracted = []
with pdfplumber.open(io.BytesIO(file_bytes)) as pdf:
for i, page in enumerate(pdf.pages):
text = page.extract_text() or ""
if text.strip():
extracted.append({"text": text, "page": i + 1})
return extracted
if file_ext == "docx":
import docx
doc = docx.Document(io.BytesIO(file_bytes))
text = "\n".join(p.text for p in doc.paragraphs if p.text is not None)
return [{"text": text, "page": 1}] if text.strip() else []
raise ValueError("Unsupported file type. Use PDF, DOCX, or TXT.")
def _extract_clauses(text_data, source: str = "final", id_start: int = 0):
import re
clauses = []
clause_id = id_start
for chunk in text_data:
raw_text = chunk.get("text", "")
page_num = chunk.get("page", 1)
pattern = re.compile(r".+?(?:[.!?](?:\s+|$)|$)", re.DOTALL)
for match in pattern.finditer(raw_text):
cleaned = " ".join(match.group(0).split())
if len(cleaned) < 30:
continue
start_idx = match.start()
line_no = raw_text[:start_idx].count("\n") + 1
clauses.append(
{
"id": clause_id,
"text": cleaned,
"page": page_num,
"line": line_no,
"source": source,
}
)
clause_id += 1
return clauses
def _normalize_person_name(raw: str) -> str:
import re
if not raw:
return ""
cleaned = " ".join(str(raw).split())
cleaned = re.sub(r"[^A-Za-z.\s]", " ", cleaned)
cleaned = re.sub(r"\s+", " ", cleaned).strip()
cleaned = re.sub(r"\b(mr|mrs|ms|miss|shri|smt)\.?\b", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"\s+", " ", cleaned).strip()
stop_words = {
"the",
"vendor",
"vendee",
"party",
"agreement",
"hereinafter",
"called",
"referred",
"to",
"as",
"and",
"or",
"by",
"of",
}
parts = [p for p in cleaned.split(" ") if p and p.lower() not in stop_words]
if not parts:
return ""
parts = parts[:4]
name = " ".join(p.capitalize() for p in parts if len(p) > 1)
return name[:80].strip()
def _extract_party_name(text: str, role: str) -> str:
import re
if not text:
return "Not found"
compact = " ".join(str(text).split())
role_l = role.lower()
patterns = [
# Role -> Name (e.g., "vendor: suresh kumar")
rf"\b{role_l}\b\s*[:,-]?\s*(?:is\s+)?(?:mr\.?|mrs\.?|ms\.?|shri|smt\.?)?\s*([A-Za-z][A-Za-z.\s]{{1,80}}?)(?=,|\.|;|\bson of\b|\bwife of\b|\bresiding\b|\baged\b|$)",
rf"\bthe\s+{role_l}\b\s*[:,-]?\s*(?:is\s+)?(?:mr\.?|mrs\.?|ms\.?|shri|smt\.?)?\s*([A-Za-z][A-Za-z.\s]{{1,80}}?)(?=,|\.|;|\bson of\b|\bwife of\b|\bresiding\b|\baged\b|$)",
# Name -> role via legal wording
rf"(?:mr\.?|mrs\.?|ms\.?|shri|smt\.?)?\s*([A-Za-z][A-Za-z.\s]{{1,80}}?)\s+(?:hereinafter\s+(?:called|referred\s+to\s+as)|called)\s+(?:the\s+)?{role_l}\b",
# Name (role)
rf"\b([A-Za-z][A-Za-z.\s]{{1,60}}?)\s*\(\s*{role_l}\s*\)",
]
for pattern in patterns:
match = re.search(pattern, compact, flags=re.IGNORECASE)
if not match:
continue
candidate = _normalize_person_name(match.group(1))
if candidate:
return candidate
if re.search(rf"\b{role_l}\b", compact, flags=re.IGNORECASE):
return f"{role.title()} mentioned (name not parsed)"
return "Not found"
def _extract_document_parties(text_data):
full_text = "\n".join(chunk.get("text", "") for chunk in (text_data or []))
vendor = _extract_party_name(full_text, "vendor")
vendee = _extract_party_name(full_text, "vendee")
return {"vendor": vendor, "vendee": vendee}
def _similarity(a: str, b: str) -> float:
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def _threshold_for_mode(scan_mode: str) -> float:
mode = (scan_mode or "").lower()
if "deep" in mode:
return 0.50
if "strict" in mode:
return 0.85
return 0.60
def _is_supported_ext(file_ext: str) -> bool:
return file_ext in {"pdf", "docx", "txt"}
def _source_label(source: str) -> str:
if source == "final":
return "Final"
if source.startswith("reference_"):
idx = source.split("_")[-1]
return f"Reference {idx}"
return source.title()
def _normalized_clause_text(text: str) -> str:
import re
return re.sub(r"\s+", " ", str(text or "").strip().lower())
def _token_set(text: str) -> set[str]:
import re
return set(re.findall(r"[a-z]{3,}", _normalized_clause_text(text)))
def _numeric_tokens(text: str) -> set[str]:
import re
return set(re.findall(r"\b\d+(?:[.,]\d+)?%?\b", str(text or "")))
def _rule_based_category(text_a: str, text_b: str, similarity: float):
a_norm = _normalized_clause_text(text_a)
b_norm = _normalized_clause_text(text_b)
tokens_a = _token_set(text_a)
tokens_b = _token_set(text_b)
common = len(tokens_a & tokens_b)
denom = max(len(tokens_a | tokens_b), 1)
jaccard = common / denom
if a_norm and b_norm and a_norm == b_norm:
return ("duplication", "DUPLICATION_EXACT", 0.99, "Exact repeated clause text.")
if similarity >= 0.94 and jaccard >= 0.88:
return ("duplication", "DUPLICATION_NEAR", 0.94, "Near-duplicate clause wording.")
nums_a = _numeric_tokens(text_a)
nums_b = _numeric_tokens(text_b)
if jaccard >= 0.45 and nums_a and nums_b and nums_a != nums_b:
return (
"inconsistency",
"NUMERIC_INCONSISTENCY",
0.9,
f"Numeric mismatch detected: {sorted(nums_a)} vs {sorted(nums_b)}.",
)
neg_words = ("shall not", "will not", "not", "never", "prohibited", "forbidden")
pos_words = ("shall", "will", "must", "required", "permitted", "allowed")
a_has_neg = any(w in a_norm for w in neg_words)
b_has_neg = any(w in b_norm for w in neg_words)
a_has_pos = any(w in a_norm for w in pos_words)
b_has_pos = any(w in b_norm for w in pos_words)
if jaccard >= 0.5 and ((a_has_neg and b_has_pos) or (b_has_neg and a_has_pos)):
return ("contradiction", "LEGAL_CONFLICT", 0.9, "Opposite obligation/negation polarity.")
return (None, None, 0.0, "")
def _analyze_clauses(clauses, threshold: float, focus_source: str = "final"):
if str(PROJECT_ROOT) not in sys.path:
sys.path.append(str(PROJECT_ROOT))
try:
from analysis.common_analyzer import analyze_pair
except Exception as exc:
raise RuntimeError(f"Analyzer import failed: {exc}") from exc
findings = []
line_issues = []
counts = {"duplication": 0, "inconsistency": 0, "contradiction": 0}
compared_pairs = 0
max_pairs = 15000
seen_findings = set()
seen_line_issues = set()
def normalize_category(label: str, reason: str, similarity: float) -> str | None:
lbl = (label or "").upper()
rsn = (reason or "").lower()
if lbl in {"NUMERIC_INCONSISTENCY"}:
return "inconsistency"
if lbl in {"LEGAL_CONFLICT", "CONTRADICTION"}:
return "contradiction"
if lbl in {"DUPLICATION", "ENTAILMENT"}:
return "duplication"
if lbl in {"CANDIDATE", "QUALIFICATION"} and similarity >= 0.92:
return "duplication"
if "negation" in rsn or "conflict" in rsn:
return "contradiction"
return None
for i in range(len(clauses)):
for j in range(i + 1, len(clauses)):
compared_pairs += 1
if compared_pairs > max_pairs:
break
clause_a = clauses[i]
clause_b = clauses[j]
source_a = str(clause_a.get("source", "final"))
source_b = str(clause_b.get("source", "final"))
# Compare only pairs involving final/focus doc:
# - final vs final
# - final vs reference
if source_a != focus_source and source_b != focus_source:
continue
similarity = _similarity(clause_a["text"], clause_b["text"])
category, label, confidence, reason = _rule_based_category(
clause_a["text"], clause_b["text"], similarity
)
if category is None:
label, confidence, reason = analyze_pair(
clause_a["text"],
clause_b["text"],
similarity,
threshold=threshold,
)
if not label or label == "NO_CONFLICT":
continue
category = normalize_category(label, reason, similarity)
if category is None:
continue
finding_key = (
category,
clause_a["page"],
clause_a["line"],
clause_b["page"],
clause_b["line"],
label,
)
if finding_key in seen_findings:
continue
seen_findings.add(finding_key)
findings.append(
{
"issueType": label,
"category": category,
"confidence": round(float(confidence), 4),
"reason": reason,
"clause1": clause_a["text"],
"clause2": clause_b["text"],
"location1": f"Pg {clause_a['page']}, Ln {clause_a['line']}",
"location2": f"Pg {clause_b['page']}, Ln {clause_b['line']}",
"source1": source_a,
"source2": source_b,
"sourceLabel1": _source_label(source_a),
"sourceLabel2": _source_label(source_b),
"page1": clause_a["page"],
"line1": clause_a["line"],
"page2": clause_b["page"],
"line2": clause_b["line"],
}
)
counts[category] += 1
for clause in (clause_a, clause_b):
source = str(clause.get("source", "final"))
line_key = (category, source, clause["page"], clause["line"], label)
if line_key in seen_line_issues:
continue
seen_line_issues.add(line_key)
line_issues.append(
{
"category": category,
"issueType": label,
"confidence": round(float(confidence), 4),
"page": clause["page"],
"line": clause["line"],
"source": source,
"sourceLabel": _source_label(source),
"location": f"{_source_label(source)} - Pg {clause['page']}, Ln {clause['line']}",
"reason": reason,
}
)
if compared_pairs > max_pairs:
break
findings.sort(key=lambda item: item["confidence"], reverse=True)
line_issues.sort(key=lambda item: (item["page"], item["line"]))
return findings, line_issues, counts, compared_pairs
def _build_page_summaries(clauses, line_issues, text_data):
pages = {}
page_text_map = {}
for chunk in text_data or []:
page = int(chunk.get("page", 1))
if page in page_text_map:
continue
raw = str(chunk.get("text", "") or "")
lines = [ln.strip() for ln in raw.splitlines() if ln.strip()]
page_text_map[page] = " ".join(lines[:2])[:260]
for clause in clauses:
page = int(clause.get("page", 1))
pages.setdefault(
page,
{
"page": page,
"clauseCount": 0,
"duplicationCount": 0,
"inconsistencyCount": 0,
"contradictionCount": 0,
"issueCount": 0,
"keyLines": [],
"pageSnippet": page_text_map.get(page, ""),
},
)
pages[page]["clauseCount"] += 1
for issue in line_issues:
page = int(issue.get("page", 1))
pages.setdefault(
page,
{
"page": page,
"clauseCount": 0,
"duplicationCount": 0,
"inconsistencyCount": 0,
"contradictionCount": 0,
"issueCount": 0,
"keyLines": [],
"pageSnippet": page_text_map.get(page, ""),
},
)
category = issue.get("category")
if category in {"duplication", "inconsistency", "contradiction"}:
pages[page][f"{category}Count"] += 1
pages[page]["issueCount"] += 1
if len(pages[page]["keyLines"]) < 6:
line_ref = f"Ln {issue.get('line', '-')}: {issue.get('issueType', '-')}"
if line_ref not in pages[page]["keyLines"]:
pages[page]["keyLines"].append(line_ref)
page_summaries = []
for page in sorted(pages.keys()):
item = pages[page]
item["summaryText"] = (
f"Page {page} contains {item['clauseCount']} clauses and {item['issueCount']} flagged lines "
f"(duplication: {item['duplicationCount']}, inconsistency: {item['inconsistencyCount']}, "
f"contradiction: {item['contradictionCount']})."
)
page_summaries.append(item)
return page_summaries
def _shorten_text(text: str, limit: int = 220) -> str:
s = " ".join(str(text or "").split())
if len(s) <= limit:
return s
return s[: limit - 3].rstrip() + "..."
def _clause_label(text: str, fallback_id: int) -> str:
import re
raw = str(text or "")
m = re.search(r"\bclause\s*(\d+)\s*(?:\(([^)]+)\))?", raw, flags=re.IGNORECASE)
if m:
num = m.group(1)
title = (m.group(2) or "").strip()
return f"Clause {num}" + (f" ({title})" if title else "")
return f"Clause {fallback_id}"
def _build_detailed_summary(clauses, page_summaries, findings):
from collections import defaultdict
clauses_by_page = defaultdict(list)
for clause in clauses:
clauses_by_page[int(clause.get("page", 1))].append(clause)
lines = ["Here is the detailed summary of the document content:", ""]
for page_item in page_summaries:
page = int(page_item.get("page", 1))
page_clauses = sorted(clauses_by_page.get(page, []), key=lambda c: (c.get("line", 0), c.get("id", 0)))
lines.append(f"Page {page} Summary:")
if not page_clauses:
lines.append(f"- No clauses extracted for Page {page}.")
lines.append("")
continue
for idx, clause in enumerate(page_clauses[:12], start=1):
label = _clause_label(clause.get("text", ""), idx)
summary = _shorten_text(clause.get("text", ""), 210)
lines.append(f"- {label}: {summary} (Page {page}, Line {clause.get('line', '-')})")
if len(page_clauses) > 12:
lines.append(f"- Additional clauses on this page: {len(page_clauses) - 12}")
lines.append("")
contradictions = [f for f in findings if f.get("category") == "contradiction"]
inconsistencies = [f for f in findings if f.get("category") == "inconsistency"]
duplicates = [f for f in findings if f.get("category") == "duplication"]
lines.append("Summary of Key Contradictions Noted:")
if contradictions:
for idx, item in enumerate(contradictions[:10], start=1):
lines.append(
f"- {idx}. {item.get('issueType', 'LEGAL_CONFLICT')}: "
f"{_shorten_text(item.get('reason', ''), 170)} "
f"({item.get('location1', '-') } vs {item.get('location2', '-')})"
)
else:
lines.append("- No strong contradiction pair detected.")
lines.append("")
lines.append("Summary of Key Inconsistencies Noted:")
if inconsistencies:
for idx, item in enumerate(inconsistencies[:10], start=1):
lines.append(
f"- {idx}. {item.get('issueType', 'INCONSISTENCY')}: "
f"{_shorten_text(item.get('reason', ''), 170)} "
f"({item.get('location1', '-') } vs {item.get('location2', '-')})"
)
else:
lines.append("- No strong inconsistency pair detected.")
lines.append("")
lines.append("Summary of Key Duplications Noted:")
if duplicates:
for idx, item in enumerate(duplicates[:10], start=1):
lines.append(
f"- {idx}. {item.get('issueType', 'DUPLICATION')}: "
f"{_shorten_text(item.get('reason', ''), 170)} "
f"({item.get('location1', '-') } vs {item.get('location2', '-')})"
)
else:
lines.append("- No major duplication pair detected.")
return "\n".join(lines)
# Ensure schema exists even when started via `flask run`.
init_db()
@app.get("/api/health")
def health_check():
return jsonify({"status": "ok"}), 200
@app.get("/")
def root():
return (
jsonify(
{
"message": "Backend is running.",
"endpoints": [
"GET /api/health",
"POST /api/register",
"POST /api/login",
"POST /api/analyze",
"GET /health",
"POST /register",
"POST /login",
"POST /analyze",
],
}
),
200,
)
@app.get("/health")
def health_check_alias():
return health_check()
@app.post("/api/register")
def register():
data = request.get_json(silent=True) or {}
full_name = str(data.get("fullName", "")).strip()
email = str(data.get("email", "")).strip().lower()
password = str(data.get("password", ""))
if not full_name or not email or not password:
return jsonify({"error": "fullName, email, and password are required."}), 400
if len(password) < 6:
return jsonify({"error": "Password must be at least 6 characters."}), 400
password_hash = generate_password_hash(password)
created_at = datetime.now(timezone.utc).isoformat()
try:
with get_db_connection() as conn:
conn.execute(
"INSERT INTO users (full_name, email, password_hash, created_at) VALUES (?, ?, ?, ?)",
(full_name, email, password_hash, created_at),
)
conn.commit()
except sqlite3.IntegrityError:
return jsonify({"error": "Email already registered."}), 409
return jsonify({"message": "User created successfully."}), 201
@app.post("/register")
def register_alias():
return register()
@app.post("/api/login")
def login():
data = request.get_json(silent=True) or {}
email = str(data.get("email", "")).strip().lower()
password = str(data.get("password", ""))
if not email or not password:
return jsonify({"error": "email and password are required."}), 400
with get_db_connection() as conn:
user = conn.execute(
"SELECT id, full_name, email, password_hash FROM users WHERE email = ?",
(email,),
).fetchone()
if user is None or not check_password_hash(user["password_hash"], password):
return jsonify({"error": "Invalid email or password."}), 401
return (
jsonify(
{
"message": "Login successful.",
"user": {
"id": user["id"],
"fullName": user["full_name"],
"email": user["email"],
},
}
),
200,
)
@app.post("/api/analyze")
def analyze():
uploaded = request.files.get("file")
reference_uploads = request.files.getlist("referenceFiles")
scan_mode = request.form.get("scanMode", "Standard Scan (Recommended)")
threshold = _threshold_for_mode(scan_mode)
if uploaded is None or uploaded.filename is None or uploaded.filename.strip() == "":
return jsonify({"error": "Please upload the final document file."}), 400
file_ext = uploaded.filename.rsplit(".", 1)[-1].lower() if "." in uploaded.filename else ""
if not _is_supported_ext(file_ext):
return jsonify({"error": "Unsupported file type. Use PDF, DOCX, or TXT."}), 400
if len(reference_uploads) > 2:
return jsonify({"error": "You can upload up to 2 reference documents."}), 400
try:
final_file_bytes = uploaded.read()
final_text_data = _extract_text_data(file_bytes=final_file_bytes, file_ext=file_ext)
if not final_text_data:
return jsonify({"error": "Could not extract text from final document."}), 400
final_clauses = _extract_clauses(final_text_data, source="final", id_start=0)
if len(final_clauses) < 2:
return jsonify({"error": "Not enough clauses found in final document for analysis."}), 400
all_clauses = list(final_clauses)
for idx, ref in enumerate(reference_uploads, start=1):
if ref is None or ref.filename is None or ref.filename.strip() == "":
continue
ref_ext = ref.filename.rsplit(".", 1)[-1].lower() if "." in ref.filename else ""
if not _is_supported_ext(ref_ext):
return jsonify({"error": f"Unsupported reference file type for {ref.filename}."}), 400
ref_text_data = _extract_text_data(file_bytes=ref.read(), file_ext=ref_ext)
if not ref_text_data:
continue
ref_clauses = _extract_clauses(
ref_text_data,
source=f"reference_{idx}",
id_start=len(all_clauses),
)
all_clauses.extend(ref_clauses)
parties = _extract_document_parties(final_text_data)
findings, line_issues, counts, compared_pairs = _analyze_clauses(
clauses=all_clauses, threshold=threshold, focus_source="final"
)
final_line_issues = [item for item in line_issues if item.get("source") == "final"]
page_summaries = _build_page_summaries(
clauses=final_clauses, line_issues=final_line_issues, text_data=final_text_data
)
detailed_summary = _build_detailed_summary(
clauses=final_clauses,
page_summaries=page_summaries,
findings=findings,
)
except Exception as exc:
return jsonify({"error": f"Analysis failed: {exc}"}), 500
return (
jsonify(
{
"message": "Analysis completed.",
"summary": {
"scanMode": scan_mode,
"threshold": threshold,
"vendor": parties["vendor"],
"vendee": parties["vendee"],
"clauses": len(final_clauses),
"referenceDocs": len([r for r in reference_uploads if r and r.filename]),
"pairsCompared": compared_pairs,
"issuesFound": len(findings),
"duplicationCount": counts["duplication"],
"inconsistencyCount": counts["inconsistency"],
"contradictionCount": counts["contradiction"],
},
"pageSummaries": page_summaries,
"detailedSummary": detailed_summary,
"findings": findings[:50],
"lineIssues": line_issues[:200],
"finalLineIssues": final_line_issues[:200],
}
),
200,
)
@app.post("/login")
def login_alias():
return login()
@app.post("/analyze")
def analyze_alias():
return analyze()
if __name__ == "__main__":
# Keep defaults production-safe and compatible with restricted environments.
debug_mode = os.getenv("FLASK_DEBUG", "0") == "1"
host = os.getenv("HOST", "127.0.0.1")
port = int(os.getenv("PORT", "5000"))
app.run(host=host, port=port, debug=debug_mode, use_reloader=False)