SciPeerAI-API / src /scipeerai /api /routes.py
Abu-Sameer-66
deploy: SciPeerAI v2.3.0 β€” 24 modules, 209 tests, Phase 6 complete
a53c25d
# import os
# import tempfile
# from fastapi import APIRouter, HTTPException, UploadFile, File
# from pydantic import BaseModel, Field
# from src.scipeerai.modules.reproducibility_scanner import ReproducibilityScanner
# from src.scipeerai.modules.stat_audit import StatAuditEngine
# from src.scipeerai.modules.figure_forensics import FigureForensicsEngine
# from src.scipeerai.modules.methodology_checker import MethodologyChecker
# from src.scipeerai.modules.citation_analyzer import CitationAnalyzer
# from src.scipeerai.modules.novelty_scorer import NoveltyScorer
# from src.scipeerai.modules.grim_test import GrimTest
# from src.scipeerai.modules.sprite_test import SpriteTest
# from src.scipeerai.modules.granularity_analyzer import GranularityAnalyzer
# from src.scipeerai.modules.pcurve_analyzer import PCurveAnalyzer
# from src.scipeerai.modules.effect_size_validator import EffectSizeValidator
# from src.scipeerai.modules.retraction_checker import RetractionChecker
# from src.scipeerai.modules.citation_cartel import CitationCartelDetector
# from src.scipeerai.modules.llm_detector import LLMDetector
# from src.scipeerai.core.pdf_parser import PDFParser
# from src.scipeerai.modules.fraud_fingerprint import FraudFingerprinter
# from src.scipeerai.modules.temporal_anomaly import TemporalAnomalyDetector
# from src.scipeerai.modules.citation_dna import CitationDNAAnalyzer
# from src.scipeerai.modules.data_fingerprint import DataFingerprintAnalyzer
# from src.scipeerai.modules.peer_review_score import PeerReviewScorer
# from src.scipeerai.modules.ai_human_spectrum import AIHumanSpectrumAnalyzer
# router = APIRouter(prefix="/api/v1", tags=["Analysis"])
# # ── Section-aware text extraction β€” replaces flat truncation ──────────────────
# _SECTION_MARKERS = [
# "abstract", "introduction", "background", "related work",
# "methods", "methodology", "materials and methods",
# "experimental", "experiments", "procedures",
# "results", "findings", "data analysis",
# "discussion", "conclusion", "conclusions",
# "references", "bibliography", "acknowledgments",
# "supplementary", "appendix",
# ]
# _MODULE_SECTIONS = {
# "statistics": ["abstract", "results", "findings",
# "data analysis", "methods", "methodology"],
# "methodology": ["abstract", "introduction", "methods",
# "methodology", "materials and methods",
# "experimental", "conclusion", "conclusions"],
# "citations": ["introduction", "background",
# "related work", "references", "bibliography"],
# "reproducibility": ["methods", "methodology",
# "materials and methods", "experimental",
# "procedures", "acknowledgments"],
# "novelty": ["abstract", "introduction",
# "background", "related work"],
# "grim": ["results", "findings", "methods",
# "methodology", "data analysis"],
# "sprite": ["results", "findings", "methods",
# "methodology", "data analysis"],
# "granularity": ["results", "findings",
# "methods", "data analysis"],
# "pcurve": ["abstract", "results",
# "findings", "data analysis"],
# "effect_size": ["results", "findings",
# "methods", "discussion"],
# "retraction": ["references", "bibliography", "introduction"],
# "cartel": ["references", "bibliography",
# "introduction", "acknowledgments"],
# "llm": ["abstract", "introduction", "methods",
# "results", "discussion"],
# }
# def _extract_sections(text: str) -> dict:
# """
# Split plain academic text into named sections.
# Looks for short lines matching known heading names.
# Returns dict of section_name -> section_text.
# """
# text_lower = text.lower()
# positions = []
# for marker in _SECTION_MARKERS:
# search_from = 0
# while True:
# idx = text_lower.find(marker, search_from)
# if idx == -1:
# break
# line_start = text.rfind('\n', 0, idx) + 1
# line_end = text.find('\n', idx)
# if line_end == -1:
# line_end = len(text)
# line_content = text[line_start:line_end].strip()
# if len(line_content) <= 60:
# positions.append((idx, marker))
# break
# search_from = idx + 1
# if not positions:
# return {}
# positions.sort(key=lambda x: x[0])
# deduped = [positions[0]]
# for pos in positions[1:]:
# if pos[0] - deduped[-1][0] > 50:
# deduped.append(pos)
# sections = {}
# for i, (start, name) in enumerate(deduped):
# end = deduped[i + 1][0] if i + 1 < len(deduped) else len(text)
# sections[name] = text[start:end].strip()
# return sections
# def _smart_text(text: str, module: str,
# per_section_limit: int = 2500) -> str:
# """
# Route paper text to the sections each module actually needs.
# Statistics module needs Results + Methods.
# Citations module needs References + Introduction.
# LLM detector needs the whole paper spread evenly.
# ...and so on.
# Falls back to flat truncation when no section headers found.
# """
# sections = _extract_sections(text)
# target_keys = _MODULE_SECTIONS.get(module, [])
# if sections and target_keys:
# parts = []
# for key in target_keys:
# if key in sections:
# parts.append(sections[key][:per_section_limit])
# if parts:
# return "\n\n".join(parts)[:12000]
# return _truncate(text)
# def _truncate(text: str, limit: int = 8000) -> str:
# """
# Fallback flat truncation.
# Used when paper has no recognisable section headers.
# Tries to keep Abstract + Methods at minimum.
# """
# if len(text) <= limit:
# return text
# lower = text.lower()
# methods_idx = lower.find('method')
# if 0 < methods_idx < len(text) - 1000:
# start = text[:3000]
# middle = text[methods_idx:methods_idx + 4000]
# return start + " [...] " + middle
# return text[:limit]
# # ── Engine initialization ─────────────────────────────────────────────────────
# _stat_engine = StatAuditEngine()
# _figure_engine = FigureForensicsEngine()
# _method_engine = MethodologyChecker()
# _citation_engine = CitationAnalyzer()
# _repro_engine = ReproducibilityScanner()
# _novelty_engine = NoveltyScorer()
# _grim_engine = GrimTest()
# _sprite_engine = SpriteTest()
# _granularity_engine = GranularityAnalyzer()
# _pcurve_engine = PCurveAnalyzer()
# _effect_size_engine = EffectSizeValidator()
# _retraction_engine = RetractionChecker()
# _cartel_engine = CitationCartelDetector()
# _llm_engine = LLMDetector()
# _pdf_parser = PDFParser()
# _fraud_engine = FraudFingerprinter()
# _temporal_engine = TemporalAnomalyDetector()
# _dna_engine = CitationDNAAnalyzer()
# _dataprint_engine = DataFingerprintAnalyzer()
# _peerreview_engine = PeerReviewScorer()
# _spectrum_engine = AIHumanSpectrumAnalyzer()
# # ── Request / Response Models ─────────────────────────────────────────────────
# class TextAnalysisRequest(BaseModel):
# text: str = Field(..., min_length=50, description="Paper text to analyze")
# class FlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class ForensicFlagResponse(BaseModel):
# figures_involved: list
# class StatAuditResponse(BaseModel):
# risk_level: str
# risk_score: float
# summary: str
# flags: list[FlagResponse]
# p_values_found: list[float]
# sample_sizes_found: list[int]
# flags_count: int
# class FigureForensicsResponse(BaseModel):
# figures_found: int
# flags: list[ForensicFlagResponse]
# duplicate_pairs: list
# class MethodologyRequest(BaseModel):
# text: str = Field(..., min_length=50)
# abstract: str = Field("")
# class MethodologyFlagResponse(BaseModel):
# claim: str
# issue: str
# suggestion: str
# class MethodologyResponse(BaseModel):
# flags: list[MethodologyFlagResponse]
# claims_found: list[str]
# methods_found: list[str]
# llm_assessment: str
# llm_available: bool
# class CitationRequest(BaseModel):
# text: str = Field(..., min_length=50)
# author_name: str = Field("")
# class CitationFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class CitationResponse(BaseModel):
# total_citations: int
# self_citations: int
# self_citation_ratio: float
# unsupported_claims: int
# flags: list[CitationFlagResponse]
# risk_level: str
# risk_score: float
# summary: str
# flags_count: int
# class ReproducibilityRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class ReproducibilityFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class ReproducibilityResponse(BaseModel):
# has_code_link: bool
# has_data_link: bool
# has_software_versions: bool
# has_preregistration: bool
# has_ethics_statement: bool
# reproducibility_score: float
# risk_level: str
# summary: str
# flags: list[ReproducibilityFlagResponse]
# flags_count: int
# class NoveltyRequest(BaseModel):
# text: str = Field(..., min_length=50)
# title: str = Field("")
# class NoveltyFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class RelatedWorkResponse(BaseModel):
# title: str
# year: int
# authors: list
# similarity_signal: str
# class NoveltyResponse(BaseModel):
# novelty_score: float
# novelty_level: str
# risk_level: str
# risk_score: float
# summary: str
# flags: list[NoveltyFlagResponse]
# related_works_found: list[RelatedWorkResponse]
# key_terms_extracted: list[str]
# literature_accessible: bool
# flags_count: int
# class GrimRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class GrimFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class GrimResponse(BaseModel):
# impossible_means: list
# possible_means: list
# grim_score: float
# risk_level: str
# summary: str
# flags: list[GrimFlagResponse]
# flags_count: int
# class SpriteRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class SpriteFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class SpriteResponse(BaseModel):
# impossible_combinations: list
# possible_combinations: list
# sprite_score: float
# risk_level: str
# summary: str
# flags: list[SpriteFlagResponse]
# flags_count: int
# class GranularityRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class GranularityFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class GranularityResponse(BaseModel):
# digit_preference_score: float
# benford_score: float
# round_number_ratio: float
# granularity_score: float
# risk_level: str
# summary: str
# flags: list[GranularityFlagResponse]
# flags_count: int
# class PCurveRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class PCurveFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class PCurveResponse(BaseModel):
# p_values_found: list
# significant_p: list
# right_skew_ratio: float
# clustering_score: float
# pcurve_score: float
# risk_level: str
# summary: str
# flags: list[PCurveFlagResponse]
# flags_count: int
# class EffectSizeRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class EffectSizeFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class EffectSizeResponse(BaseModel):
# effect_sizes_found: list
# power_estimates: list
# inflated_effects: list
# underpowered: list
# effect_score: float
# risk_level: str
# summary: str
# flags: list[EffectSizeFlagResponse]
# flags_count: int
# class RetractionRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class RetractionFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class RetractionResponse(BaseModel):
# dois_found: list
# retracted_found: list
# checked_count: int
# retraction_score: float
# risk_level: str
# summary: str
# flags: list[RetractionFlagResponse]
# flags_count: int
# class CartelRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class CartelFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class CartelResponse(BaseModel):
# authors_found: list
# citation_network: dict
# cartel_score: float
# self_citation_ratio: float
# network_diversity: float
# risk_level: str
# summary: str
# flags: list[CartelFlagResponse]
# flags_count: int
# class LLMRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class LLMFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class LLMResponse(BaseModel):
# burstiness_score: float
# vocabulary_diversity: float
# sentence_uniformity: float
# llm_phrase_count: int
# llm_score: float
# risk_level: str
# summary: str
# flags: list[LLMFlagResponse]
# flags_count: int
# # ── Endpoints ─────────────────────────────────────────────────────────────────
# @router.get("/status")
# def system_status():
# return {
# "modules_ready": {
# "stat_audit": True,
# "figure_forensics": True,
# "methodology_checker": True,
# "citation_analyzer": True,
# "reproducibility": True,
# "novelty_scorer": True,
# "grim_test": True,
# "sprite_test": True,
# "granularity": True,
# "pcurve": True,
# "effect_size": True,
# "retraction": True,
# "citation_cartel": True,
# "llm_detector": True,
# },
# "version": "1.5.0",
# "text_extraction": "section-aware",
# }
# @router.post("/analyze/statistics", response_model=StatAuditResponse)
# def analyze_statistics(request: TextAnalysisRequest):
# """Analyze paper for statistical integrity issues."""
# try:
# result = _stat_engine.analyze(
# _smart_text(request.text, "statistics")
# )
# return StatAuditResponse(
# risk_level = result.risk_level,
# risk_score = result.risk_score,
# summary = result.summary,
# flags = [
# FlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in result.flags
# ],
# p_values_found = result.p_values_found,
# sample_sizes_found = result.sample_sizes_found,
# flags_count = len(result.flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/figures", response_model=FigureForensicsResponse)
# async def analyze_figures(file: UploadFile = File(...)):
# """Upload PDF and analyze figures for forensic anomalies."""
# if not file.filename.endswith(".pdf"):
# raise HTTPException(status_code=400, detail="Only PDF files accepted.")
# tmp_path = None
# try:
# contents = await file.read()
# with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
# tmp.write(contents)
# tmp_path = tmp.name
# result = _figure_engine.analyze(tmp_path)
# return FigureForensicsResponse(
# figures_found = result.figures_found,
# flags = [
# ForensicFlagResponse(figures_involved=f.figures_involved)
# for f in result.flags
# ],
# duplicate_pairs = result.duplicate_pairs,
# )
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# finally:
# if tmp_path and os.path.exists(tmp_path):
# os.unlink(tmp_path)
# @router.post("/analyze/methodology", response_model=MethodologyResponse)
# def analyze_methodology(request: MethodologyRequest):
# """Analyze paper for methodology logic issues."""
# try:
# result = _method_engine.analyze(
# _smart_text(request.text, "methodology"),
# request.abstract,
# )
# return MethodologyResponse(
# flags = [
# MethodologyFlagResponse(
# claim = f.claim,
# issue = f.issue,
# suggestion = f.suggestion,
# ) for f in result.flags
# ],
# claims_found = result.claims_found,
# methods_found = result.methods_found,
# llm_assessment = result.llm_assessment,
# llm_available = result.llm_available,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/citations", response_model=CitationResponse)
# def analyze_citations(request: CitationRequest):
# """Analyze citations for integrity issues."""
# try:
# result = _citation_engine.analyze(
# _smart_text(request.text, "citations"),
# request.author_name,
# )
# return CitationResponse(
# total_citations = result.total_citations,
# self_citations = result.self_citations,
# self_citation_ratio = result.self_citation_ratio,
# unsupported_claims = result.unsupported_claims,
# flags = [
# CitationFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = getattr(f, 'suggestion', ''),
# ) for f in result.flags
# ],
# risk_level = result.risk_level,
# risk_score = result.risk_score,
# summary = result.summary,
# flags_count = len(result.flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/reproducibility", response_model=ReproducibilityResponse)
# def analyze_reproducibility(request: ReproducibilityRequest):
# """Scan paper for reproducibility indicators."""
# try:
# result = _repro_engine.analyze(
# _smart_text(request.text, "reproducibility")
# )
# return ReproducibilityResponse(
# has_code_link = result.has_code_link,
# has_data_link = result.has_data_link,
# has_software_versions = result.has_software_versions,
# has_preregistration = result.has_preregistration,
# has_ethics_statement = result.has_ethics_statement,
# reproducibility_score = result.reproducibility_score,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# ReproducibilityFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = getattr(f, 'suggestion', ''),
# ) for f in result.flags
# ],
# flags_count = len(result.flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/novelty", response_model=NoveltyResponse)
# def analyze_novelty(request: NoveltyRequest):
# """Estimate paper novelty against existing literature."""
# try:
# result = _novelty_engine.analyze(
# _smart_text(request.text, "novelty", per_section_limit=2000),
# request.title,
# )
# raw_flags = getattr(result, 'flags', []) or []
# return NoveltyResponse(
# novelty_score = result.novelty_score,
# novelty_level = result.novelty_level,
# risk_level = result.risk_level,
# risk_score = getattr(result, 'risk_score', result.novelty_score),
# summary = result.summary,
# flags = [
# NoveltyFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = getattr(f, 'suggestion', ''),
# ) for f in raw_flags
# ],
# related_works_found = [
# RelatedWorkResponse(
# title = w.title,
# year = w.year,
# authors = w.authors,
# similarity_signal = w.similarity_signal,
# ) for w in result.related_works_found
# ],
# key_terms_extracted = result.key_terms_extracted,
# literature_accessible = result.literature_accessible,
# flags_count = len(raw_flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/grim", response_model=GrimResponse)
# def analyze_grim(request: GrimRequest):
# """GRIM Test β€” detect mathematically impossible means."""
# try:
# result = _grim_engine.analyze(
# _smart_text(request.text, "grim")
# )
# return GrimResponse(
# impossible_means = result.impossible_means,
# possible_means = result.possible_means,
# grim_score = result.grim_score,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# GrimFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in result.flags
# ],
# flags_count = result.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/sprite", response_model=SpriteResponse)
# def analyze_sprite(request: SpriteRequest):
# """SPRITE Test β€” detect impossible distributions."""
# try:
# result = _sprite_engine.analyze(
# _smart_text(request.text, "sprite")
# )
# return SpriteResponse(
# impossible_combinations = result.impossible_combinations,
# possible_combinations = result.possible_combinations,
# sprite_score = result.sprite_score,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# SpriteFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in result.flags
# ],
# flags_count = result.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/granularity", response_model=GranularityResponse)
# def analyze_granularity(request: GranularityRequest):
# """Granularity Analyzer β€” Benford Law + digit preference."""
# try:
# result = _granularity_engine.analyze(
# _smart_text(request.text, "granularity")
# )
# return GranularityResponse(
# digit_preference_score = result.digit_preference_score,
# benford_score = result.benford_score,
# round_number_ratio = result.round_number_ratio,
# granularity_score = result.granularity_score,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# GranularityFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in result.flags
# ],
# flags_count = result.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/pcurve", response_model=PCurveResponse)
# def analyze_pcurve(request: PCurveRequest):
# """P-Curve Analyzer β€” publication bias detector."""
# try:
# result = _pcurve_engine.analyze(
# _smart_text(request.text, "pcurve")
# )
# return PCurveResponse(
# p_values_found = result.p_values_found,
# significant_p = result.significant_p,
# right_skew_ratio = result.right_skew_ratio,
# clustering_score = result.clustering_score,
# pcurve_score = result.pcurve_score,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# PCurveFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in result.flags
# ],
# flags_count = result.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post('/analyze/effect_size', response_model=EffectSizeResponse)
# def analyze_effect_size(request: EffectSizeRequest):
# """Effect Size Validator β€” Cohen d, power analysis."""
# try:
# result = _effect_size_engine.analyze(
# _smart_text(request.text, "effect_size")
# )
# return EffectSizeResponse(
# effect_sizes_found = result.effect_sizes_found,
# power_estimates = result.power_estimates,
# inflated_effects = result.inflated_effects,
# underpowered = result.underpowered,
# effect_score = result.effect_score,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# EffectSizeFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in result.flags
# ],
# flags_count = result.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post('/analyze/retraction', response_model=RetractionResponse)
# def analyze_retraction(request: RetractionRequest):
# """Retraction Checker β€” live CrossRef API."""
# try:
# result = _retraction_engine.analyze(
# _smart_text(request.text, "retraction")
# )
# return RetractionResponse(
# dois_found = result.dois_found,
# retracted_found = result.retracted_found,
# checked_count = result.checked_count,
# retraction_score = result.retraction_score,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# RetractionFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in result.flags
# ],
# flags_count = result.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post('/analyze/cartel', response_model=CartelResponse)
# def analyze_cartel(request: CartelRequest):
# """Citation Cartel Detector β€” graph-based ring detection."""
# try:
# result = _cartel_engine.analyze(
# _smart_text(request.text, "cartel")
# )
# return CartelResponse(
# authors_found = result.authors_found,
# citation_network = result.citation_network,
# cartel_score = result.cartel_score,
# self_citation_ratio = result.self_citation_ratio,
# network_diversity = result.network_diversity,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# CartelFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in result.flags
# ],
# flags_count = result.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post('/analyze/llm', response_model=LLMResponse)
# def analyze_llm(request: LLMRequest):
# """LLM-Generated Paper Detector β€” burstiness + TTR."""
# try:
# result = _llm_engine.analyze(
# _smart_text(request.text, "llm")
# )
# return LLMResponse(
# burstiness_score = result.burstiness_score,
# vocabulary_diversity = result.vocabulary_diversity,
# sentence_uniformity = result.sentence_uniformity,
# llm_phrase_count = result.llm_phrase_count,
# llm_score = result.llm_score,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# LLMFlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in result.flags
# ],
# flags_count = result.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# # ── Full PDF Analysis β€” Master Endpoint ──────────────────────────────────────
# class ModuleSummary(BaseModel):
# module: str
# risk_level: str
# risk_score: float
# summary: str
# flags_count: int
# class FullPDFResponse(BaseModel):
# paper_title: str
# page_count: int
# figure_count: int
# file_size_kb: float
# sha256: str
# overall_score: float
# overall_risk: str
# integrity_verdict: str
# modules: list[ModuleSummary]
# top_flags: list[str]
# analyzed_by: str
# def _compute_overall(scores: list[float]) -> tuple[float, str]:
# avg = round(sum(scores) / len(scores), 3) if scores else 0.0
# if avg >= 0.7:
# level = "HIGH"
# elif avg >= 0.4:
# level = "MEDIUM"
# else:
# level = "LOW"
# return avg, level
# def _verdict(risk: str) -> str:
# return {
# "HIGH": "Serious integrity concerns detected. Manual expert review strongly recommended.",
# "MEDIUM": "Some integrity issues found. Careful review advised before publication.",
# "LOW": "No major integrity issues detected. Paper appears scientifically sound.",
# }.get(risk, "Unknown")
# @router.post("/analyze/full-pdf", response_model=FullPDFResponse)
# async def analyze_full_pdf(file: UploadFile = File(...)):
# """
# Master endpoint β€” Upload a PDF and run all 14 analysis modules at once.
# Returns a unified integrity report with per-module scores and top flags.
# Designed for PhD researchers who want a single comprehensive analysis.
# """
# try:
# file_bytes = await file.read()
# paper = _pdf_parser.parse_bytes(file_bytes, file.filename)
# text = paper.full_text
# if len(text.strip()) < 100:
# raise HTTPException(
# status_code=422,
# detail="PDF text extraction failed or paper is too short. "
# "Ensure the PDF contains selectable text (not a scanned image)."
# )
# modules_run = []
# top_flags = []
# scores = []
# # ── Module 1: Statistical Audit ───────────────────────────
# try:
# r = _stat_engine.analyze(_smart_text(text, "statistics"))
# modules_run.append(ModuleSummary(
# module="Statistical Audit",
# risk_level=r.risk_level,
# risk_score=r.risk_score,
# summary=r.summary,
# flags_count=len(r.flags),
# ))
# scores.append(r.risk_score)
# for f in r.flags[:2]:
# top_flags.append(f"[Statistics] {f.description}")
# except Exception:
# pass
# # ── Module 2: Methodology Checker ─────────────────────────
# try:
# abstract = paper.sections.get("abstract", "")
# r = _method_engine.analyze(_smart_text(text, "methodology"), abstract)
# score = 0.7 if len(r.flags) > 2 else 0.3 if r.flags else 0.1
# modules_run.append(ModuleSummary(
# module="Methodology Checker",
# risk_level="HIGH" if score >= 0.7 else "MEDIUM" if score >= 0.4 else "LOW",
# risk_score=score,
# summary=r.summary,
# flags_count=len(r.flags),
# ))
# scores.append(score)
# for f in r.flags[:2]:
# top_flags.append(f"[Methodology] {f.issue}")
# except Exception:
# pass
# # ── Module 3: Citation Integrity ──────────────────────────
# try:
# r = _citation_engine.analyze(_smart_text(text, "citations"), "")
# modules_run.append(ModuleSummary(
# module="Citation Integrity",
# risk_level=r.risk_level,
# risk_score=r.risk_score,
# summary=r.summary,
# flags_count=len(r.flags),
# ))
# scores.append(r.risk_score)
# for f in r.flags[:2]:
# top_flags.append(f"[Citations] {f.description}")
# except Exception:
# pass
# # ── Module 4: Reproducibility ─────────────────────────────
# try:
# r = _repro_engine.analyze(_smart_text(text, "reproducibility"))
# modules_run.append(ModuleSummary(
# module="Reproducibility Scanner",
# risk_level=r.risk_level,
# risk_score=1.0 - r.reproducibility_score,
# summary=r.summary,
# flags_count=len(r.flags),
# ))
# scores.append(1.0 - r.reproducibility_score)
# for f in r.flags[:1]:
# top_flags.append(f"[Reproducibility] {f.description}")
# except Exception:
# pass
# # ── Module 5: Novelty ─────────────────────────────────────
# try:
# r = _novelty_engine.analyze(
# _smart_text(text, "novelty", per_section_limit=2000),
# paper.title,
# )
# modules_run.append(ModuleSummary(
# module="Novelty Scorer",
# risk_level=r.risk_level,
# risk_score=getattr(r, "risk_score", 1.0 - r.novelty_score),
# summary=r.summary,
# flags_count=len(getattr(r, "flags", []) or []),
# ))
# scores.append(getattr(r, "risk_score", 1.0 - r.novelty_score))
# except Exception:
# pass
# # ── Module 6: GRIM Test ───────────────────────────────────
# try:
# r = _grim_engine.analyze(_smart_text(text, "grim"))
# modules_run.append(ModuleSummary(
# module="GRIM Test",
# risk_level=r.risk_level,
# risk_score=r.grim_score,
# summary=r.summary,
# flags_count=r.flags_count,
# ))
# scores.append(r.grim_score)
# for f in r.flags[:1]:
# top_flags.append(f"[GRIM] {f.description}")
# except Exception:
# pass
# # ── Module 7: SPRITE Test ─────────────────────────────────
# try:
# r = _sprite_engine.analyze(_smart_text(text, "sprite"))
# modules_run.append(ModuleSummary(
# module="SPRITE Test",
# risk_level=r.risk_level,
# risk_score=r.sprite_score,
# summary=r.summary,
# flags_count=r.flags_count,
# ))
# scores.append(r.sprite_score)
# except Exception:
# pass
# # ── Module 8: Granularity ─────────────────────────────────
# try:
# r = _granularity_engine.analyze(_smart_text(text, "granularity"))
# modules_run.append(ModuleSummary(
# module="Granularity Analyzer",
# risk_level=r.risk_level,
# risk_score=r.granularity_score,
# summary=r.summary,
# flags_count=r.flags_count,
# ))
# scores.append(r.granularity_score)
# except Exception:
# pass
# # ── Module 9: P-Curve ─────────────────────────────────────
# try:
# r = _pcurve_engine.analyze(_smart_text(text, "pcurve"))
# modules_run.append(ModuleSummary(
# module="P-Curve Analyzer",
# risk_level=r.risk_level,
# risk_score=r.pcurve_score,
# summary=r.summary,
# flags_count=r.flags_count,
# ))
# scores.append(r.pcurve_score)
# for f in r.flags[:1]:
# top_flags.append(f"[P-Curve] {f.description}")
# except Exception:
# pass
# # ── Module 10: Effect Size ────────────────────────────────
# try:
# r = _effect_size_engine.analyze(_smart_text(text, "effect_size"))
# modules_run.append(ModuleSummary(
# module="Effect Size Validator",
# risk_level=r.risk_level,
# risk_score=r.effect_score,
# summary=r.summary,
# flags_count=r.flags_count,
# ))
# scores.append(r.effect_score)
# except Exception:
# pass
# # ── Module 11: Retraction Checker ─────────────────────────
# try:
# r = _retraction_engine.analyze(_smart_text(text, "retraction"))
# modules_run.append(ModuleSummary(
# module="Retraction Checker",
# risk_level=r.risk_level,
# risk_score=r.retraction_score,
# summary=r.summary,
# flags_count=r.flags_count,
# ))
# scores.append(r.retraction_score)
# for f in r.flags[:1]:
# top_flags.append(f"[Retraction] {f.description}")
# except Exception:
# pass
# # ── Module 12: Citation Cartel ────────────────────────────
# try:
# r = _cartel_engine.analyze(_smart_text(text, "cartel"))
# modules_run.append(ModuleSummary(
# module="Citation Cartel Detector",
# risk_level=r.risk_level,
# risk_score=r.cartel_score,
# summary=r.summary,
# flags_count=r.flags_count,
# ))
# scores.append(r.cartel_score)
# for f in r.flags[:1]:
# top_flags.append(f"[Cartel] {f.description}")
# except Exception:
# pass
# # ── Module 13: LLM Detector ───────────────────────────────
# try:
# r = _llm_engine.analyze(_smart_text(text, "llm"))
# modules_run.append(ModuleSummary(
# module="LLM Paper Detector",
# risk_level=r.risk_level,
# risk_score=r.llm_score,
# summary=r.summary,
# flags_count=r.flags_count,
# ))
# scores.append(r.llm_score)
# for f in r.flags[:1]:
# top_flags.append(f"[LLM] {f.description}")
# except Exception:
# pass
# # ── Module 14: Figure Forensics ───────────────────────────
# try:
# tmp_path = None
# with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
# tmp.write(file_bytes)
# tmp_path = tmp.name
# r = _figure_engine.analyze(tmp_path)
# fig_score = min(len(r.duplicate_pairs) * 0.3, 1.0)
# modules_run.append(ModuleSummary(
# module="Figure Forensics",
# risk_level="HIGH" if fig_score >= 0.7 else "MEDIUM" if fig_score >= 0.3 else "LOW",
# risk_score=fig_score,
# summary=f"{r.figures_found} figures found. {len(r.duplicate_pairs)} duplicate pairs detected.",
# flags_count=len(r.flags),
# ))
# scores.append(fig_score)
# if tmp_path and os.path.exists(tmp_path):
# os.unlink(tmp_path)
# except Exception:
# pass
# # ── Final Score ───────────────────────────────────────────
# overall_score, overall_risk = _compute_overall(scores)
# return FullPDFResponse(
# paper_title = paper.title,
# page_count = paper.page_count,
# figure_count = paper.figure_count,
# file_size_kb = paper.metadata.get("file_size_kb", 0.0),
# sha256 = paper.metadata.get("sha256", ""),
# overall_score = overall_score,
# overall_risk = overall_risk,
# integrity_verdict = _verdict(overall_risk),
# modules = modules_run,
# top_flags = top_flags[:10],
# analyzed_by = "SciPeerAI v1.5.0 β€” 14-Module Pipeline",
# )
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# import os
# import tempfile
# from fastapi import APIRouter, HTTPException, UploadFile, File
# from pydantic import BaseModel, Field
# from src.scipeerai.modules.reproducibility_scanner import ReproducibilityScanner
# from src.scipeerai.modules.stat_audit import StatAuditEngine
# from src.scipeerai.modules.figure_forensics import FigureForensicsEngine
# from src.scipeerai.modules.methodology_checker import MethodologyChecker
# from src.scipeerai.modules.citation_analyzer import CitationAnalyzer
# from src.scipeerai.modules.novelty_scorer import NoveltyScorer
# from src.scipeerai.modules.grim_test import GrimTest
# from src.scipeerai.modules.sprite_test import SpriteTest
# from src.scipeerai.modules.granularity_analyzer import GranularityAnalyzer
# from src.scipeerai.modules.pcurve_analyzer import PCurveAnalyzer
# from src.scipeerai.modules.effect_size_validator import EffectSizeValidator
# from src.scipeerai.modules.retraction_checker import RetractionChecker
# from src.scipeerai.modules.citation_cartel import CitationCartelDetector
# from src.scipeerai.modules.llm_detector import LLMDetector
# from src.scipeerai.modules.fraud_fingerprint import FraudFingerprinter
# from src.scipeerai.modules.temporal_anomaly import TemporalAnomalyDetector
# from src.scipeerai.modules.citation_dna import CitationDNAAnalyzer
# from src.scipeerai.modules.data_fingerprint import DataFingerprintAnalyzer
# from src.scipeerai.modules.peer_review_score import PeerReviewScorer
# from src.scipeerai.modules.ai_human_spectrum import AIHumanSpectrumAnalyzer
# from src.scipeerai.modules.replication_predictor import analyze as _predict_replication
# from src.scipeerai.core.pdf_parser import PDFParser
# router = APIRouter(prefix="/api/v1", tags=["Analysis"])
# # ── Section-aware text routing ────────────────────────────────────────────────
# _SECTION_MARKERS = [
# "abstract", "introduction", "background", "related work",
# "methods", "methodology", "materials and methods",
# "experimental", "experiments", "procedures",
# "results", "findings", "data analysis",
# "discussion", "conclusion", "conclusions",
# "references", "bibliography", "acknowledgments",
# "supplementary", "appendix",
# ]
# _MODULE_SECTIONS = {
# "statistics": ["abstract", "results", "findings",
# "data analysis", "methods", "methodology"],
# "methodology": ["abstract", "introduction", "methods",
# "methodology", "materials and methods",
# "experimental", "conclusion", "conclusions"],
# "citations": ["introduction", "background",
# "related work", "references", "bibliography"],
# "reproducibility": ["methods", "methodology",
# "materials and methods", "experimental",
# "procedures", "acknowledgments"],
# "novelty": ["abstract", "introduction",
# "background", "related work"],
# "grim": ["results", "findings", "methods",
# "methodology", "data analysis"],
# "sprite": ["results", "findings", "methods",
# "methodology", "data analysis"],
# "granularity": ["results", "findings",
# "methods", "data analysis"],
# "pcurve": ["abstract", "results",
# "findings", "data analysis"],
# "effect_size": ["results", "findings",
# "methods", "discussion"],
# "retraction": ["references", "bibliography", "introduction"],
# "cartel": ["references", "bibliography",
# "introduction", "acknowledgments"],
# "llm": ["abstract", "introduction", "methods",
# "results", "discussion"],
# "fraud_fingerprint": ["abstract", "introduction", "methods",
# "results", "discussion", "conclusion"],
# "temporal": ["introduction", "methods", "references",
# "bibliography", "background"],
# "citation_dna": ["references", "bibliography",
# "introduction", "acknowledgments"],
# "data_fingerprint": ["results", "findings", "methods",
# "data analysis", "discussion"],
# "peer_review": ["acknowledgments", "introduction",
# "abstract", "references"],
# "ai_spectrum": ["abstract", "introduction", "methods",
# "results", "discussion", "conclusion"],
# }
# def _extract_sections(text: str) -> dict:
# text_lower = text.lower()
# positions = []
# for marker in _SECTION_MARKERS:
# search_from = 0
# while True:
# idx = text_lower.find(marker, search_from)
# if idx == -1:
# break
# line_start = text.rfind('\n', 0, idx) + 1
# line_end = text.find('\n', idx)
# if line_end == -1:
# line_end = len(text)
# line_content = text[line_start:line_end].strip()
# if len(line_content) <= 60:
# positions.append((idx, marker))
# break
# search_from = idx + 1
# if not positions:
# return {}
# positions.sort(key=lambda x: x[0])
# deduped = [positions[0]]
# for pos in positions[1:]:
# if pos[0] - deduped[-1][0] > 50:
# deduped.append(pos)
# sections = {}
# for i, (start, name) in enumerate(deduped):
# end = deduped[i + 1][0] if i + 1 < len(deduped) else len(text)
# sections[name] = text[start:end].strip()
# return sections
# def _smart_text(text: str, module: str, per_section_limit: int = 2500) -> str:
# sections = _extract_sections(text)
# target_keys = _MODULE_SECTIONS.get(module, [])
# if sections and target_keys:
# parts = []
# for key in target_keys:
# if key in sections:
# parts.append(sections[key][:per_section_limit])
# if parts:
# return "\n\n".join(parts)[:12000]
# return _truncate(text)
# def _truncate(text: str, limit: int = 8000) -> str:
# if len(text) <= limit:
# return text
# lower = text.lower()
# methods_idx = lower.find('method')
# if 0 < methods_idx < len(text) - 1000:
# start = text[:3000]
# middle = text[methods_idx:methods_idx + 4000]
# return start + " [...] " + middle
# return text[:limit]
# # ── Engine initialization ─────────────────────────────────────────────────────
# _stat_engine = StatAuditEngine()
# _figure_engine = FigureForensicsEngine()
# _method_engine = MethodologyChecker()
# _citation_engine = CitationAnalyzer()
# _repro_engine = ReproducibilityScanner()
# _novelty_engine = NoveltyScorer()
# _grim_engine = GrimTest()
# _sprite_engine = SpriteTest()
# _granularity_engine = GranularityAnalyzer()
# _pcurve_engine = PCurveAnalyzer()
# _effect_size_engine = EffectSizeValidator()
# _retraction_engine = RetractionChecker()
# _cartel_engine = CitationCartelDetector()
# _llm_engine = LLMDetector()
# _fraud_engine = FraudFingerprinter()
# _temporal_engine = TemporalAnomalyDetector()
# _dna_engine = CitationDNAAnalyzer()
# _dataprint_engine = DataFingerprintAnalyzer()
# _peerreview_engine = PeerReviewScorer()
# _spectrum_engine = AIHumanSpectrumAnalyzer()
# _pdf_parser = PDFParser()
# # ── Shared response models ────────────────────────────────────────────────────
# class TextAnalysisRequest(BaseModel):
# text: str = Field(..., min_length=50, description="Paper text to analyze")
# class FlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class ForensicFlagResponse(BaseModel):
# figures_involved: list
# class StatAuditResponse(BaseModel):
# risk_level: str
# risk_score: float
# summary: str
# flags: list[FlagResponse]
# p_values_found: list[float]
# sample_sizes_found: list[int]
# flags_count: int
# class FigureForensicsResponse(BaseModel):
# figures_found: int
# flags: list[ForensicFlagResponse]
# duplicate_pairs: list
# class MethodologyRequest(BaseModel):
# text: str = Field(..., min_length=50)
# abstract: str = Field("")
# class MethodologyFlagResponse(BaseModel):
# claim: str
# issue: str
# suggestion: str
# class MethodologyResponse(BaseModel):
# flags: list[MethodologyFlagResponse]
# claims_found: list[str]
# methods_found: list[str]
# llm_assessment: str
# llm_available: bool
# class CitationRequest(BaseModel):
# text: str = Field(..., min_length=50)
# author_name: str = Field("")
# class CitationFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class CitationResponse(BaseModel):
# total_citations: int
# self_citations: int
# self_citation_ratio: float
# unsupported_claims: int
# flags: list[CitationFlagResponse]
# risk_level: str
# risk_score: float
# summary: str
# flags_count: int
# class ReproducibilityRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class ReproducibilityFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class ReproducibilityResponse(BaseModel):
# has_code_link: bool
# has_data_link: bool
# has_software_versions: bool
# has_preregistration: bool
# has_ethics_statement: bool
# reproducibility_score: float
# risk_level: str
# summary: str
# flags: list[ReproducibilityFlagResponse]
# flags_count: int
# class NoveltyRequest(BaseModel):
# text: str = Field(..., min_length=50)
# title: str = Field("")
# class NoveltyFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class RelatedWorkResponse(BaseModel):
# title: str
# year: int
# authors: list
# similarity_signal: str
# class NoveltyResponse(BaseModel):
# novelty_score: float
# novelty_level: str
# risk_level: str
# risk_score: float
# summary: str
# flags: list[NoveltyFlagResponse]
# related_works_found: list[RelatedWorkResponse]
# key_terms_extracted: list[str]
# literature_accessible: bool
# flags_count: int
# class GrimRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class GrimFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class GrimResponse(BaseModel):
# impossible_means: list; possible_means: list; grim_score: float
# risk_level: str; summary: str; flags: list[GrimFlagResponse]; flags_count: int
# class SpriteRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class SpriteFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class SpriteResponse(BaseModel):
# impossible_combinations: list; possible_combinations: list; sprite_score: float
# risk_level: str; summary: str; flags: list[SpriteFlagResponse]; flags_count: int
# class GranularityRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class GranularityFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class GranularityResponse(BaseModel):
# digit_preference_score: float; benford_score: float; round_number_ratio: float
# granularity_score: float; risk_level: str; summary: str
# flags: list[GranularityFlagResponse]; flags_count: int
# class PCurveRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class PCurveFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class PCurveResponse(BaseModel):
# p_values_found: list; significant_p: list; right_skew_ratio: float
# clustering_score: float; pcurve_score: float; risk_level: str; summary: str
# flags: list[PCurveFlagResponse]; flags_count: int
# class EffectSizeRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class EffectSizeFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class EffectSizeResponse(BaseModel):
# effect_sizes_found: list; power_estimates: list; inflated_effects: list
# underpowered: list; effect_score: float; risk_level: str; summary: str
# flags: list[EffectSizeFlagResponse]; flags_count: int
# class RetractionRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class RetractionFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class RetractionResponse(BaseModel):
# dois_found: list; retracted_found: list; checked_count: int
# retraction_score: float; risk_level: str; summary: str
# flags: list[RetractionFlagResponse]; flags_count: int
# class CartelRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class CartelFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class CartelResponse(BaseModel):
# authors_found: list; citation_network: dict; cartel_score: float
# self_citation_ratio: float; network_diversity: float; risk_level: str
# summary: str; flags: list[CartelFlagResponse]; flags_count: int
# class LLMRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class LLMFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class LLMResponse(BaseModel):
# burstiness_score: float; vocabulary_diversity: float; sentence_uniformity: float
# llm_phrase_count: int; llm_score: float; risk_level: str; summary: str
# flags: list[LLMFlagResponse]; flags_count: int
# # ── Phase 5 models ────────────────────────────────────────────────────────────
# class Phase5FlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class FraudFingerprintResponse(BaseModel):
# style_shift_score: float
# precision_inconsistency: float
# vocabulary_mixing_score: float
# hedge_overuse_score: float
# fingerprint_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# class TemporalAnomalyResponse(BaseModel):
# collection_years: list
# cited_years: list
# future_citations: list
# recency_claim_score: float
# timeline_score: float
# temporal_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# class CitationDNAResponse(BaseModel):
# total_citations: int
# unique_authors: int
# author_concentration: float
# journal_concentration: float
# decade_concentration: float
# geographic_diversity: float
# dna_diversity_score: float
# dna_risk_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# class DataFingerprintResponse(BaseModel):
# total_numbers: int
# round_number_ratio: float
# terminal_digit_bias: float
# impossible_pairs: list
# suspicious_duplicates: list
# fingerprint_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# class PeerReviewResponse(BaseModel):
# days_to_acceptance: int
# has_conflict_statement: bool
# conflict_credible: bool
# reviewer_suggestions: bool
# special_issue: bool
# predatory_signals: int
# manipulation_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# class AISpectrumResponse(BaseModel):
# overall_human_ratio: float
# overall_ai_ratio: float
# dominant_ai_model: str
# model_confidence: float
# ai_sections: list
# human_sections: list
# gpt4_signal_count: int
# claude_signal_count: int
# gemini_signal_count: int
# spectrum_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# # ── Replication Predictor model ───────────────────────────────────────────────
# class ReplicationRequest(BaseModel):
# text: str = Field(..., min_length=50, description="Paper text to analyze for replication probability")
# class ReplicationFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class ReplicationResponse(BaseModel):
# replication_probability: float
# fraud_probability: float
# replication_level: str
# verdict: str
# risk_score: float
# risk_level: str
# summary: str
# flags: list[ReplicationFlagResponse]
# flags_count: int
# model_version: str
# # ── Full PDF models ───────────────────────────────────────────────────────────
# class ModuleSummary(BaseModel):
# module: str
# risk_level: str
# risk_score: float
# summary: str
# flags_count: int
# class FullPDFResponse(BaseModel):
# paper_title: str
# page_count: int
# figure_count: int
# file_size_kb: float
# sha256: str
# overall_score: float
# overall_risk: str
# integrity_verdict: str
# replication_probability: float
# replication_level: str
# modules: list[ModuleSummary]
# top_flags: list[str]
# analyzed_by: str
# # ── Helpers ───────────────────────────────────────────────────────────────────
# def _compute_overall(scores: list[float]) -> tuple[float, str]:
# avg = round(sum(scores) / len(scores), 3) if scores else 0.0
# if avg >= 0.7: level = "HIGH"
# elif avg >= 0.4: level = "MEDIUM"
# else: level = "LOW"
# return avg, level
# def _verdict(risk: str) -> str:
# return {
# "HIGH": "Serious integrity concerns detected. Manual expert review strongly recommended.",
# "MEDIUM": "Some integrity issues found. Careful review advised before publication.",
# "LOW": "No major integrity issues detected. Paper appears scientifically sound.",
# }.get(risk, "Unknown")
# def _p5flags(flags) -> list:
# return [
# Phase5FlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in flags
# ]
# # ── Status ────────────────────────────────────────────────────────────────────
# @router.get("/status")
# def system_status():
# return {
# "modules_ready": {
# "stat_audit": True,
# "figure_forensics": True,
# "methodology_checker": True,
# "citation_analyzer": True,
# "reproducibility": True,
# "novelty_scorer": True,
# "grim_test": True,
# "sprite_test": True,
# "granularity": True,
# "pcurve": True,
# "effect_size": True,
# "retraction": True,
# "citation_cartel": True,
# "llm_detector": True,
# "fraud_fingerprint": True,
# "temporal_anomaly": True,
# "citation_dna": True,
# "data_fingerprint": True,
# "peer_review_score": True,
# "ai_human_spectrum": True,
# "replication_predictor": True,
# },
# "version": "2.1.0",
# "modules_count": 21,
# "tests_passing": 194,
# "text_extraction": "section-aware",
# }
# # ── Original 14 endpoints ─────────────────────────────────────────────────────
# @router.post("/analyze/statistics", response_model=StatAuditResponse)
# def analyze_statistics(request: TextAnalysisRequest):
# """Statistical Audit β€” p-hacking, sample size, round numbers."""
# try:
# r = _stat_engine.analyze(_smart_text(request.text, "statistics"))
# return StatAuditResponse(
# risk_level = r.risk_level,
# risk_score = r.risk_score,
# summary = r.summary,
# flags = [FlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags],
# p_values_found = r.p_values_found,
# sample_sizes_found = r.sample_sizes_found,
# flags_count = len(r.flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/figures", response_model=FigureForensicsResponse)
# async def analyze_figures(file: UploadFile = File(...)):
# """Figure Forensics β€” duplicate image detection."""
# if not file.filename.endswith(".pdf"):
# raise HTTPException(status_code=400, detail="Only PDF files accepted.")
# tmp_path = None
# try:
# contents = await file.read()
# with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
# tmp.write(contents)
# tmp_path = tmp.name
# r = _figure_engine.analyze(tmp_path)
# return FigureForensicsResponse(
# figures_found = r.figures_found,
# flags = [ForensicFlagResponse(figures_involved=f.figures_involved) for f in r.flags],
# duplicate_pairs = r.duplicate_pairs,
# )
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# finally:
# if tmp_path and os.path.exists(tmp_path):
# os.unlink(tmp_path)
# @router.post("/analyze/methodology", response_model=MethodologyResponse)
# def analyze_methodology(request: MethodologyRequest):
# """Methodology Checker β€” causation, control groups, timeframe."""
# try:
# r = _method_engine.analyze(_smart_text(request.text, "methodology"), request.abstract)
# return MethodologyResponse(
# flags = [MethodologyFlagResponse(claim=f.claim, issue=f.issue, suggestion=f.suggestion) for f in r.flags],
# claims_found = r.claims_found,
# methods_found = r.methods_found,
# llm_assessment = r.llm_assessment,
# llm_available = r.llm_available,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/citations", response_model=CitationResponse)
# def analyze_citations(request: CitationRequest):
# """Citation Integrity β€” self-citation, unsupported claims."""
# try:
# r = _citation_engine.analyze(_smart_text(request.text, "citations"), request.author_name)
# return CitationResponse(
# total_citations = r.total_citations,
# self_citations = r.self_citations,
# self_citation_ratio = r.self_citation_ratio,
# unsupported_claims = r.unsupported_claims,
# flags = [CitationFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=getattr(f, 'suggestion', '')) for f in r.flags],
# risk_level = r.risk_level,
# risk_score = r.risk_score,
# summary = r.summary,
# flags_count = len(r.flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/reproducibility", response_model=ReproducibilityResponse)
# def analyze_reproducibility(request: ReproducibilityRequest):
# """Reproducibility Scanner β€” code, data, ethics, preregistration."""
# try:
# r = _repro_engine.analyze(_smart_text(request.text, "reproducibility"))
# return ReproducibilityResponse(
# has_code_link = r.has_code_link,
# has_data_link = r.has_data_link,
# has_software_versions = r.has_software_versions,
# has_preregistration = r.has_preregistration,
# has_ethics_statement = r.has_ethics_statement,
# reproducibility_score = r.reproducibility_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = [ReproducibilityFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=getattr(f, 'suggestion', '')) for f in r.flags],
# flags_count = len(r.flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/novelty", response_model=NoveltyResponse)
# def analyze_novelty(request: NoveltyRequest):
# """Novelty Scorer β€” literature search, novelty estimation."""
# try:
# r = _novelty_engine.analyze(_smart_text(request.text, "novelty", per_section_limit=2000), request.title)
# raw_flags = getattr(r, 'flags', []) or []
# return NoveltyResponse(
# novelty_score = r.novelty_score,
# novelty_level = r.novelty_level,
# risk_level = r.risk_level,
# risk_score = getattr(r, 'risk_score', r.novelty_score),
# summary = r.summary,
# flags = [NoveltyFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=getattr(f, 'suggestion', '')) for f in raw_flags],
# related_works_found = [RelatedWorkResponse(title=w.title, year=w.year, authors=w.authors, similarity_signal=w.similarity_signal) for w in r.related_works_found],
# key_terms_extracted = r.key_terms_extracted,
# literature_accessible = r.literature_accessible,
# flags_count = len(raw_flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/grim", response_model=GrimResponse)
# def analyze_grim(request: GrimRequest):
# """GRIM Test β€” mathematically impossible means."""
# try:
# r = _grim_engine.analyze(_smart_text(request.text, "grim"))
# return GrimResponse(impossible_means=r.impossible_means, possible_means=r.possible_means, grim_score=r.grim_score, risk_level=r.risk_level, summary=r.summary, flags=[GrimFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/sprite", response_model=SpriteResponse)
# def analyze_sprite(request: SpriteRequest):
# """SPRITE Test β€” impossible distributions."""
# try:
# r = _sprite_engine.analyze(_smart_text(request.text, "sprite"))
# return SpriteResponse(impossible_combinations=r.impossible_combinations, possible_combinations=r.possible_combinations, sprite_score=r.sprite_score, risk_level=r.risk_level, summary=r.summary, flags=[SpriteFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/granularity", response_model=GranularityResponse)
# def analyze_granularity(request: GranularityRequest):
# """Granularity Analyzer β€” Benford Law, digit preference."""
# try:
# r = _granularity_engine.analyze(_smart_text(request.text, "granularity"))
# return GranularityResponse(digit_preference_score=r.digit_preference_score, benford_score=r.benford_score, round_number_ratio=r.round_number_ratio, granularity_score=r.granularity_score, risk_level=r.risk_level, summary=r.summary, flags=[GranularityFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/pcurve", response_model=PCurveResponse)
# def analyze_pcurve(request: PCurveRequest):
# """P-Curve Analyzer β€” publication bias detector."""
# try:
# r = _pcurve_engine.analyze(_smart_text(request.text, "pcurve"))
# return PCurveResponse(p_values_found=r.p_values_found, significant_p=r.significant_p, right_skew_ratio=r.right_skew_ratio, clustering_score=r.clustering_score, pcurve_score=r.pcurve_score, risk_level=r.risk_level, summary=r.summary, flags=[PCurveFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/effect_size", response_model=EffectSizeResponse)
# def analyze_effect_size(request: EffectSizeRequest):
# """Effect Size Validator β€” Cohen d, power analysis."""
# try:
# r = _effect_size_engine.analyze(_smart_text(request.text, "effect_size"))
# return EffectSizeResponse(effect_sizes_found=r.effect_sizes_found, power_estimates=r.power_estimates, inflated_effects=r.inflated_effects, underpowered=r.underpowered, effect_score=r.effect_score, risk_level=r.risk_level, summary=r.summary, flags=[EffectSizeFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/retraction", response_model=RetractionResponse)
# def analyze_retraction(request: RetractionRequest):
# """Retraction Checker β€” live CrossRef API."""
# try:
# r = _retraction_engine.analyze(_smart_text(request.text, "retraction"))
# return RetractionResponse(dois_found=r.dois_found, retracted_found=r.retracted_found, checked_count=r.checked_count, retraction_score=r.retraction_score, risk_level=r.risk_level, summary=r.summary, flags=[RetractionFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/cartel", response_model=CartelResponse)
# def analyze_cartel(request: CartelRequest):
# """Citation Cartel Detector β€” graph-based ring detection."""
# try:
# r = _cartel_engine.analyze(_smart_text(request.text, "cartel"))
# return CartelResponse(authors_found=r.authors_found, citation_network=r.citation_network, cartel_score=r.cartel_score, self_citation_ratio=r.self_citation_ratio, network_diversity=r.network_diversity, risk_level=r.risk_level, summary=r.summary, flags=[CartelFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/llm", response_model=LLMResponse)
# def analyze_llm(request: LLMRequest):
# """LLM Detector β€” burstiness, TTR, phrase patterns."""
# try:
# r = _llm_engine.analyze(_smart_text(request.text, "llm"))
# return LLMResponse(burstiness_score=r.burstiness_score, vocabulary_diversity=r.vocabulary_diversity, sentence_uniformity=r.sentence_uniformity, llm_phrase_count=r.llm_phrase_count, llm_score=r.llm_score, risk_level=r.risk_level, summary=r.summary, flags=[LLMFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# # ── Phase 5 endpoints ─────────────────────────────────────────────────────────
# @router.post("/analyze/fraud_fingerprint", response_model=FraudFingerprintResponse)
# def analyze_fraud_fingerprint(request: TextAnalysisRequest):
# """Fraud Fingerprinting β€” writing DNA inconsistency detection."""
# try:
# r = _fraud_engine.analyze(_smart_text(request.text, "fraud_fingerprint"))
# return FraudFingerprintResponse(
# style_shift_score = r.style_shift_score,
# precision_inconsistency = r.precision_inconsistency,
# vocabulary_mixing_score = r.vocabulary_mixing_score,
# hedge_overuse_score = r.hedge_overuse_score,
# fingerprint_score = r.fingerprint_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/temporal_anomaly", response_model=TemporalAnomalyResponse)
# def analyze_temporal_anomaly(request: TextAnalysisRequest):
# """Temporal Anomaly β€” citation timeline paradox detection."""
# try:
# r = _temporal_engine.analyze(_smart_text(request.text, "temporal"))
# return TemporalAnomalyResponse(
# collection_years = r.collection_years,
# cited_years = r.cited_years,
# future_citations = r.future_citations,
# recency_claim_score = r.recency_claim_score,
# timeline_score = r.timeline_score,
# temporal_score = r.temporal_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/citation_dna", response_model=CitationDNAResponse)
# def analyze_citation_dna(request: TextAnalysisRequest):
# """Citation DNA β€” network concentration and diversity analysis."""
# try:
# r = _dna_engine.analyze(_smart_text(request.text, "citation_dna"))
# return CitationDNAResponse(
# total_citations = r.total_citations,
# unique_authors = r.unique_authors,
# author_concentration = r.author_concentration,
# journal_concentration = r.journal_concentration,
# decade_concentration = r.decade_concentration,
# geographic_diversity = r.geographic_diversity,
# dna_diversity_score = r.dna_diversity_score,
# dna_risk_score = r.dna_risk_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/data_fingerprint", response_model=DataFingerprintResponse)
# def analyze_data_fingerprint(request: TextAnalysisRequest):
# """Data Fingerprinting β€” fabrication and numerical cloning detection."""
# try:
# r = _dataprint_engine.analyze(_smart_text(request.text, "data_fingerprint"))
# return DataFingerprintResponse(
# total_numbers = r.total_numbers,
# round_number_ratio = r.round_number_ratio,
# terminal_digit_bias = r.terminal_digit_bias,
# impossible_pairs = r.impossible_pairs,
# suspicious_duplicates = [round(v, 4) for v in r.suspicious_duplicates],
# fingerprint_score = r.fingerprint_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/peer_review", response_model=PeerReviewResponse)
# def analyze_peer_review(request: TextAnalysisRequest):
# """Peer Review Score β€” process manipulation detection."""
# try:
# r = _peerreview_engine.analyze(_smart_text(request.text, "peer_review"))
# return PeerReviewResponse(
# days_to_acceptance = r.days_to_acceptance,
# has_conflict_statement = r.has_conflict_statement,
# conflict_credible = r.conflict_credible,
# reviewer_suggestions = r.reviewer_suggestions,
# special_issue = r.special_issue,
# predatory_signals = r.predatory_signals,
# manipulation_score = r.manipulation_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/ai_spectrum", response_model=AISpectrumResponse)
# def analyze_ai_spectrum(request: TextAnalysisRequest):
# """AI-Human Spectrum β€” model attribution and ratio analysis."""
# try:
# r = _spectrum_engine.analyze(_smart_text(request.text, "ai_spectrum"))
# return AISpectrumResponse(
# overall_human_ratio = r.overall_human_ratio,
# overall_ai_ratio = r.overall_ai_ratio,
# dominant_ai_model = r.dominant_ai_model,
# model_confidence = r.model_confidence,
# ai_sections = r.ai_sections,
# human_sections = r.human_sections,
# gpt4_signal_count = r.gpt4_signal_count,
# claude_signal_count = r.claude_signal_count,
# gemini_signal_count = r.gemini_signal_count,
# spectrum_score = r.spectrum_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# # ── Replication Predictor endpoint ────────────────────────────────────────────
# @router.post("/analyze/replication", response_model=ReplicationResponse)
# def analyze_replication(request: TextAnalysisRequest):
# """
# Replication Probability Score β€” ML-based replication likelihood prediction.
# Runs all integrity signals through a trained LogisticRegression model
# and returns the probability that this paper is replicable.
# """
# try:
# text = request.text
# module_scores: dict = {}
# try:
# r = _stat_engine.analyze(_smart_text(text, "statistics"))
# module_scores["score_stat"] = r.risk_score
# except Exception:
# module_scores["score_stat"] = 0.0
# try:
# r = _method_engine.analyze(_smart_text(text, "methodology"), "")
# module_scores["score_method"] = 0.7 if len(r.flags) > 2 else 0.3 if r.flags else 0.1
# except Exception:
# module_scores["score_method"] = 0.0
# try:
# r = _citation_engine.analyze(_smart_text(text, "citations"), "")
# module_scores["score_citation"] = r.risk_score
# except Exception:
# module_scores["score_citation"] = 0.0
# try:
# r = _repro_engine.analyze(_smart_text(text, "reproducibility"))
# module_scores["score_repro"] = round(1.0 - r.reproducibility_score, 4)
# except Exception:
# module_scores["score_repro"] = 0.0
# try:
# r = _novelty_engine.analyze(_smart_text(text, "novelty", per_section_limit=2000), "")
# module_scores["score_novelty"] = getattr(r, "risk_score", round(1.0 - r.novelty_score, 4))
# except Exception:
# module_scores["score_novelty"] = 0.0
# try:
# r = _grim_engine.analyze(_smart_text(text, "grim"))
# module_scores["score_grim"] = r.grim_score
# except Exception:
# module_scores["score_grim"] = 0.0
# try:
# r = _sprite_engine.analyze(_smart_text(text, "sprite"))
# module_scores["score_sprite"] = r.sprite_score
# except Exception:
# module_scores["score_sprite"] = 0.0
# try:
# r = _granularity_engine.analyze(_smart_text(text, "granularity"))
# module_scores["score_granularity"] = r.granularity_score
# except Exception:
# module_scores["score_granularity"] = 0.0
# try:
# r = _pcurve_engine.analyze(_smart_text(text, "pcurve"))
# module_scores["score_pcurve"] = r.pcurve_score
# except Exception:
# module_scores["score_pcurve"] = 0.0
# try:
# r = _effect_size_engine.analyze(_smart_text(text, "effect_size"))
# module_scores["score_effect"] = r.effect_score
# except Exception:
# module_scores["score_effect"] = 0.0
# try:
# r = _retraction_engine.analyze(_smart_text(text, "retraction"))
# module_scores["score_retraction"] = r.retraction_score
# except Exception:
# module_scores["score_retraction"] = 0.0
# try:
# r = _cartel_engine.analyze(_smart_text(text, "cartel"))
# module_scores["score_cartel"] = r.cartel_score
# except Exception:
# module_scores["score_cartel"] = 0.0
# try:
# r = _llm_engine.analyze(_smart_text(text, "llm"))
# module_scores["score_llm"] = r.llm_score
# except Exception:
# module_scores["score_llm"] = 0.0
# try:
# r = _fraud_engine.analyze(_smart_text(text, "fraud_fingerprint"))
# module_scores["score_fraud"] = r.fingerprint_score
# except Exception:
# module_scores["score_fraud"] = 0.0
# try:
# r = _temporal_engine.analyze(_smart_text(text, "temporal"))
# module_scores["score_temporal"] = r.temporal_score
# except Exception:
# module_scores["score_temporal"] = 0.0
# try:
# r = _dna_engine.analyze(_smart_text(text, "citation_dna"))
# module_scores["score_dna"] = r.dna_risk_score
# except Exception:
# module_scores["score_dna"] = 0.0
# try:
# r = _dataprint_engine.analyze(_smart_text(text, "data_fingerprint"))
# module_scores["score_dataprint"] = r.fingerprint_score
# except Exception:
# module_scores["score_dataprint"] = 0.0
# try:
# r = _peerreview_engine.analyze(_smart_text(text, "peer_review"))
# module_scores["score_peerreview"] = r.manipulation_score
# except Exception:
# module_scores["score_peerreview"] = 0.0
# try:
# r = _spectrum_engine.analyze(_smart_text(text, "ai_spectrum"))
# module_scores["score_spectrum"] = r.spectrum_score
# except Exception:
# module_scores["score_spectrum"] = 0.0
# result = _predict_replication(module_scores)
# return ReplicationResponse(
# replication_probability = result.replication_probability,
# fraud_probability = result.fraud_probability,
# replication_level = result.replication_level,
# verdict = result.verdict,
# risk_score = result.risk_score,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# ReplicationFlagResponse(
# flag_type = f["flag_type"],
# severity = f["severity"],
# description = f["description"],
# evidence = f["evidence"],
# suggestion = f["suggestion"],
# ) for f in result.flags
# ],
# flags_count = result.flags_count,
# model_version = result.model_version,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# # ── Full PDF β€” 21-Module Master Endpoint ─────────────────────────────────────
# @router.post("/analyze/full-pdf", response_model=FullPDFResponse)
# async def analyze_full_pdf(file: UploadFile = File(...)):
# """
# Master endpoint β€” Upload a PDF and run all 21 analysis modules at once.
# Returns a unified integrity report with per-module scores, top flags,
# and an ML-based replication probability prediction.
# Designed for PhD researchers, journal editors, and integrity committees.
# """
# try:
# file_bytes = await file.read()
# paper = _pdf_parser.parse_bytes(file_bytes, file.filename)
# text = paper.full_text
# if len(text.strip()) < 100:
# raise HTTPException(
# status_code=422,
# detail="PDF text extraction failed or paper is too short. "
# "Ensure the PDF contains selectable text (not a scanned image)."
# )
# modules_run = []
# top_flags = []
# scores = []
# module_scores: dict = {}
# # ── Module 01: Statistical Audit ──────────────────────────
# try:
# r = _stat_engine.analyze(_smart_text(text, "statistics"))
# modules_run.append(ModuleSummary(module="Statistical Audit", risk_level=r.risk_level, risk_score=r.risk_score, summary=r.summary, flags_count=len(r.flags)))
# scores.append(r.risk_score)
# module_scores["score_stat"] = r.risk_score
# for f in r.flags[:2]: top_flags.append(f"[Statistics] {f.description}")
# except Exception: pass
# # ── Module 02: Methodology Checker ────────────────────────
# try:
# abstract = paper.sections.get("abstract", "")
# r = _method_engine.analyze(_smart_text(text, "methodology"), abstract)
# score = 0.7 if len(r.flags) > 2 else 0.3 if r.flags else 0.1
# modules_run.append(ModuleSummary(module="Methodology Checker", risk_level="HIGH" if score >= 0.7 else "MEDIUM" if score >= 0.4 else "LOW", risk_score=score, summary=r.summary, flags_count=len(r.flags)))
# scores.append(score)
# module_scores["score_method"] = score
# for f in r.flags[:2]: top_flags.append(f"[Methodology] {f.issue}")
# except Exception: pass
# # ── Module 03: Citation Integrity ─────────────────────────
# try:
# r = _citation_engine.analyze(_smart_text(text, "citations"), "")
# modules_run.append(ModuleSummary(module="Citation Integrity", risk_level=r.risk_level, risk_score=r.risk_score, summary=r.summary, flags_count=len(r.flags)))
# scores.append(r.risk_score)
# module_scores["score_citation"] = r.risk_score
# for f in r.flags[:2]: top_flags.append(f"[Citations] {f.description}")
# except Exception: pass
# # ── Module 04: Reproducibility ────────────────────────────
# try:
# r = _repro_engine.analyze(_smart_text(text, "reproducibility"))
# repro_risk = round(1.0 - r.reproducibility_score, 4)
# modules_run.append(ModuleSummary(module="Reproducibility Scanner", risk_level=r.risk_level, risk_score=repro_risk, summary=r.summary, flags_count=len(r.flags)))
# scores.append(repro_risk)
# module_scores["score_repro"] = repro_risk
# for f in r.flags[:1]: top_flags.append(f"[Reproducibility] {f.description}")
# except Exception: pass
# # ── Module 05: Novelty ────────────────────────────────────
# try:
# r = _novelty_engine.analyze(_smart_text(text, "novelty", per_section_limit=2000), paper.title)
# novelty_risk = getattr(r, "risk_score", round(1.0 - r.novelty_score, 4))
# modules_run.append(ModuleSummary(module="Novelty Scorer", risk_level=r.risk_level, risk_score=novelty_risk, summary=r.summary, flags_count=len(getattr(r, "flags", []) or [])))
# scores.append(novelty_risk)
# module_scores["score_novelty"] = novelty_risk
# except Exception: pass
# # ── Module 06: GRIM Test ──────────────────────────────────
# try:
# r = _grim_engine.analyze(_smart_text(text, "grim"))
# modules_run.append(ModuleSummary(module="GRIM Test", risk_level=r.risk_level, risk_score=r.grim_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.grim_score)
# module_scores["score_grim"] = r.grim_score
# for f in r.flags[:1]: top_flags.append(f"[GRIM] {f.description}")
# except Exception: pass
# # ── Module 07: SPRITE Test ────────────────────────────────
# try:
# r = _sprite_engine.analyze(_smart_text(text, "sprite"))
# modules_run.append(ModuleSummary(module="SPRITE Test", risk_level=r.risk_level, risk_score=r.sprite_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.sprite_score)
# module_scores["score_sprite"] = r.sprite_score
# except Exception: pass
# # ── Module 08: Granularity ────────────────────────────────
# try:
# r = _granularity_engine.analyze(_smart_text(text, "granularity"))
# modules_run.append(ModuleSummary(module="Granularity Analyzer", risk_level=r.risk_level, risk_score=r.granularity_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.granularity_score)
# module_scores["score_granularity"] = r.granularity_score
# except Exception: pass
# # ── Module 09: P-Curve ────────────────────────────────────
# try:
# r = _pcurve_engine.analyze(_smart_text(text, "pcurve"))
# modules_run.append(ModuleSummary(module="P-Curve Analyzer", risk_level=r.risk_level, risk_score=r.pcurve_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.pcurve_score)
# module_scores["score_pcurve"] = r.pcurve_score
# for f in r.flags[:1]: top_flags.append(f"[P-Curve] {f.description}")
# except Exception: pass
# # ── Module 10: Effect Size ────────────────────────────────
# try:
# r = _effect_size_engine.analyze(_smart_text(text, "effect_size"))
# modules_run.append(ModuleSummary(module="Effect Size Validator", risk_level=r.risk_level, risk_score=r.effect_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.effect_score)
# module_scores["score_effect"] = r.effect_score
# except Exception: pass
# # ── Module 11: Retraction Checker ─────────────────────────
# try:
# r = _retraction_engine.analyze(_smart_text(text, "retraction"))
# modules_run.append(ModuleSummary(module="Retraction Checker", risk_level=r.risk_level, risk_score=r.retraction_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.retraction_score)
# module_scores["score_retraction"] = r.retraction_score
# for f in r.flags[:1]: top_flags.append(f"[Retraction] {f.description}")
# except Exception: pass
# # ── Module 12: Citation Cartel ────────────────────────────
# try:
# r = _cartel_engine.analyze(_smart_text(text, "cartel"))
# modules_run.append(ModuleSummary(module="Citation Cartel Detector", risk_level=r.risk_level, risk_score=r.cartel_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.cartel_score)
# module_scores["score_cartel"] = r.cartel_score
# for f in r.flags[:1]: top_flags.append(f"[Cartel] {f.description}")
# except Exception: pass
# # ── Module 13: LLM Detector ───────────────────────────────
# try:
# r = _llm_engine.analyze(_smart_text(text, "llm"))
# modules_run.append(ModuleSummary(module="LLM Paper Detector", risk_level=r.risk_level, risk_score=r.llm_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.llm_score)
# module_scores["score_llm"] = r.llm_score
# for f in r.flags[:1]: top_flags.append(f"[LLM] {f.description}")
# except Exception: pass
# # ── Module 14: Figure Forensics ───────────────────────────
# try:
# tmp_path = None
# with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
# tmp.write(file_bytes)
# tmp_path = tmp.name
# r = _figure_engine.analyze(tmp_path)
# fig_score = min(len(r.duplicate_pairs) * 0.3, 1.0)
# modules_run.append(ModuleSummary(module="Figure Forensics", risk_level="HIGH" if fig_score >= 0.7 else "MEDIUM" if fig_score >= 0.3 else "LOW", risk_score=fig_score, summary=f"{r.figures_found} figures found. {len(r.duplicate_pairs)} duplicate pairs detected.", flags_count=len(r.flags)))
# scores.append(fig_score)
# if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path)
# except Exception: pass
# # ── Module 15: Fraud Fingerprinting ───────────────────────
# try:
# r = _fraud_engine.analyze(_smart_text(text, "fraud_fingerprint"))
# modules_run.append(ModuleSummary(module="Fraud Fingerprinting", risk_level=r.risk_level, risk_score=r.fingerprint_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.fingerprint_score)
# module_scores["score_fraud"] = r.fingerprint_score
# for f in r.flags[:1]: top_flags.append(f"[Fingerprint] {f.description}")
# except Exception: pass
# # ── Module 16: Temporal Anomaly ───────────────────────────
# try:
# r = _temporal_engine.analyze(_smart_text(text, "temporal"))
# modules_run.append(ModuleSummary(module="Temporal Anomaly Detector", risk_level=r.risk_level, risk_score=r.temporal_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.temporal_score)
# module_scores["score_temporal"] = r.temporal_score
# for f in r.flags[:1]: top_flags.append(f"[Temporal] {f.description}")
# except Exception: pass
# # ── Module 17: Citation DNA ───────────────────────────────
# try:
# r = _dna_engine.analyze(_smart_text(text, "citation_dna"))
# modules_run.append(ModuleSummary(module="Citation DNA Analyzer", risk_level=r.risk_level, risk_score=r.dna_risk_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.dna_risk_score)
# module_scores["score_dna"] = r.dna_risk_score
# for f in r.flags[:1]: top_flags.append(f"[CitationDNA] {f.description}")
# except Exception: pass
# # ── Module 18: Data Fingerprint ───────────────────────────
# try:
# r = _dataprint_engine.analyze(_smart_text(text, "data_fingerprint"))
# modules_run.append(ModuleSummary(module="Data Fingerprint Analyzer", risk_level=r.risk_level, risk_score=r.fingerprint_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.fingerprint_score)
# module_scores["score_dataprint"] = r.fingerprint_score
# for f in r.flags[:1]: top_flags.append(f"[DataFingerprint] {f.description}")
# except Exception: pass
# # ── Module 19: Peer Review Score ──────────────────────────
# try:
# r = _peerreview_engine.analyze(_smart_text(text, "peer_review"))
# modules_run.append(ModuleSummary(module="Peer Review Score", risk_level=r.risk_level, risk_score=r.manipulation_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.manipulation_score)
# module_scores["score_peerreview"] = r.manipulation_score
# for f in r.flags[:1]: top_flags.append(f"[PeerReview] {f.description}")
# except Exception: pass
# # ── Module 20: AI-Human Spectrum ──────────────────────────
# try:
# r = _spectrum_engine.analyze(_smart_text(text, "ai_spectrum"))
# modules_run.append(ModuleSummary(module="AI-Human Spectrum", risk_level=r.risk_level, risk_score=r.spectrum_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.spectrum_score)
# module_scores["score_spectrum"] = r.spectrum_score
# for f in r.flags[:1]: top_flags.append(f"[AISpectrum] {f.description}")
# except Exception: pass
# # ── Module 21: Replication Probability Score ──────────────
# replication_probability = 0.5
# replication_level = "UNKNOWN"
# try:
# rep = _predict_replication(module_scores)
# replication_probability = rep.replication_probability
# replication_level = rep.replication_level
# modules_run.append(ModuleSummary(
# module = "Replication Probability Score",
# risk_level = rep.risk_level,
# risk_score = rep.risk_score,
# summary = rep.summary,
# flags_count = rep.flags_count,
# ))
# scores.append(rep.risk_score)
# if rep.flags_count > 0:
# top_flags.append(f"[Replication] {rep.verdict}")
# except Exception: pass
# # ── Final Score ───────────────────────────────────────────
# overall_score, overall_risk = _compute_overall(scores)
# return FullPDFResponse(
# paper_title = paper.title,
# page_count = paper.page_count,
# figure_count = paper.figure_count,
# file_size_kb = paper.metadata.get("file_size_kb", 0.0),
# sha256 = paper.metadata.get("sha256", ""),
# overall_score = overall_score,
# overall_risk = overall_risk,
# integrity_verdict = _verdict(overall_risk),
# replication_probability = replication_probability,
# replication_level = replication_level,
# modules = modules_run,
# top_flags = top_flags[:10],
# analyzed_by = "SciPeerAI v2.1.0 β€” 21-Module Pipeline",
# )
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# import os
# import tempfile
# from fastapi import APIRouter, HTTPException, UploadFile, File
# from pydantic import BaseModel, Field
# from src.scipeerai.modules.reproducibility_scanner import ReproducibilityScanner
# from src.scipeerai.modules.stat_audit import StatAuditEngine
# from src.scipeerai.modules.figure_forensics import FigureForensicsEngine
# from src.scipeerai.modules.methodology_checker import MethodologyChecker
# from src.scipeerai.modules.citation_analyzer import CitationAnalyzer
# from src.scipeerai.modules.novelty_scorer import NoveltyScorer
# from src.scipeerai.modules.grim_test import GrimTest
# from src.scipeerai.modules.sprite_test import SpriteTest
# from src.scipeerai.modules.granularity_analyzer import GranularityAnalyzer
# from src.scipeerai.modules.pcurve_analyzer import PCurveAnalyzer
# from src.scipeerai.modules.effect_size_validator import EffectSizeValidator
# from src.scipeerai.modules.retraction_checker import RetractionChecker
# from src.scipeerai.modules.citation_cartel import CitationCartelDetector
# from src.scipeerai.modules.llm_detector import LLMDetector
# from src.scipeerai.modules.fraud_fingerprint import FraudFingerprinter
# from src.scipeerai.modules.temporal_anomaly import TemporalAnomalyDetector
# from src.scipeerai.modules.citation_dna import CitationDNAAnalyzer
# from src.scipeerai.modules.data_fingerprint import DataFingerprintAnalyzer
# from src.scipeerai.modules.peer_review_score import PeerReviewScorer
# from src.scipeerai.modules.ai_human_spectrum import AIHumanSpectrumAnalyzer
# from src.scipeerai.modules.replication_predictor import analyze as _predict_replication
# from src.scipeerai.modules.research_genealogy import ResearchGenealogyEngine
# from src.scipeerai.core.pdf_parser import PDFParser
# router = APIRouter(prefix="/api/v1", tags=["Analysis"])
# # ── Section-aware text routing ────────────────────────────────────────────────
# _SECTION_MARKERS = [
# "abstract", "introduction", "background", "related work",
# "methods", "methodology", "materials and methods",
# "experimental", "experiments", "procedures",
# "results", "findings", "data analysis",
# "discussion", "conclusion", "conclusions",
# "references", "bibliography", "acknowledgments",
# "supplementary", "appendix",
# ]
# _MODULE_SECTIONS = {
# "statistics": ["abstract", "results", "findings",
# "data analysis", "methods", "methodology"],
# "methodology": ["abstract", "introduction", "methods",
# "methodology", "materials and methods",
# "experimental", "conclusion", "conclusions"],
# "citations": ["introduction", "background",
# "related work", "references", "bibliography"],
# "reproducibility": ["methods", "methodology",
# "materials and methods", "experimental",
# "procedures", "acknowledgments"],
# "novelty": ["abstract", "introduction",
# "background", "related work"],
# "grim": ["results", "findings", "methods",
# "methodology", "data analysis"],
# "sprite": ["results", "findings", "methods",
# "methodology", "data analysis"],
# "granularity": ["results", "findings",
# "methods", "data analysis"],
# "pcurve": ["abstract", "results",
# "findings", "data analysis"],
# "effect_size": ["results", "findings",
# "methods", "discussion"],
# "retraction": ["references", "bibliography", "introduction"],
# "cartel": ["references", "bibliography",
# "introduction", "acknowledgments"],
# "llm": ["abstract", "introduction", "methods",
# "results", "discussion"],
# "fraud_fingerprint": ["abstract", "introduction", "methods",
# "results", "discussion", "conclusion"],
# "temporal": ["introduction", "methods", "references",
# "bibliography", "background"],
# "citation_dna": ["references", "bibliography",
# "introduction", "acknowledgments"],
# "data_fingerprint": ["results", "findings", "methods",
# "data analysis", "discussion"],
# "peer_review": ["acknowledgments", "introduction",
# "abstract", "references"],
# "ai_spectrum": ["abstract", "introduction", "methods",
# "results", "discussion", "conclusion"],
# "genealogy": ["references", "bibliography",
# "introduction", "acknowledgments"],
# }
# def _extract_sections(text: str) -> dict:
# text_lower = text.lower()
# positions = []
# for marker in _SECTION_MARKERS:
# search_from = 0
# while True:
# idx = text_lower.find(marker, search_from)
# if idx == -1:
# break
# line_start = text.rfind('\n', 0, idx) + 1
# line_end = text.find('\n', idx)
# if line_end == -1:
# line_end = len(text)
# line_content = text[line_start:line_end].strip()
# if len(line_content) <= 60:
# positions.append((idx, marker))
# break
# search_from = idx + 1
# if not positions:
# return {}
# positions.sort(key=lambda x: x[0])
# deduped = [positions[0]]
# for pos in positions[1:]:
# if pos[0] - deduped[-1][0] > 50:
# deduped.append(pos)
# sections = {}
# for i, (start, name) in enumerate(deduped):
# end = deduped[i + 1][0] if i + 1 < len(deduped) else len(text)
# sections[name] = text[start:end].strip()
# return sections
# def _smart_text(text: str, module: str, per_section_limit: int = 2500) -> str:
# sections = _extract_sections(text)
# target_keys = _MODULE_SECTIONS.get(module, [])
# if sections and target_keys:
# parts = []
# for key in target_keys:
# if key in sections:
# parts.append(sections[key][:per_section_limit])
# if parts:
# return "\n\n".join(parts)[:12000]
# return _truncate(text)
# def _truncate(text: str, limit: int = 8000) -> str:
# if len(text) <= limit:
# return text
# lower = text.lower()
# methods_idx = lower.find('method')
# if 0 < methods_idx < len(text) - 1000:
# start = text[:3000]
# middle = text[methods_idx:methods_idx + 4000]
# return start + " [...] " + middle
# return text[:limit]
# # ── Engine initialization ─────────────────────────────────────────────────────
# _stat_engine = StatAuditEngine()
# _figure_engine = FigureForensicsEngine()
# _method_engine = MethodologyChecker()
# _citation_engine = CitationAnalyzer()
# _repro_engine = ReproducibilityScanner()
# _novelty_engine = NoveltyScorer()
# _grim_engine = GrimTest()
# _sprite_engine = SpriteTest()
# _granularity_engine = GranularityAnalyzer()
# _pcurve_engine = PCurveAnalyzer()
# _effect_size_engine = EffectSizeValidator()
# _retraction_engine = RetractionChecker()
# _cartel_engine = CitationCartelDetector()
# _llm_engine = LLMDetector()
# _fraud_engine = FraudFingerprinter()
# _temporal_engine = TemporalAnomalyDetector()
# _dna_engine = CitationDNAAnalyzer()
# _dataprint_engine = DataFingerprintAnalyzer()
# _peerreview_engine = PeerReviewScorer()
# _spectrum_engine = AIHumanSpectrumAnalyzer()
# _genealogy_engine = ResearchGenealogyEngine()
# _pdf_parser = PDFParser()
# # ── Shared response models ────────────────────────────────────────────────────
# class TextAnalysisRequest(BaseModel):
# text: str = Field(..., min_length=50, description="Paper text to analyze")
# class FlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class ForensicFlagResponse(BaseModel):
# figures_involved: list
# class StatAuditResponse(BaseModel):
# risk_level: str
# risk_score: float
# summary: str
# flags: list[FlagResponse]
# p_values_found: list[float]
# sample_sizes_found: list[int]
# flags_count: int
# class FigureForensicsResponse(BaseModel):
# figures_found: int
# flags: list[ForensicFlagResponse]
# duplicate_pairs: list
# class MethodologyRequest(BaseModel):
# text: str = Field(..., min_length=50)
# abstract: str = Field("")
# class MethodologyFlagResponse(BaseModel):
# claim: str
# issue: str
# suggestion: str
# class MethodologyResponse(BaseModel):
# flags: list[MethodologyFlagResponse]
# claims_found: list[str]
# methods_found: list[str]
# llm_assessment: str
# llm_available: bool
# class CitationRequest(BaseModel):
# text: str = Field(..., min_length=50)
# author_name: str = Field("")
# class CitationFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class CitationResponse(BaseModel):
# total_citations: int
# self_citations: int
# self_citation_ratio: float
# unsupported_claims: int
# flags: list[CitationFlagResponse]
# risk_level: str
# risk_score: float
# summary: str
# flags_count: int
# class ReproducibilityRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class ReproducibilityFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class ReproducibilityResponse(BaseModel):
# has_code_link: bool
# has_data_link: bool
# has_software_versions: bool
# has_preregistration: bool
# has_ethics_statement: bool
# reproducibility_score: float
# risk_level: str
# summary: str
# flags: list[ReproducibilityFlagResponse]
# flags_count: int
# class NoveltyRequest(BaseModel):
# text: str = Field(..., min_length=50)
# title: str = Field("")
# class NoveltyFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str = ""
# class RelatedWorkResponse(BaseModel):
# title: str
# year: int
# authors: list
# similarity_signal: str
# class NoveltyResponse(BaseModel):
# novelty_score: float
# novelty_level: str
# risk_level: str
# risk_score: float
# summary: str
# flags: list[NoveltyFlagResponse]
# related_works_found: list[RelatedWorkResponse]
# key_terms_extracted: list[str]
# literature_accessible: bool
# flags_count: int
# class GrimRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class GrimFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class GrimResponse(BaseModel):
# impossible_means: list; possible_means: list; grim_score: float
# risk_level: str; summary: str; flags: list[GrimFlagResponse]; flags_count: int
# class SpriteRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class SpriteFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class SpriteResponse(BaseModel):
# impossible_combinations: list; possible_combinations: list; sprite_score: float
# risk_level: str; summary: str; flags: list[SpriteFlagResponse]; flags_count: int
# class GranularityRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class GranularityFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class GranularityResponse(BaseModel):
# digit_preference_score: float; benford_score: float; round_number_ratio: float
# granularity_score: float; risk_level: str; summary: str
# flags: list[GranularityFlagResponse]; flags_count: int
# class PCurveRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class PCurveFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class PCurveResponse(BaseModel):
# p_values_found: list; significant_p: list; right_skew_ratio: float
# clustering_score: float; pcurve_score: float; risk_level: str; summary: str
# flags: list[PCurveFlagResponse]; flags_count: int
# class EffectSizeRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class EffectSizeFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class EffectSizeResponse(BaseModel):
# effect_sizes_found: list; power_estimates: list; inflated_effects: list
# underpowered: list; effect_score: float; risk_level: str; summary: str
# flags: list[EffectSizeFlagResponse]; flags_count: int
# class RetractionRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class RetractionFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class RetractionResponse(BaseModel):
# dois_found: list; retracted_found: list; checked_count: int
# retraction_score: float; risk_level: str; summary: str
# flags: list[RetractionFlagResponse]; flags_count: int
# class CartelRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class CartelFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class CartelResponse(BaseModel):
# authors_found: list; citation_network: dict; cartel_score: float
# self_citation_ratio: float; network_diversity: float; risk_level: str
# summary: str; flags: list[CartelFlagResponse]; flags_count: int
# class LLMRequest(BaseModel):
# text: str = Field(..., min_length=50)
# class LLMFlagResponse(BaseModel):
# flag_type: str; severity: str; description: str; evidence: str; suggestion: str
# class LLMResponse(BaseModel):
# burstiness_score: float; vocabulary_diversity: float; sentence_uniformity: float
# llm_phrase_count: int; llm_score: float; risk_level: str; summary: str
# flags: list[LLMFlagResponse]; flags_count: int
# # ── Phase 5 models ────────────────────────────────────────────────────────────
# class Phase5FlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class FraudFingerprintResponse(BaseModel):
# style_shift_score: float
# precision_inconsistency: float
# vocabulary_mixing_score: float
# hedge_overuse_score: float
# fingerprint_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# class TemporalAnomalyResponse(BaseModel):
# collection_years: list
# cited_years: list
# future_citations: list
# recency_claim_score: float
# timeline_score: float
# temporal_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# class CitationDNAResponse(BaseModel):
# total_citations: int
# unique_authors: int
# author_concentration: float
# journal_concentration: float
# decade_concentration: float
# geographic_diversity: float
# dna_diversity_score: float
# dna_risk_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# class DataFingerprintResponse(BaseModel):
# total_numbers: int
# round_number_ratio: float
# terminal_digit_bias: float
# impossible_pairs: list
# suspicious_duplicates: list
# fingerprint_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# class PeerReviewResponse(BaseModel):
# days_to_acceptance: int
# has_conflict_statement: bool
# conflict_credible: bool
# reviewer_suggestions: bool
# special_issue: bool
# predatory_signals: int
# manipulation_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# class AISpectrumResponse(BaseModel):
# overall_human_ratio: float
# overall_ai_ratio: float
# dominant_ai_model: str
# model_confidence: float
# ai_sections: list
# human_sections: list
# gpt4_signal_count: int
# claude_signal_count: int
# gemini_signal_count: int
# spectrum_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# # ── Replication Predictor models ──────────────────────────────────────────────
# class ReplicationRequest(BaseModel):
# text: str = Field(..., min_length=50, description="Paper text to analyze for replication probability")
# class ReplicationFlagResponse(BaseModel):
# flag_type: str
# severity: str
# description: str
# evidence: str
# suggestion: str
# class ReplicationResponse(BaseModel):
# replication_probability: float
# fraud_probability: float
# replication_level: str
# verdict: str
# risk_score: float
# risk_level: str
# summary: str
# flags: list[ReplicationFlagResponse]
# flags_count: int
# model_version: str
# # ── Phase 6 models ────────────────────────────────────────────────────────────
# class GenealogyResponse(BaseModel):
# doi_count: int
# unique_cited_authors: int
# genealogy_concentration: float
# ring_detected: bool
# ring_members: list[str]
# crossref_checked: int
# retracted_ancestors: int
# lineage_depth_score: float
# genealogy_score: float
# risk_level: str
# summary: str
# flags: list[Phase5FlagResponse]
# flags_count: int
# # ── Full PDF models ───────────────────────────────────────────────────────────
# class ModuleSummary(BaseModel):
# module: str
# risk_level: str
# risk_score: float
# summary: str
# flags_count: int
# class FullPDFResponse(BaseModel):
# paper_title: str
# page_count: int
# figure_count: int
# file_size_kb: float
# sha256: str
# overall_score: float
# overall_risk: str
# integrity_verdict: str
# replication_probability: float
# replication_level: str
# modules: list[ModuleSummary]
# top_flags: list[str]
# analyzed_by: str
# # ── Helpers ───────────────────────────────────────────────────────────────────
# def _compute_overall(scores: list[float]) -> tuple[float, str]:
# avg = round(sum(scores) / len(scores), 3) if scores else 0.0
# if avg >= 0.7: level = "HIGH"
# elif avg >= 0.4: level = "MEDIUM"
# else: level = "LOW"
# return avg, level
# def _verdict(risk: str) -> str:
# return {
# "HIGH": "Serious integrity concerns detected. Manual expert review strongly recommended.",
# "MEDIUM": "Some integrity issues found. Careful review advised before publication.",
# "LOW": "No major integrity issues detected. Paper appears scientifically sound.",
# }.get(risk, "Unknown")
# def _p5flags(flags) -> list:
# return [
# Phase5FlagResponse(
# flag_type = f.flag_type,
# severity = f.severity,
# description = f.description,
# evidence = f.evidence,
# suggestion = f.suggestion,
# ) for f in flags
# ]
# # ── Status ────────────────────────────────────────────────────────────────────
# @router.get("/status")
# def system_status():
# return {
# "modules_ready": {
# "stat_audit": True,
# "figure_forensics": True,
# "methodology_checker": True,
# "citation_analyzer": True,
# "reproducibility": True,
# "novelty_scorer": True,
# "grim_test": True,
# "sprite_test": True,
# "granularity": True,
# "pcurve": True,
# "effect_size": True,
# "retraction": True,
# "citation_cartel": True,
# "llm_detector": True,
# "fraud_fingerprint": True,
# "temporal_anomaly": True,
# "citation_dna": True,
# "data_fingerprint": True,
# "peer_review_score": True,
# "ai_human_spectrum": True,
# "replication_predictor": True,
# "research_genealogy": True,
# },
# "version": "2.2.0",
# "modules_count": 22,
# "tests_passing": 209,
# "text_extraction": "section-aware",
# "ml_ensemble": "LightGBM+XGBoost+RF+LR",
# "benchmark": "SciPeerBench v2.0 β€” 91,779 papers",
# "ensemble_auc": 0.895,
# }
# # ── Original 14 endpoints ─────────────────────────────────────────────────────
# @router.post("/analyze/statistics", response_model=StatAuditResponse)
# def analyze_statistics(request: TextAnalysisRequest):
# """Statistical Audit β€” p-hacking, sample size, round numbers."""
# try:
# r = _stat_engine.analyze(_smart_text(request.text, "statistics"))
# return StatAuditResponse(
# risk_level = r.risk_level,
# risk_score = r.risk_score,
# summary = r.summary,
# flags = [FlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags],
# p_values_found = r.p_values_found,
# sample_sizes_found = r.sample_sizes_found,
# flags_count = len(r.flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/figures", response_model=FigureForensicsResponse)
# async def analyze_figures(file: UploadFile = File(...)):
# """Figure Forensics β€” duplicate image detection."""
# if not file.filename.endswith(".pdf"):
# raise HTTPException(status_code=400, detail="Only PDF files accepted.")
# tmp_path = None
# try:
# contents = await file.read()
# with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
# tmp.write(contents)
# tmp_path = tmp.name
# r = _figure_engine.analyze(tmp_path)
# return FigureForensicsResponse(
# figures_found = r.figures_found,
# flags = [ForensicFlagResponse(figures_involved=f.figures_involved) for f in r.flags],
# duplicate_pairs = r.duplicate_pairs,
# )
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# finally:
# if tmp_path and os.path.exists(tmp_path):
# os.unlink(tmp_path)
# @router.post("/analyze/methodology", response_model=MethodologyResponse)
# def analyze_methodology(request: MethodologyRequest):
# """Methodology Checker β€” causation, control groups, timeframe."""
# try:
# r = _method_engine.analyze(_smart_text(request.text, "methodology"), request.abstract)
# return MethodologyResponse(
# flags = [MethodologyFlagResponse(claim=f.claim, issue=f.issue, suggestion=f.suggestion) for f in r.flags],
# claims_found = r.claims_found,
# methods_found = r.methods_found,
# llm_assessment = r.llm_assessment,
# llm_available = r.llm_available,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/citations", response_model=CitationResponse)
# def analyze_citations(request: CitationRequest):
# """Citation Integrity β€” self-citation, unsupported claims."""
# try:
# r = _citation_engine.analyze(_smart_text(request.text, "citations"), request.author_name)
# return CitationResponse(
# total_citations = r.total_citations,
# self_citations = r.self_citations,
# self_citation_ratio = r.self_citation_ratio,
# unsupported_claims = r.unsupported_claims,
# flags = [CitationFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=getattr(f, 'suggestion', '')) for f in r.flags],
# risk_level = r.risk_level,
# risk_score = r.risk_score,
# summary = r.summary,
# flags_count = len(r.flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/reproducibility", response_model=ReproducibilityResponse)
# def analyze_reproducibility(request: ReproducibilityRequest):
# """Reproducibility Scanner β€” code, data, ethics, preregistration."""
# try:
# r = _repro_engine.analyze(_smart_text(request.text, "reproducibility"))
# return ReproducibilityResponse(
# has_code_link = r.has_code_link,
# has_data_link = r.has_data_link,
# has_software_versions = r.has_software_versions,
# has_preregistration = r.has_preregistration,
# has_ethics_statement = r.has_ethics_statement,
# reproducibility_score = r.reproducibility_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = [ReproducibilityFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=getattr(f, 'suggestion', '')) for f in r.flags],
# flags_count = len(r.flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/novelty", response_model=NoveltyResponse)
# def analyze_novelty(request: NoveltyRequest):
# """Novelty Scorer β€” literature search, novelty estimation."""
# try:
# r = _novelty_engine.analyze(_smart_text(request.text, "novelty", per_section_limit=2000), request.title)
# raw_flags = getattr(r, 'flags', []) or []
# return NoveltyResponse(
# novelty_score = r.novelty_score,
# novelty_level = r.novelty_level,
# risk_level = r.risk_level,
# risk_score = getattr(r, 'risk_score', r.novelty_score),
# summary = r.summary,
# flags = [NoveltyFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=getattr(f, 'suggestion', '')) for f in raw_flags],
# related_works_found = [RelatedWorkResponse(title=w.title, year=w.year, authors=w.authors, similarity_signal=w.similarity_signal) for w in r.related_works_found],
# key_terms_extracted = r.key_terms_extracted,
# literature_accessible = r.literature_accessible,
# flags_count = len(raw_flags),
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/grim", response_model=GrimResponse)
# def analyze_grim(request: GrimRequest):
# """GRIM Test β€” mathematically impossible means."""
# try:
# r = _grim_engine.analyze(_smart_text(request.text, "grim"))
# return GrimResponse(impossible_means=r.impossible_means, possible_means=r.possible_means, grim_score=r.grim_score, risk_level=r.risk_level, summary=r.summary, flags=[GrimFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/sprite", response_model=SpriteResponse)
# def analyze_sprite(request: SpriteRequest):
# """SPRITE Test β€” impossible distributions."""
# try:
# r = _sprite_engine.analyze(_smart_text(request.text, "sprite"))
# return SpriteResponse(impossible_combinations=r.impossible_combinations, possible_combinations=r.possible_combinations, sprite_score=r.sprite_score, risk_level=r.risk_level, summary=r.summary, flags=[SpriteFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/granularity", response_model=GranularityResponse)
# def analyze_granularity(request: GranularityRequest):
# """Granularity Analyzer β€” Benford Law, digit preference."""
# try:
# r = _granularity_engine.analyze(_smart_text(request.text, "granularity"))
# return GranularityResponse(digit_preference_score=r.digit_preference_score, benford_score=r.benford_score, round_number_ratio=r.round_number_ratio, granularity_score=r.granularity_score, risk_level=r.risk_level, summary=r.summary, flags=[GranularityFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/pcurve", response_model=PCurveResponse)
# def analyze_pcurve(request: PCurveRequest):
# """P-Curve Analyzer β€” publication bias detector."""
# try:
# r = _pcurve_engine.analyze(_smart_text(request.text, "pcurve"))
# return PCurveResponse(p_values_found=r.p_values_found, significant_p=r.significant_p, right_skew_ratio=r.right_skew_ratio, clustering_score=r.clustering_score, pcurve_score=r.pcurve_score, risk_level=r.risk_level, summary=r.summary, flags=[PCurveFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/effect_size", response_model=EffectSizeResponse)
# def analyze_effect_size(request: EffectSizeRequest):
# """Effect Size Validator β€” Cohen d, power analysis."""
# try:
# r = _effect_size_engine.analyze(_smart_text(request.text, "effect_size"))
# return EffectSizeResponse(effect_sizes_found=r.effect_sizes_found, power_estimates=r.power_estimates, inflated_effects=r.inflated_effects, underpowered=r.underpowered, effect_score=r.effect_score, risk_level=r.risk_level, summary=r.summary, flags=[EffectSizeFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/retraction", response_model=RetractionResponse)
# def analyze_retraction(request: RetractionRequest):
# """Retraction Checker β€” live CrossRef API."""
# try:
# r = _retraction_engine.analyze(_smart_text(request.text, "retraction"))
# return RetractionResponse(dois_found=r.dois_found, retracted_found=r.retracted_found, checked_count=r.checked_count, retraction_score=r.retraction_score, risk_level=r.risk_level, summary=r.summary, flags=[RetractionFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/cartel", response_model=CartelResponse)
# def analyze_cartel(request: CartelRequest):
# """Citation Cartel Detector β€” graph-based ring detection."""
# try:
# r = _cartel_engine.analyze(_smart_text(request.text, "cartel"))
# return CartelResponse(authors_found=r.authors_found, citation_network=r.citation_network, cartel_score=r.cartel_score, self_citation_ratio=r.self_citation_ratio, network_diversity=r.network_diversity, risk_level=r.risk_level, summary=r.summary, flags=[CartelFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/llm", response_model=LLMResponse)
# def analyze_llm(request: LLMRequest):
# """LLM Detector β€” burstiness, TTR, phrase patterns."""
# try:
# r = _llm_engine.analyze(_smart_text(request.text, "llm"))
# return LLMResponse(burstiness_score=r.burstiness_score, vocabulary_diversity=r.vocabulary_diversity, sentence_uniformity=r.sentence_uniformity, llm_phrase_count=r.llm_phrase_count, llm_score=r.llm_score, risk_level=r.risk_level, summary=r.summary, flags=[LLMFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# # ── Phase 5 endpoints ─────────────────────────────────────────────────────────
# @router.post("/analyze/fraud_fingerprint", response_model=FraudFingerprintResponse)
# def analyze_fraud_fingerprint(request: TextAnalysisRequest):
# """Fraud Fingerprinting β€” writing DNA inconsistency detection."""
# try:
# r = _fraud_engine.analyze(_smart_text(request.text, "fraud_fingerprint"))
# return FraudFingerprintResponse(
# style_shift_score = r.style_shift_score,
# precision_inconsistency = r.precision_inconsistency,
# vocabulary_mixing_score = r.vocabulary_mixing_score,
# hedge_overuse_score = r.hedge_overuse_score,
# fingerprint_score = r.fingerprint_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/temporal_anomaly", response_model=TemporalAnomalyResponse)
# def analyze_temporal_anomaly(request: TextAnalysisRequest):
# """Temporal Anomaly β€” citation timeline paradox detection."""
# try:
# r = _temporal_engine.analyze(_smart_text(request.text, "temporal"))
# return TemporalAnomalyResponse(
# collection_years = r.collection_years,
# cited_years = r.cited_years,
# future_citations = r.future_citations,
# recency_claim_score = r.recency_claim_score,
# timeline_score = r.timeline_score,
# temporal_score = r.temporal_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/citation_dna", response_model=CitationDNAResponse)
# def analyze_citation_dna(request: TextAnalysisRequest):
# """Citation DNA β€” network concentration and diversity analysis."""
# try:
# r = _dna_engine.analyze(_smart_text(request.text, "citation_dna"))
# return CitationDNAResponse(
# total_citations = r.total_citations,
# unique_authors = r.unique_authors,
# author_concentration = r.author_concentration,
# journal_concentration = r.journal_concentration,
# decade_concentration = r.decade_concentration,
# geographic_diversity = r.geographic_diversity,
# dna_diversity_score = r.dna_diversity_score,
# dna_risk_score = r.dna_risk_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/data_fingerprint", response_model=DataFingerprintResponse)
# def analyze_data_fingerprint(request: TextAnalysisRequest):
# """Data Fingerprinting β€” fabrication and numerical cloning detection."""
# try:
# r = _dataprint_engine.analyze(_smart_text(request.text, "data_fingerprint"))
# return DataFingerprintResponse(
# total_numbers = r.total_numbers,
# round_number_ratio = r.round_number_ratio,
# terminal_digit_bias = r.terminal_digit_bias,
# impossible_pairs = r.impossible_pairs,
# suspicious_duplicates = [round(v, 4) for v in r.suspicious_duplicates],
# fingerprint_score = r.fingerprint_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/peer_review", response_model=PeerReviewResponse)
# def analyze_peer_review(request: TextAnalysisRequest):
# """Peer Review Score β€” process manipulation detection."""
# try:
# r = _peerreview_engine.analyze(_smart_text(request.text, "peer_review"))
# return PeerReviewResponse(
# days_to_acceptance = r.days_to_acceptance,
# has_conflict_statement = r.has_conflict_statement,
# conflict_credible = r.conflict_credible,
# reviewer_suggestions = r.reviewer_suggestions,
# special_issue = r.special_issue,
# predatory_signals = r.predatory_signals,
# manipulation_score = r.manipulation_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @router.post("/analyze/ai_spectrum", response_model=AISpectrumResponse)
# def analyze_ai_spectrum(request: TextAnalysisRequest):
# """AI-Human Spectrum β€” model attribution and ratio analysis."""
# try:
# r = _spectrum_engine.analyze(_smart_text(request.text, "ai_spectrum"))
# return AISpectrumResponse(
# overall_human_ratio = r.overall_human_ratio,
# overall_ai_ratio = r.overall_ai_ratio,
# dominant_ai_model = r.dominant_ai_model,
# model_confidence = r.model_confidence,
# ai_sections = r.ai_sections,
# human_sections = r.human_sections,
# gpt4_signal_count = r.gpt4_signal_count,
# claude_signal_count = r.claude_signal_count,
# gemini_signal_count = r.gemini_signal_count,
# spectrum_score = r.spectrum_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# # ── Replication Predictor endpoint ────────────────────────────────────────────
# @router.post("/analyze/replication", response_model=ReplicationResponse)
# def analyze_replication(request: TextAnalysisRequest):
# """
# Replication Probability Score β€” ML ensemble replication likelihood prediction.
# Uses LightGBM + XGBoost + RandomForest + LogisticRegression trained on
# SciPeerBench v2.0 (91,779 papers). Ensemble AUC: 0.895.
# """
# try:
# text = request.text
# module_scores = {}
# try:
# r = _stat_engine.analyze(_smart_text(text, "statistics"))
# module_scores["score_stat"] = r.risk_score
# except Exception:
# module_scores["score_stat"] = 0.0
# try:
# r = _method_engine.analyze(_smart_text(text, "methodology"), "")
# module_scores["score_method"] = 0.7 if len(r.flags) > 2 else 0.3 if r.flags else 0.1
# except Exception:
# module_scores["score_method"] = 0.0
# try:
# r = _citation_engine.analyze(_smart_text(text, "citations"), "")
# module_scores["score_citation"] = r.risk_score
# except Exception:
# module_scores["score_citation"] = 0.0
# try:
# r = _repro_engine.analyze(_smart_text(text, "reproducibility"))
# module_scores["score_repro"] = round(1.0 - r.reproducibility_score, 4)
# except Exception:
# module_scores["score_repro"] = 0.0
# try:
# r = _novelty_engine.analyze(_smart_text(text, "novelty", per_section_limit=2000), "")
# module_scores["score_novelty"] = getattr(r, "risk_score", round(1.0 - r.novelty_score, 4))
# except Exception:
# module_scores["score_novelty"] = 0.0
# try:
# r = _grim_engine.analyze(_smart_text(text, "grim"))
# module_scores["score_grim"] = r.grim_score
# except Exception:
# module_scores["score_grim"] = 0.0
# try:
# r = _sprite_engine.analyze(_smart_text(text, "sprite"))
# module_scores["score_sprite"] = r.sprite_score
# except Exception:
# module_scores["score_sprite"] = 0.0
# try:
# r = _granularity_engine.analyze(_smart_text(text, "granularity"))
# module_scores["score_granularity"] = r.granularity_score
# except Exception:
# module_scores["score_granularity"] = 0.0
# try:
# r = _pcurve_engine.analyze(_smart_text(text, "pcurve"))
# module_scores["score_pcurve"] = r.pcurve_score
# except Exception:
# module_scores["score_pcurve"] = 0.0
# try:
# r = _effect_size_engine.analyze(_smart_text(text, "effect_size"))
# module_scores["score_effect"] = r.effect_score
# except Exception:
# module_scores["score_effect"] = 0.0
# try:
# r = _retraction_engine.analyze(_smart_text(text, "retraction"))
# module_scores["score_retraction"] = r.retraction_score
# except Exception:
# module_scores["score_retraction"] = 0.0
# try:
# r = _cartel_engine.analyze(_smart_text(text, "cartel"))
# module_scores["score_cartel"] = r.cartel_score
# except Exception:
# module_scores["score_cartel"] = 0.0
# try:
# r = _llm_engine.analyze(_smart_text(text, "llm"))
# module_scores["score_llm"] = r.llm_score
# except Exception:
# module_scores["score_llm"] = 0.0
# try:
# r = _fraud_engine.analyze(_smart_text(text, "fraud_fingerprint"))
# module_scores["score_fraud"] = r.fingerprint_score
# except Exception:
# module_scores["score_fraud"] = 0.0
# try:
# r = _temporal_engine.analyze(_smart_text(text, "temporal"))
# module_scores["score_temporal"] = r.temporal_score
# except Exception:
# module_scores["score_temporal"] = 0.0
# try:
# r = _dna_engine.analyze(_smart_text(text, "citation_dna"))
# module_scores["score_dna"] = r.dna_risk_score
# except Exception:
# module_scores["score_dna"] = 0.0
# try:
# r = _dataprint_engine.analyze(_smart_text(text, "data_fingerprint"))
# module_scores["score_dataprint"] = r.fingerprint_score
# except Exception:
# module_scores["score_dataprint"] = 0.0
# try:
# r = _peerreview_engine.analyze(_smart_text(text, "peer_review"))
# module_scores["score_peerreview"] = r.manipulation_score
# except Exception:
# module_scores["score_peerreview"] = 0.0
# try:
# r = _spectrum_engine.analyze(_smart_text(text, "ai_spectrum"))
# module_scores["score_spectrum"] = r.spectrum_score
# except Exception:
# module_scores["score_spectrum"] = 0.0
# result = _predict_replication(module_scores, text=text)
# return ReplicationResponse(
# replication_probability = result.replication_probability,
# fraud_probability = result.fraud_probability,
# replication_level = result.replication_level,
# verdict = result.verdict,
# risk_score = result.risk_score,
# risk_level = result.risk_level,
# summary = result.summary,
# flags = [
# ReplicationFlagResponse(
# flag_type = f["flag_type"],
# severity = f["severity"],
# description = f["description"],
# evidence = f["evidence"],
# suggestion = f["suggestion"],
# ) for f in result.flags
# ],
# flags_count = result.flags_count,
# model_version = result.model_version,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# # ── Phase 6 endpoints ─────────────────────────────────────────────────────────
# @router.post("/analyze/genealogy", response_model=GenealogyResponse)
# def analyze_genealogy(request: TextAnalysisRequest):
# """
# Research Genealogy Engine β€” citation ancestry tracing, ring network
# detection, lineage concentration scoring, and retracted ancestor
# verification via CrossRef API. Module 22 β€” SciPeerAI Phase 6.
# """
# try:
# r = _genealogy_engine.analyze(_smart_text(request.text, "genealogy"))
# return GenealogyResponse(
# doi_count = r.doi_count,
# unique_cited_authors = r.unique_cited_authors,
# genealogy_concentration = r.genealogy_concentration,
# ring_detected = r.ring_detected,
# ring_members = r.ring_members,
# crossref_checked = r.crossref_checked,
# retracted_ancestors = r.retracted_ancestors,
# lineage_depth_score = r.lineage_depth_score,
# genealogy_score = r.genealogy_score,
# risk_level = r.risk_level,
# summary = r.summary,
# flags = _p5flags(r.flags),
# flags_count = r.flags_count,
# )
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# # ── Full PDF β€” 22-Module Master Endpoint ─────────────────────────────────────
# @router.post("/analyze/full-pdf", response_model=FullPDFResponse)
# async def analyze_full_pdf(file: UploadFile = File(...)):
# """
# Master endpoint β€” Upload a PDF and run all 22 analysis modules at once.
# Returns a unified integrity report with per-module scores, top flags,
# and an ML ensemble replication probability score.
# Trained on SciPeerBench v2.0 (91,779 papers, AUC 0.895).
# Designed for PhD researchers, journal editors, and integrity committees.
# """
# try:
# file_bytes = await file.read()
# paper = _pdf_parser.parse_bytes(file_bytes, file.filename)
# text = paper.full_text
# if len(text.strip()) < 100:
# raise HTTPException(
# status_code=422,
# detail="PDF text extraction failed or paper is too short. "
# "Ensure the PDF contains selectable text (not a scanned image)."
# )
# modules_run = []
# top_flags = []
# scores = []
# module_scores = {}
# # ── Module 01: Statistical Audit ──────────────────────────
# try:
# r = _stat_engine.analyze(_smart_text(text, "statistics"))
# modules_run.append(ModuleSummary(module="Statistical Audit", risk_level=r.risk_level, risk_score=r.risk_score, summary=r.summary, flags_count=len(r.flags)))
# scores.append(r.risk_score)
# module_scores["score_stat"] = r.risk_score
# for f in r.flags[:2]: top_flags.append(f"[Statistics] {f.description}")
# except Exception: pass
# # ── Module 02: Methodology Checker ────────────────────────
# try:
# abstract = paper.sections.get("abstract", "")
# r = _method_engine.analyze(_smart_text(text, "methodology"), abstract)
# score = 0.7 if len(r.flags) > 2 else 0.3 if r.flags else 0.1
# modules_run.append(ModuleSummary(module="Methodology Checker", risk_level="HIGH" if score >= 0.7 else "MEDIUM" if score >= 0.4 else "LOW", risk_score=score, summary=r.summary, flags_count=len(r.flags)))
# scores.append(score)
# module_scores["score_method"] = score
# for f in r.flags[:2]: top_flags.append(f"[Methodology] {f.issue}")
# except Exception: pass
# # ── Module 03: Citation Integrity ─────────────────────────
# try:
# r = _citation_engine.analyze(_smart_text(text, "citations"), "")
# modules_run.append(ModuleSummary(module="Citation Integrity", risk_level=r.risk_level, risk_score=r.risk_score, summary=r.summary, flags_count=len(r.flags)))
# scores.append(r.risk_score)
# module_scores["score_citation"] = r.risk_score
# for f in r.flags[:2]: top_flags.append(f"[Citations] {f.description}")
# except Exception: pass
# # ── Module 04: Reproducibility ────────────────────────────
# try:
# r = _repro_engine.analyze(_smart_text(text, "reproducibility"))
# repro_risk = round(1.0 - r.reproducibility_score, 4)
# modules_run.append(ModuleSummary(module="Reproducibility Scanner", risk_level=r.risk_level, risk_score=repro_risk, summary=r.summary, flags_count=len(r.flags)))
# scores.append(repro_risk)
# module_scores["score_repro"] = repro_risk
# for f in r.flags[:1]: top_flags.append(f"[Reproducibility] {f.description}")
# except Exception: pass
# # ── Module 05: Novelty ────────────────────────────────────
# try:
# r = _novelty_engine.analyze(_smart_text(text, "novelty", per_section_limit=2000), paper.title)
# novelty_risk = getattr(r, "risk_score", round(1.0 - r.novelty_score, 4))
# modules_run.append(ModuleSummary(module="Novelty Scorer", risk_level=r.risk_level, risk_score=novelty_risk, summary=r.summary, flags_count=len(getattr(r, "flags", []) or [])))
# scores.append(novelty_risk)
# module_scores["score_novelty"] = novelty_risk
# except Exception: pass
# # ── Module 06: GRIM Test ──────────────────────────────────
# try:
# r = _grim_engine.analyze(_smart_text(text, "grim"))
# modules_run.append(ModuleSummary(module="GRIM Test", risk_level=r.risk_level, risk_score=r.grim_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.grim_score)
# module_scores["score_grim"] = r.grim_score
# for f in r.flags[:1]: top_flags.append(f"[GRIM] {f.description}")
# except Exception: pass
# # ── Module 07: SPRITE Test ────────────────────────────────
# try:
# r = _sprite_engine.analyze(_smart_text(text, "sprite"))
# modules_run.append(ModuleSummary(module="SPRITE Test", risk_level=r.risk_level, risk_score=r.sprite_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.sprite_score)
# module_scores["score_sprite"] = r.sprite_score
# except Exception: pass
# # ── Module 08: Granularity ────────────────────────────────
# try:
# r = _granularity_engine.analyze(_smart_text(text, "granularity"))
# modules_run.append(ModuleSummary(module="Granularity Analyzer", risk_level=r.risk_level, risk_score=r.granularity_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.granularity_score)
# module_scores["score_granularity"] = r.granularity_score
# except Exception: pass
# # ── Module 09: P-Curve ────────────────────────────────────
# try:
# r = _pcurve_engine.analyze(_smart_text(text, "pcurve"))
# modules_run.append(ModuleSummary(module="P-Curve Analyzer", risk_level=r.risk_level, risk_score=r.pcurve_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.pcurve_score)
# module_scores["score_pcurve"] = r.pcurve_score
# for f in r.flags[:1]: top_flags.append(f"[P-Curve] {f.description}")
# except Exception: pass
# # ── Module 10: Effect Size ────────────────────────────────
# try:
# r = _effect_size_engine.analyze(_smart_text(text, "effect_size"))
# modules_run.append(ModuleSummary(module="Effect Size Validator", risk_level=r.risk_level, risk_score=r.effect_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.effect_score)
# module_scores["score_effect"] = r.effect_score
# except Exception: pass
# # ── Module 11: Retraction Checker ─────────────────────────
# try:
# r = _retraction_engine.analyze(_smart_text(text, "retraction"))
# modules_run.append(ModuleSummary(module="Retraction Checker", risk_level=r.risk_level, risk_score=r.retraction_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.retraction_score)
# module_scores["score_retraction"] = r.retraction_score
# for f in r.flags[:1]: top_flags.append(f"[Retraction] {f.description}")
# except Exception: pass
# # ── Module 12: Citation Cartel ────────────────────────────
# try:
# r = _cartel_engine.analyze(_smart_text(text, "cartel"))
# modules_run.append(ModuleSummary(module="Citation Cartel Detector", risk_level=r.risk_level, risk_score=r.cartel_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.cartel_score)
# module_scores["score_cartel"] = r.cartel_score
# for f in r.flags[:1]: top_flags.append(f"[Cartel] {f.description}")
# except Exception: pass
# # ── Module 13: LLM Detector ───────────────────────────────
# try:
# r = _llm_engine.analyze(_smart_text(text, "llm"))
# modules_run.append(ModuleSummary(module="LLM Paper Detector", risk_level=r.risk_level, risk_score=r.llm_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.llm_score)
# module_scores["score_llm"] = r.llm_score
# for f in r.flags[:1]: top_flags.append(f"[LLM] {f.description}")
# except Exception: pass
# # ── Module 14: Figure Forensics ───────────────────────────
# try:
# tmp_path = None
# with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
# tmp.write(file_bytes)
# tmp_path = tmp.name
# r = _figure_engine.analyze(tmp_path)
# fig_score = min(len(r.duplicate_pairs) * 0.3, 1.0)
# modules_run.append(ModuleSummary(module="Figure Forensics", risk_level="HIGH" if fig_score >= 0.7 else "MEDIUM" if fig_score >= 0.3 else "LOW", risk_score=fig_score, summary=f"{r.figures_found} figures found. {len(r.duplicate_pairs)} duplicate pairs detected.", flags_count=len(r.flags)))
# scores.append(fig_score)
# if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path)
# except Exception: pass
# # ── Module 15: Fraud Fingerprinting ───────────────────────
# try:
# r = _fraud_engine.analyze(_smart_text(text, "fraud_fingerprint"))
# modules_run.append(ModuleSummary(module="Fraud Fingerprinting", risk_level=r.risk_level, risk_score=r.fingerprint_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.fingerprint_score)
# module_scores["score_fraud"] = r.fingerprint_score
# for f in r.flags[:1]: top_flags.append(f"[Fingerprint] {f.description}")
# except Exception: pass
# # ── Module 16: Temporal Anomaly ───────────────────────────
# try:
# r = _temporal_engine.analyze(_smart_text(text, "temporal"))
# modules_run.append(ModuleSummary(module="Temporal Anomaly Detector", risk_level=r.risk_level, risk_score=r.temporal_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.temporal_score)
# module_scores["score_temporal"] = r.temporal_score
# for f in r.flags[:1]: top_flags.append(f"[Temporal] {f.description}")
# except Exception: pass
# # ── Module 17: Citation DNA ───────────────────────────────
# try:
# r = _dna_engine.analyze(_smart_text(text, "citation_dna"))
# modules_run.append(ModuleSummary(module="Citation DNA Analyzer", risk_level=r.risk_level, risk_score=r.dna_risk_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.dna_risk_score)
# module_scores["score_dna"] = r.dna_risk_score
# for f in r.flags[:1]: top_flags.append(f"[CitationDNA] {f.description}")
# except Exception: pass
# # ── Module 18: Data Fingerprint ───────────────────────────
# try:
# r = _dataprint_engine.analyze(_smart_text(text, "data_fingerprint"))
# modules_run.append(ModuleSummary(module="Data Fingerprint Analyzer", risk_level=r.risk_level, risk_score=r.fingerprint_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.fingerprint_score)
# module_scores["score_dataprint"] = r.fingerprint_score
# for f in r.flags[:1]: top_flags.append(f"[DataFingerprint] {f.description}")
# except Exception: pass
# # ── Module 19: Peer Review Score ──────────────────────────
# try:
# r = _peerreview_engine.analyze(_smart_text(text, "peer_review"))
# modules_run.append(ModuleSummary(module="Peer Review Score", risk_level=r.risk_level, risk_score=r.manipulation_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.manipulation_score)
# module_scores["score_peerreview"] = r.manipulation_score
# for f in r.flags[:1]: top_flags.append(f"[PeerReview] {f.description}")
# except Exception: pass
# # ── Module 20: AI-Human Spectrum ──────────────────────────
# try:
# r = _spectrum_engine.analyze(_smart_text(text, "ai_spectrum"))
# modules_run.append(ModuleSummary(module="AI-Human Spectrum", risk_level=r.risk_level, risk_score=r.spectrum_score, summary=r.summary, flags_count=r.flags_count))
# scores.append(r.spectrum_score)
# module_scores["score_spectrum"] = r.spectrum_score
# for f in r.flags[:1]: top_flags.append(f"[AISpectrum] {f.description}")
# except Exception: pass
# # ── Module 21: Replication Probability Score ──────────────
# replication_probability = 0.5
# replication_level = "UNKNOWN"
# try:
# rep = _predict_replication(module_scores, text=text)
# replication_probability = rep.replication_probability
# replication_level = rep.replication_level
# modules_run.append(ModuleSummary(
# module = "Replication Probability Score",
# risk_level = rep.risk_level,
# risk_score = rep.risk_score,
# summary = rep.summary,
# flags_count = rep.flags_count,
# ))
# scores.append(rep.risk_score)
# if rep.flags_count > 0:
# top_flags.append(f"[Replication] {rep.verdict}")
# except Exception: pass
# # ── Module 22: Research Genealogy Engine ──────────────────
# try:
# r = _genealogy_engine.analyze(_smart_text(text, "genealogy"))
# modules_run.append(ModuleSummary(
# module = "Research Genealogy Engine",
# risk_level = r.risk_level.upper(),
# risk_score = r.genealogy_score,
# summary = r.summary,
# flags_count = r.flags_count,
# ))
# scores.append(r.genealogy_score)
# if r.ring_detected:
# top_flags.append(
# f"[Genealogy] Citation ring: {', '.join(r.ring_members[:3])}"
# )
# if r.retracted_ancestors > 0:
# top_flags.append(
# f"[Genealogy] {r.retracted_ancestors} retracted ancestor(s) found"
# )
# except Exception: pass
# # ── Final Score ───────────────────────────────────────────
# overall_score, overall_risk = _compute_overall(scores)
# return FullPDFResponse(
# paper_title = paper.title,
# page_count = paper.page_count,
# figure_count = paper.figure_count,
# file_size_kb = paper.metadata.get("file_size_kb", 0.0),
# sha256 = paper.metadata.get("sha256", ""),
# overall_score = overall_score,
# overall_risk = overall_risk,
# integrity_verdict = _verdict(overall_risk),
# replication_probability = replication_probability,
# replication_level = replication_level,
# modules = modules_run,
# top_flags = top_flags[:10],
# analyzed_by = "SciPeerAI v2.2.0 β€” 22-Module Pipeline",
# )
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
import os
import tempfile
from fastapi import APIRouter, HTTPException, UploadFile, File
from pydantic import BaseModel, Field
from src.scipeerai.modules.reproducibility_scanner import ReproducibilityScanner
from src.scipeerai.modules.stat_audit import StatAuditEngine
from src.scipeerai.modules.figure_forensics import FigureForensicsEngine
from src.scipeerai.modules.methodology_checker import MethodologyChecker
from src.scipeerai.modules.citation_analyzer import CitationAnalyzer
from src.scipeerai.modules.novelty_scorer import NoveltyScorer
from src.scipeerai.modules.grim_test import GrimTest
from src.scipeerai.modules.sprite_test import SpriteTest
from src.scipeerai.modules.granularity_analyzer import GranularityAnalyzer
from src.scipeerai.modules.pcurve_analyzer import PCurveAnalyzer
from src.scipeerai.modules.effect_size_validator import EffectSizeValidator
from src.scipeerai.modules.retraction_checker import RetractionChecker
from src.scipeerai.modules.citation_cartel import CitationCartelDetector
from src.scipeerai.modules.llm_detector import LLMDetector
from src.scipeerai.modules.fraud_fingerprint import FraudFingerprinter
from src.scipeerai.modules.temporal_anomaly import TemporalAnomalyDetector
from src.scipeerai.modules.citation_dna import CitationDNAAnalyzer
from src.scipeerai.modules.data_fingerprint import DataFingerprintAnalyzer
from src.scipeerai.modules.peer_review_score import PeerReviewScorer
from src.scipeerai.modules.ai_human_spectrum import AIHumanSpectrumAnalyzer
from src.scipeerai.modules.replication_predictor import analyze as _predict_replication
from src.scipeerai.modules.research_genealogy import ResearchGenealogyEngine
from src.scipeerai.modules.field_saturation import FieldSaturationDetector
from src.scipeerai.modules.institutional_conflict import InstitutionalConflictDetector
from src.scipeerai.core.pdf_parser import PDFParser
router = APIRouter(prefix="/api/v1", tags=["Analysis"])
# ── Section-aware text routing ────────────────────────────────────────────────
_SECTION_MARKERS = [
"abstract", "introduction", "background", "related work",
"methods", "methodology", "materials and methods",
"experimental", "experiments", "procedures",
"results", "findings", "data analysis",
"discussion", "conclusion", "conclusions",
"references", "bibliography", "acknowledgments",
"supplementary", "appendix",
]
_MODULE_SECTIONS = {
"statistics": ["abstract", "results", "findings",
"data analysis", "methods", "methodology"],
"methodology": ["abstract", "introduction", "methods",
"methodology", "materials and methods",
"experimental", "conclusion", "conclusions"],
"citations": ["introduction", "background",
"related work", "references", "bibliography"],
"reproducibility": ["methods", "methodology",
"materials and methods", "experimental",
"procedures", "acknowledgments"],
"novelty": ["abstract", "introduction",
"background", "related work"],
"grim": ["results", "findings", "methods",
"methodology", "data analysis"],
"sprite": ["results", "findings", "methods",
"methodology", "data analysis"],
"granularity": ["results", "findings",
"methods", "data analysis"],
"pcurve": ["abstract", "results",
"findings", "data analysis"],
"effect_size": ["results", "findings",
"methods", "discussion"],
"retraction": ["references", "bibliography", "introduction"],
"cartel": ["references", "bibliography",
"introduction", "acknowledgments"],
"llm": ["abstract", "introduction", "methods",
"results", "discussion"],
"fraud_fingerprint": ["abstract", "introduction", "methods",
"results", "discussion", "conclusion"],
"temporal": ["introduction", "methods", "references",
"bibliography", "background"],
"citation_dna": ["references", "bibliography",
"introduction", "acknowledgments"],
"data_fingerprint": ["results", "findings", "methods",
"data analysis", "discussion"],
"peer_review": ["acknowledgments", "introduction",
"abstract", "references"],
"ai_spectrum": ["abstract", "introduction", "methods",
"results", "discussion", "conclusion"],
"genealogy": ["references", "bibliography",
"introduction", "acknowledgments"],
"saturation": ["abstract", "introduction",
"background", "related work", "conclusion"],
"conflict": ["acknowledgments", "abstract",
"introduction", "conclusion", "references"],
}
def _extract_sections(text: str) -> dict:
text_lower = text.lower()
positions = []
for marker in _SECTION_MARKERS:
search_from = 0
while True:
idx = text_lower.find(marker, search_from)
if idx == -1:
break
line_start = text.rfind('\n', 0, idx) + 1
line_end = text.find('\n', idx)
if line_end == -1:
line_end = len(text)
line_content = text[line_start:line_end].strip()
if len(line_content) <= 60:
positions.append((idx, marker))
break
search_from = idx + 1
if not positions:
return {}
positions.sort(key=lambda x: x[0])
deduped = [positions[0]]
for pos in positions[1:]:
if pos[0] - deduped[-1][0] > 50:
deduped.append(pos)
sections = {}
for i, (start, name) in enumerate(deduped):
end = deduped[i + 1][0] if i + 1 < len(deduped) else len(text)
sections[name] = text[start:end].strip()
return sections
def _smart_text(text: str, module: str, per_section_limit: int = 2500) -> str:
sections = _extract_sections(text)
target_keys = _MODULE_SECTIONS.get(module, [])
if sections and target_keys:
parts = []
for key in target_keys:
if key in sections:
parts.append(sections[key][:per_section_limit])
if parts:
return "\n\n".join(parts)[:12000]
return _truncate(text)
def _truncate(text: str, limit: int = 8000) -> str:
if len(text) <= limit:
return text
lower = text.lower()
methods_idx = lower.find('method')
if 0 < methods_idx < len(text) - 1000:
start = text[:3000]
middle = text[methods_idx:methods_idx + 4000]
return start + " [...] " + middle
return text[:limit]
# ── Engine initialization ─────────────────────────────────────────────────────
_stat_engine = StatAuditEngine()
_figure_engine = FigureForensicsEngine()
_method_engine = MethodologyChecker()
_citation_engine = CitationAnalyzer()
_repro_engine = ReproducibilityScanner()
_novelty_engine = NoveltyScorer()
_grim_engine = GrimTest()
_sprite_engine = SpriteTest()
_granularity_engine = GranularityAnalyzer()
_pcurve_engine = PCurveAnalyzer()
_effect_size_engine = EffectSizeValidator()
_retraction_engine = RetractionChecker()
_cartel_engine = CitationCartelDetector()
_llm_engine = LLMDetector()
_fraud_engine = FraudFingerprinter()
_temporal_engine = TemporalAnomalyDetector()
_dna_engine = CitationDNAAnalyzer()
_dataprint_engine = DataFingerprintAnalyzer()
_peerreview_engine = PeerReviewScorer()
_spectrum_engine = AIHumanSpectrumAnalyzer()
_genealogy_engine = ResearchGenealogyEngine()
_saturation_engine = FieldSaturationDetector()
_conflict_engine = InstitutionalConflictDetector()
_pdf_parser = PDFParser()
# ── Shared response models ────────────────────────────────────────────────────
class TextAnalysisRequest(BaseModel):
text: str = Field(..., min_length=50, description="Paper text to analyze")
class FlagResponse(BaseModel):
flag_type: str
severity: str
description: str
evidence: str
suggestion: str = ""
class ForensicFlagResponse(BaseModel):
figures_involved: list
class StatAuditResponse(BaseModel):
risk_level: str
risk_score: float
summary: str
flags: list[FlagResponse]
p_values_found: list[float]
sample_sizes_found: list[int]
flags_count: int
class FigureForensicsResponse(BaseModel):
figures_found: int
flags: list[ForensicFlagResponse]
duplicate_pairs: list
class MethodologyRequest(BaseModel):
text: str = Field(..., min_length=50)
abstract: str = Field("")
class MethodologyFlagResponse(BaseModel):
claim: str
issue: str
suggestion: str
class MethodologyResponse(BaseModel):
flags: list[MethodologyFlagResponse]
claims_found: list[str]
methods_found: list[str]
llm_assessment: str
llm_available: bool
class CitationRequest(BaseModel):
text: str = Field(..., min_length=50)
author_name: str = Field("")
class CitationFlagResponse(BaseModel):
flag_type: str
severity: str
description: str
evidence: str
suggestion: str = ""
class CitationResponse(BaseModel):
total_citations: int
self_citations: int
self_citation_ratio: float
unsupported_claims: int
flags: list[CitationFlagResponse]
risk_level: str
risk_score: float
summary: str
flags_count: int
class ReproducibilityRequest(BaseModel):
text: str = Field(..., min_length=50)
class ReproducibilityFlagResponse(BaseModel):
flag_type: str
severity: str
description: str
evidence: str
suggestion: str = ""
class ReproducibilityResponse(BaseModel):
has_code_link: bool
has_data_link: bool
has_software_versions: bool
has_preregistration: bool
has_ethics_statement: bool
reproducibility_score: float
risk_level: str
summary: str
flags: list[ReproducibilityFlagResponse]
flags_count: int
class NoveltyRequest(BaseModel):
text: str = Field(..., min_length=50)
title: str = Field("")
class NoveltyFlagResponse(BaseModel):
flag_type: str
severity: str
description: str
evidence: str
suggestion: str = ""
class RelatedWorkResponse(BaseModel):
title: str
year: int
authors: list
similarity_signal: str
class NoveltyResponse(BaseModel):
novelty_score: float
novelty_level: str
risk_level: str
risk_score: float
summary: str
flags: list[NoveltyFlagResponse]
related_works_found: list[RelatedWorkResponse]
key_terms_extracted: list[str]
literature_accessible: bool
flags_count: int
class GrimRequest(BaseModel):
text: str = Field(..., min_length=50)
class GrimFlagResponse(BaseModel):
flag_type: str; severity: str; description: str; evidence: str; suggestion: str
class GrimResponse(BaseModel):
impossible_means: list; possible_means: list; grim_score: float
risk_level: str; summary: str; flags: list[GrimFlagResponse]; flags_count: int
class SpriteRequest(BaseModel):
text: str = Field(..., min_length=50)
class SpriteFlagResponse(BaseModel):
flag_type: str; severity: str; description: str; evidence: str; suggestion: str
class SpriteResponse(BaseModel):
impossible_combinations: list; possible_combinations: list; sprite_score: float
risk_level: str; summary: str; flags: list[SpriteFlagResponse]; flags_count: int
class GranularityRequest(BaseModel):
text: str = Field(..., min_length=50)
class GranularityFlagResponse(BaseModel):
flag_type: str; severity: str; description: str; evidence: str; suggestion: str
class GranularityResponse(BaseModel):
digit_preference_score: float; benford_score: float; round_number_ratio: float
granularity_score: float; risk_level: str; summary: str
flags: list[GranularityFlagResponse]; flags_count: int
class PCurveRequest(BaseModel):
text: str = Field(..., min_length=50)
class PCurveFlagResponse(BaseModel):
flag_type: str; severity: str; description: str; evidence: str; suggestion: str
class PCurveResponse(BaseModel):
p_values_found: list; significant_p: list; right_skew_ratio: float
clustering_score: float; pcurve_score: float; risk_level: str; summary: str
flags: list[PCurveFlagResponse]; flags_count: int
class EffectSizeRequest(BaseModel):
text: str = Field(..., min_length=50)
class EffectSizeFlagResponse(BaseModel):
flag_type: str; severity: str; description: str; evidence: str; suggestion: str
class EffectSizeResponse(BaseModel):
effect_sizes_found: list; power_estimates: list; inflated_effects: list
underpowered: list; effect_score: float; risk_level: str; summary: str
flags: list[EffectSizeFlagResponse]; flags_count: int
class RetractionRequest(BaseModel):
text: str = Field(..., min_length=50)
class RetractionFlagResponse(BaseModel):
flag_type: str; severity: str; description: str; evidence: str; suggestion: str
class RetractionResponse(BaseModel):
dois_found: list; retracted_found: list; checked_count: int
retraction_score: float; risk_level: str; summary: str
flags: list[RetractionFlagResponse]; flags_count: int
class CartelRequest(BaseModel):
text: str = Field(..., min_length=50)
class CartelFlagResponse(BaseModel):
flag_type: str; severity: str; description: str; evidence: str; suggestion: str
class CartelResponse(BaseModel):
authors_found: list; citation_network: dict; cartel_score: float
self_citation_ratio: float; network_diversity: float; risk_level: str
summary: str; flags: list[CartelFlagResponse]; flags_count: int
class LLMRequest(BaseModel):
text: str = Field(..., min_length=50)
class LLMFlagResponse(BaseModel):
flag_type: str; severity: str; description: str; evidence: str; suggestion: str
class LLMResponse(BaseModel):
burstiness_score: float; vocabulary_diversity: float; sentence_uniformity: float
llm_phrase_count: int; llm_score: float; risk_level: str; summary: str
flags: list[LLMFlagResponse]; flags_count: int
# ── Phase 5 models ────────────────────────────────────────────────────────────
class Phase5FlagResponse(BaseModel):
flag_type: str
severity: str
description: str
evidence: str
suggestion: str
class FraudFingerprintResponse(BaseModel):
style_shift_score: float
precision_inconsistency: float
vocabulary_mixing_score: float
hedge_overuse_score: float
fingerprint_score: float
risk_level: str
summary: str
flags: list[Phase5FlagResponse]
flags_count: int
class TemporalAnomalyResponse(BaseModel):
collection_years: list
cited_years: list
future_citations: list
recency_claim_score: float
timeline_score: float
temporal_score: float
risk_level: str
summary: str
flags: list[Phase5FlagResponse]
flags_count: int
class CitationDNAResponse(BaseModel):
total_citations: int
unique_authors: int
author_concentration: float
journal_concentration: float
decade_concentration: float
geographic_diversity: float
dna_diversity_score: float
dna_risk_score: float
risk_level: str
summary: str
flags: list[Phase5FlagResponse]
flags_count: int
class DataFingerprintResponse(BaseModel):
total_numbers: int
round_number_ratio: float
terminal_digit_bias: float
impossible_pairs: list
suspicious_duplicates: list
fingerprint_score: float
risk_level: str
summary: str
flags: list[Phase5FlagResponse]
flags_count: int
class PeerReviewResponse(BaseModel):
days_to_acceptance: int
has_conflict_statement: bool
conflict_credible: bool
reviewer_suggestions: bool
special_issue: bool
predatory_signals: int
manipulation_score: float
risk_level: str
summary: str
flags: list[Phase5FlagResponse]
flags_count: int
class AISpectrumResponse(BaseModel):
overall_human_ratio: float
overall_ai_ratio: float
dominant_ai_model: str
model_confidence: float
ai_sections: list
human_sections: list
gpt4_signal_count: int
claude_signal_count: int
gemini_signal_count: int
spectrum_score: float
risk_level: str
summary: str
flags: list[Phase5FlagResponse]
flags_count: int
# ── Replication Predictor models ──────────────────────────────────────────────
class ReplicationRequest(BaseModel):
text: str = Field(..., min_length=50)
class ReplicationFlagResponse(BaseModel):
flag_type: str
severity: str
description: str
evidence: str
suggestion: str
class ReplicationResponse(BaseModel):
replication_probability: float
fraud_probability: float
replication_level: str
verdict: str
risk_score: float
risk_level: str
summary: str
flags: list[ReplicationFlagResponse]
flags_count: int
model_version: str
# ── Phase 6 models ────────────────────────────────────────────────────────────
class GenealogyResponse(BaseModel):
doi_count: int
unique_cited_authors: int
genealogy_concentration: float
ring_detected: bool
ring_members: list[str]
crossref_checked: int
retracted_ancestors: int
lineage_depth_score: float
genealogy_score: float
risk_level: str
summary: str
flags: list[Phase5FlagResponse]
flags_count: int
class SaturationResponse(BaseModel):
topic_keywords: list[str]
keyword_density: float
redundancy_score: float
novelty_claim_score: float
contribution_vagueness: float
overcrowding_signals: int
saturation_score: float
risk_level: str
summary: str
flags: list[Phase5FlagResponse]
flags_count: int
class ConflictResponse(BaseModel):
institution_mentions: int
unique_institutions: int
funding_sources: list[str]
conflict_disclosures: int
undisclosed_signals: int
self_serving_claims: int
industry_funding_signals: int
conclusion_bias_score: float
conflict_score: float
risk_level: str
summary: str
flags: list[Phase5FlagResponse]
flags_count: int
# ── Full PDF models ───────────────────────────────────────────────────────────
class ModuleSummary(BaseModel):
module: str
risk_level: str
risk_score: float
summary: str
flags_count: int
class FullPDFResponse(BaseModel):
paper_title: str
page_count: int
figure_count: int
file_size_kb: float
sha256: str
overall_score: float
overall_risk: str
integrity_verdict: str
replication_probability: float
replication_level: str
modules: list[ModuleSummary]
top_flags: list[str]
analyzed_by: str
# ── Helpers ───────────────────────────────────────────────────────────────────
def _compute_overall(scores: list[float]) -> tuple[float, str]:
avg = round(sum(scores) / len(scores), 3) if scores else 0.0
if avg >= 0.7: level = "HIGH"
elif avg >= 0.4: level = "MEDIUM"
else: level = "LOW"
return avg, level
def _verdict(risk: str) -> str:
return {
"HIGH": "Serious integrity concerns detected. Manual expert review strongly recommended.",
"MEDIUM": "Some integrity issues found. Careful review advised before publication.",
"LOW": "No major integrity issues detected. Paper appears scientifically sound.",
}.get(risk, "Unknown")
def _p5flags(flags) -> list:
return [
Phase5FlagResponse(
flag_type = f.flag_type,
severity = f.severity,
description = f.description,
evidence = f.evidence,
suggestion = f.suggestion,
) for f in flags
]
# ── Status ────────────────────────────────────────────────────────────────────
@router.get("/status")
def system_status():
return {
"modules_ready": {
"stat_audit": True,
"figure_forensics": True,
"methodology_checker": True,
"citation_analyzer": True,
"reproducibility": True,
"novelty_scorer": True,
"grim_test": True,
"sprite_test": True,
"granularity": True,
"pcurve": True,
"effect_size": True,
"retraction": True,
"citation_cartel": True,
"llm_detector": True,
"fraud_fingerprint": True,
"temporal_anomaly": True,
"citation_dna": True,
"data_fingerprint": True,
"peer_review_score": True,
"ai_human_spectrum": True,
"replication_predictor": True,
"research_genealogy": True,
"field_saturation": True,
"institutional_conflict": True,
},
"version": "2.3.0",
"modules_count": 24,
"tests_passing": 209,
"text_extraction": "section-aware",
"ml_ensemble": "LightGBM+XGBoost+RF+LR",
"benchmark": "SciPeerBench v2.0 β€” 91,779 papers",
"ensemble_auc": 0.895,
}
# ── Original 14 endpoints ─────────────────────────────────────────────────────
@router.post("/analyze/statistics", response_model=StatAuditResponse)
def analyze_statistics(request: TextAnalysisRequest):
"""Statistical Audit β€” p-hacking, sample size, round numbers."""
try:
r = _stat_engine.analyze(_smart_text(request.text, "statistics"))
return StatAuditResponse(
risk_level = r.risk_level,
risk_score = r.risk_score,
summary = r.summary,
flags = [FlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags],
p_values_found = r.p_values_found,
sample_sizes_found = r.sample_sizes_found,
flags_count = len(r.flags),
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/figures", response_model=FigureForensicsResponse)
async def analyze_figures(file: UploadFile = File(...)):
"""Figure Forensics β€” duplicate image detection."""
if not file.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files accepted.")
tmp_path = None
try:
contents = await file.read()
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
tmp.write(contents)
tmp_path = tmp.name
r = _figure_engine.analyze(tmp_path)
return FigureForensicsResponse(
figures_found = r.figures_found,
flags = [ForensicFlagResponse(figures_involved=f.figures_involved) for f in r.flags],
duplicate_pairs = r.duplicate_pairs,
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
@router.post("/analyze/methodology", response_model=MethodologyResponse)
def analyze_methodology(request: MethodologyRequest):
"""Methodology Checker β€” causation, control groups, timeframe."""
try:
r = _method_engine.analyze(_smart_text(request.text, "methodology"), request.abstract)
return MethodologyResponse(
flags = [MethodologyFlagResponse(claim=f.claim, issue=f.issue, suggestion=f.suggestion) for f in r.flags],
claims_found = r.claims_found,
methods_found = r.methods_found,
llm_assessment = r.llm_assessment,
llm_available = r.llm_available,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/citations", response_model=CitationResponse)
def analyze_citations(request: CitationRequest):
"""Citation Integrity β€” self-citation, unsupported claims."""
try:
r = _citation_engine.analyze(_smart_text(request.text, "citations"), request.author_name)
return CitationResponse(
total_citations = r.total_citations,
self_citations = r.self_citations,
self_citation_ratio = r.self_citation_ratio,
unsupported_claims = r.unsupported_claims,
flags = [CitationFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=getattr(f, 'suggestion', '')) for f in r.flags],
risk_level = r.risk_level,
risk_score = r.risk_score,
summary = r.summary,
flags_count = len(r.flags),
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/reproducibility", response_model=ReproducibilityResponse)
def analyze_reproducibility(request: ReproducibilityRequest):
"""Reproducibility Scanner β€” code, data, ethics, preregistration."""
try:
r = _repro_engine.analyze(_smart_text(request.text, "reproducibility"))
return ReproducibilityResponse(
has_code_link = r.has_code_link,
has_data_link = r.has_data_link,
has_software_versions = r.has_software_versions,
has_preregistration = r.has_preregistration,
has_ethics_statement = r.has_ethics_statement,
reproducibility_score = r.reproducibility_score,
risk_level = r.risk_level,
summary = r.summary,
flags = [ReproducibilityFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=getattr(f, 'suggestion', '')) for f in r.flags],
flags_count = len(r.flags),
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/novelty", response_model=NoveltyResponse)
def analyze_novelty(request: NoveltyRequest):
"""Novelty Scorer β€” literature search, novelty estimation."""
try:
r = _novelty_engine.analyze(_smart_text(request.text, "novelty", per_section_limit=2000), request.title)
raw_flags = getattr(r, 'flags', []) or []
return NoveltyResponse(
novelty_score = r.novelty_score,
novelty_level = r.novelty_level,
risk_level = r.risk_level,
risk_score = getattr(r, 'risk_score', r.novelty_score),
summary = r.summary,
flags = [NoveltyFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=getattr(f, 'suggestion', '')) for f in raw_flags],
related_works_found = [RelatedWorkResponse(title=w.title, year=w.year, authors=w.authors, similarity_signal=w.similarity_signal) for w in r.related_works_found],
key_terms_extracted = r.key_terms_extracted,
literature_accessible = r.literature_accessible,
flags_count = len(raw_flags),
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/grim", response_model=GrimResponse)
def analyze_grim(request: GrimRequest):
"""GRIM Test β€” mathematically impossible means."""
try:
r = _grim_engine.analyze(_smart_text(request.text, "grim"))
return GrimResponse(impossible_means=r.impossible_means, possible_means=r.possible_means, grim_score=r.grim_score, risk_level=r.risk_level, summary=r.summary, flags=[GrimFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/sprite", response_model=SpriteResponse)
def analyze_sprite(request: SpriteRequest):
"""SPRITE Test β€” impossible distributions."""
try:
r = _sprite_engine.analyze(_smart_text(request.text, "sprite"))
return SpriteResponse(impossible_combinations=r.impossible_combinations, possible_combinations=r.possible_combinations, sprite_score=r.sprite_score, risk_level=r.risk_level, summary=r.summary, flags=[SpriteFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/granularity", response_model=GranularityResponse)
def analyze_granularity(request: GranularityRequest):
"""Granularity Analyzer β€” Benford Law, digit preference."""
try:
r = _granularity_engine.analyze(_smart_text(request.text, "granularity"))
return GranularityResponse(digit_preference_score=r.digit_preference_score, benford_score=r.benford_score, round_number_ratio=r.round_number_ratio, granularity_score=r.granularity_score, risk_level=r.risk_level, summary=r.summary, flags=[GranularityFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/pcurve", response_model=PCurveResponse)
def analyze_pcurve(request: PCurveRequest):
"""P-Curve Analyzer β€” publication bias detector."""
try:
r = _pcurve_engine.analyze(_smart_text(request.text, "pcurve"))
return PCurveResponse(p_values_found=r.p_values_found, significant_p=r.significant_p, right_skew_ratio=r.right_skew_ratio, clustering_score=r.clustering_score, pcurve_score=r.pcurve_score, risk_level=r.risk_level, summary=r.summary, flags=[PCurveFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/effect_size", response_model=EffectSizeResponse)
def analyze_effect_size(request: EffectSizeRequest):
"""Effect Size Validator β€” Cohen d, power analysis."""
try:
r = _effect_size_engine.analyze(_smart_text(request.text, "effect_size"))
return EffectSizeResponse(effect_sizes_found=r.effect_sizes_found, power_estimates=r.power_estimates, inflated_effects=r.inflated_effects, underpowered=r.underpowered, effect_score=r.effect_score, risk_level=r.risk_level, summary=r.summary, flags=[EffectSizeFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/retraction", response_model=RetractionResponse)
def analyze_retraction(request: RetractionRequest):
"""Retraction Checker β€” live CrossRef API."""
try:
r = _retraction_engine.analyze(_smart_text(request.text, "retraction"))
return RetractionResponse(dois_found=r.dois_found, retracted_found=r.retracted_found, checked_count=r.checked_count, retraction_score=r.retraction_score, risk_level=r.risk_level, summary=r.summary, flags=[RetractionFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/cartel", response_model=CartelResponse)
def analyze_cartel(request: CartelRequest):
"""Citation Cartel Detector β€” graph-based ring detection."""
try:
r = _cartel_engine.analyze(_smart_text(request.text, "cartel"))
return CartelResponse(authors_found=r.authors_found, citation_network=r.citation_network, cartel_score=r.cartel_score, self_citation_ratio=r.self_citation_ratio, network_diversity=r.network_diversity, risk_level=r.risk_level, summary=r.summary, flags=[CartelFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/llm", response_model=LLMResponse)
def analyze_llm(request: LLMRequest):
"""LLM Detector β€” burstiness, TTR, phrase patterns."""
try:
r = _llm_engine.analyze(_smart_text(request.text, "llm"))
return LLMResponse(burstiness_score=r.burstiness_score, vocabulary_diversity=r.vocabulary_diversity, sentence_uniformity=r.sentence_uniformity, llm_phrase_count=r.llm_phrase_count, llm_score=r.llm_score, risk_level=r.risk_level, summary=r.summary, flags=[LLMFlagResponse(flag_type=f.flag_type, severity=f.severity, description=f.description, evidence=f.evidence, suggestion=f.suggestion) for f in r.flags], flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ── Phase 5 endpoints ─────────────────────────────────────────────────────────
@router.post("/analyze/fraud_fingerprint", response_model=FraudFingerprintResponse)
def analyze_fraud_fingerprint(request: TextAnalysisRequest):
"""Fraud Fingerprinting β€” writing DNA inconsistency detection."""
try:
r = _fraud_engine.analyze(_smart_text(request.text, "fraud_fingerprint"))
return FraudFingerprintResponse(style_shift_score=r.style_shift_score, precision_inconsistency=r.precision_inconsistency, vocabulary_mixing_score=r.vocabulary_mixing_score, hedge_overuse_score=r.hedge_overuse_score, fingerprint_score=r.fingerprint_score, risk_level=r.risk_level, summary=r.summary, flags=_p5flags(r.flags), flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/temporal_anomaly", response_model=TemporalAnomalyResponse)
def analyze_temporal_anomaly(request: TextAnalysisRequest):
"""Temporal Anomaly β€” citation timeline paradox detection."""
try:
r = _temporal_engine.analyze(_smart_text(request.text, "temporal"))
return TemporalAnomalyResponse(collection_years=r.collection_years, cited_years=r.cited_years, future_citations=r.future_citations, recency_claim_score=r.recency_claim_score, timeline_score=r.timeline_score, temporal_score=r.temporal_score, risk_level=r.risk_level, summary=r.summary, flags=_p5flags(r.flags), flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/citation_dna", response_model=CitationDNAResponse)
def analyze_citation_dna(request: TextAnalysisRequest):
"""Citation DNA β€” network concentration and diversity analysis."""
try:
r = _dna_engine.analyze(_smart_text(request.text, "citation_dna"))
return CitationDNAResponse(total_citations=r.total_citations, unique_authors=r.unique_authors, author_concentration=r.author_concentration, journal_concentration=r.journal_concentration, decade_concentration=r.decade_concentration, geographic_diversity=r.geographic_diversity, dna_diversity_score=r.dna_diversity_score, dna_risk_score=r.dna_risk_score, risk_level=r.risk_level, summary=r.summary, flags=_p5flags(r.flags), flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/data_fingerprint", response_model=DataFingerprintResponse)
def analyze_data_fingerprint(request: TextAnalysisRequest):
"""Data Fingerprinting β€” fabrication and numerical cloning detection."""
try:
r = _dataprint_engine.analyze(_smart_text(request.text, "data_fingerprint"))
return DataFingerprintResponse(total_numbers=r.total_numbers, round_number_ratio=r.round_number_ratio, terminal_digit_bias=r.terminal_digit_bias, impossible_pairs=r.impossible_pairs, suspicious_duplicates=[round(v, 4) for v in r.suspicious_duplicates], fingerprint_score=r.fingerprint_score, risk_level=r.risk_level, summary=r.summary, flags=_p5flags(r.flags), flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/peer_review", response_model=PeerReviewResponse)
def analyze_peer_review(request: TextAnalysisRequest):
"""Peer Review Score β€” process manipulation detection."""
try:
r = _peerreview_engine.analyze(_smart_text(request.text, "peer_review"))
return PeerReviewResponse(days_to_acceptance=r.days_to_acceptance, has_conflict_statement=r.has_conflict_statement, conflict_credible=r.conflict_credible, reviewer_suggestions=r.reviewer_suggestions, special_issue=r.special_issue, predatory_signals=r.predatory_signals, manipulation_score=r.manipulation_score, risk_level=r.risk_level, summary=r.summary, flags=_p5flags(r.flags), flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/ai_spectrum", response_model=AISpectrumResponse)
def analyze_ai_spectrum(request: TextAnalysisRequest):
"""AI-Human Spectrum β€” model attribution and ratio analysis."""
try:
r = _spectrum_engine.analyze(_smart_text(request.text, "ai_spectrum"))
return AISpectrumResponse(overall_human_ratio=r.overall_human_ratio, overall_ai_ratio=r.overall_ai_ratio, dominant_ai_model=r.dominant_ai_model, model_confidence=r.model_confidence, ai_sections=r.ai_sections, human_sections=r.human_sections, gpt4_signal_count=r.gpt4_signal_count, claude_signal_count=r.claude_signal_count, gemini_signal_count=r.gemini_signal_count, spectrum_score=r.spectrum_score, risk_level=r.risk_level, summary=r.summary, flags=_p5flags(r.flags), flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ── Replication Predictor endpoint ────────────────────────────────────────────
@router.post("/analyze/replication", response_model=ReplicationResponse)
def analyze_replication(request: TextAnalysisRequest):
"""
Replication Probability Score β€” ML ensemble replication likelihood prediction.
LightGBM + XGBoost + RandomForest + LogisticRegression.
Trained on SciPeerBench v2.0 (91,779 papers). Ensemble AUC: 0.895.
"""
try:
text = request.text
module_scores = {}
try:
r = _stat_engine.analyze(_smart_text(text, "statistics"))
module_scores["score_stat"] = r.risk_score
except Exception:
module_scores["score_stat"] = 0.0
try:
r = _method_engine.analyze(_smart_text(text, "methodology"), "")
module_scores["score_method"] = 0.7 if len(r.flags) > 2 else 0.3 if r.flags else 0.1
except Exception:
module_scores["score_method"] = 0.0
try:
r = _citation_engine.analyze(_smart_text(text, "citations"), "")
module_scores["score_citation"] = r.risk_score
except Exception:
module_scores["score_citation"] = 0.0
try:
r = _repro_engine.analyze(_smart_text(text, "reproducibility"))
module_scores["score_repro"] = round(1.0 - r.reproducibility_score, 4)
except Exception:
module_scores["score_repro"] = 0.0
try:
r = _novelty_engine.analyze(_smart_text(text, "novelty", per_section_limit=2000), "")
module_scores["score_novelty"] = getattr(r, "risk_score", round(1.0 - r.novelty_score, 4))
except Exception:
module_scores["score_novelty"] = 0.0
try:
r = _grim_engine.analyze(_smart_text(text, "grim"))
module_scores["score_grim"] = r.grim_score
except Exception:
module_scores["score_grim"] = 0.0
try:
r = _sprite_engine.analyze(_smart_text(text, "sprite"))
module_scores["score_sprite"] = r.sprite_score
except Exception:
module_scores["score_sprite"] = 0.0
try:
r = _granularity_engine.analyze(_smart_text(text, "granularity"))
module_scores["score_granularity"] = r.granularity_score
except Exception:
module_scores["score_granularity"] = 0.0
try:
r = _pcurve_engine.analyze(_smart_text(text, "pcurve"))
module_scores["score_pcurve"] = r.pcurve_score
except Exception:
module_scores["score_pcurve"] = 0.0
try:
r = _effect_size_engine.analyze(_smart_text(text, "effect_size"))
module_scores["score_effect"] = r.effect_score
except Exception:
module_scores["score_effect"] = 0.0
try:
r = _retraction_engine.analyze(_smart_text(text, "retraction"))
module_scores["score_retraction"] = r.retraction_score
except Exception:
module_scores["score_retraction"] = 0.0
try:
r = _cartel_engine.analyze(_smart_text(text, "cartel"))
module_scores["score_cartel"] = r.cartel_score
except Exception:
module_scores["score_cartel"] = 0.0
try:
r = _llm_engine.analyze(_smart_text(text, "llm"))
module_scores["score_llm"] = r.llm_score
except Exception:
module_scores["score_llm"] = 0.0
try:
r = _fraud_engine.analyze(_smart_text(text, "fraud_fingerprint"))
module_scores["score_fraud"] = r.fingerprint_score
except Exception:
module_scores["score_fraud"] = 0.0
try:
r = _temporal_engine.analyze(_smart_text(text, "temporal"))
module_scores["score_temporal"] = r.temporal_score
except Exception:
module_scores["score_temporal"] = 0.0
try:
r = _dna_engine.analyze(_smart_text(text, "citation_dna"))
module_scores["score_dna"] = r.dna_risk_score
except Exception:
module_scores["score_dna"] = 0.0
try:
r = _dataprint_engine.analyze(_smart_text(text, "data_fingerprint"))
module_scores["score_dataprint"] = r.fingerprint_score
except Exception:
module_scores["score_dataprint"] = 0.0
try:
r = _peerreview_engine.analyze(_smart_text(text, "peer_review"))
module_scores["score_peerreview"] = r.manipulation_score
except Exception:
module_scores["score_peerreview"] = 0.0
try:
r = _spectrum_engine.analyze(_smart_text(text, "ai_spectrum"))
module_scores["score_spectrum"] = r.spectrum_score
except Exception:
module_scores["score_spectrum"] = 0.0
result = _predict_replication(module_scores, text=text)
return ReplicationResponse(
replication_probability = result.replication_probability,
fraud_probability = result.fraud_probability,
replication_level = result.replication_level,
verdict = result.verdict,
risk_score = result.risk_score,
risk_level = result.risk_level,
summary = result.summary,
flags = [ReplicationFlagResponse(flag_type=f["flag_type"], severity=f["severity"], description=f["description"], evidence=f["evidence"], suggestion=f["suggestion"]) for f in result.flags],
flags_count = result.flags_count,
model_version = result.model_version,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ── Phase 6 endpoints ─────────────────────────────────────────────────────────
@router.post("/analyze/genealogy", response_model=GenealogyResponse)
def analyze_genealogy(request: TextAnalysisRequest):
"""Research Genealogy Engine β€” citation ancestry, ring detection,
lineage concentration, retracted ancestor checking. Module 22."""
try:
r = _genealogy_engine.analyze(_smart_text(request.text, "genealogy"))
return GenealogyResponse(doi_count=r.doi_count, unique_cited_authors=r.unique_cited_authors, genealogy_concentration=r.genealogy_concentration, ring_detected=r.ring_detected, ring_members=r.ring_members, crossref_checked=r.crossref_checked, retracted_ancestors=r.retracted_ancestors, lineage_depth_score=r.lineage_depth_score, genealogy_score=r.genealogy_score, risk_level=r.risk_level, summary=r.summary, flags=_p5flags(r.flags), flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/field_saturation", response_model=SaturationResponse)
def analyze_field_saturation(request: TextAnalysisRequest):
"""Field Saturation Detector β€” topic overcrowding, redundancy,
novelty claim analysis, contribution vagueness scoring. Module 23."""
try:
r = _saturation_engine.analyze(_smart_text(request.text, "saturation"))
return SaturationResponse(topic_keywords=r.topic_keywords, keyword_density=r.keyword_density, redundancy_score=r.redundancy_score, novelty_claim_score=r.novelty_claim_score, contribution_vagueness=r.contribution_vagueness, overcrowding_signals=r.overcrowding_signals, saturation_score=r.saturation_score, risk_level=r.risk_level, summary=r.summary, flags=_p5flags(r.flags), flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze/institutional_conflict", response_model=ConflictResponse)
def analyze_institutional_conflict(request: TextAnalysisRequest):
"""Institutional Conflict Score β€” undisclosed COI, industry funding bias,
self-serving citations, overstated conclusions. Module 24."""
try:
r = _conflict_engine.analyze(_smart_text(request.text, "conflict"))
return ConflictResponse(institution_mentions=r.institution_mentions, unique_institutions=r.unique_institutions, funding_sources=r.funding_sources, conflict_disclosures=r.conflict_disclosures, undisclosed_signals=r.undisclosed_signals, self_serving_claims=r.self_serving_claims, industry_funding_signals=r.industry_funding_signals, conclusion_bias_score=r.conclusion_bias_score, conflict_score=r.conflict_score, risk_level=r.risk_level, summary=r.summary, flags=_p5flags(r.flags), flags_count=r.flags_count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ── Full PDF β€” 24-Module Master Endpoint ─────────────────────────────────────
@router.post("/analyze/full-pdf", response_model=FullPDFResponse)
async def analyze_full_pdf(file: UploadFile = File(...)):
"""
Master endpoint β€” Upload a PDF and run all 24 analysis modules at once.
Returns a unified integrity report with per-module scores, top flags,
and an ML ensemble replication probability score.
Trained on SciPeerBench v2.0 (91,779 papers, AUC 0.895).
Designed for PhD researchers, journal editors, and integrity committees.
"""
try:
file_bytes = await file.read()
paper = _pdf_parser.parse_bytes(file_bytes, file.filename)
text = paper.full_text
if len(text.strip()) < 100:
raise HTTPException(status_code=422, detail="PDF text extraction failed or paper is too short.")
modules_run = []
top_flags = []
scores = []
module_scores = {}
# ── Module 01: Statistical Audit ──────────────────────────
try:
r = _stat_engine.analyze(_smart_text(text, "statistics"))
modules_run.append(ModuleSummary(module="Statistical Audit", risk_level=r.risk_level, risk_score=r.risk_score, summary=r.summary, flags_count=len(r.flags)))
scores.append(r.risk_score); module_scores["score_stat"] = r.risk_score
for f in r.flags[:2]: top_flags.append(f"[Statistics] {f.description}")
except Exception: pass
# ── Module 02: Methodology Checker ────────────────────────
try:
abstract = paper.sections.get("abstract", "")
r = _method_engine.analyze(_smart_text(text, "methodology"), abstract)
score = 0.7 if len(r.flags) > 2 else 0.3 if r.flags else 0.1
modules_run.append(ModuleSummary(module="Methodology Checker", risk_level="HIGH" if score >= 0.7 else "MEDIUM" if score >= 0.4 else "LOW", risk_score=score, summary=r.summary, flags_count=len(r.flags)))
scores.append(score); module_scores["score_method"] = score
for f in r.flags[:2]: top_flags.append(f"[Methodology] {f.issue}")
except Exception: pass
# ── Module 03: Citation Integrity ─────────────────────────
try:
r = _citation_engine.analyze(_smart_text(text, "citations"), "")
modules_run.append(ModuleSummary(module="Citation Integrity", risk_level=r.risk_level, risk_score=r.risk_score, summary=r.summary, flags_count=len(r.flags)))
scores.append(r.risk_score); module_scores["score_citation"] = r.risk_score
for f in r.flags[:2]: top_flags.append(f"[Citations] {f.description}")
except Exception: pass
# ── Module 04: Reproducibility ────────────────────────────
try:
r = _repro_engine.analyze(_smart_text(text, "reproducibility"))
repro_risk = round(1.0 - r.reproducibility_score, 4)
modules_run.append(ModuleSummary(module="Reproducibility Scanner", risk_level=r.risk_level, risk_score=repro_risk, summary=r.summary, flags_count=len(r.flags)))
scores.append(repro_risk); module_scores["score_repro"] = repro_risk
except Exception: pass
# ── Module 05: Novelty ────────────────────────────────────
try:
r = _novelty_engine.analyze(_smart_text(text, "novelty", per_section_limit=2000), paper.title)
novelty_risk = getattr(r, "risk_score", round(1.0 - r.novelty_score, 4))
modules_run.append(ModuleSummary(module="Novelty Scorer", risk_level=r.risk_level, risk_score=novelty_risk, summary=r.summary, flags_count=len(getattr(r, "flags", []) or [])))
scores.append(novelty_risk); module_scores["score_novelty"] = novelty_risk
except Exception: pass
# ── Module 06: GRIM Test ──────────────────────────────────
try:
r = _grim_engine.analyze(_smart_text(text, "grim"))
modules_run.append(ModuleSummary(module="GRIM Test", risk_level=r.risk_level, risk_score=r.grim_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.grim_score); module_scores["score_grim"] = r.grim_score
except Exception: pass
# ── Module 07: SPRITE Test ────────────────────────────────
try:
r = _sprite_engine.analyze(_smart_text(text, "sprite"))
modules_run.append(ModuleSummary(module="SPRITE Test", risk_level=r.risk_level, risk_score=r.sprite_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.sprite_score); module_scores["score_sprite"] = r.sprite_score
except Exception: pass
# ── Module 08: Granularity ────────────────────────────────
try:
r = _granularity_engine.analyze(_smart_text(text, "granularity"))
modules_run.append(ModuleSummary(module="Granularity Analyzer", risk_level=r.risk_level, risk_score=r.granularity_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.granularity_score); module_scores["score_granularity"] = r.granularity_score
except Exception: pass
# ── Module 09: P-Curve ────────────────────────────────────
try:
r = _pcurve_engine.analyze(_smart_text(text, "pcurve"))
modules_run.append(ModuleSummary(module="P-Curve Analyzer", risk_level=r.risk_level, risk_score=r.pcurve_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.pcurve_score); module_scores["score_pcurve"] = r.pcurve_score
except Exception: pass
# ── Module 10: Effect Size ────────────────────────────────
try:
r = _effect_size_engine.analyze(_smart_text(text, "effect_size"))
modules_run.append(ModuleSummary(module="Effect Size Validator", risk_level=r.risk_level, risk_score=r.effect_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.effect_score); module_scores["score_effect"] = r.effect_score
except Exception: pass
# ── Module 11: Retraction Checker ─────────────────────────
try:
r = _retraction_engine.analyze(_smart_text(text, "retraction"))
modules_run.append(ModuleSummary(module="Retraction Checker", risk_level=r.risk_level, risk_score=r.retraction_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.retraction_score); module_scores["score_retraction"] = r.retraction_score
for f in r.flags[:1]: top_flags.append(f"[Retraction] {f.description}")
except Exception: pass
# ── Module 12: Citation Cartel ────────────────────────────
try:
r = _cartel_engine.analyze(_smart_text(text, "cartel"))
modules_run.append(ModuleSummary(module="Citation Cartel Detector", risk_level=r.risk_level, risk_score=r.cartel_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.cartel_score); module_scores["score_cartel"] = r.cartel_score
for f in r.flags[:1]: top_flags.append(f"[Cartel] {f.description}")
except Exception: pass
# ── Module 13: LLM Detector ───────────────────────────────
try:
r = _llm_engine.analyze(_smart_text(text, "llm"))
modules_run.append(ModuleSummary(module="LLM Paper Detector", risk_level=r.risk_level, risk_score=r.llm_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.llm_score); module_scores["score_llm"] = r.llm_score
except Exception: pass
# ── Module 14: Figure Forensics ───────────────────────────
try:
tmp_path = None
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
tmp.write(file_bytes); tmp_path = tmp.name
r = _figure_engine.analyze(tmp_path)
fig_score = min(len(r.duplicate_pairs) * 0.3, 1.0)
modules_run.append(ModuleSummary(module="Figure Forensics", risk_level="HIGH" if fig_score >= 0.7 else "MEDIUM" if fig_score >= 0.3 else "LOW", risk_score=fig_score, summary=f"{r.figures_found} figures found. {len(r.duplicate_pairs)} duplicate pairs detected.", flags_count=len(r.flags)))
scores.append(fig_score)
if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path)
except Exception: pass
# ── Module 15: Fraud Fingerprinting ───────────────────────
try:
r = _fraud_engine.analyze(_smart_text(text, "fraud_fingerprint"))
modules_run.append(ModuleSummary(module="Fraud Fingerprinting", risk_level=r.risk_level, risk_score=r.fingerprint_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.fingerprint_score); module_scores["score_fraud"] = r.fingerprint_score
except Exception: pass
# ── Module 16: Temporal Anomaly ───────────────────────────
try:
r = _temporal_engine.analyze(_smart_text(text, "temporal"))
modules_run.append(ModuleSummary(module="Temporal Anomaly Detector", risk_level=r.risk_level, risk_score=r.temporal_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.temporal_score); module_scores["score_temporal"] = r.temporal_score
except Exception: pass
# ── Module 17: Citation DNA ───────────────────────────────
try:
r = _dna_engine.analyze(_smart_text(text, "citation_dna"))
modules_run.append(ModuleSummary(module="Citation DNA Analyzer", risk_level=r.risk_level, risk_score=r.dna_risk_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.dna_risk_score); module_scores["score_dna"] = r.dna_risk_score
except Exception: pass
# ── Module 18: Data Fingerprint ───────────────────────────
try:
r = _dataprint_engine.analyze(_smart_text(text, "data_fingerprint"))
modules_run.append(ModuleSummary(module="Data Fingerprint Analyzer", risk_level=r.risk_level, risk_score=r.fingerprint_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.fingerprint_score); module_scores["score_dataprint"] = r.fingerprint_score
except Exception: pass
# ── Module 19: Peer Review Score ──────────────────────────
try:
r = _peerreview_engine.analyze(_smart_text(text, "peer_review"))
modules_run.append(ModuleSummary(module="Peer Review Score", risk_level=r.risk_level, risk_score=r.manipulation_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.manipulation_score); module_scores["score_peerreview"] = r.manipulation_score
except Exception: pass
# ── Module 20: AI-Human Spectrum ──────────────────────────
try:
r = _spectrum_engine.analyze(_smart_text(text, "ai_spectrum"))
modules_run.append(ModuleSummary(module="AI-Human Spectrum", risk_level=r.risk_level, risk_score=r.spectrum_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.spectrum_score); module_scores["score_spectrum"] = r.spectrum_score
except Exception: pass
# ── Module 21: Replication Probability Score ──────────────
replication_probability = 0.5
replication_level = "UNKNOWN"
try:
rep = _predict_replication(module_scores, text=text)
replication_probability = rep.replication_probability
replication_level = rep.replication_level
modules_run.append(ModuleSummary(module="Replication Probability Score", risk_level=rep.risk_level, risk_score=rep.risk_score, summary=rep.summary, flags_count=rep.flags_count))
scores.append(rep.risk_score)
if rep.flags_count > 0: top_flags.append(f"[Replication] {rep.verdict}")
except Exception: pass
# ── Module 22: Research Genealogy Engine ──────────────────
try:
r = _genealogy_engine.analyze(_smart_text(text, "genealogy"))
modules_run.append(ModuleSummary(module="Research Genealogy Engine", risk_level=r.risk_level.upper(), risk_score=r.genealogy_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.genealogy_score)
if r.ring_detected:
top_flags.append(f"[Genealogy] Citation ring: {', '.join(r.ring_members[:3])}")
if r.retracted_ancestors > 0:
top_flags.append(f"[Genealogy] {r.retracted_ancestors} retracted ancestor(s) found")
except Exception: pass
# ── Module 23: Field Saturation Detector ──────────────────
try:
r = _saturation_engine.analyze(_smart_text(text, "saturation"))
modules_run.append(ModuleSummary(module="Field Saturation Detector", risk_level=r.risk_level.upper(), risk_score=r.saturation_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.saturation_score)
if r.overcrowding_signals >= 3:
top_flags.append(f"[Saturation] {r.overcrowding_signals} field overcrowding signals detected")
except Exception: pass
# ── Module 24: Institutional Conflict Score ───────────────
try:
r = _conflict_engine.analyze(_smart_text(text, "conflict"))
modules_run.append(ModuleSummary(module="Institutional Conflict Score", risk_level=r.risk_level.upper(), risk_score=r.conflict_score, summary=r.summary, flags_count=r.flags_count))
scores.append(r.conflict_score)
if r.undisclosed_signals > 0:
top_flags.append(f"[Conflict] {r.undisclosed_signals} undisclosed conflict signal(s) detected")
except Exception: pass
# ── Final Score ───────────────────────────────────────────
overall_score, overall_risk = _compute_overall(scores)
return FullPDFResponse(
paper_title = paper.title,
page_count = paper.page_count,
figure_count = paper.figure_count,
file_size_kb = paper.metadata.get("file_size_kb", 0.0),
sha256 = paper.metadata.get("sha256", ""),
overall_score = overall_score,
overall_risk = overall_risk,
integrity_verdict = _verdict(overall_risk),
replication_probability = replication_probability,
replication_level = replication_level,
modules = modules_run,
top_flags = top_flags[:10],
analyzed_by = "SciPeerAI v2.3.0 β€” 24-Module Pipeline",
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))