ispg-backend / models /phase1_data_preprocessing.py
urestrange's picture
Upload 162 files
cc2c355 verified
Raw
History Blame Contribute Delete
38.9 kB
# ============================================================
# PHASE 1 DATA PREPROCESSING (FINAL UPDATED VERSION - PRO READY)
# FILE: models/phase1_data_preprocessing.py
#
# Supports:
# - IEEE + Non-standard journals
# - PDF -> text extraction (PyMuPDF)
# - SAFE cleaning (preserve tables + numbering)
# - Metadata Extraction:
# Title, Authors, Affiliation, DOI, Year, Abstract, Keywords
# - References extraction
# - IMRAD split (heading-based + fallback)
#
# IMPORTANT FIX:
# - DO NOT destroy table structures
# - Preserve line breaks
# - Preserve numeric units and symbols (% , | , : , -)
#
# OUTPUT FORMAT (STRICT COMPATIBLE WITH PHASE 2):
# {
# "paper_id": "...",
# "title": "...",
# "keywords": [...],
# "abstract": "...",
# "cleaned_text": "...",
# "imrad_sections": {
# "introduction": "...",
# "methodology": "...",
# "results": "...",
# "conclusion": "..."
# },
# "references": "...",
# "metadata": {...}
# }
# ============================================================
import re
import os
from datetime import datetime
# Safe import fitz
try:
import fitz # PyMuPDF
except ImportError:
raise ImportError("❌ PyMuPDF not installed. Run: pip install pymupdf")
# ==========================================================
# SAFE STRING
# ==========================================================
def safe_str(value):
if value is None:
return ""
return str(value).strip()
def clean_text(text: str) -> str:
text = safe_str(text)
text = text.replace("\u00a0", " ")
text = re.sub(r"\s+", " ", text).strip()
return text
# ============================================================
# 1) PDF TEXT EXTRACTION (COLUMN-AWARE)
# ============================================================
def extract_text_from_pdf(pdf_path):
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"❌ pdf file not found: {pdf_path}")
doc = fitz.open(pdf_path)
full_text = []
for page in doc:
# get page width to calculate the middle divider
width = page.rect.width
mid_x = width / 2.0
# extract text as layout blocks instead of raw text
blocks = page.get_text("blocks")
# filter out images/drawings (block_type == 0 is text)
text_blocks = [b for b in blocks if b[6] == 0]
# separate blocks into header/full-width, left column, and right column
full_width = []
left_col = []
right_col = []
for b in text_blocks:
x0, y0, x1, y1, text, block_no, block_type = b
block_width = x1 - x0
# if the block takes up more than 80% of the page, it's a title/header
if block_width > width * 0.8:
full_width.append(b)
# if the block starts on the left half
elif x0 < mid_x:
left_col.append(b)
# if the block starts on the right half
else:
right_col.append(b)
# sort everything top-to-bottom (y0 coordinate)
full_width.sort(key=lambda b: b[1])
left_col.sort(key=lambda b: b[1])
right_col.sort(key=lambda b: b[1])
# assemble the page: headers first, then left column, then right column
sorted_blocks = full_width + left_col + right_col
for b in sorted_blocks:
text = b[4]
# clean out weird hidden characters
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', ' ', text)
full_text.append(text.strip())
doc.close()
return "\n\n".join(full_text).strip()
# ============================================================
# 2) REMOVE IEEE FOOTER / LICENSE NOISE (SAFE)
# IMPORTANT: DOI MUST BE PRESERVED
# ============================================================
def remove_ieee_noise(text: str):
if not text:
return ""
patterns = [
# --------------------------------------
# IEEE LICENSE
# --------------------------------------
r"authorized licensed use.*?restrictions apply\.?",
r"downloaded on.*?from ieee xplore\.?",
r"personal use is permitted.*?permission\.?",
# --------------------------------------
# IEEE ACCESS FOOTER
# --------------------------------------
r"©\s*\d{4}\s*ieee",
r"ieee xplore",
r"\$\d+\.\d+",
r"\bvol\.\s*\d+",
r"\bno\.\s*\d+",
r"\bpp\.\s*\d+\s*-\s*\d+",
# --------------------------------------
# RECEIVED / ACCEPTED BLOCK
# --------------------------------------
r"received\s+\d{1,2}\s+\w+\s+\d{4}.*?",
r"accepted\s+\d{1,2}\s+\w+\s+\d{4}.*?",
r"date\s+of\s+publication\s+\d{1,2}\s+\w+\s+\d{4}.*?",
r"date\s+of\s+current\s+version\s+\d{1,2}\s+\w+\s+\d{4}.*?",
# --------------------------------------
# ASSOCIATE EDITOR
# --------------------------------------
r"the associate editor coordinating the review.*?publication.*?",
# --------------------------------------
# CREATIVE COMMONS LICENSE
# --------------------------------------
r"this work is licensed under a creative commons.*?",
r"for more information,\s*see\s*https?://[^\s]+",
# --------------------------------------
# PAGE FOOTER
# --------------------------------------
r"volume\s+\d+,\s*\d{4}",
r"vol\.\s*\d+,\s*\d{4}",
# --------------------------------------
# REMOVE REPEATED IEEE ACCESS HEADER
# --------------------------------------
r"w\.\s*han\s*et\s*al\.\s*:.*?classifier",
]
for pattern in patterns:
text = re.sub(
pattern,
"",
text,
flags=re.IGNORECASE
)
# --------------------------------------
# REMOVE EMPTY LINES
# --------------------------------------
text = re.sub(
r"\n{4,}",
"\n\n",
text
)
return text.strip()
# ============================================================
# 3) SAFE CLEANING (PRESERVE TABLES + BULLETS)
# ============================================================
def clean_extracted_text(raw_text):
if not raw_text:
return ""
text = raw_text.replace("\u00a0", " ").replace("\t", " ")
text = remove_ieee_noise(text)
# remove page numbers standing alone
text = re.sub(r"^\s*\d+\s*$", "", text, flags=re.MULTILINE)
# fix broken hyphenated words across lines (e.g., classifi- \n cation)
text = re.sub(r"([a-zA-Z])-\s*\n\s*([a-zA-Z])", r"\1\2", text)
# smart paragraph stitching:
# if a line ends with a lowercase letter or comma, it is probably mid-sentence.
# replace that specific newline with a space to stitch the sentence back together.
text = re.sub(r"([a-z,])\n([a-zA-Z])", r"\1 \2", text)
# common pdf extraction typos
replacements = {
"face-toface": "face-to-face",
"IoTbased": "IoT-based",
"pre- processing": "preprocessing",
"machinelearning": "machine learning"
}
for old, new in replacements.items():
text = text.replace(old, new)
# clean up excess whitespace but preserve double newlines for sections
text = re.sub(r" {2,}", " ", text)
text = re.sub(r"\n{4,}", "\n\n", text)
return text.strip()
# ============================================================
# 4) DOI EXTRACTION
# ============================================================
def extract_doi(cleaned_text):
cleaned_text = cleaned_text or ""
doi_patterns = [
# Standard DOI
r"\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b",
# DOI:
r"\bdoi\s*:\s*(10\.\d{4,9}/[-._;()/:A-Z0-9]+)\b",
# Digital Object Identifier
r"\bdigital\s+object\s+identifier\s+(10\.\d{4,9}/[-._;()/:A-Z0-9]+)\b"
]
for pattern in doi_patterns:
m = re.search(
pattern,
cleaned_text,
flags=re.IGNORECASE
)
if not m:
continue
if m.lastindex:
doi = m.group(1)
else:
doi = m.group(0)
doi = doi.strip()
doi = re.sub(
r"^(doi\s*:?)",
"",
doi,
flags=re.IGNORECASE
)
return doi
return ""
# ============================================================
# 5) YEAR EXTRACTION
# ============================================================
def extract_year(cleaned_text):
cleaned_text = cleaned_text or ""
current_year = datetime.now().year
# ----------------------------------------
# PRIORITY 1
# RECEIVED / ACCEPTED / PUBLICATION YEAR
# ----------------------------------------
publication_patterns = [
r"date of publication\s+\w+\s+\d{1,2},?\s+(20\d{2})",
r"accepted\s+\d{1,2}\s+\w+\s+(20\d{2})",
r"received\s+\d{1,2}\s+\w+\s+(20\d{2})",
r"current version\s+\d{1,2}\s+\w+\s+(20\d{2})"
]
for pattern in publication_patterns:
m = re.search(
pattern,
cleaned_text,
flags=re.IGNORECASE
)
if m:
year = int(m.group(1))
if 1990 <= year <= current_year + 1:
return str(year)
# ----------------------------------------
# PRIORITY 2
# FIRST 3000 CHARS ONLY
# ----------------------------------------
head_text = cleaned_text[:3000]
years = re.findall(
r"\b(19\d{2}|20\d{2})\b",
head_text
)
if years:
valid_years = [
int(y)
for y in years
if 1990 <= int(y) <= current_year + 1
]
if valid_years:
return str(max(valid_years))
# ----------------------------------------
# PRIORITY 3
# FULL DOCUMENT FALLBACK
# ----------------------------------------
years = re.findall(
r"\b(19\d{2}|20\d{2})\b",
cleaned_text
)
if years:
valid_years = [
int(y)
for y in years
if 1990 <= int(y) <= current_year + 1
]
if valid_years:
return str(max(valid_years))
return ""
# ============================================================
# 6) TITLE EXTRACTION
# ============================================================
def extract_title(cleaned_text):
lines = [
l.strip()
for l in cleaned_text.split("\n")
if l.strip()
]
if not lines:
return "untitled paper"
best_title = ""
best_score = 0
for line in lines[:120]:
low = line.lower()
score = 0
# aggressive reject for headers and journal metadata
reject_words = [
"abstract", "keywords", "index terms", "references",
"received", "accepted", "date of publication",
"date of current version", "digital object identifier",
"doi", "volume", "issue", "@", "ieee", "transactions",
"journal", "proceedings", "conference", "vol.", "no.",
"pp.", "issn", "copyright"
]
if any(x in low for x in reject_words):
continue
if re.search(r"\b(university|faculty|department|school|college|institute)\b", low):
continue
words = len(line.split())
# titles are usually between 4 and 25 words
if 4 <= words <= 25:
score += 5
if len(line) <= 180:
score += 2
# title usually contains capitals
caps = sum(1 for c in line if c.isupper())
score += min(caps, 5)
# boost score if it doesn't have 4-digit numbers (headers often have years like 2025)
if not re.search(r"\b\d{4}\b", line):
score += 3
if score > best_score:
best_score = score
best_title = line
final_title = best_title if best_title else lines[0]
# check for weird encoding gibberish
weird_chars = len(re.findall(r"[^a-zA-Z0-9\s:,\.\-]", final_title))
if weird_chars > len(final_title) * 0.2:
return "title extraction error (please enter manually)"
return final_title
# ============================================================
# 7) ABSTRACT EXTRACTION (IEEE ROBUST VERSION)
# ============================================================
def extract_abstract(cleaned_text):
if not cleaned_text:
return ""
text = safe_str(cleaned_text)
# catch standard, capitalized, and spaced-out versions
patterns = [
r"\bA\s*B\s*S\s*T\s*R\s*A\s*C\s*T\b\s*[—\-:\.]?\s*",
r"\bAbstract\b\s*[—\-:\.]?\s*"
]
start_pos = -1
for p in patterns:
m = re.search(p, text, flags=re.IGNORECASE)
if m:
start_pos = m.end()
break
# smart fallback: if no "abstract" keyword, grab the block before the introduction
if start_pos == -1:
intro_match = re.search(r"\b(1\.|I\.)?\s*INTRODUCTION\b", text, flags=re.IGNORECASE)
if intro_match:
potential_abstract = text[:intro_match.start()]
# find the last chunky paragraph before intro
paragraphs = potential_abstract.split("\n\n")
for para in reversed(paragraphs):
if len(para.split()) > 40: # abstracts usually have more than 40 words
return re.sub(r"\s+", " ", para).strip()
return ""
tail = text[start_pos:]
# strict stop markers so it doesn't bleed into the main body
stop_markers = [
r"\bKeywords\b", r"\bIndex Terms\b",
r"\bI\.\s*INTRODUCTION\b", r"\b1\.\s*INTRODUCTION\b",
r"\n\s*INTRODUCTION\b"
]
stop_pos = len(tail)
for marker in stop_markers:
mm = re.search(marker, tail, flags=re.IGNORECASE)
if mm:
stop_pos = min(stop_pos, mm.start())
abstract = tail[:stop_pos]
abstract = re.sub(r"\s+", " ", abstract).strip()
# remove leftover metadata noise
abstract = re.sub(r"The associate editor.*?publication.*?\.", "", abstract, flags=re.IGNORECASE)
return abstract[:2500].strip()
# ============================================================
# 8) AUTHORS + AFFILIATION EXTRACTION (IEEE + GENERAL HEURISTIC)
# ============================================================
def extract_authors_affiliation(cleaned_text, paper_title=""):
lines = [
l.strip()
for l in cleaned_text.split("\n")
if l.strip()
]
if not lines:
return "author information not found", "affiliation data not found"
authors = ""
affiliation = ""
affiliation_keywords = [
"university", "faculty", "department", "school",
"college", "institute", "research center", "centre",
"laboratory", "lab", "malaysia", "campus"
]
reject_keywords = [
"abstract", "keywords", "index terms", "received",
"accepted", "date of publication", "date of current version",
"digital object identifier", "doi", "copyright",
"volume", "issue", "ieee"
]
head_lines = lines[:120]
# ==========================
# AFFILIATION
# ==========================
for line in head_lines:
low = line.lower()
if any(k in low for k in affiliation_keywords):
if len(line) < 250:
affiliation = line
break
# ==========================
# AUTHORS
# ==========================
for line in head_lines:
low = line.lower()
# ignore the line if it is exactly the paper title!
if paper_title and line.lower() == paper_title.lower():
continue
if "abstract" in low:
break
if any(k in low for k in reject_keywords):
continue
if "@" in line:
continue
if any(k in low for k in affiliation_keywords):
continue
# skip section heading
if re.match(r"^[IVX]{1,6}\.", line) or re.match(r"^\d+\.", line):
continue
# author line usually contains names
capital_words = len(re.findall(r"\b[A-Z][a-z]+\b", line))
initials = len(re.findall(r"\b[A-Z]\.", line))
score = capital_words + (initials * 2)
if "," in line:
score += 3
if 2 <= len(line.split()) <= 20:
score += 2
if score >= 6:
authors = line
break
authors = re.sub(r"[^a-zA-Z0-9,\.\-\s]", "", authors).strip()
affiliation = re.sub(r"[^a-zA-Z0-9,\.\-\s\(\)]", "", affiliation).strip()
if not authors or len(authors) < 3:
authors = "author information not found"
if not affiliation or len(affiliation) < 3:
affiliation = "affiliation data not found"
return authors, affiliation
# ============================================================
# 9) KEYWORDS EXTRACTION (ROBUST IEEE VERSION)
# ============================================================
def extract_keywords(cleaned_text):
text = safe_str(cleaned_text)
if not text:
return []
keywords = []
patterns = [
r"\bKeywords\s*[:\-]?\s*(.+)",
r"\bIndex Terms\s*[:\-]?\s*(.+)",
r"\bKeywords\s*[—–-]\s*(.+)",
r"\bIndex Terms\s*[—–-]\s*(.+)"
]
kw_block = ""
for pattern in patterns:
m = re.search(
pattern,
text,
flags=re.IGNORECASE
)
if m:
start = m.start()
tail = text[start:start + 1200]
stop_patterns = [
r"\bI\.\s*INTRODUCTION\b",
r"\b1\.\s*INTRODUCTION\b",
r"\bINTRODUCTION\b",
r"\bABSTRACT\b",
r"\bREFERENCES\b",
r"\bReceived\b",
r"\bAccepted\b",
r"\bDigital Object Identifier\b"
]
stop_pos = len(tail)
for sp in stop_patterns:
mm = re.search(
sp,
tail,
flags=re.IGNORECASE
)
if mm:
stop_pos = min(
stop_pos,
mm.start()
)
kw_block = tail[:stop_pos]
kw_block = re.sub(
r"^(Keywords|Index Terms)\s*[:\-–—]?\s*",
"",
kw_block,
flags=re.IGNORECASE
)
break
if not kw_block:
return []
kw_block = kw_block.replace("\n", " ")
kw_block = re.sub(
r"\s+",
" ",
kw_block
)
raw_keywords = re.split(
r",|;",
kw_block
)
for kw in raw_keywords:
kw = clean_text(kw)
kw = re.sub(
r"[^A-Za-z0-9\-\s\(\)]",
"",
kw
).strip()
if len(kw) < 3:
continue
if len(kw) > 60:
continue
keywords.append(kw)
keywords = list(dict.fromkeys(keywords))
return keywords[:12]
# ============================================================
# 10) REFERENCES EXTRACTION
# ============================================================
def extract_references(cleaned_text):
cleaned_text = cleaned_text or ""
ref_match = re.search(
r"^\s*REFERENCES\b",
cleaned_text,
flags=re.IGNORECASE | re.MULTILINE
)
if not ref_match:
return ""
references_text = cleaned_text[
ref_match.end():
].strip()
# ----------------------------------------
# STOP AFTER REFERENCES SECTION
# ----------------------------------------
stop_patterns = [
r"^\s*APPENDIX\b",
r"^\s*ACKNOWLEDGMENT\b",
r"^\s*ACKNOWLEDGEMENTS\b",
r"^\s*AUTHOR BIOGRAPHY\b",
r"^\s*AUTHOR BIOGRAPHIES\b",
r"^\s*BIOGRAPHY\b",
r"^\s*BIOGRAPHIES\b",
r"^\s*ABOUT THE AUTHORS\b"
]
stop_pos = len(references_text)
for pattern in stop_patterns:
m = re.search(
pattern,
references_text,
flags=re.IGNORECASE | re.MULTILINE
)
if m:
stop_pos = min(
stop_pos,
m.start()
)
references_text = references_text[:stop_pos]
# ----------------------------------------
# CLEAN
# ----------------------------------------
references_text = remove_ieee_noise(
references_text
)
references_text = re.sub(
r"\n{4,}",
"\n\n",
references_text
)
references_text = references_text.strip()
# ----------------------------------------
# LIMIT SIZE
# ----------------------------------------
if len(references_text) > 15000:
references_text = references_text[:15000]
return references_text
# ============================================================
# 11) REMOVE KEYWORDS + REFERENCES FROM MAIN BODY
# ============================================================
def remove_keywords_and_references(cleaned_text):
text = cleaned_text
# remove keyword block (multi-line safe)
text = re.sub(
r"^\s*(Keywords|Index Terms)\s*[:\-]?.*?(\n\s*\n)",
"\n\n",
text,
flags=re.IGNORECASE | re.DOTALL
)
# cut before REFERENCES
text = re.split(r"^\s*REFERENCES\b", text, flags=re.IGNORECASE | re.MULTILINE)[0]
# remove extra blank lines
text = re.sub(r"\n{4,}", "\n\n\n", text).strip()
return text
# ============================================================
# 12) DETECT SECTION HEADINGS
# IEEE + GENERAL JOURNAL SUPPORT
# ============================================================
def detect_section_headings(text):
text = safe_str(text)
headings = []
seen_positions = set()
patterns = [
# III. RESULTS
re.compile(
r"^\s*([IVX]{1,8})\.\s+(.+?)\s*$",
re.MULTILINE
),
# 3 RESULTS
re.compile(
r"^\s*(\d{1,2})\.?\s+([A-Za-z].+?)\s*$",
re.MULTILINE
),
# A. Experimental Results
re.compile(
r"^\s*([A-Z])\.\s+(.+?)\s*$",
re.MULTILINE
),
# RESULTS AND DISCUSSION
re.compile(
r"^\s*([A-Z][A-Z0-9 \-\(\)/]{4,})\s*$",
re.MULTILINE
)
]
for pat in patterns:
for m in pat.finditer(text):
start = m.start()
if start in seen_positions:
continue
seen_positions.add(start)
if m.lastindex >= 2:
label = m.group(1).strip()
title = m.group(2).strip()
else:
label = ""
title = m.group(1).strip()
title = re.sub(
r"\s{2,}",
" ",
title
).strip()
low = title.lower()
# ------------------------------------------------
# FILTER GARBAGE
# ------------------------------------------------
if len(title) < 4:
continue
if len(title) > 120:
continue
if low.startswith("table"):
continue
if low.startswith("fig"):
continue
if low.startswith("figure"):
continue
if low.startswith("volume"):
continue
if low.startswith("received"):
continue
if low.startswith("accepted"):
continue
if "copyright" in low:
continue
if "creative commons" in low:
continue
if "digital object identifier" in low:
continue
if re.match(
r"^w\.\s*[a-z]",
low
):
continue
headings.append({
"label": label,
"title": title,
"start": start
})
# --------------------------------------------------------
# SORT
# --------------------------------------------------------
headings = sorted(
headings,
key=lambda x: x["start"]
)
# --------------------------------------------------------
# REMOVE DUPLICATES
# --------------------------------------------------------
cleaned = []
used_titles = set()
for h in headings:
title_key = (
h["title"]
.lower()
.strip()
)
if title_key in used_titles:
continue
used_titles.add(title_key)
cleaned.append(h)
return cleaned
# ============================================================
# 13) MAP HEADING INTO IMRAD CATEGORY
# ============================================================
def map_heading_to_imrad(heading_title):
t = safe_str(heading_title).lower().strip()
# --------------------------------------------------------
# INTRODUCTION
# --------------------------------------------------------
if any(k in t for k in [
"introduction",
"background",
"motivation",
"overview",
"preliminaries",
"related work",
"literature review",
"state of the art",
"problem statement",
"research gap"
]):
return "introduction"
# --------------------------------------------------------
# METHODOLOGY
# --------------------------------------------------------
if any(k in t for k in [
"method",
"methodology",
"materials",
"implementation",
"framework",
"architecture",
"design",
"approach",
"system model",
"proposed system",
"proposed method",
"proposed framework",
"procedure",
"development",
"algorithm",
"workflow",
# IEEE common
"dataset",
"data collection",
"data preprocessing",
"training",
"testing setup",
"experimental setup",
"feature extraction",
"model construction",
"network structure",
"network model",
"model architecture",
"fasternet",
"yolov5",
"cnn",
"resnet",
"classifier"
]):
return "methodology"
# --------------------------------------------------------
# RESULTS
# --------------------------------------------------------
if any(k in t for k in [
"results",
"evaluation",
"experiment",
"analysis",
"performance",
"validation",
"discussion",
"findings",
"testing",
"comparison",
# IEEE common
"experimental results",
"performance comparison",
"ablation study",
"benchmark",
"case study",
"accuracy analysis",
"result analysis"
]):
return "results"
# --------------------------------------------------------
# CONCLUSION
# --------------------------------------------------------
if any(k in t for k in [
"conclusion",
"conclusions",
"future work",
"future research",
"summary",
"limitations",
"recommendation",
"recommendations",
"closing remarks"
]):
return "conclusion"
return "other"
# ============================================================
# 14) FALLBACK SPLIT BY KEYWORDS (IMPROVED)
# ============================================================
def fallback_split_by_keywords(text):
text_lower = text.lower()
def find_pos(keyword_list):
positions = []
for kw in keyword_list:
pos = text_lower.find(kw)
if pos != -1:
positions.append(pos)
if positions:
return min(positions)
return -1
intro_pos = find_pos([
"introduction",
"background",
"motivation",
"overview"
])
method_pos = find_pos([
"methodology",
"methods",
"materials and methods",
"proposed method",
"proposed framework",
"proposed system",
"system design",
"framework",
"architecture",
"implementation",
"algorithm",
"workflow"
])
results_pos = find_pos([
"results",
"experimental results",
"evaluation",
"performance evaluation",
"experiment",
"experiments",
"analysis",
"discussion",
"findings",
"testing"
])
concl_pos = find_pos([
"conclusion",
"conclusions",
"future work",
"summary",
"concluding remarks",
"final remarks",
"limitations"
])
positions = [
("introduction", intro_pos),
("methodology", method_pos),
("results", results_pos),
("conclusion", concl_pos)
]
positions = [
(name, pos)
for name, pos in positions
if pos != -1
]
positions = sorted(
positions,
key=lambda x: x[1]
)
# --------------------------------------------------------
# No headings detected
# --------------------------------------------------------
if len(positions) == 0:
n = len(text)
return {
"introduction":
text[:int(n * 0.30)].strip(),
"methodology":
text[int(n * 0.30):int(n * 0.60)].strip(),
"results":
text[int(n * 0.60):int(n * 0.85)].strip(),
"conclusion":
text[int(n * 0.85):].strip()
}
imrad = {
"introduction": "",
"methodology": "",
"results": "",
"conclusion": ""
}
for i, (name, start) in enumerate(positions):
end = (
positions[i + 1][1]
if i < len(positions) - 1
else len(text)
)
chunk = text[start:end].strip()
imrad[name] = chunk
# --------------------------------------------------------
# Safety fallback
# --------------------------------------------------------
if not imrad["introduction"]:
imrad["introduction"] = text[:1500]
if not imrad["conclusion"]:
imrad["conclusion"] = text[-1500:]
return imrad
# ============================================================
# 15) SPLIT IMRAD USING HEADINGS
# ============================================================
def split_into_imrad_sections(clean_body_text):
clean_body_text = safe_str(clean_body_text)
headings = detect_section_headings(clean_body_text)
if len(headings) == 0:
return fallback_split_by_keywords(clean_body_text)
for i in range(len(headings)):
if i < len(headings) - 1:
headings[i]["end"] = headings[i + 1]["start"]
else:
headings[i]["end"] = len(clean_body_text)
imrad = {
"introduction": "",
"methodology": "",
"results": "",
"conclusion": ""
}
other_chunks = []
for h in headings:
title = safe_str(h.get("title", "")).strip()
chunk = clean_body_text[h["start"]:h["end"]].strip()
# cleanly remove the heading title itself from the paragraph
chunk = re.sub(
r"^\s*([IVX]{1,6}|[0-9]{1,3})\.?\s*" + re.escape(title) + r"\s*",
"",
chunk,
flags=re.IGNORECASE
).strip()
category = map_heading_to_imrad(title)
if category in imrad:
if imrad[category]:
imrad[category] += "\n\n"
imrad[category] += chunk
else:
other_chunks.append(chunk)
# clean up extra spaces
for key in imrad:
imrad[key] = re.sub(r"\n{3,}", "\n\n", imrad[key]).strip()
# smart content recovery for missing sections
if not imrad["methodology"]:
for chunk in other_chunks:
if any(k in chunk.lower() for k in ["proposed method", "framework", "architecture", "dataset", "training"]):
imrad["methodology"] = chunk
break
if not imrad["results"]:
for chunk in other_chunks:
if any(k in chunk.lower() for k in ["accuracy", "experiment", "evaluation", "performance"]):
imrad["results"] = chunk
break
if not imrad["conclusion"]:
for chunk in reversed(other_chunks):
if any(k in chunk.lower() for k in ["conclusion", "future work", "summary", "limitation"]):
imrad["conclusion"] = chunk
break
# hard fallback if the mapping completely failed
empty_count = sum(1 for v in imrad.values() if not v.strip())
if empty_count >= 3:
return fallback_split_by_keywords(clean_body_text)
# fix the "giant introduction" bug for weird ieee formatting
intro_len = len(imrad["introduction"])
if intro_len > 6000 and len(imrad["methodology"]) < 500:
# if intro is massive but method is empty, split it manually
half = intro_len // 2
imrad["methodology"] = imrad["introduction"][half:]
imrad["introduction"] = imrad["introduction"][:half]
return imrad
# ============================================================
# 16) MAIN PIPELINE FUNCTION (PHASE 1)
# ============================================================
def run_phase1_pipeline(pdf_path):
raw_text = extract_text_from_pdf(pdf_path)
if not raw_text.strip():
raise ValueError("❌ Extracted PDF text is empty (This PDF may be scanned, OCR is required).")
cleaned_text = clean_extracted_text(raw_text)
paper_id = os.path.splitext(os.path.basename(pdf_path))[0]
title = extract_title(cleaned_text)
doi = extract_doi(cleaned_text)
year = extract_year(cleaned_text)
abstract = extract_abstract(cleaned_text)
keywords = extract_keywords(cleaned_text)
references = extract_references(cleaned_text)
authors, affiliation = extract_authors_affiliation(cleaned_text)
# remove keywords + references before IMRAD split
clean_body_text = remove_keywords_and_references(cleaned_text)
# split IMRAD
imrad_sections = split_into_imrad_sections(clean_body_text)
imrad_sections = {
"introduction": imrad_sections.get("introduction", "").strip(),
"methodology": imrad_sections.get("methodology", "").strip(),
"results": imrad_sections.get("results", "").strip(),
"conclusion": imrad_sections.get("conclusion", "").strip()
}
metadata = {
"paper_id": paper_id,
"source_file": os.path.basename(pdf_path),
"processed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"title": title,
"authors": authors,
"affiliation": affiliation,
"doi": doi,
"year": year,
"raw_text_length": len(raw_text),
"cleaned_text_length": len(cleaned_text),
"body_length": len(clean_body_text),
"imrad_detected": {
"intro_len": len(imrad_sections["introduction"]),
"method_len": len(imrad_sections["methodology"]),
"results_len": len(imrad_sections["results"]),
"conclusion_len": len(imrad_sections["conclusion"])
},
"notes": {
"preserve_tables": True,
"preserve_linebreaks": True,
"preserve_numbering": True,
"abstract_detected": True if abstract else False,
"doi_detected": True if doi else False
}
}
return {
"paper_id": paper_id,
"title": title,
"doi": doi,
"year": year,
"keywords": keywords,
"abstract": abstract,
"cleaned_text": clean_body_text,
"imrad_sections": imrad_sections,
"references": references,
"metadata": metadata,
"status": "success"
}
# ============================================================
# QUICK TEST
# ============================================================
if __name__ == "__main__":
test_pdf = "sample.pdf"
if os.path.exists(test_pdf):
out = run_phase1_pipeline(test_pdf)
print("\n========== PHASE 1 OUTPUT TEST ==========")
print("PAPER ID:", out["paper_id"])
print("TITLE:", out["title"])
print("AUTHORS:", out["metadata"]["authors"])
print("AFFILIATION:", out["metadata"]["affiliation"])
print("DOI:", out["doi"])
print("YEAR:", out["year"])
print("KEYWORDS:", out["keywords"])
print("ABSTRACT LEN:", len(out["abstract"]))
print("INTRO LEN:", len(out["imrad_sections"]["introduction"]))
print("METHOD LEN:", len(out["imrad_sections"]["methodology"]))
print("RESULTS LEN:", len(out["imrad_sections"]["results"]))
print("CONCLUSION LEN:", len(out["imrad_sections"]["conclusion"]))
print("REFERENCES LEN:", len(out["references"]))
print("========================================\n")
else:
print("❌ sample.pdf not found for testing.")