| |
| import os |
| import re |
| import io |
| import sqlite3 |
| from datetime import datetime, timezone |
|
|
| from dotenv import load_dotenv |
| from fastapi import FastAPI, HTTPException, status, Header, Depends, File, UploadFile |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import StreamingResponse |
| from pydantic import BaseModel, EmailStr |
| from passlib.context import CryptContext |
| import jwt |
|
|
| |
| from docx import Document as DocxDocument |
| import PyPDF2 |
|
|
| |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification |
| import torch |
| import numpy as np |
|
|
| |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| |
| try: |
| from sentence_transformers import SentenceTransformer |
| except Exception: |
| SentenceTransformer = None |
|
|
| |
| try: |
| import language_tool_python |
| except Exception: |
| language_tool_python = None |
|
|
| |
| try: |
| |
| from gector import GECToR, predict as gector_predict, load_verb_dict |
| except Exception: |
| GECToR = None |
| gector_predict = None |
| load_verb_dict = None |
|
|
| |
| from reportlab.lib.pagesizes import A4 |
| from reportlab.pdfgen import canvas |
| from reportlab.lib.units import mm |
| from reportlab.lib.utils import ImageReader |
| from reportlab.lib import colors |
|
|
| |
| load_dotenv() |
|
|
| JWT_SECRET = os.getenv("JWT_SECRET", "super_secret_key_change_this") |
| JWT_ALGO = os.getenv("JWT_ALGO", "HS256") |
| DB_PATH = os.getenv("DB_PATH", "truewrite.db") |
| CORPUS_DIR = os.getenv("CORPUS_DIR", "corpus") |
| CORPUS_RAW = os.getenv("CORPUS_RAW", "corpus_raw") |
|
|
| |
| PLAG_ALPHA = float(os.getenv("PLAG_ALPHA", "0.4")) |
|
|
| pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") |
|
|
| |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False) |
| conn.row_factory = sqlite3.Row |
| cur = conn.cursor() |
|
|
| |
| cur.execute(""" |
| CREATE TABLE IF NOT EXISTS users ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| name TEXT NOT NULL, |
| email TEXT NOT NULL UNIQUE, |
| password_hash TEXT NOT NULL, |
| created_at TEXT NOT NULL |
| ) |
| """) |
|
|
| cur.execute(""" |
| CREATE TABLE IF NOT EXISTS history ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER NOT NULL, |
| tool TEXT NOT NULL, |
| input_text TEXT, |
| result_summary TEXT, |
| created_at TEXT NOT NULL, |
| FOREIGN KEY (user_id) REFERENCES users(id) |
| ) |
| """) |
|
|
| conn.commit() |
|
|
| |
| app = FastAPI(title="TrueWrite Scan (Python Backend)") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| |
| allow_origin_regex=r"https?://.*", |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| class SignupRequest(BaseModel): |
| name: str |
| email: EmailStr |
| password: str |
|
|
|
|
| class LoginRequest(BaseModel): |
| email: EmailStr |
| password: str |
|
|
|
|
| class TextRequest(BaseModel): |
| text: str |
|
|
|
|
| |
| def hash_password(pw: str) -> str: |
| return pwd_context.hash(pw) |
|
|
|
|
| def verify_password(plain: str, hashed: str) -> bool: |
| return pwd_context.verify(plain, hashed) |
|
|
|
|
| def create_token(user_id: int, email: str) -> str: |
| payload = {"user_id": user_id, "email": email} |
| token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGO) |
| if isinstance(token, bytes): |
| token = token.decode("utf-8") |
| return token |
|
|
|
|
| def decode_token(token: str): |
| try: |
| payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO]) |
| return payload |
| except jwt.PyJWTError: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid token" |
| ) |
|
|
|
|
| def get_current_user(authorization: str = Header(None)): |
| if not authorization or not authorization.startswith("Bearer "): |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Missing token" |
| ) |
| token = authorization.split(" ", 1)[1] |
| payload = decode_token(token) |
| user_id = payload.get("user_id") |
| cur.execute("SELECT * FROM users WHERE id = ?", (user_id,)) |
| row = cur.fetchone() |
| if not row: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="User not found" |
| ) |
| return {"id": row["id"], "name": row["name"], "email": row["email"]} |
|
|
|
|
| def now_iso(): |
| return datetime.now(timezone.utc).isoformat() |
|
|
|
|
| def save_history(user_id: int, tool: str, input_text: str, summary: str): |
| trimmed = (input_text[:500] + "...") if len(input_text) > 500 else input_text |
| cur.execute( |
| "INSERT INTO history (user_id, tool, input_text, result_summary, created_at) VALUES (?, ?, ?, ?, ?)", |
| (user_id, tool, trimmed, summary, now_iso()), |
| ) |
| conn.commit() |
|
|
|
|
| |
| def count_words(text: str) -> int: |
| tokens = text.strip().split() |
| return len(tokens) if text.strip() else 0 |
|
|
|
|
| def simple_grammar_correct(text: str): |
| """Old heuristic grammar fixer (kept as fallback).""" |
| corrections = 0 |
| original_words = count_words(text) |
|
|
| before = text |
| text = re.sub(r"\s{2,}", " ", text) |
| if text != before: |
| corrections += 1 |
|
|
| before = text |
| text = re.sub(r"\bi\b", "I", text) |
| if text != before: |
| corrections += 1 |
|
|
| def cap_match(m): |
| return m.group(0).upper() |
|
|
| before = text |
| text = re.sub(r"(^\s*\w|[.!?]\s+\w)", cap_match, text) |
| if text != before: |
| corrections += 1 |
|
|
| if text.strip() and not re.search(r"[.!?]\s*$", text.strip()): |
| text = text.strip() + "." |
| corrections += 1 |
|
|
| return text, corrections, original_words |
|
|
|
|
| |
| def extract_from_docx_path(path: str) -> str: |
| doc = DocxDocument(path) |
| paragraphs = [p.text for p in doc.paragraphs] |
| return "\n".join(paragraphs) |
|
|
|
|
| def extract_from_pdf_path(path: str) -> str: |
| with open(path, "rb") as f: |
| reader = PyPDF2.PdfReader(f) |
| texts = [] |
| for pg in range(len(reader.pages)): |
| try: |
| texts.append(reader.pages[pg].extract_text() or "") |
| except Exception: |
| texts.append("") |
| return "\n".join(texts) |
|
|
|
|
| def build_corpus_from_raw(raw_dir: str = CORPUS_RAW, out_dir: str = CORPUS_DIR): |
| """ |
| Convert any .pdf / .docx / .txt files from corpus_raw/ into .txt files in corpus/. |
| This mirrors your build_corpus.py logic but is called automatically at startup. |
| """ |
| os.makedirs(raw_dir, exist_ok=True) |
| os.makedirs(out_dir, exist_ok=True) |
|
|
| for fname in os.listdir(raw_dir): |
| inpath = os.path.join(raw_dir, fname) |
| if not os.path.isfile(inpath): |
| continue |
| outname = os.path.splitext(fname)[0] + ".txt" |
| outpath = os.path.join(out_dir, outname) |
| try: |
| ext = fname.lower() |
| if ext.endswith(".docx"): |
| text = extract_from_docx_path(inpath) |
| elif ext.endswith(".pdf"): |
| text = extract_from_pdf_path(inpath) |
| elif ext.endswith(".txt"): |
| with open(inpath, "r", encoding="utf-8", errors="ignore") as f: |
| text = f.read() |
| else: |
| print("[CorpusRaw] Skipping unsupported:", fname) |
| continue |
|
|
| text = text.strip() |
| with open(outpath, "w", encoding="utf-8") as fo: |
| fo.write(text) |
| print("[CorpusRaw] Wrote:", outpath) |
| except Exception as e: |
| print("[CorpusRaw] Failed", fname, "->", e) |
|
|
|
|
| |
| vectorizer = None |
| corpus_tfidf = None |
| corpus_titles = [] |
| corpus_texts = [] |
|
|
|
|
| def load_corpus(corpus_dir=CORPUS_DIR): |
| """ |
| Load .txt corpus files from CORPUS_DIR, build TF-IDF index. |
| Semantic embeddings are built separately in load_embeddings(). |
| """ |
| global vectorizer, corpus_tfidf, corpus_titles, corpus_texts |
| corpus_titles = [] |
| corpus_texts = [] |
| if not os.path.isdir(corpus_dir): |
| os.makedirs(corpus_dir, exist_ok=True) |
| print("[Corpus] Created empty corpus directory:", corpus_dir) |
| vectorizer = None |
| corpus_tfidf = None |
| return |
|
|
| for fname in os.listdir(corpus_dir): |
| if fname.lower().endswith(".txt"): |
| path = os.path.join(corpus_dir, fname) |
| try: |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: |
| txt = f.read() |
| corpus_titles.append(fname) |
| corpus_texts.append(txt) |
| except Exception as e: |
| print(f"[Corpus] Failed to read {path}: {e}") |
|
|
| if corpus_texts: |
| try: |
| vectorizer = TfidfVectorizer( |
| ngram_range=(1, 3), |
| stop_words="english", |
| max_features=50000 |
| ) |
| corpus_tfidf = vectorizer.fit_transform(corpus_texts) |
| print(f"[Corpus] Loaded {len(corpus_texts)} documents into TF-IDF index") |
| except Exception as e: |
| print("[Corpus] TF-IDF build failed:", e) |
| vectorizer = None |
| corpus_tfidf = None |
| else: |
| vectorizer = None |
| corpus_tfidf = None |
| print("[Corpus] No .txt documents found in", corpus_dir) |
|
|
|
|
| |
| emb_model = None |
| corpus_emb = None |
| EMB_MODEL_NAME = os.getenv("PLAG_EMB_MODEL", "sentence-transformers/all-MiniLM-L6-v2") |
|
|
|
|
| def load_embeddings(): |
| """ |
| Build semantic embedding index for plagiarism using sentence-transformers. |
| """ |
| global emb_model, corpus_emb |
| if SentenceTransformer is None: |
| print("[Embeddings] sentence-transformers not installed; skipping semantic index.") |
| emb_model = None |
| corpus_emb = None |
| return |
|
|
| if not corpus_texts: |
| print("[Embeddings] No corpus texts available; semantic index not built.") |
| emb_model = None |
| corpus_emb = None |
| return |
|
|
| try: |
| emb_model = SentenceTransformer(EMB_MODEL_NAME) |
| corpus_emb = emb_model.encode( |
| corpus_texts, |
| convert_to_numpy=True, |
| show_progress_bar=False, |
| normalize_embeddings=True, |
| ) |
| print(f"[Embeddings] Loaded '{EMB_MODEL_NAME}' and encoded {len(corpus_texts)} corpus docs.") |
| except Exception as e: |
| emb_model = None |
| corpus_emb = None |
| print("[Embeddings] Failed to load or encode corpus:", e) |
|
|
|
|
| |
| build_corpus_from_raw() |
| load_corpus() |
| load_embeddings() |
|
|
| |
| AI_DETECTOR_MODEL = "openai-community/roberta-base-openai-detector" |
| tokenizer = None |
| model = None |
| device = None |
|
|
| try: |
| tokenizer = AutoTokenizer.from_pretrained(AI_DETECTOR_MODEL) |
| model = AutoModelForSequenceClassification.from_pretrained(AI_DETECTOR_MODEL) |
| model.eval() |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| model.to(device) |
| print(f"[AI Detector] Loaded {AI_DETECTOR_MODEL} on {device}") |
| except Exception as e: |
| tokenizer = None |
| model = None |
| device = None |
| print("[AI Detector] Failed to load HF model — using heuristic fallback. Error:", e) |
|
|
| |
| GEC_MODEL = None |
| GEC_TOKENIZER = None |
| GEC_ENCODE = None |
| GEC_DECODE = None |
| GEC_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| if GECToR is not None and gector_predict is not None and load_verb_dict is not None: |
| try: |
| print("[GECToR] Initializing model... (This may take a bit on first run)") |
| GEC_MODEL_ID = os.getenv("GEC_MODEL_ID", "gotutiyan/gector-roberta-base-5k") |
| VERB_DICT_PATH = os.getenv("GEC_VERB_DICT", "/app/data/verb-form-vocab.txt") |
|
|
| GEC_MODEL = GECToR.from_pretrained(GEC_MODEL_ID).to(GEC_DEVICE) |
| GEC_TOKENIZER = AutoTokenizer.from_pretrained(GEC_MODEL_ID) |
| GEC_ENCODE, GEC_DECODE = load_verb_dict(VERB_DICT_PATH) |
|
|
| print(f"[GECToR] Model & verb dict loaded: {GEC_MODEL_ID}") |
| except Exception as e: |
| print(f"[GECToR] Failed to load. Error: {e}") |
| GEC_MODEL = None |
| GEC_TOKENIZER = None |
| GEC_ENCODE = None |
| GEC_DECODE = None |
| else: |
| print("[GECToR] Library not available; skipping neural GEC.") |
|
|
|
|
| def gector_correct(text: str): |
| """ |
| Run neural grammatical error correction using GECToR (gotutiyan implementation). |
| """ |
| if GEC_MODEL is None or GEC_TOKENIZER is None or GEC_ENCODE is None or GEC_DECODE is None: |
| print("[GECToR] Model not loaded, skipping.") |
| return text, 0, len(text.split()) if text.strip() else 0 |
|
|
| parts = text.strip().split() |
| |
| if len(parts) > 1000: |
| text_proc = " ".join(parts[:1000]) |
| else: |
| text_proc = text.strip() |
|
|
| if not text_proc: |
| return text_proc, 0, 0 |
|
|
| srcs = [text_proc] |
|
|
| try: |
| corrected_list = gector_predict( |
| GEC_MODEL, |
| GEC_TOKENIZER, |
| srcs, |
| GEC_ENCODE, |
| GEC_DECODE, |
| keep_confidence=0.0, |
| min_error_prob=0.0, |
| n_iteration=5, |
| batch_size=2, |
| ) |
| corrected_text = corrected_list[0] |
|
|
| orig_tokens = text_proc.split() |
| corr_tokens = corrected_text.split() |
| corrections = sum(1 for a, b in zip(orig_tokens, corr_tokens) if a != b) |
| original_words = len(orig_tokens) |
|
|
| return corrected_text, corrections, original_words |
|
|
| except Exception as e: |
| print(f"[GECToR] Prediction error: {e}") |
| return text_proc, 0, len(text_proc.split()) |
|
|
|
|
| |
| MAX_FILE_SIZE = 15 * 1024 * 1024 |
|
|
|
|
| def extract_text_from_upload(upload: UploadFile) -> str: |
| filename = (upload.filename or "").lower() |
| content_type = (upload.content_type or "").lower() |
| data = upload.file.read() |
| try: |
| upload.file.seek(0) |
| except Exception: |
| pass |
|
|
| if len(data) > MAX_FILE_SIZE: |
| raise HTTPException(status_code=413, detail="File too large (max 15MB)") |
|
|
| |
| if filename.endswith(".txt") or content_type == "text/plain": |
| try: |
| try: |
| return data.decode("utf-8") |
| except UnicodeDecodeError: |
| return data.decode("latin-1") |
| except Exception as e: |
| raise HTTPException(status_code=400, detail=f"Failed to decode text file: {e}") |
|
|
| |
| if filename.endswith(".docx") or "wordprocessingml" in content_type: |
| |
| if not data.startswith(b"PK"): |
| raise HTTPException( |
| status_code=400, |
| detail="Uploaded file is not a valid .docx package (it might be an old .doc file or a corrupted document). " |
| "Please open it in Word/Google Docs and re-save as .docx or export as PDF, then upload again." |
| ) |
| try: |
| f = io.BytesIO(data) |
| doc = DocxDocument(f) |
| paragraphs = [p.text for p in doc.paragraphs] |
| text = "\n".join(paragraphs).strip() |
| if not text: |
| raise ValueError("DOCX contained no readable text.") |
| return text |
| except Exception as e: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Failed to parse docx file: {e}. Try opening it in Word/Google Docs and exporting again as .docx or PDF." |
| ) |
|
|
| |
| if filename.endswith(".pdf") or "pdf" in content_type: |
| try: |
| f = io.BytesIO(data) |
| reader = PyPDF2.PdfReader(f) |
| texts = [] |
| for pg in range(len(reader.pages)): |
| try: |
| txt = reader.pages[pg].extract_text() or "" |
| except Exception: |
| txt = "" |
| texts.append(txt) |
| return "\n".join(texts) |
| except Exception as e: |
| raise HTTPException(status_code=400, detail=f"Failed to parse PDF file: {e}") |
|
|
| raise HTTPException( |
| status_code=415, |
| detail="Unsupported file type. Use .txt, .pdf, or .docx", |
| ) |
|
|
|
|
| |
| lt_tool = None |
| if language_tool_python is not None: |
| try: |
| lt_tool = language_tool_python.LanguageTool("en-US") |
| print("[LanguageTool] Loaded (local Java-backed checker)") |
| except Exception as e: |
| lt_tool = None |
| print("[LanguageTool] Could not start local LanguageTool — falling back. Error:", e) |
| else: |
| print("[LanguageTool] library not installed; falling back to heuristics.") |
|
|
|
|
| def grammar_with_languagetool(text: str): |
| parts = text.strip().split() |
| if len(parts) > 1000: |
| text_proc = " ".join(parts[:1000]) |
| else: |
| text_proc = text.strip() |
|
|
| matches = lt_tool.check(text_proc) |
| corrected = language_tool_python.utils.correct(text_proc, matches) |
| corrections = len(matches) |
| return corrected, corrections, len(text_proc.split()) |
|
|
|
|
| |
| def _clean_for_jaccard(t: str): |
| t = t.lower() |
| t = re.sub(r"[^a-z0-9\s]", " ", t) |
| return [w for w in t.split() if w] |
|
|
|
|
| def _jaccard_similarity(a, b): |
| sa = set(a) |
| sb = set(b) |
| if not sa or not sb: |
| return 0.0 |
| return len(sa & sb) / len(sa | sb) |
|
|
|
|
| def demo_plagiarism_fallback(text: str): |
| """ |
| Simple Jaccard-based fallback using a tiny built-in sample set. |
| Used when no TF-IDF / semantic corpus is available. |
| """ |
| SAMPLE_DOCS = [ |
| {"title": "AI for Social Good", |
| "text": "Artificial intelligence is transforming multiple industries by automating routine tasks and enabling data driven decision making for social impact and efficiency."}, |
| {"title": "IoT in Smart Cities", |
| "text": "The Internet of Things connects sensors, devices, and cloud platforms to enable real time monitoring and control in smart cities including lighting, traffic, and waste management."}, |
| {"title": "Climate & Renewable Energy", |
| "text": "Climate change is a critical global challenge that demands renewable energy, efficient resource management, and international cooperation to ensure a sustainable future."}, |
| ] |
|
|
| input_words = _clean_for_jaccard(text) |
| best_score = 0.0 |
| matches = [] |
| for doc in SAMPLE_DOCS: |
| doc_words = _clean_for_jaccard(doc["text"]) |
| score = _jaccard_similarity(input_words, doc_words) |
| matches.append({"title": doc["title"], "score": round(score * 100, 2)}) |
| if score > best_score: |
| best_score = score |
|
|
| matches.sort(key=lambda x: x["score"], reverse=True) |
| plagiarism_percent = round(best_score * 100, 2) |
| summary = f"Plagiarism estimate (demo Jaccard): {plagiarism_percent}%" |
| return {"plagiarism_percent": plagiarism_percent, "matches": matches[:5], "summary": summary} |
|
|
|
|
| def corpus_plagiarism_combined(text: str): |
| """ |
| Combined plagiarism score using: |
| - TF-IDF cosine similarity |
| - Semantic embedding cosine similarity (SentenceTransformers) |
| |
| Returns dict matching API schema: |
| { plagiarism_percent, matches, summary } |
| """ |
| if not corpus_texts: |
| raise ValueError("No corpus texts loaded") |
|
|
| sims_tfidf = None |
| sims_emb = None |
|
|
| words = text.split() |
| if len(words) > 3000: |
| text_proc = " ".join(words[:3000]) |
| else: |
| text_proc = text |
|
|
| |
| if vectorizer is not None and corpus_tfidf is not None: |
| q = vectorizer.transform([text_proc]) |
| sims_tfidf = cosine_similarity(q, corpus_tfidf)[0] |
|
|
| |
| if emb_model is not None and corpus_emb is not None: |
| q_emb = emb_model.encode( |
| [text_proc], |
| convert_to_numpy=True, |
| normalize_embeddings=True, |
| show_progress_bar=False, |
| )[0] |
| sims_emb = corpus_emb @ q_emb |
|
|
| if sims_tfidf is None and sims_emb is None: |
| raise ValueError("No plagiarism backends (TF-IDF / embeddings) are available") |
|
|
| n_docs = len(corpus_texts) |
| combined_rows = [] |
| alpha = PLAG_ALPHA |
|
|
| for i in range(n_docs): |
| tf = float(sims_tfidf[i]) if sims_tfidf is not None else None |
| se = float(sims_emb[i]) if sims_emb is not None else None |
| if tf is None and se is None: |
| continue |
|
|
| if tf is not None and se is not None: |
| score = alpha * tf + (1.0 - alpha) * se |
| elif tf is not None: |
| score = tf |
| else: |
| score = se |
|
|
| combined_rows.append({ |
| "index": i, |
| "combined": score, |
| "tfidf": tf, |
| "semantic": se, |
| }) |
|
|
| if not combined_rows: |
| raise ValueError("No scores computed for corpus documents") |
|
|
| combined_rows.sort(key=lambda x: x["combined"], reverse=True) |
| top = combined_rows[:10] |
|
|
| best = top[0]["combined"] |
| plagiarism_percent = round(best * 100, 2) |
|
|
| matches = [] |
| for row in top: |
| matches.append({ |
| "title": corpus_titles[row["index"]], |
| "score": round(row["combined"] * 100, 2), |
| "tfidf_score": round(row["tfidf"] * 100, 2) if row["tfidf"] is not None else None, |
| "semantic_score": round(row["semantic"] * 100, 2) if row["semantic"] is not None else None, |
| }) |
|
|
| components = [] |
| if sims_tfidf is not None: |
| components.append("TF-IDF") |
| if sims_emb is not None: |
| components.append("semantic embeddings") |
| comp_str = " + ".join(components) |
|
|
| summary = f"Plagiarism estimate (combined {comp_str}): {plagiarism_percent}%" |
| return {"plagiarism_percent": plagiarism_percent, "matches": matches, "summary": summary} |
|
|
|
|
| |
|
|
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| PDF_LOGO_PATH = os.path.join(BASE_DIR, "logo.png") |
|
|
| BRAND_NAME = "TrueWrite Scan" |
| BRAND_FONT_SIZE = 18 |
| TITLE_FONT_SIZE = 18 |
| LOGO_SIZE_MM = 15 |
|
|
|
|
| def _wrap_text(text: str, max_chars: int): |
| """ |
| Simple word-wrap: yields lines with at most max_chars characters. |
| """ |
| words = text.split() |
| line = [] |
| length = 0 |
| for w in words: |
| if length + len(w) + (1 if line else 0) > max_chars: |
| yield " ".join(line) |
| line = [w] |
| length = len(w) |
| else: |
| line.append(w) |
| length += len(w) + (1 if line else 0) |
| if line: |
| yield " ".join(line) |
|
|
|
|
| def _format_checked_on(): |
| |
| local_now = datetime.now(timezone.utc).astimezone() |
| return "Checked On: " + local_now.strftime("%b %d, %Y") |
|
|
|
|
| def _get_logo_reader(): |
| try: |
| return ImageReader(PDF_LOGO_PATH) |
| except Exception as e: |
| print(f"[PDF] Failed to load logo at {PDF_LOGO_PATH}: {e}") |
| return None |
|
|
|
|
| def _draw_header_footer(c: canvas.Canvas, page_num: int): |
| """ |
| Draws the common header + footer for all reports. |
| Returns (x_margin, content_start_y, page_width, page_height) |
| """ |
| width, height = A4 |
| x_margin = 20 * mm |
| top_y = height - 22 * mm |
| footer_y = 15 * mm |
| brand_spacing = 5 * mm |
| logo_size = LOGO_SIZE_MM |
|
|
| logo = _get_logo_reader() |
|
|
| |
| if logo is not None: |
| c.drawImage( |
| logo, |
| x_margin, |
| top_y - logo_size, |
| width=logo_size, |
| height=logo_size, |
| mask="auto", |
| ) |
|
|
| |
| c.setFont("Helvetica-Bold", BRAND_FONT_SIZE) |
| text_y = top_y - logo_size / 2 - 1 * mm |
| c.drawString(x_margin + logo_size + brand_spacing, text_y, BRAND_NAME) |
|
|
| |
| checked_date = _format_checked_on() |
| c.setFont("Helvetica", 10) |
| text_width = c.stringWidth(checked_date, "Helvetica", 10) |
| c.drawString(width - x_margin - text_width, text_y, checked_date) |
|
|
| |
| c.setLineWidth(0.5) |
| c.setStrokeColor(colors.black) |
| c.line( |
| x_margin, |
| top_y - logo_size - 5 * mm, |
| width - x_margin, |
| top_y - logo_size - 5 * mm, |
| ) |
|
|
| |
| |
| c.setLineWidth(0.35) |
| c.line(x_margin, footer_y + 8 * mm, width - x_margin, footer_y + 8 * mm) |
|
|
| |
| c.setFont("Helvetica", 9) |
| page_label = f"Page {page_num}" |
| label_width = c.stringWidth(page_label, "Helvetica", 9) |
| c.drawString((width - label_width) / 2, footer_y + 2 * mm, page_label) |
|
|
| content_start_y = top_y - logo_size - 18 * mm |
| return x_margin, content_start_y, width, height |
|
|
|
|
| |
|
|
| def generate_plagiarism_pdf(user: dict, text: str, result: dict) -> StreamingResponse: |
| """ |
| Generate plagiarism PDF in TrueWrite Scan style. |
| """ |
| buf = io.BytesIO() |
| c = canvas.Canvas(buf, pagesize=A4) |
|
|
| report_title = "Plagiarism Scan Report" |
| plagiarism_percent = float(result.get("plagiarism_percent", 0.0)) |
| unique_percent = max(0.0, round(100.0 - plagiarism_percent, 2)) |
|
|
| |
| page_num = 1 |
| x_margin, y, width, height = _draw_header_footer(c, page_num) |
|
|
| |
| c.setFont("Helvetica-Bold", TITLE_FONT_SIZE) |
| title_width = c.stringWidth(report_title, "Helvetica-Bold", TITLE_FONT_SIZE) |
| c.drawString((width - title_width) / 2, y, report_title) |
| y -= 18 * mm |
|
|
| |
| c.setFont("Helvetica-Bold", 22) |
| c.setFillColor(colors.red if plagiarism_percent >= 1.0 else colors.green) |
| c.drawString(x_margin, y, f"{plagiarism_percent:.0f}%") |
|
|
| c.setFont("Helvetica", 11) |
| c.setFillColor(colors.black) |
| c.drawString(x_margin + 22 * mm, y + 2 * mm, "Overall Similarity") |
| y -= 12 * mm |
|
|
| |
| c.setFont("Helvetica", 10) |
| c.drawString(x_margin, y, f"Estimated Unique Content: {unique_percent:.0f}%") |
| y -= 6 * mm |
| summary = result.get("summary", "") |
| if summary: |
| for line in _wrap_text("Summary: " + summary, 110): |
| c.drawString(x_margin, y, line) |
| y -= 5 * mm |
| else: |
| y -= 5 * mm |
| y -= 10 * mm |
|
|
| |
| c.setFont("Helvetica", 10) |
| truncated = text.strip() |
| if len(truncated) > 8000: |
| truncated = truncated[:8000] + "\n...\n[Content truncated for report]" |
|
|
| for line in _wrap_text(truncated, 110): |
| if y < 40 * mm: |
| c.showPage() |
| page_num += 1 |
| x_margin, y, width, height = _draw_header_footer(c, page_num) |
| c.setFont("Helvetica", 10) |
| c.drawString(x_margin, y, line) |
| y -= 5 * mm |
|
|
| |
| c.showPage() |
| page_num += 1 |
| x_margin, y, width, height = _draw_header_footer(c, page_num) |
| c.setFont("Helvetica-Bold", 12) |
| c.drawString(x_margin, y, "Matched Sources") |
| y -= 10 * mm |
| c.setFont("Helvetica", 10) |
|
|
| matches = result.get("matches", []) or [] |
| if not matches: |
| c.drawString(x_margin, y, "No specific sources recorded. Content appears mostly unique.") |
| else: |
| for idx, m in enumerate(matches[:10], start=1): |
| title = m.get("title", "Source") |
| score = m.get("score", m.get("tfidf_score", 0.0) or 0.0) |
| line = f"{idx}. {title} — {score:.2f}% match" |
| for part in _wrap_text(line, 110): |
| c.drawString(x_margin, y, part) |
| y -= 5 * mm |
| if y < 40 * mm: |
| c.showPage() |
| page_num += 1 |
| x_margin, y, width, height = _draw_header_footer(c, page_num) |
| c.setFont("Helvetica", 10) |
|
|
| c.save() |
| buf.seek(0) |
| return StreamingResponse( |
| buf, |
| media_type="application/pdf", |
| headers={"Content-Disposition": "attachment; filename=plagiarism-report.pdf"}, |
| ) |
|
|
|
|
| def generate_ai_pdf(user: dict, text: str, result: dict) -> StreamingResponse: |
| """ |
| AI content analysis PDF. |
| result from ai-check logic. |
| """ |
| buf = io.BytesIO() |
| c = canvas.Canvas(buf, pagesize=A4) |
|
|
| report_title = "AI Content Analysis Report" |
| ai_percent = float(result.get("ai_percent", 0.0)) |
| human_percent = float(result.get("human_percent", 100.0)) |
| word_count = int(result.get("word_count", 0)) |
| avg_len = float(result.get("avg_sentence_length", 0.0)) |
|
|
| |
| page_num = 1 |
| x_margin, y, width, height = _draw_header_footer(c, page_num) |
|
|
| |
| c.setFont("Helvetica-Bold", TITLE_FONT_SIZE) |
| title_width = c.stringWidth(report_title, "Helvetica-Bold", TITLE_FONT_SIZE) |
| c.drawString((width - title_width) / 2, y, report_title) |
| y -= 18 * mm |
|
|
| |
| c.setFont("Helvetica-Bold", 22) |
| if ai_percent >= 50: |
| c.setFillColor(colors.red) |
| else: |
| c.setFillColor(colors.green) |
| c.drawString(x_margin, y, f"{ai_percent:.0f}%") |
| c.setFont("Helvetica", 11) |
| c.setFillColor(colors.black) |
| c.drawString(x_margin + 22 * mm, y + 2 * mm, "Estimated AI Probability") |
| y -= 12 * mm |
|
|
| |
| c.setFont("Helvetica", 10) |
| c.drawString(x_margin, y, f"Estimated Human Probability: {human_percent:.0f}%") |
| y -= 6 * mm |
| c.drawString(x_margin, y, f"Word Count: {word_count}") |
| y -= 6 * mm |
| c.drawString(x_margin, y, f"Average Sentence Length: {avg_len:.2f} words") |
| y -= 6 * mm |
|
|
| summary = result.get("summary", "") |
| if summary: |
| for line in _wrap_text("Summary: " + summary, 110): |
| c.drawString(x_margin, y, line) |
| y -= 5 * mm |
| y -= 5 * mm |
| else: |
| y -= 10 * mm |
|
|
| |
| c.setFont("Helvetica", 10) |
| truncated = text.strip() |
| if len(truncated) > 8000: |
| truncated = truncated[:8000] + "\n...\n[Content truncated for report]" |
|
|
| for line in _wrap_text(truncated, 110): |
| if y < 40 * mm: |
| c.showPage() |
| page_num += 1 |
| x_margin, y, width, height = _draw_header_footer(c, page_num) |
| c.setFont("Helvetica", 10) |
| c.drawString(x_margin, y, line) |
| y -= 5 * mm |
|
|
| c.save() |
| buf.seek(0) |
| return StreamingResponse( |
| buf, |
| media_type="application/pdf", |
| headers={"Content-Disposition": "attachment; filename=truewrite-ai-report.pdf"}, |
| ) |
|
|
|
|
| def generate_grammar_pdf(user: dict, original_text: str, corrected_text: str, result: dict) -> StreamingResponse: |
| """ |
| Grammar correction PDF. |
| result from grammar-check logic. |
| """ |
| buf = io.BytesIO() |
| c = canvas.Canvas(buf, pagesize=A4) |
|
|
| report_title = "Grammar Correction Report" |
| corrections = int(result.get("corrections", 0)) |
| original_words = int(result.get("original_words", 0)) |
| summary = result.get("summary", "") |
|
|
| |
| page_num = 1 |
| x_margin, y, width, height = _draw_header_footer(c, page_num) |
|
|
| |
| c.setFont("Helvetica-Bold", TITLE_FONT_SIZE) |
| title_width = c.stringWidth(report_title, "Helvetica-Bold", TITLE_FONT_SIZE) |
| c.drawString((width - title_width) / 2, y, report_title) |
| y -= 18 * mm |
|
|
| |
| c.setFont("Helvetica-Bold", 22) |
| c.setFillColor(colors.blue if corrections > 0 else colors.green) |
| c.drawString(x_margin, y, f"{corrections}") |
| c.setFont("Helvetica", 11) |
| c.setFillColor(colors.black) |
| c.drawString(x_margin + 22 * mm, y + 2 * mm, "Corrections Applied") |
| y -= 12 * mm |
|
|
| c.setFont("Helvetica", 10) |
| c.drawString(x_margin, y, f"Words Analysed: {original_words}") |
| y -= 6 * mm |
|
|
| if summary: |
| for line in _wrap_text("Summary: " + summary, 110): |
| c.drawString(x_margin, y, line) |
| y -= 5 * mm |
| y -= 5 * mm |
| else: |
| y -= 10 * mm |
|
|
| |
| c.setFont("Helvetica-Bold", 11) |
| c.drawString(x_margin, y, "Original Text") |
| y -= 7 * mm |
| c.setFont("Helvetica", 10) |
|
|
| orig = original_text.strip() |
| if len(orig) > 4000: |
| orig = orig[:4000] + "\n...\n[Content truncated for report]" |
|
|
| for line in _wrap_text(orig, 110): |
| if y < 40 * mm: |
| c.showPage() |
| page_num += 1 |
| x_margin, y, width, height = _draw_header_footer(c, page_num) |
| c.setFont("Helvetica", 10) |
| c.drawString(x_margin, y, line) |
| y -= 5 * mm |
|
|
| |
| c.showPage() |
| page_num += 1 |
| x_margin, y, width, height = _draw_header_footer(c, page_num) |
|
|
| c.setFont("Helvetica-Bold", 11) |
| c.drawString(x_margin, y, "Corrected Text") |
| y -= 7 * mm |
| c.setFont("Helvetica", 10) |
|
|
| corr = corrected_text.strip() |
| if len(corr) > 4000: |
| corr = corr[:4000] + "\n...\n[Content truncated for report]" |
|
|
| for line in _wrap_text(corr, 110): |
| if y < 40 * mm: |
| c.showPage() |
| page_num += 1 |
| x_margin, y, width, height = _draw_header_footer(c, page_num) |
| c.setFont("Helvetica", 10) |
| c.drawString(x_margin, y, line) |
| y -= 5 * mm |
|
|
| c.save() |
| buf.seek(0) |
| return StreamingResponse( |
| buf, |
| media_type="application/pdf", |
| headers={"Content-Disposition": "attachment; filename=truewrite-grammar-report.pdf"}, |
| ) |
|
|
|
|
| |
|
|
| @app.post("/api/signup") |
| def signup(req: SignupRequest): |
| cur.execute("SELECT id FROM users WHERE email = ?", (req.email,)) |
| if cur.fetchone(): |
| raise HTTPException(status_code=400, detail="Email already registered") |
|
|
| pw_hash = hash_password(req.password) |
| created_at = now_iso() |
| cur.execute( |
| "INSERT INTO users (name, email, password_hash, created_at) VALUES (?, ?, ?, ?)", |
| (req.name, req.email, pw_hash, created_at), |
| ) |
| conn.commit() |
| user_id = cur.lastrowid |
| token = create_token(user_id, req.email) |
|
|
| return { |
| "message": "Signup successful", |
| "token": token, |
| "name": req.name, |
| "email": req.email, |
| } |
|
|
|
|
| @app.post("/api/login") |
| def login(req: LoginRequest): |
| cur.execute("SELECT * FROM users WHERE email = ?", (req.email,)) |
| row = cur.fetchone() |
| if not row or not verify_password(req.password, row["password_hash"]): |
| raise HTTPException(status_code=401, detail="Invalid email or password") |
|
|
| token = create_token(row["id"], row["email"]) |
| return { |
| "message": "Login successful", |
| "token": token, |
| "name": row["name"], |
| "email": row["email"], |
| } |
|
|
|
|
| @app.post("/api/grammar-check") |
| def api_grammar_check(req: TextRequest, user=Depends(get_current_user)): |
| text = req.text or "" |
| if not text.strip(): |
| raise HTTPException(status_code=400, detail="Text is required") |
|
|
| |
| if GEC_MODEL is not None: |
| corrected, corrections, original_words = gector_correct(text) |
| summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}" |
| elif lt_tool is not None: |
| corrected, corrections, original_words = grammar_with_languagetool(text) |
| summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}" |
| else: |
| corrected, corrections, original_words = simple_grammar_correct(text) |
| summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}" |
|
|
| save_history(user["id"], "grammar", text, summary) |
|
|
| return { |
| "original_words": original_words, |
| "corrections": corrections, |
| "corrected_text": corrected, |
| "summary": summary, |
| } |
|
|
|
|
| @app.post("/api/grammar-check-file") |
| def api_grammar_check_file(file: UploadFile = File(...), user=Depends(get_current_user)): |
| text = extract_text_from_upload(file).strip() |
| if not text: |
| raise HTTPException(status_code=400, detail="Uploaded file contains no text") |
|
|
| if GEC_MODEL is not None: |
| corrected, corrections, original_words = gector_correct(text) |
| summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}" |
| elif lt_tool is not None: |
| corrected, corrections, original_words = grammar_with_languagetool(text) |
| summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}" |
| else: |
| parts = text.strip().split() |
| if len(parts) > 1000: |
| text = " ".join(parts[:1000]) |
| corrected, corrections, original_words = simple_grammar_correct(text) |
| summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}" |
|
|
| save_history(user["id"], "grammar", text, summary) |
|
|
| return { |
| "original_words": original_words, |
| "corrections": corrections, |
| "corrected_text": corrected, |
| "summary": summary, |
| } |
|
|
|
|
| |
| @app.post("/api/plagiarism-check") |
| def api_plagiarism_check(req: TextRequest, user=Depends(get_current_user)): |
| text = req.text or "" |
| if not text.strip(): |
| raise HTTPException(status_code=400, detail="Text is required") |
|
|
| |
| try: |
| result = corpus_plagiarism_combined(text) |
| save_history(user["id"], "plagiarism", text, result["summary"]) |
| return result |
| except Exception as e: |
| print("[Plagiarism] Combined corpus engine failed, falling back to demo:", e) |
|
|
| |
| result = demo_plagiarism_fallback(text) |
| save_history(user["id"], "plagiarism", text, result["summary"]) |
| return result |
|
|
|
|
| @app.post("/api/plagiarism-check-file") |
| def api_plagiarism_check_file(file: UploadFile = File(...), user=Depends(get_current_user)): |
| text = extract_text_from_upload(file).strip() |
| if not text: |
| raise HTTPException(status_code=400, detail="Uploaded file contains no text") |
|
|
| try: |
| result = corpus_plagiarism_combined(text) |
| save_history(user["id"], "plagiarism", text, result["summary"]) |
| return result |
| except Exception as e: |
| print("[Plagiarism-file] Combined corpus engine failed, falling back to demo:", e) |
|
|
| |
| result = demo_plagiarism_fallback(text) |
| save_history(user["id"], "plagiarism", text, result["summary"]) |
| return result |
|
|
|
|
| |
| def heuristic_ai_score(text: str): |
| words = re.sub(r"[^a-z0-9\s]", " ", text.lower()).split() |
| word_count = len(words) |
| unique_ratio = len(set(words)) / (word_count or 1) |
| sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()] |
| avg_sentence_length = word_count / (len(sentences) or 1) |
|
|
| ai_score = 0 |
| if unique_ratio < 0.45: |
| ai_score += 40 |
| elif unique_ratio < 0.6: |
| ai_score += 20 |
|
|
| if avg_sentence_length > 25: |
| ai_score += 40 |
| elif avg_sentence_length > 18: |
| ai_score += 25 |
|
|
| if word_count > 400: |
| ai_score += 10 |
|
|
| ai_score = min(100, round(ai_score)) |
| human_score = 100 - ai_score |
| return ai_score, human_score, word_count, avg_sentence_length, unique_ratio |
|
|
|
|
| @app.post("/api/ai-check") |
| def api_ai_check(req: TextRequest, user=Depends(get_current_user)): |
| text = (req.text or "").strip() |
| if not text: |
| raise HTTPException(status_code=400, detail="Text is required") |
|
|
| if model is not None and tokenizer is not None: |
| try: |
| max_len = getattr(tokenizer, "model_max_length", 512) |
| if max_len is None or max_len > 1024: |
| max_len = 512 |
|
|
| words = text.split() |
| chunk_size = min(400, max_len - 10) |
| chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] |
| probs = [] |
| for chunk in chunks: |
| inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=max_len) |
| inputs = {k: v.to(device) for k, v in inputs.items()} |
| with torch.no_grad(): |
| outputs = model(**inputs) |
| logits = outputs.logits |
| p = torch.softmax(logits, dim=1).cpu().numpy()[0] |
| ai_prob = float(p[1]) if p.shape[0] > 1 else float(p[0]) |
| probs.append(ai_prob) |
| avg_ai_prob = float(np.mean(probs)) if probs else 0.0 |
| ai_percent = round(avg_ai_prob * 100, 2) |
| human_percent = round(100 - ai_percent, 2) |
| words_count = len(words) |
| sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()] |
| avg_sentence_len = round(words_count / (len(sentences) or 1), 2) |
| summary = f"Model: {AI_DETECTOR_MODEL}; AI probability: {ai_percent}%" |
| save_history(user["id"], "ai", text, summary) |
| return { |
| "ai_percent": ai_percent, |
| "human_percent": human_percent, |
| "word_count": words_count, |
| "avg_sentence_length": avg_sentence_len, |
| "summary": summary, |
| } |
| except Exception as e: |
| print("[AI-check] model inference failed:", e) |
|
|
| ai_percent, human_percent, wc, avg_len, uniq = heuristic_ai_score(text) |
| summary = f"HEURISTIC fallback — AI probability: {ai_percent}%" |
| save_history(user["id"], "ai", text, summary) |
| return { |
| "ai_percent": ai_percent, |
| "human_percent": human_percent, |
| "word_count": wc, |
| "avg_sentence_length": avg_len, |
| "unique_ratio": round(uniq, 3), |
| "summary": summary, |
| } |
|
|
|
|
| @app.post("/api/ai-check-file") |
| def api_ai_check_file(file: UploadFile = File(...), user=Depends(get_current_user)): |
| text = extract_text_from_upload(file).strip() |
| if not text: |
| raise HTTPException(status_code=400, detail="Uploaded file contains no text") |
| return api_ai_check.__wrapped__(TextRequest(text=text), user) |
|
|
|
|
| |
|
|
| @app.post("/api/plagiarism-report") |
| def api_plagiarism_report(req: TextRequest, user=Depends(get_current_user)): |
| """ |
| Generate a PDF plagiarism report in the TrueWrite Scan style. |
| """ |
| text = (req.text or "").strip() |
| if not text: |
| raise HTTPException(status_code=400, detail="Text is required") |
|
|
| try: |
| result = corpus_plagiarism_combined(text) |
| except Exception as e: |
| print("[Plagiarism-Report] Combined engine failed, falling back:", e) |
| result = demo_plagiarism_fallback(text) |
|
|
| save_history(user["id"], "plagiarism_report", text, result.get("summary", "")) |
|
|
| user_info = { |
| "name": user.get("name"), |
| "email": user.get("email"), |
| } |
| return generate_plagiarism_pdf(user_info, text, result) |
|
|
|
|
| @app.post("/api/ai-report") |
| def api_ai_report(req: TextRequest, user=Depends(get_current_user)): |
| """ |
| Generate a PDF AI analysis report in the TrueWrite Scan style. |
| """ |
| text = (req.text or "").strip() |
| if not text: |
| raise HTTPException(status_code=400, detail="Text is required") |
|
|
| result = None |
| if model is not None and tokenizer is not None: |
| try: |
| max_len = getattr(tokenizer, "model_max_length", 512) |
| if max_len is None or max_len > 1024: |
| max_len = 512 |
|
|
| words = text.split() |
| chunk_size = min(400, max_len - 10) |
| chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] |
| probs = [] |
| for chunk in chunks: |
| inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=max_len) |
| inputs = {k: v.to(device) for k, v in inputs.items()} |
| with torch.no_grad(): |
| outputs = model(**inputs) |
| logits = outputs.logits |
| p = torch.softmax(logits, dim=1).cpu().numpy()[0] |
| ai_prob = float(p[1]) if p.shape[0] > 1 else float(p[0]) |
| probs.append(ai_prob) |
| avg_ai_prob = float(np.mean(probs)) if probs else 0.0 |
| ai_percent = round(avg_ai_prob * 100, 2) |
| human_percent = round(100 - ai_percent, 2) |
| words_count = len(words) |
| sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()] |
| avg_sentence_len = round(words_count / (len(sentences) or 1), 2) |
| summary = f"Model: {AI_DETECTOR_MODEL}; AI probability: {ai_percent}%" |
| result = { |
| "ai_percent": ai_percent, |
| "human_percent": human_percent, |
| "word_count": words_count, |
| "avg_sentence_length": avg_sentence_len, |
| "summary": summary, |
| } |
| except Exception as e: |
| print("[AI-report] model inference failed:", e) |
|
|
| if result is None: |
| ai_percent, human_percent, wc, avg_len, uniq = heuristic_ai_score(text) |
| summary = f"HEURISTIC fallback — AI probability: {ai_percent}%" |
| result = { |
| "ai_percent": ai_percent, |
| "human_percent": human_percent, |
| "word_count": wc, |
| "avg_sentence_length": avg_len, |
| "unique_ratio": round(uniq, 3), |
| "summary": summary, |
| } |
|
|
| save_history(user["id"], "ai_report", text, result.get("summary", "")) |
|
|
| user_info = { |
| "name": user.get("name"), |
| "email": user.get("email"), |
| } |
| return generate_ai_pdf(user_info, text, result) |
|
|
|
|
| @app.post("/api/grammar-report") |
| def api_grammar_report(req: TextRequest, user=Depends(get_current_user)): |
| """ |
| Generate a PDF grammar correction report in the TrueWrite Scan style. |
| """ |
| text = (req.text or "").strip() |
| if not text: |
| raise HTTPException(status_code=400, detail="Text is required") |
|
|
| if GEC_MODEL is not None: |
| corrected, corrections, original_words = gector_correct(text) |
| summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}" |
| elif lt_tool is not None: |
| corrected, corrections, original_words = grammar_with_languagetool(text) |
| summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}" |
| else: |
| corrected, corrections, original_words = simple_grammar_correct(text) |
| summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}" |
|
|
| result = { |
| "original_words": original_words, |
| "corrections": corrections, |
| "summary": summary, |
| } |
|
|
| save_history(user["id"], "grammar_report", text, summary) |
|
|
| user_info = { |
| "name": user.get("name"), |
| "email": user.get("email"), |
| } |
| return generate_grammar_pdf(user_info, text, corrected, result) |
|
|
|
|
| |
| @app.get("/api/history") |
| def api_history(user=Depends(get_current_user)): |
| cur.execute( |
| "SELECT id, tool, input_text, result_summary, created_at " |
| "FROM history WHERE user_id = ? " |
| "ORDER BY created_at DESC LIMIT 50", |
| (user["id"],), |
| ) |
| rows = cur.fetchall() |
| items = [] |
| for r in rows: |
| items.append( |
| { |
| "id": r["id"], |
| "tool": r["tool"], |
| "input_text": r["input_text"], |
| "summary": r["result_summary"], |
| "created_at": r["created_at"], |
| } |
| ) |
| return {"items": items} |
|
|
|
|
| @app.get("/") |
| def read_root(): |
| return {"status": "Backend is running with GECToR + 16GB RAM + PDF reports!"} |
|
|