|
|
from fastapi import APIRouter, UploadFile, File, HTTPException, Depends |
|
|
from typing import List, Tuple, Set |
|
|
from datetime import datetime |
|
|
from fastapi.security import OAuth2PasswordBearer |
|
|
from jose import JWTError, jwt |
|
|
from motor.motor_asyncio import AsyncIOMotorClient |
|
|
|
|
|
from app.schemas.teacher_schemas import ( |
|
|
DocumentInfo, OverlapDetail, ComparisonDetail, |
|
|
InternalReportDetail, InternalReportSummary |
|
|
) |
|
|
from app.utils.file_utils import extract_text_from_file, allowed_file |
|
|
from app.utils.lexical_utils import ( |
|
|
find_partial_phrase_match_for_internal, |
|
|
get_meaningful_sentences, |
|
|
find_exact_matches, |
|
|
find_partial_phrase_match, |
|
|
) |
|
|
from app.config import MONGODB_URI,ALGORITHM, SECRET_KEY |
|
|
|
|
|
router = APIRouter(prefix="/teacher", tags=["teacher-internal"]) |
|
|
|
|
|
LEXICAL_PAIR_THRESHOLD = 0.50 |
|
|
OVERLAP_MIN_TOKENS = 12 |
|
|
|
|
|
|
|
|
HIGH_SIMILARITY_THRESHOLD = 0.85 |
|
|
MEDIUM_SIMILARITY_THRESHOLD = 0.70 |
|
|
LOW_SIMILARITY_THRESHOLD = 0.50 |
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") |
|
|
|
|
|
|
|
|
def verify_token(token: str = Depends(oauth2_scheme)): |
|
|
try: |
|
|
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
|
|
except JWTError: |
|
|
raise HTTPException(status_code=401, detail="Invalid or expired token") |
|
|
|
|
|
|
|
|
async def get_mongo_client(): |
|
|
return AsyncIOMotorClient(MONGODB_URI) |
|
|
|
|
|
|
|
|
def _percent(x: float) -> float: |
|
|
return round(float(x) * 100.0, 1) |
|
|
|
|
|
|
|
|
def _ordered_pair_key(i: int, j: int) -> str: |
|
|
a, b = (i, j) if i < j else (j, i) |
|
|
return f"{a}-{b}" |
|
|
|
|
|
|
|
|
def _aggregate_pair_score(overlaps: List[OverlapDetail]) -> float: |
|
|
return max((o.similarity for o in overlaps), default=0.0) |
|
|
|
|
|
|
|
|
def _create_overlap_key(name_a: str, name_b: str, text: str, similarity: float, context: str) -> str: |
|
|
"""Create unique key for overlap deduplication - includes context to distinguish different match types""" |
|
|
|
|
|
text_normalized = ' '.join(text.split()) |
|
|
return f"{name_a}|{name_b}|{text_normalized}|{similarity}|{context}" |
|
|
|
|
|
|
|
|
def _extract_matched_text_from_sentence(sent_b: str, phrase: str) -> str: |
|
|
"""Extract the actual text from sent_b that matches the phrase""" |
|
|
if not sent_b or not phrase: |
|
|
return phrase |
|
|
|
|
|
|
|
|
phrase_normalized = ' '.join(phrase.split()).lower() |
|
|
sent_normalized = ' '.join(sent_b.split()).lower() |
|
|
sent_b_normalized = ' '.join(sent_b.split()) |
|
|
|
|
|
|
|
|
if phrase_normalized in sent_normalized: |
|
|
start_idx = sent_normalized.find(phrase_normalized) |
|
|
end_idx = start_idx + len(phrase_normalized) |
|
|
return sent_b_normalized[start_idx:end_idx].strip() |
|
|
|
|
|
|
|
|
|
|
|
phrase_words = phrase_normalized.split() |
|
|
sent_words = sent_normalized.split() |
|
|
|
|
|
|
|
|
for i in range(len(sent_words) - len(phrase_words) + 1): |
|
|
if sent_words[i:i+len(phrase_words)] == phrase_words: |
|
|
return ' '.join(sent_b_normalized.split()[i:i+len(phrase_words)]) |
|
|
|
|
|
|
|
|
return phrase |
|
|
|
|
|
|
|
|
def _find_overlaps_for_pair( |
|
|
name_a: str, sents_a: List[str], |
|
|
name_b: str, sents_b: List[str], |
|
|
seen_overlaps: Set[str] |
|
|
) -> List[OverlapDetail]: |
|
|
"""Find all overlaps between two document's sentences""" |
|
|
overlaps: List[OverlapDetail] = [] |
|
|
|
|
|
for sent_a in sents_a: |
|
|
|
|
|
for sent_b in sents_b: |
|
|
exact_score = find_exact_matches(sent_a, sent_b) |
|
|
if exact_score is not None: |
|
|
sim_pct = _percent(exact_score) |
|
|
if sim_pct >= LEXICAL_PAIR_THRESHOLD * 100: |
|
|
context = "Exact/near-exact sentence overlap" |
|
|
overlap_key = _create_overlap_key(name_a, name_b, sent_a, sim_pct, context) |
|
|
if overlap_key not in seen_overlaps: |
|
|
seen_overlaps.add(overlap_key) |
|
|
overlaps.append(OverlapDetail( |
|
|
fromDoc=name_a, |
|
|
toDoc=name_b, |
|
|
text=sent_a, |
|
|
similarity=sim_pct, |
|
|
sectionA=sent_a, |
|
|
sectionB=sent_b, |
|
|
context=context, |
|
|
)) |
|
|
|
|
|
|
|
|
best_partial = None |
|
|
best_score = 0.0 |
|
|
best_sent_b = None |
|
|
|
|
|
for sent_b in sents_b: |
|
|
partial_result = find_partial_phrase_match_for_internal(sent_a, sent_b) |
|
|
if partial_result: |
|
|
phrase, score = partial_result |
|
|
print(f"DEBUG: Partial match - phrase: {phrase[:80]}, score: {score}") |
|
|
if score > best_score: |
|
|
best_score = score |
|
|
best_partial = phrase |
|
|
best_sent_b = sent_b |
|
|
|
|
|
|
|
|
if best_partial and best_sent_b and len(best_partial.split()) >= OVERLAP_MIN_TOKENS: |
|
|
sim_pct = _percent(best_score) |
|
|
if sim_pct >= LEXICAL_PAIR_THRESHOLD * 100: |
|
|
context = "High-overlap phrase (shingle/containment)" |
|
|
overlap_key = _create_overlap_key(name_a, name_b, best_partial, sim_pct, context) |
|
|
if overlap_key not in seen_overlaps: |
|
|
seen_overlaps.add(overlap_key) |
|
|
overlaps.append(OverlapDetail( |
|
|
fromDoc=name_a, |
|
|
toDoc=name_b, |
|
|
text=best_partial, |
|
|
similarity=sim_pct, |
|
|
sectionA=sent_a, |
|
|
sectionB=best_sent_b, |
|
|
context=context, |
|
|
)) |
|
|
|
|
|
return overlaps |
|
|
|
|
|
@router.post("/internal-analysis", response_model=InternalReportDetail) |
|
|
async def internal_analysis( |
|
|
files: List[UploadFile] = File(...), |
|
|
token_payload: dict = Depends(verify_token), |
|
|
mongo: AsyncIOMotorClient = Depends(get_mongo_client), |
|
|
): |
|
|
if len(files) < 2: |
|
|
raise HTTPException(status_code=400, detail="Upload at least 2 files") |
|
|
|
|
|
t0 = datetime.utcnow() |
|
|
|
|
|
|
|
|
docs: List[Tuple[str, List[str]]] = [] |
|
|
doc_infos: List[DocumentInfo] = [] |
|
|
doc_texts = {} |
|
|
|
|
|
for idx, f in enumerate(files, start=1): |
|
|
if not allowed_file(f.filename): |
|
|
raise HTTPException(status_code=400, detail=f"Invalid file type: {f.filename}") |
|
|
raw = await f.read() |
|
|
text = extract_text_from_file(raw, f.filename) or "" |
|
|
sents = get_meaningful_sentences(text) |
|
|
doc_infos.append(DocumentInfo(id=idx, name=f.filename, author=None)) |
|
|
docs.append((f.filename, sents)) |
|
|
doc_texts[f.filename] = text |
|
|
|
|
|
|
|
|
comparisons: List[ComparisonDetail] = [] |
|
|
seen_overlaps: Set[str] = set() |
|
|
|
|
|
for i in range(len(docs)): |
|
|
for j in range(i + 1, len(docs)): |
|
|
name_a, sents_a = docs[i] |
|
|
name_b, sents_b = docs[j] |
|
|
|
|
|
|
|
|
overlaps = _find_overlaps_for_pair( |
|
|
name_a, sents_a, |
|
|
name_b, sents_b, |
|
|
seen_overlaps |
|
|
) |
|
|
|
|
|
|
|
|
pair_score = _aggregate_pair_score(overlaps) |
|
|
flagged = pair_score >= LEXICAL_PAIR_THRESHOLD * 100 |
|
|
|
|
|
comp = ComparisonDetail( |
|
|
id=_ordered_pair_key(i + 1, j + 1), |
|
|
docA=name_a, |
|
|
docB=name_b, |
|
|
similarity=round(pair_score, 1), |
|
|
flagged=flagged, |
|
|
overlaps=overlaps, |
|
|
contentA=doc_texts[name_a], |
|
|
contentB=doc_texts[name_b], |
|
|
) |
|
|
if flagged: |
|
|
comparisons.append(comp) |
|
|
|
|
|
|
|
|
doc_results = [] |
|
|
total_matches = 0 |
|
|
flagged_count = 0 |
|
|
|
|
|
for d_idx, d in enumerate(doc_infos, start=1): |
|
|
name = d.name |
|
|
word_count = len(doc_texts[name].split()) |
|
|
matches = [o for c in comparisons for o in c.overlaps if o.fromDoc == name or o.toDoc == name] |
|
|
highest_similarity = max((o.similarity for o in matches), default=0.0) |
|
|
flagged = highest_similarity >= LEXICAL_PAIR_THRESHOLD * 100 |
|
|
if flagged: |
|
|
flagged_count += 1 |
|
|
total_matches += len(matches) |
|
|
|
|
|
doc_results.append({ |
|
|
"id": d.id, |
|
|
"name": d.name, |
|
|
"similarity": round(highest_similarity, 1), |
|
|
"flagged": flagged, |
|
|
"wordCount": word_count, |
|
|
"matchCount": len(matches), |
|
|
"matches": matches |
|
|
}) |
|
|
|
|
|
highest_any = max(d['similarity'] for d in doc_results) if doc_results else 0.0 |
|
|
avg_similarity = round(sum(d['similarity'] for d in doc_results) / len(doc_results), 1) if doc_results else 0.0 |
|
|
elapsed = (datetime.utcnow() - t0).total_seconds() |
|
|
processing = f"{int(elapsed // 60)}m {int(elapsed % 60):02d}s" |
|
|
|
|
|
report = InternalReportDetail( |
|
|
id="internal_report", |
|
|
name="Internal Plagiarism Check", |
|
|
uploadDate=datetime.utcnow(), |
|
|
processingTime=processing, |
|
|
documents=doc_infos, |
|
|
comparisons=comparisons, |
|
|
summary=InternalReportSummary( |
|
|
totalDocuments=len(doc_results), |
|
|
totalComparisons=(len(docs) * (len(docs) - 1)) // 2, |
|
|
flaggedComparisons=flagged_count, |
|
|
highestSimilarity=round(highest_any, 1), |
|
|
averageSimilarity=avg_similarity, |
|
|
), |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
db = mongo.sluethink |
|
|
reports_collection = db.reports |
|
|
|
|
|
all_sources = set() |
|
|
for comp in comparisons: |
|
|
for o in comp.overlaps: |
|
|
all_sources.add(o.toDoc) |
|
|
|
|
|
report_doc = { |
|
|
"name": f"Internal_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}", |
|
|
"analysisType": "internal", |
|
|
"submittedBy": token_payload.get("name", "System"), |
|
|
"uploadDate": datetime.utcnow().strftime("%Y-%m-%d"), |
|
|
"similarity": highest_any, |
|
|
"status": "completed", |
|
|
"flagged": flagged_count > 0, |
|
|
"fileCount": len(doc_results), |
|
|
"processingTime": processing, |
|
|
"avgSimilarity": avg_similarity, |
|
|
"totalMatches": total_matches, |
|
|
"sources": list(all_sources), |
|
|
"createdAt": datetime.utcnow(), |
|
|
"userId": token_payload.get("sub") or token_payload.get("user_id"), |
|
|
"documents": [ |
|
|
{ |
|
|
"id": d['id'], |
|
|
"name": d['name'], |
|
|
"similarity": d['similarity'], |
|
|
"flagged": d['flagged'], |
|
|
"wordCount": d['wordCount'], |
|
|
"matchCount": d['matchCount'], |
|
|
"matches": [ |
|
|
{ |
|
|
"matched_text": m.text, |
|
|
"similarity": m.similarity, |
|
|
"source_url": m.toDoc, |
|
|
"source_title": m.toDoc, |
|
|
"source_type": "internal", |
|
|
} for m in d['matches'] |
|
|
] |
|
|
} for d in doc_results |
|
|
], |
|
|
"summary": { |
|
|
"totalDocuments": len(doc_results), |
|
|
"flaggedDocuments": flagged_count, |
|
|
"highestSimilarity": highest_any, |
|
|
"averageSimilarity": avg_similarity, |
|
|
"totalMatches": total_matches, |
|
|
} |
|
|
} |
|
|
|
|
|
insert_result = await reports_collection.insert_one(report_doc) |
|
|
print(f"๐พ Report saved to MongoDB with ID: {insert_result.inserted_id}") |
|
|
report.id = str(insert_result.inserted_id) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Error saving to MongoDB: {str(e)}") |
|
|
|
|
|
return report |