GopalKrushnaMahapatra's picture
Update app.py
f3f2bd2 verified
raw
history blame
48.4 kB
# app.py (was: backend/main.py)
import os
import re
import io
import sqlite3
from datetime import datetime, timezone
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, status, Header, Depends, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, EmailStr
from passlib.context import CryptContext
import jwt
# File parsing libs
from docx import Document as DocxDocument
import PyPDF2
# ML / NLP libs
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import numpy as np
# TF-IDF
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# Semantic embeddings for plagiarism (combined approach)
try:
from sentence_transformers import SentenceTransformer
except Exception:
SentenceTransformer = None
# LanguageTool (may require Java)
try:
import language_tool_python
except Exception:
language_tool_python = None
# GECToR (neural grammatical error correction)
try:
# This is the official import path from gotutiyan/gector README
from gector import GECToR, predict as gector_predict, load_verb_dict
except Exception:
GECToR = None
gector_predict = None
load_verb_dict = None
# PDF report libs
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.lib.units import mm
from reportlab.lib.utils import ImageReader
from reportlab.lib import colors
# ------------------ ENV & DB SETUP ------------------
load_dotenv()
JWT_SECRET = os.getenv("JWT_SECRET", "super_secret_key_change_this")
JWT_ALGO = os.getenv("JWT_ALGO", "HS256")
DB_PATH = os.getenv("DB_PATH", "truewrite.db")
CORPUS_DIR = os.getenv("CORPUS_DIR", "corpus")
CORPUS_RAW = os.getenv("CORPUS_RAW", "corpus_raw")
# Combined plagiarism weights
PLAG_ALPHA = float(os.getenv("PLAG_ALPHA", "0.4")) # TF-IDF weight; (1-alpha) for embeddings
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
# SQLite DB (simple demo)
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
# Create tables if not exist
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
tool TEXT NOT NULL,
input_text TEXT,
result_summary TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id)
)
""")
conn.commit()
# ------------------ FASTAPI APP ------------------
app = FastAPI(title="TrueWrite Scan (Python Backend)")
app.add_middleware(
CORSMiddleware,
# This regex allows ANY URL (HTTP or HTTPS) to connect
allow_origin_regex=r"https?://.*",
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ------------------ MODELS ------------------
class SignupRequest(BaseModel):
name: str
email: EmailStr
password: str
class LoginRequest(BaseModel):
email: EmailStr
password: str
class TextRequest(BaseModel):
text: str
# ------------------ AUTH HELPERS ------------------
def hash_password(pw: str) -> str:
return pwd_context.hash(pw)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_token(user_id: int, email: str) -> str:
payload = {"user_id": user_id, "email": email}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGO)
if isinstance(token, bytes):
token = token.decode("utf-8")
return token
def decode_token(token: str):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO])
return payload
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
def get_current_user(authorization: str = Header(None)):
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing token"
)
token = authorization.split(" ", 1)[1]
payload = decode_token(token)
user_id = payload.get("user_id")
cur.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cur.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
return {"id": row["id"], "name": row["name"], "email": row["email"]}
def now_iso():
return datetime.now(timezone.utc).isoformat()
def save_history(user_id: int, tool: str, input_text: str, summary: str):
trimmed = (input_text[:500] + "...") if len(input_text) > 500 else input_text
cur.execute(
"INSERT INTO history (user_id, tool, input_text, result_summary, created_at) VALUES (?, ?, ?, ?, ?)",
(user_id, tool, trimmed, summary, now_iso()),
)
conn.commit()
# ------------------ TEXT HELPERS ------------------
def count_words(text: str) -> int:
tokens = text.strip().split()
return len(tokens) if text.strip() else 0
def simple_grammar_correct(text: str):
"""Old heuristic grammar fixer (kept as fallback)."""
corrections = 0
original_words = count_words(text)
before = text
text = re.sub(r"\s{2,}", " ", text)
if text != before:
corrections += 1
before = text
text = re.sub(r"\bi\b", "I", text)
if text != before:
corrections += 1
def cap_match(m):
return m.group(0).upper()
before = text
text = re.sub(r"(^\s*\w|[.!?]\s+\w)", cap_match, text)
if text != before:
corrections += 1
if text.strip() and not re.search(r"[.!?]\s*$", text.strip()):
text = text.strip() + "."
corrections += 1
return text, corrections, original_words
# ------------------ CORPUS BUILDING (from corpus_raw -> corpus) ------------------
def extract_from_docx_path(path: str) -> str:
doc = DocxDocument(path)
paragraphs = [p.text for p in doc.paragraphs]
return "\n".join(paragraphs)
def extract_from_pdf_path(path: str) -> str:
with open(path, "rb") as f:
reader = PyPDF2.PdfReader(f)
texts = []
for pg in range(len(reader.pages)):
try:
texts.append(reader.pages[pg].extract_text() or "")
except Exception:
texts.append("")
return "\n".join(texts)
def build_corpus_from_raw(raw_dir: str = CORPUS_RAW, out_dir: str = CORPUS_DIR):
"""
Convert any .pdf / .docx / .txt files from corpus_raw/ into .txt files in corpus/.
This mirrors your build_corpus.py logic but is called automatically at startup.
"""
os.makedirs(raw_dir, exist_ok=True)
os.makedirs(out_dir, exist_ok=True)
for fname in os.listdir(raw_dir):
inpath = os.path.join(raw_dir, fname)
if not os.path.isfile(inpath):
continue
outname = os.path.splitext(fname)[0] + ".txt"
outpath = os.path.join(out_dir, outname)
try:
ext = fname.lower()
if ext.endswith(".docx"):
text = extract_from_docx_path(inpath)
elif ext.endswith(".pdf"):
text = extract_from_pdf_path(inpath)
elif ext.endswith(".txt"):
with open(inpath, "r", encoding="utf-8", errors="ignore") as f:
text = f.read()
else:
print("[CorpusRaw] Skipping unsupported:", fname)
continue
text = text.strip()
with open(outpath, "w", encoding="utf-8") as fo:
fo.write(text)
print("[CorpusRaw] Wrote:", outpath)
except Exception as e:
print("[CorpusRaw] Failed", fname, "->", e)
# ------------------ TF-IDF CORPUS LOADING ------------------
vectorizer = None
corpus_tfidf = None
corpus_titles = []
corpus_texts = []
def load_corpus(corpus_dir=CORPUS_DIR):
"""
Load .txt corpus files from CORPUS_DIR, build TF-IDF index.
Semantic embeddings are built separately in load_embeddings().
"""
global vectorizer, corpus_tfidf, corpus_titles, corpus_texts
corpus_titles = []
corpus_texts = []
if not os.path.isdir(corpus_dir):
os.makedirs(corpus_dir, exist_ok=True)
print("[Corpus] Created empty corpus directory:", corpus_dir)
vectorizer = None
corpus_tfidf = None
return
for fname in os.listdir(corpus_dir):
if fname.lower().endswith(".txt"):
path = os.path.join(corpus_dir, fname)
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
txt = f.read()
corpus_titles.append(fname)
corpus_texts.append(txt)
except Exception as e:
print(f"[Corpus] Failed to read {path}: {e}")
if corpus_texts:
try:
vectorizer = TfidfVectorizer(
ngram_range=(1, 3),
stop_words="english",
max_features=50000
)
corpus_tfidf = vectorizer.fit_transform(corpus_texts)
print(f"[Corpus] Loaded {len(corpus_texts)} documents into TF-IDF index")
except Exception as e:
print("[Corpus] TF-IDF build failed:", e)
vectorizer = None
corpus_tfidf = None
else:
vectorizer = None
corpus_tfidf = None
print("[Corpus] No .txt documents found in", corpus_dir)
# ------------------ SEMANTIC EMBEDDINGS (SentenceTransformers) ------------------
emb_model = None
corpus_emb = None
EMB_MODEL_NAME = os.getenv("PLAG_EMB_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
def load_embeddings():
"""
Build semantic embedding index for plagiarism using sentence-transformers.
"""
global emb_model, corpus_emb
if SentenceTransformer is None:
print("[Embeddings] sentence-transformers not installed; skipping semantic index.")
emb_model = None
corpus_emb = None
return
if not corpus_texts:
print("[Embeddings] No corpus texts available; semantic index not built.")
emb_model = None
corpus_emb = None
return
try:
emb_model = SentenceTransformer(EMB_MODEL_NAME)
corpus_emb = emb_model.encode(
corpus_texts,
convert_to_numpy=True,
show_progress_bar=False,
normalize_embeddings=True,
)
print(f"[Embeddings] Loaded '{EMB_MODEL_NAME}' and encoded {len(corpus_texts)} corpus docs.")
except Exception as e:
emb_model = None
corpus_emb = None
print("[Embeddings] Failed to load or encode corpus:", e)
# Build corpus & embeddings at startup
build_corpus_from_raw()
load_corpus()
load_embeddings()
# ------------------ HF MODEL LOADING (AI Detector) ------------------
AI_DETECTOR_MODEL = "openai-community/roberta-base-openai-detector"
tokenizer = None
model = None
device = None
try:
tokenizer = AutoTokenizer.from_pretrained(AI_DETECTOR_MODEL)
model = AutoModelForSequenceClassification.from_pretrained(AI_DETECTOR_MODEL)
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"[AI Detector] Loaded {AI_DETECTOR_MODEL} on {device}")
except Exception as e:
tokenizer = None
model = None
device = None
print("[AI Detector] Failed to load HF model — using heuristic fallback. Error:", e)
# ------------------ GECToR LOADING (Neural GEC) ------------------
GEC_MODEL = None
GEC_TOKENIZER = None
GEC_ENCODE = None
GEC_DECODE = None
GEC_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if GECToR is not None and gector_predict is not None and load_verb_dict is not None:
try:
print("[GECToR] Initializing model... (This may take a bit on first run)")
GEC_MODEL_ID = os.getenv("GEC_MODEL_ID", "gotutiyan/gector-roberta-base-5k")
VERB_DICT_PATH = os.getenv("GEC_VERB_DICT", "/app/data/verb-form-vocab.txt")
GEC_MODEL = GECToR.from_pretrained(GEC_MODEL_ID).to(GEC_DEVICE)
GEC_TOKENIZER = AutoTokenizer.from_pretrained(GEC_MODEL_ID)
GEC_ENCODE, GEC_DECODE = load_verb_dict(VERB_DICT_PATH)
print(f"[GECToR] Model & verb dict loaded: {GEC_MODEL_ID}")
except Exception as e:
print(f"[GECToR] Failed to load. Error: {e}")
GEC_MODEL = None
GEC_TOKENIZER = None
GEC_ENCODE = None
GEC_DECODE = None
else:
print("[GECToR] Library not available; skipping neural GEC.")
def gector_correct(text: str):
"""
Run neural grammatical error correction using GECToR (gotutiyan implementation).
"""
if GEC_MODEL is None or GEC_TOKENIZER is None or GEC_ENCODE is None or GEC_DECODE is None:
print("[GECToR] Model not loaded, skipping.")
return text, 0, len(text.split()) if text.strip() else 0
parts = text.strip().split()
# Safety truncate (protect server)
if len(parts) > 1000:
text_proc = " ".join(parts[:1000])
else:
text_proc = text.strip()
if not text_proc:
return text_proc, 0, 0
srcs = [text_proc]
try:
corrected_list = gector_predict(
GEC_MODEL,
GEC_TOKENIZER,
srcs,
GEC_ENCODE,
GEC_DECODE,
keep_confidence=0.0,
min_error_prob=0.0,
n_iteration=5,
batch_size=2,
)
corrected_text = corrected_list[0]
orig_tokens = text_proc.split()
corr_tokens = corrected_text.split()
corrections = sum(1 for a, b in zip(orig_tokens, corr_tokens) if a != b)
original_words = len(orig_tokens)
return corrected_text, corrections, original_words
except Exception as e:
print(f"[GECToR] Prediction error: {e}")
return text_proc, 0, len(text_proc.split())
# ------------------ FILE EXTRACTION HELPERS ------------------
MAX_FILE_SIZE = 15 * 1024 * 1024 # 15 MB
def extract_text_from_upload(upload: UploadFile) -> str:
filename = (upload.filename or "").lower()
content_type = (upload.content_type or "").lower()
data = upload.file.read()
try:
upload.file.seek(0)
except Exception:
pass
if len(data) > MAX_FILE_SIZE:
raise HTTPException(status_code=413, detail="File too large (max 15MB)")
# TXT
if filename.endswith(".txt") or content_type == "text/plain":
try:
try:
return data.decode("utf-8")
except UnicodeDecodeError:
return data.decode("latin-1")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to decode text file: {e}")
# DOCX
if filename.endswith(".docx") or "wordprocessingml" in content_type:
# Basic sanity check: valid .docx is a ZIP (PK header)
if not data.startswith(b"PK"):
raise HTTPException(
status_code=400,
detail="Uploaded file is not a valid .docx package (it might be an old .doc file or a corrupted document). "
"Please open it in Word/Google Docs and re-save as .docx or export as PDF, then upload again."
)
try:
f = io.BytesIO(data)
doc = DocxDocument(f)
paragraphs = [p.text for p in doc.paragraphs]
text = "\n".join(paragraphs).strip()
if not text:
raise ValueError("DOCX contained no readable text.")
return text
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to parse docx file: {e}. Try opening it in Word/Google Docs and exporting again as .docx or PDF."
)
# PDF
if filename.endswith(".pdf") or "pdf" in content_type:
try:
f = io.BytesIO(data)
reader = PyPDF2.PdfReader(f)
texts = []
for pg in range(len(reader.pages)):
try:
txt = reader.pages[pg].extract_text() or ""
except Exception:
txt = ""
texts.append(txt)
return "\n".join(texts)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to parse PDF file: {e}")
raise HTTPException(
status_code=415,
detail="Unsupported file type. Use .txt, .pdf, or .docx",
)
# ------------------ GRAMMAR (LANGUAGETOOL INTEGRATION) ------------------
lt_tool = None
if language_tool_python is not None:
try:
lt_tool = language_tool_python.LanguageTool("en-US")
print("[LanguageTool] Loaded (local Java-backed checker)")
except Exception as e:
lt_tool = None
print("[LanguageTool] Could not start local LanguageTool — falling back. Error:", e)
else:
print("[LanguageTool] library not installed; falling back to heuristics.")
def grammar_with_languagetool(text: str):
parts = text.strip().split()
if len(parts) > 1000:
text_proc = " ".join(parts[:1000])
else:
text_proc = text.strip()
matches = lt_tool.check(text_proc)
corrected = language_tool_python.utils.correct(text_proc, matches)
corrections = len(matches)
return corrected, corrections, len(text_proc.split())
# ------------------ PLAGIARISM HELPERS (COMBINED ENGINE) ------------------
def _clean_for_jaccard(t: str):
t = t.lower()
t = re.sub(r"[^a-z0-9\s]", " ", t)
return [w for w in t.split() if w]
def _jaccard_similarity(a, b):
sa = set(a)
sb = set(b)
if not sa or not sb:
return 0.0
return len(sa & sb) / len(sa | sb)
def demo_plagiarism_fallback(text: str):
"""
Simple Jaccard-based fallback using a tiny built-in sample set.
Used when no TF-IDF / semantic corpus is available.
"""
SAMPLE_DOCS = [
{"title": "AI for Social Good",
"text": "Artificial intelligence is transforming multiple industries by automating routine tasks and enabling data driven decision making for social impact and efficiency."},
{"title": "IoT in Smart Cities",
"text": "The Internet of Things connects sensors, devices, and cloud platforms to enable real time monitoring and control in smart cities including lighting, traffic, and waste management."},
{"title": "Climate & Renewable Energy",
"text": "Climate change is a critical global challenge that demands renewable energy, efficient resource management, and international cooperation to ensure a sustainable future."},
]
input_words = _clean_for_jaccard(text)
best_score = 0.0
matches = []
for doc in SAMPLE_DOCS:
doc_words = _clean_for_jaccard(doc["text"])
score = _jaccard_similarity(input_words, doc_words)
matches.append({"title": doc["title"], "score": round(score * 100, 2)})
if score > best_score:
best_score = score
matches.sort(key=lambda x: x["score"], reverse=True)
plagiarism_percent = round(best_score * 100, 2)
summary = f"Plagiarism estimate (demo Jaccard): {plagiarism_percent}%"
return {"plagiarism_percent": plagiarism_percent, "matches": matches[:5], "summary": summary}
def corpus_plagiarism_combined(text: str):
"""
Combined plagiarism score using:
- TF-IDF cosine similarity
- Semantic embedding cosine similarity (SentenceTransformers)
Returns dict matching API schema:
{ plagiarism_percent, matches, summary }
"""
if not corpus_texts:
raise ValueError("No corpus texts loaded")
sims_tfidf = None
sims_emb = None
words = text.split()
if len(words) > 3000:
text_proc = " ".join(words[:3000])
else:
text_proc = text
# TF-IDF similarity
if vectorizer is not None and corpus_tfidf is not None:
q = vectorizer.transform([text_proc])
sims_tfidf = cosine_similarity(q, corpus_tfidf)[0]
# Semantic similarity
if emb_model is not None and corpus_emb is not None:
q_emb = emb_model.encode(
[text_proc],
convert_to_numpy=True,
normalize_embeddings=True,
show_progress_bar=False,
)[0]
sims_emb = corpus_emb @ q_emb # normalized → dot = cosine
if sims_tfidf is None and sims_emb is None:
raise ValueError("No plagiarism backends (TF-IDF / embeddings) are available")
n_docs = len(corpus_texts)
combined_rows = []
alpha = PLAG_ALPHA # TF-IDF weight
for i in range(n_docs):
tf = float(sims_tfidf[i]) if sims_tfidf is not None else None
se = float(sims_emb[i]) if sims_emb is not None else None
if tf is None and se is None:
continue
if tf is not None and se is not None:
score = alpha * tf + (1.0 - alpha) * se
elif tf is not None:
score = tf
else:
score = se
combined_rows.append({
"index": i,
"combined": score,
"tfidf": tf,
"semantic": se,
})
if not combined_rows:
raise ValueError("No scores computed for corpus documents")
combined_rows.sort(key=lambda x: x["combined"], reverse=True)
top = combined_rows[:10]
best = top[0]["combined"]
plagiarism_percent = round(best * 100, 2)
matches = []
for row in top:
matches.append({
"title": corpus_titles[row["index"]],
"score": round(row["combined"] * 100, 2),
"tfidf_score": round(row["tfidf"] * 100, 2) if row["tfidf"] is not None else None,
"semantic_score": round(row["semantic"] * 100, 2) if row["semantic"] is not None else None,
})
components = []
if sims_tfidf is not None:
components.append("TF-IDF")
if sims_emb is not None:
components.append("semantic embeddings")
comp_str = " + ".join(components)
summary = f"Plagiarism estimate (combined {comp_str}): {plagiarism_percent}%"
return {"plagiarism_percent": plagiarism_percent, "matches": matches, "summary": summary}
# ------------------ PDF HELPERS (COMMON STYLE) ------------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PDF_LOGO_PATH = os.path.join(BASE_DIR, "logo.png") # ensure logo.png is here
BRAND_NAME = "TrueWrite Scan"
BRAND_FONT_SIZE = 18
TITLE_FONT_SIZE = 18
LOGO_SIZE_MM = 15 # logo height in mm
def _wrap_text(text: str, max_chars: int):
"""
Simple word-wrap: yields lines with at most max_chars characters.
"""
words = text.split()
line = []
length = 0
for w in words:
if length + len(w) + (1 if line else 0) > max_chars:
yield " ".join(line)
line = [w]
length = len(w)
else:
line.append(w)
length += len(w) + (1 if line else 0)
if line:
yield " ".join(line)
def _format_checked_on():
# e.g. "Checked On: Dec 08, 2025"
local_now = datetime.now(timezone.utc).astimezone()
return "Checked On: " + local_now.strftime("%b %d, %Y")
def _get_logo_reader():
try:
return ImageReader(PDF_LOGO_PATH)
except Exception as e:
print(f"[PDF] Failed to load logo at {PDF_LOGO_PATH}: {e}")
return None
def _draw_header_footer(c: canvas.Canvas, page_num: int):
"""
Draws the common header + footer for all reports.
Returns (x_margin, content_start_y, page_width, page_height)
"""
width, height = A4
x_margin = 20 * mm
top_y = height - 22 * mm
footer_y = 15 * mm
brand_spacing = 5 * mm
logo_size = LOGO_SIZE_MM
logo = _get_logo_reader()
# --- HEADER ---
if logo is not None:
c.drawImage(
logo,
x_margin,
top_y - logo_size,
width=logo_size,
height=logo_size,
mask="auto",
)
# Brand name
c.setFont("Helvetica-Bold", BRAND_FONT_SIZE)
text_y = top_y - logo_size / 2 - 1 * mm
c.drawString(x_margin + logo_size + brand_spacing, text_y, BRAND_NAME)
# Header right: Checked On: ...
checked_date = _format_checked_on()
c.setFont("Helvetica", 10)
text_width = c.stringWidth(checked_date, "Helvetica", 10)
c.drawString(width - x_margin - text_width, text_y, checked_date)
# Thin line under header
c.setLineWidth(0.5)
c.setStrokeColor(colors.black)
c.line(
x_margin,
top_y - logo_size - 5 * mm,
width - x_margin,
top_y - logo_size - 5 * mm,
)
# --- FOOTER ---
# Thin footer line
c.setLineWidth(0.35)
c.line(x_margin, footer_y + 8 * mm, width - x_margin, footer_y + 8 * mm)
# Page number centered
c.setFont("Helvetica", 9)
page_label = f"Page {page_num}"
label_width = c.stringWidth(page_label, "Helvetica", 9)
c.drawString((width - label_width) / 2, footer_y + 2 * mm, page_label)
content_start_y = top_y - logo_size - 18 * mm
return x_margin, content_start_y, width, height
# ------------------ PDF GENERATORS ------------------
def generate_plagiarism_pdf(user: dict, text: str, result: dict) -> StreamingResponse:
"""
Generate plagiarism PDF in TrueWrite Scan style.
"""
buf = io.BytesIO()
c = canvas.Canvas(buf, pagesize=A4)
report_title = "Plagiarism Scan Report"
plagiarism_percent = float(result.get("plagiarism_percent", 0.0))
unique_percent = max(0.0, round(100.0 - plagiarism_percent, 2))
# ---------- PAGE 1 ----------
page_num = 1
x_margin, y, width, height = _draw_header_footer(c, page_num)
# Title
c.setFont("Helvetica-Bold", TITLE_FONT_SIZE)
title_width = c.stringWidth(report_title, "Helvetica-Bold", TITLE_FONT_SIZE)
c.drawString((width - title_width) / 2, y, report_title)
y -= 18 * mm
# Overall similarity big number
c.setFont("Helvetica-Bold", 22)
c.setFillColor(colors.red if plagiarism_percent >= 1.0 else colors.green)
c.drawString(x_margin, y, f"{plagiarism_percent:.0f}%")
c.setFont("Helvetica", 11)
c.setFillColor(colors.black)
c.drawString(x_margin + 22 * mm, y + 2 * mm, "Overall Similarity")
y -= 12 * mm
# Additional info
c.setFont("Helvetica", 10)
c.drawString(x_margin, y, f"Estimated Unique Content: {unique_percent:.0f}%")
y -= 6 * mm
summary = result.get("summary", "")
if summary:
for line in _wrap_text("Summary: " + summary, 110):
c.drawString(x_margin, y, line)
y -= 5 * mm
else:
y -= 5 * mm
y -= 10 * mm
# Body text: original text (truncated)
c.setFont("Helvetica", 10)
truncated = text.strip()
if len(truncated) > 8000:
truncated = truncated[:8000] + "\n...\n[Content truncated for report]"
for line in _wrap_text(truncated, 110):
if y < 40 * mm:
c.showPage()
page_num += 1
x_margin, y, width, height = _draw_header_footer(c, page_num)
c.setFont("Helvetica", 10)
c.drawString(x_margin, y, line)
y -= 5 * mm
# ---------- NEXT PAGE: MATCHED SOURCES ----------
c.showPage()
page_num += 1
x_margin, y, width, height = _draw_header_footer(c, page_num)
c.setFont("Helvetica-Bold", 12)
c.drawString(x_margin, y, "Matched Sources")
y -= 10 * mm
c.setFont("Helvetica", 10)
matches = result.get("matches", []) or []
if not matches:
c.drawString(x_margin, y, "No specific sources recorded. Content appears mostly unique.")
else:
for idx, m in enumerate(matches[:10], start=1):
title = m.get("title", "Source")
score = m.get("score", m.get("tfidf_score", 0.0) or 0.0)
line = f"{idx}. {title}{score:.2f}% match"
for part in _wrap_text(line, 110):
c.drawString(x_margin, y, part)
y -= 5 * mm
if y < 40 * mm:
c.showPage()
page_num += 1
x_margin, y, width, height = _draw_header_footer(c, page_num)
c.setFont("Helvetica", 10)
c.save()
buf.seek(0)
return StreamingResponse(
buf,
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=plagiarism-report.pdf"},
)
def generate_ai_pdf(user: dict, text: str, result: dict) -> StreamingResponse:
"""
AI content analysis PDF.
result from ai-check logic.
"""
buf = io.BytesIO()
c = canvas.Canvas(buf, pagesize=A4)
report_title = "AI Content Analysis Report"
ai_percent = float(result.get("ai_percent", 0.0))
human_percent = float(result.get("human_percent", 100.0))
word_count = int(result.get("word_count", 0))
avg_len = float(result.get("avg_sentence_length", 0.0))
# PAGE 1
page_num = 1
x_margin, y, width, height = _draw_header_footer(c, page_num)
# Title
c.setFont("Helvetica-Bold", TITLE_FONT_SIZE)
title_width = c.stringWidth(report_title, "Helvetica-Bold", TITLE_FONT_SIZE)
c.drawString((width - title_width) / 2, y, report_title)
y -= 18 * mm
# Big AI probability
c.setFont("Helvetica-Bold", 22)
if ai_percent >= 50:
c.setFillColor(colors.red)
else:
c.setFillColor(colors.green)
c.drawString(x_margin, y, f"{ai_percent:.0f}%")
c.setFont("Helvetica", 11)
c.setFillColor(colors.black)
c.drawString(x_margin + 22 * mm, y + 2 * mm, "Estimated AI Probability")
y -= 12 * mm
# Extra stats
c.setFont("Helvetica", 10)
c.drawString(x_margin, y, f"Estimated Human Probability: {human_percent:.0f}%")
y -= 6 * mm
c.drawString(x_margin, y, f"Word Count: {word_count}")
y -= 6 * mm
c.drawString(x_margin, y, f"Average Sentence Length: {avg_len:.2f} words")
y -= 6 * mm
summary = result.get("summary", "")
if summary:
for line in _wrap_text("Summary: " + summary, 110):
c.drawString(x_margin, y, line)
y -= 5 * mm
y -= 5 * mm
else:
y -= 10 * mm
# Body text
c.setFont("Helvetica", 10)
truncated = text.strip()
if len(truncated) > 8000:
truncated = truncated[:8000] + "\n...\n[Content truncated for report]"
for line in _wrap_text(truncated, 110):
if y < 40 * mm:
c.showPage()
page_num += 1
x_margin, y, width, height = _draw_header_footer(c, page_num)
c.setFont("Helvetica", 10)
c.drawString(x_margin, y, line)
y -= 5 * mm
c.save()
buf.seek(0)
return StreamingResponse(
buf,
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=truewrite-ai-report.pdf"},
)
def generate_grammar_pdf(user: dict, original_text: str, corrected_text: str, result: dict) -> StreamingResponse:
"""
Grammar correction PDF.
result from grammar-check logic.
"""
buf = io.BytesIO()
c = canvas.Canvas(buf, pagesize=A4)
report_title = "Grammar Correction Report"
corrections = int(result.get("corrections", 0))
original_words = int(result.get("original_words", 0))
summary = result.get("summary", "")
# PAGE 1: Metrics + Original
page_num = 1
x_margin, y, width, height = _draw_header_footer(c, page_num)
# Title
c.setFont("Helvetica-Bold", TITLE_FONT_SIZE)
title_width = c.stringWidth(report_title, "Helvetica-Bold", TITLE_FONT_SIZE)
c.drawString((width - title_width) / 2, y, report_title)
y -= 18 * mm
# Big metric: corrections
c.setFont("Helvetica-Bold", 22)
c.setFillColor(colors.blue if corrections > 0 else colors.green)
c.drawString(x_margin, y, f"{corrections}")
c.setFont("Helvetica", 11)
c.setFillColor(colors.black)
c.drawString(x_margin + 22 * mm, y + 2 * mm, "Corrections Applied")
y -= 12 * mm
c.setFont("Helvetica", 10)
c.drawString(x_margin, y, f"Words Analysed: {original_words}")
y -= 6 * mm
if summary:
for line in _wrap_text("Summary: " + summary, 110):
c.drawString(x_margin, y, line)
y -= 5 * mm
y -= 5 * mm
else:
y -= 10 * mm
# Original text
c.setFont("Helvetica-Bold", 11)
c.drawString(x_margin, y, "Original Text")
y -= 7 * mm
c.setFont("Helvetica", 10)
orig = original_text.strip()
if len(orig) > 4000:
orig = orig[:4000] + "\n...\n[Content truncated for report]"
for line in _wrap_text(orig, 110):
if y < 40 * mm:
c.showPage()
page_num += 1
x_margin, y, width, height = _draw_header_footer(c, page_num)
c.setFont("Helvetica", 10)
c.drawString(x_margin, y, line)
y -= 5 * mm
# PAGE 2: Corrected text
c.showPage()
page_num += 1
x_margin, y, width, height = _draw_header_footer(c, page_num)
c.setFont("Helvetica-Bold", 11)
c.drawString(x_margin, y, "Corrected Text")
y -= 7 * mm
c.setFont("Helvetica", 10)
corr = corrected_text.strip()
if len(corr) > 4000:
corr = corr[:4000] + "\n...\n[Content truncated for report]"
for line in _wrap_text(corr, 110):
if y < 40 * mm:
c.showPage()
page_num += 1
x_margin, y, width, height = _draw_header_footer(c, page_num)
c.setFont("Helvetica", 10)
c.drawString(x_margin, y, line)
y -= 5 * mm
c.save()
buf.seek(0)
return StreamingResponse(
buf,
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=truewrite-grammar-report.pdf"},
)
# ------------------ ENDPOINTS ------------------
@app.post("/api/signup")
def signup(req: SignupRequest):
cur.execute("SELECT id FROM users WHERE email = ?", (req.email,))
if cur.fetchone():
raise HTTPException(status_code=400, detail="Email already registered")
pw_hash = hash_password(req.password)
created_at = now_iso()
cur.execute(
"INSERT INTO users (name, email, password_hash, created_at) VALUES (?, ?, ?, ?)",
(req.name, req.email, pw_hash, created_at),
)
conn.commit()
user_id = cur.lastrowid
token = create_token(user_id, req.email)
return {
"message": "Signup successful",
"token": token,
"name": req.name,
"email": req.email,
}
@app.post("/api/login")
def login(req: LoginRequest):
cur.execute("SELECT * FROM users WHERE email = ?", (req.email,))
row = cur.fetchone()
if not row or not verify_password(req.password, row["password_hash"]):
raise HTTPException(status_code=401, detail="Invalid email or password")
token = create_token(row["id"], row["email"])
return {
"message": "Login successful",
"token": token,
"name": row["name"],
"email": row["email"],
}
@app.post("/api/grammar-check")
def api_grammar_check(req: TextRequest, user=Depends(get_current_user)):
text = req.text or ""
if not text.strip():
raise HTTPException(status_code=400, detail="Text is required")
# Prefer GECToR → LanguageTool → heuristics
if GEC_MODEL is not None:
corrected, corrections, original_words = gector_correct(text)
summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}"
elif lt_tool is not None:
corrected, corrections, original_words = grammar_with_languagetool(text)
summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}"
else:
corrected, corrections, original_words = simple_grammar_correct(text)
summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}"
save_history(user["id"], "grammar", text, summary)
return {
"original_words": original_words,
"corrections": corrections,
"corrected_text": corrected,
"summary": summary,
}
@app.post("/api/grammar-check-file")
def api_grammar_check_file(file: UploadFile = File(...), user=Depends(get_current_user)):
text = extract_text_from_upload(file).strip()
if not text:
raise HTTPException(status_code=400, detail="Uploaded file contains no text")
if GEC_MODEL is not None:
corrected, corrections, original_words = gector_correct(text)
summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}"
elif lt_tool is not None:
corrected, corrections, original_words = grammar_with_languagetool(text)
summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}"
else:
parts = text.strip().split()
if len(parts) > 1000:
text = " ".join(parts[:1000])
corrected, corrections, original_words = simple_grammar_correct(text)
summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}"
save_history(user["id"], "grammar", text, summary)
return {
"original_words": original_words,
"corrections": corrections,
"corrected_text": corrected,
"summary": summary,
}
# ------------------ PLAGIARISM ENDPOINTS (COMBINED) ------------------
@app.post("/api/plagiarism-check")
def api_plagiarism_check(req: TextRequest, user=Depends(get_current_user)):
text = req.text or ""
if not text.strip():
raise HTTPException(status_code=400, detail="Text is required")
# First try full combined engine (TF-IDF + embeddings) with corpus
try:
result = corpus_plagiarism_combined(text)
save_history(user["id"], "plagiarism", text, result["summary"])
return result
except Exception as e:
print("[Plagiarism] Combined corpus engine failed, falling back to demo:", e)
# Fallback: small Jaccard demo
result = demo_plagiarism_fallback(text)
save_history(user["id"], "plagiarism", text, result["summary"])
return result
@app.post("/api/plagiarism-check-file")
def api_plagiarism_check_file(file: UploadFile = File(...), user=Depends(get_current_user)):
text = extract_text_from_upload(file).strip()
if not text:
raise HTTPException(status_code=400, detail="Uploaded file contains no text")
try:
result = corpus_plagiarism_combined(text)
save_history(user["id"], "plagiarism", text, result["summary"])
return result
except Exception as e:
print("[Plagiarism-file] Combined corpus engine failed, falling back to demo:", e)
# Fallback to demo if corpus/engines unavailable
result = demo_plagiarism_fallback(text)
save_history(user["id"], "plagiarism", text, result["summary"])
return result
# ------------------ AI CHECK (TEXT & FILE) ------------------
def heuristic_ai_score(text: str):
words = re.sub(r"[^a-z0-9\s]", " ", text.lower()).split()
word_count = len(words)
unique_ratio = len(set(words)) / (word_count or 1)
sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()]
avg_sentence_length = word_count / (len(sentences) or 1)
ai_score = 0
if unique_ratio < 0.45:
ai_score += 40
elif unique_ratio < 0.6:
ai_score += 20
if avg_sentence_length > 25:
ai_score += 40
elif avg_sentence_length > 18:
ai_score += 25
if word_count > 400:
ai_score += 10
ai_score = min(100, round(ai_score))
human_score = 100 - ai_score
return ai_score, human_score, word_count, avg_sentence_length, unique_ratio
@app.post("/api/ai-check")
def api_ai_check(req: TextRequest, user=Depends(get_current_user)):
text = (req.text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="Text is required")
if model is not None and tokenizer is not None:
try:
max_len = getattr(tokenizer, "model_max_length", 512)
if max_len is None or max_len > 1024:
max_len = 512
words = text.split()
chunk_size = min(400, max_len - 10)
chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
probs = []
for chunk in chunks:
inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=max_len)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
p = torch.softmax(logits, dim=1).cpu().numpy()[0]
ai_prob = float(p[1]) if p.shape[0] > 1 else float(p[0])
probs.append(ai_prob)
avg_ai_prob = float(np.mean(probs)) if probs else 0.0
ai_percent = round(avg_ai_prob * 100, 2)
human_percent = round(100 - ai_percent, 2)
words_count = len(words)
sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()]
avg_sentence_len = round(words_count / (len(sentences) or 1), 2)
summary = f"Model: {AI_DETECTOR_MODEL}; AI probability: {ai_percent}%"
save_history(user["id"], "ai", text, summary)
return {
"ai_percent": ai_percent,
"human_percent": human_percent,
"word_count": words_count,
"avg_sentence_length": avg_sentence_len,
"summary": summary,
}
except Exception as e:
print("[AI-check] model inference failed:", e)
ai_percent, human_percent, wc, avg_len, uniq = heuristic_ai_score(text)
summary = f"HEURISTIC fallback — AI probability: {ai_percent}%"
save_history(user["id"], "ai", text, summary)
return {
"ai_percent": ai_percent,
"human_percent": human_percent,
"word_count": wc,
"avg_sentence_length": avg_len,
"unique_ratio": round(uniq, 3),
"summary": summary,
}
@app.post("/api/ai-check-file")
def api_ai_check_file(file: UploadFile = File(...), user=Depends(get_current_user)):
text = extract_text_from_upload(file).strip()
if not text:
raise HTTPException(status_code=400, detail="Uploaded file contains no text")
return api_ai_check.__wrapped__(TextRequest(text=text), user)
# ------------------ PDF REPORT ENDPOINTS ------------------
@app.post("/api/plagiarism-report")
def api_plagiarism_report(req: TextRequest, user=Depends(get_current_user)):
"""
Generate a PDF plagiarism report in the TrueWrite Scan style.
"""
text = (req.text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="Text is required")
try:
result = corpus_plagiarism_combined(text)
except Exception as e:
print("[Plagiarism-Report] Combined engine failed, falling back:", e)
result = demo_plagiarism_fallback(text)
save_history(user["id"], "plagiarism_report", text, result.get("summary", ""))
user_info = {
"name": user.get("name"),
"email": user.get("email"),
}
return generate_plagiarism_pdf(user_info, text, result)
@app.post("/api/ai-report")
def api_ai_report(req: TextRequest, user=Depends(get_current_user)):
"""
Generate a PDF AI analysis report in the TrueWrite Scan style.
"""
text = (req.text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="Text is required")
result = None
if model is not None and tokenizer is not None:
try:
max_len = getattr(tokenizer, "model_max_length", 512)
if max_len is None or max_len > 1024:
max_len = 512
words = text.split()
chunk_size = min(400, max_len - 10)
chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
probs = []
for chunk in chunks:
inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=max_len)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
p = torch.softmax(logits, dim=1).cpu().numpy()[0]
ai_prob = float(p[1]) if p.shape[0] > 1 else float(p[0])
probs.append(ai_prob)
avg_ai_prob = float(np.mean(probs)) if probs else 0.0
ai_percent = round(avg_ai_prob * 100, 2)
human_percent = round(100 - ai_percent, 2)
words_count = len(words)
sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()]
avg_sentence_len = round(words_count / (len(sentences) or 1), 2)
summary = f"Model: {AI_DETECTOR_MODEL}; AI probability: {ai_percent}%"
result = {
"ai_percent": ai_percent,
"human_percent": human_percent,
"word_count": words_count,
"avg_sentence_length": avg_sentence_len,
"summary": summary,
}
except Exception as e:
print("[AI-report] model inference failed:", e)
if result is None:
ai_percent, human_percent, wc, avg_len, uniq = heuristic_ai_score(text)
summary = f"HEURISTIC fallback — AI probability: {ai_percent}%"
result = {
"ai_percent": ai_percent,
"human_percent": human_percent,
"word_count": wc,
"avg_sentence_length": avg_len,
"unique_ratio": round(uniq, 3),
"summary": summary,
}
save_history(user["id"], "ai_report", text, result.get("summary", ""))
user_info = {
"name": user.get("name"),
"email": user.get("email"),
}
return generate_ai_pdf(user_info, text, result)
@app.post("/api/grammar-report")
def api_grammar_report(req: TextRequest, user=Depends(get_current_user)):
"""
Generate a PDF grammar correction report in the TrueWrite Scan style.
"""
text = (req.text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="Text is required")
if GEC_MODEL is not None:
corrected, corrections, original_words = gector_correct(text)
summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}"
elif lt_tool is not None:
corrected, corrections, original_words = grammar_with_languagetool(text)
summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}"
else:
corrected, corrections, original_words = simple_grammar_correct(text)
summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}"
result = {
"original_words": original_words,
"corrections": corrections,
"summary": summary,
}
save_history(user["id"], "grammar_report", text, summary)
user_info = {
"name": user.get("name"),
"email": user.get("email"),
}
return generate_grammar_pdf(user_info, text, corrected, result)
# ------------------ HISTORY ------------------
@app.get("/api/history")
def api_history(user=Depends(get_current_user)):
cur.execute(
"SELECT id, tool, input_text, result_summary, created_at "
"FROM history WHERE user_id = ? "
"ORDER BY created_at DESC LIMIT 50",
(user["id"],),
)
rows = cur.fetchall()
items = []
for r in rows:
items.append(
{
"id": r["id"],
"tool": r["tool"],
"input_text": r["input_text"],
"summary": r["result_summary"],
"created_at": r["created_at"],
}
)
return {"items": items}
@app.get("/")
def read_root():
return {"status": "Backend is running with GECToR + 16GB RAM + PDF reports!"}