Spaces:
Running
Running
| # ============================================================ | |
| # PHASE 1 DATA PREPROCESSING (FINAL UPDATED VERSION - PRO READY) | |
| # FILE: models/phase1_data_preprocessing.py | |
| # | |
| # Supports: | |
| # - IEEE + Non-standard journals | |
| # - PDF -> text extraction (PyMuPDF) | |
| # - SAFE cleaning (preserve tables + numbering) | |
| # - Metadata Extraction: | |
| # Title, Authors, Affiliation, DOI, Year, Abstract, Keywords | |
| # - References extraction | |
| # - IMRAD split (heading-based + fallback) | |
| # | |
| # IMPORTANT FIX: | |
| # - DO NOT destroy table structures | |
| # - Preserve line breaks | |
| # - Preserve numeric units and symbols (% , | , : , -) | |
| # | |
| # OUTPUT FORMAT (STRICT COMPATIBLE WITH PHASE 2): | |
| # { | |
| # "paper_id": "...", | |
| # "title": "...", | |
| # "keywords": [...], | |
| # "abstract": "...", | |
| # "cleaned_text": "...", | |
| # "imrad_sections": { | |
| # "introduction": "...", | |
| # "methodology": "...", | |
| # "results": "...", | |
| # "conclusion": "..." | |
| # }, | |
| # "references": "...", | |
| # "metadata": {...} | |
| # } | |
| # ============================================================ | |
| import re | |
| import os | |
| from datetime import datetime | |
| # Safe import fitz | |
| try: | |
| import fitz # PyMuPDF | |
| except ImportError: | |
| raise ImportError("❌ PyMuPDF not installed. Run: pip install pymupdf") | |
| # ========================================================== | |
| # SAFE STRING | |
| # ========================================================== | |
| def safe_str(value): | |
| if value is None: | |
| return "" | |
| return str(value).strip() | |
| def clean_text(text: str) -> str: | |
| text = safe_str(text) | |
| text = text.replace("\u00a0", " ") | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text | |
| # ============================================================ | |
| # 1) PDF TEXT EXTRACTION (COLUMN-AWARE) | |
| # ============================================================ | |
| def extract_text_from_pdf(pdf_path): | |
| if not os.path.exists(pdf_path): | |
| raise FileNotFoundError(f"❌ pdf file not found: {pdf_path}") | |
| doc = fitz.open(pdf_path) | |
| full_text = [] | |
| for page in doc: | |
| # get page width to calculate the middle divider | |
| width = page.rect.width | |
| mid_x = width / 2.0 | |
| # extract text as layout blocks instead of raw text | |
| blocks = page.get_text("blocks") | |
| # filter out images/drawings (block_type == 0 is text) | |
| text_blocks = [b for b in blocks if b[6] == 0] | |
| # separate blocks into header/full-width, left column, and right column | |
| full_width = [] | |
| left_col = [] | |
| right_col = [] | |
| for b in text_blocks: | |
| x0, y0, x1, y1, text, block_no, block_type = b | |
| block_width = x1 - x0 | |
| # if the block takes up more than 80% of the page, it's a title/header | |
| if block_width > width * 0.8: | |
| full_width.append(b) | |
| # if the block starts on the left half | |
| elif x0 < mid_x: | |
| left_col.append(b) | |
| # if the block starts on the right half | |
| else: | |
| right_col.append(b) | |
| # sort everything top-to-bottom (y0 coordinate) | |
| full_width.sort(key=lambda b: b[1]) | |
| left_col.sort(key=lambda b: b[1]) | |
| right_col.sort(key=lambda b: b[1]) | |
| # assemble the page: headers first, then left column, then right column | |
| sorted_blocks = full_width + left_col + right_col | |
| for b in sorted_blocks: | |
| text = b[4] | |
| # clean out weird hidden characters | |
| text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', ' ', text) | |
| full_text.append(text.strip()) | |
| doc.close() | |
| return "\n\n".join(full_text).strip() | |
| # ============================================================ | |
| # 2) REMOVE IEEE FOOTER / LICENSE NOISE (SAFE) | |
| # IMPORTANT: DOI MUST BE PRESERVED | |
| # ============================================================ | |
| def remove_ieee_noise(text: str): | |
| if not text: | |
| return "" | |
| patterns = [ | |
| # -------------------------------------- | |
| # IEEE LICENSE | |
| # -------------------------------------- | |
| r"authorized licensed use.*?restrictions apply\.?", | |
| r"downloaded on.*?from ieee xplore\.?", | |
| r"personal use is permitted.*?permission\.?", | |
| # -------------------------------------- | |
| # IEEE ACCESS FOOTER | |
| # -------------------------------------- | |
| r"©\s*\d{4}\s*ieee", | |
| r"ieee xplore", | |
| r"\$\d+\.\d+", | |
| r"\bvol\.\s*\d+", | |
| r"\bno\.\s*\d+", | |
| r"\bpp\.\s*\d+\s*-\s*\d+", | |
| # -------------------------------------- | |
| # RECEIVED / ACCEPTED BLOCK | |
| # -------------------------------------- | |
| r"received\s+\d{1,2}\s+\w+\s+\d{4}.*?", | |
| r"accepted\s+\d{1,2}\s+\w+\s+\d{4}.*?", | |
| r"date\s+of\s+publication\s+\d{1,2}\s+\w+\s+\d{4}.*?", | |
| r"date\s+of\s+current\s+version\s+\d{1,2}\s+\w+\s+\d{4}.*?", | |
| # -------------------------------------- | |
| # ASSOCIATE EDITOR | |
| # -------------------------------------- | |
| r"the associate editor coordinating the review.*?publication.*?", | |
| # -------------------------------------- | |
| # CREATIVE COMMONS LICENSE | |
| # -------------------------------------- | |
| r"this work is licensed under a creative commons.*?", | |
| r"for more information,\s*see\s*https?://[^\s]+", | |
| # -------------------------------------- | |
| # PAGE FOOTER | |
| # -------------------------------------- | |
| r"volume\s+\d+,\s*\d{4}", | |
| r"vol\.\s*\d+,\s*\d{4}", | |
| # -------------------------------------- | |
| # REMOVE REPEATED IEEE ACCESS HEADER | |
| # -------------------------------------- | |
| r"w\.\s*han\s*et\s*al\.\s*:.*?classifier", | |
| ] | |
| for pattern in patterns: | |
| text = re.sub( | |
| pattern, | |
| "", | |
| text, | |
| flags=re.IGNORECASE | |
| ) | |
| # -------------------------------------- | |
| # REMOVE EMPTY LINES | |
| # -------------------------------------- | |
| text = re.sub( | |
| r"\n{4,}", | |
| "\n\n", | |
| text | |
| ) | |
| return text.strip() | |
| # ============================================================ | |
| # 3) SAFE CLEANING (PRESERVE TABLES + BULLETS) | |
| # ============================================================ | |
| def clean_extracted_text(raw_text): | |
| if not raw_text: | |
| return "" | |
| text = raw_text.replace("\u00a0", " ").replace("\t", " ") | |
| text = remove_ieee_noise(text) | |
| # remove page numbers standing alone | |
| text = re.sub(r"^\s*\d+\s*$", "", text, flags=re.MULTILINE) | |
| # fix broken hyphenated words across lines (e.g., classifi- \n cation) | |
| text = re.sub(r"([a-zA-Z])-\s*\n\s*([a-zA-Z])", r"\1\2", text) | |
| # smart paragraph stitching: | |
| # if a line ends with a lowercase letter or comma, it is probably mid-sentence. | |
| # replace that specific newline with a space to stitch the sentence back together. | |
| text = re.sub(r"([a-z,])\n([a-zA-Z])", r"\1 \2", text) | |
| # common pdf extraction typos | |
| replacements = { | |
| "face-toface": "face-to-face", | |
| "IoTbased": "IoT-based", | |
| "pre- processing": "preprocessing", | |
| "machinelearning": "machine learning" | |
| } | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| # clean up excess whitespace but preserve double newlines for sections | |
| text = re.sub(r" {2,}", " ", text) | |
| text = re.sub(r"\n{4,}", "\n\n", text) | |
| return text.strip() | |
| # ============================================================ | |
| # 4) DOI EXTRACTION | |
| # ============================================================ | |
| def extract_doi(cleaned_text): | |
| cleaned_text = cleaned_text or "" | |
| doi_patterns = [ | |
| # Standard DOI | |
| r"\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b", | |
| # DOI: | |
| r"\bdoi\s*:\s*(10\.\d{4,9}/[-._;()/:A-Z0-9]+)\b", | |
| # Digital Object Identifier | |
| r"\bdigital\s+object\s+identifier\s+(10\.\d{4,9}/[-._;()/:A-Z0-9]+)\b" | |
| ] | |
| for pattern in doi_patterns: | |
| m = re.search( | |
| pattern, | |
| cleaned_text, | |
| flags=re.IGNORECASE | |
| ) | |
| if not m: | |
| continue | |
| if m.lastindex: | |
| doi = m.group(1) | |
| else: | |
| doi = m.group(0) | |
| doi = doi.strip() | |
| doi = re.sub( | |
| r"^(doi\s*:?)", | |
| "", | |
| doi, | |
| flags=re.IGNORECASE | |
| ) | |
| return doi | |
| return "" | |
| # ============================================================ | |
| # 5) YEAR EXTRACTION | |
| # ============================================================ | |
| def extract_year(cleaned_text): | |
| cleaned_text = cleaned_text or "" | |
| current_year = datetime.now().year | |
| # ---------------------------------------- | |
| # PRIORITY 1 | |
| # RECEIVED / ACCEPTED / PUBLICATION YEAR | |
| # ---------------------------------------- | |
| publication_patterns = [ | |
| r"date of publication\s+\w+\s+\d{1,2},?\s+(20\d{2})", | |
| r"accepted\s+\d{1,2}\s+\w+\s+(20\d{2})", | |
| r"received\s+\d{1,2}\s+\w+\s+(20\d{2})", | |
| r"current version\s+\d{1,2}\s+\w+\s+(20\d{2})" | |
| ] | |
| for pattern in publication_patterns: | |
| m = re.search( | |
| pattern, | |
| cleaned_text, | |
| flags=re.IGNORECASE | |
| ) | |
| if m: | |
| year = int(m.group(1)) | |
| if 1990 <= year <= current_year + 1: | |
| return str(year) | |
| # ---------------------------------------- | |
| # PRIORITY 2 | |
| # FIRST 3000 CHARS ONLY | |
| # ---------------------------------------- | |
| head_text = cleaned_text[:3000] | |
| years = re.findall( | |
| r"\b(19\d{2}|20\d{2})\b", | |
| head_text | |
| ) | |
| if years: | |
| valid_years = [ | |
| int(y) | |
| for y in years | |
| if 1990 <= int(y) <= current_year + 1 | |
| ] | |
| if valid_years: | |
| return str(max(valid_years)) | |
| # ---------------------------------------- | |
| # PRIORITY 3 | |
| # FULL DOCUMENT FALLBACK | |
| # ---------------------------------------- | |
| years = re.findall( | |
| r"\b(19\d{2}|20\d{2})\b", | |
| cleaned_text | |
| ) | |
| if years: | |
| valid_years = [ | |
| int(y) | |
| for y in years | |
| if 1990 <= int(y) <= current_year + 1 | |
| ] | |
| if valid_years: | |
| return str(max(valid_years)) | |
| return "" | |
| # ============================================================ | |
| # 6) TITLE EXTRACTION | |
| # ============================================================ | |
| def extract_title(cleaned_text): | |
| lines = [ | |
| l.strip() | |
| for l in cleaned_text.split("\n") | |
| if l.strip() | |
| ] | |
| if not lines: | |
| return "untitled paper" | |
| best_title = "" | |
| best_score = 0 | |
| for line in lines[:120]: | |
| low = line.lower() | |
| score = 0 | |
| # aggressive reject for headers and journal metadata | |
| reject_words = [ | |
| "abstract", "keywords", "index terms", "references", | |
| "received", "accepted", "date of publication", | |
| "date of current version", "digital object identifier", | |
| "doi", "volume", "issue", "@", "ieee", "transactions", | |
| "journal", "proceedings", "conference", "vol.", "no.", | |
| "pp.", "issn", "copyright" | |
| ] | |
| if any(x in low for x in reject_words): | |
| continue | |
| if re.search(r"\b(university|faculty|department|school|college|institute)\b", low): | |
| continue | |
| words = len(line.split()) | |
| # titles are usually between 4 and 25 words | |
| if 4 <= words <= 25: | |
| score += 5 | |
| if len(line) <= 180: | |
| score += 2 | |
| # title usually contains capitals | |
| caps = sum(1 for c in line if c.isupper()) | |
| score += min(caps, 5) | |
| # boost score if it doesn't have 4-digit numbers (headers often have years like 2025) | |
| if not re.search(r"\b\d{4}\b", line): | |
| score += 3 | |
| if score > best_score: | |
| best_score = score | |
| best_title = line | |
| final_title = best_title if best_title else lines[0] | |
| # check for weird encoding gibberish | |
| weird_chars = len(re.findall(r"[^a-zA-Z0-9\s:,\.\-]", final_title)) | |
| if weird_chars > len(final_title) * 0.2: | |
| return "title extraction error (please enter manually)" | |
| return final_title | |
| # ============================================================ | |
| # 7) ABSTRACT EXTRACTION (IEEE ROBUST VERSION) | |
| # ============================================================ | |
| def extract_abstract(cleaned_text): | |
| if not cleaned_text: | |
| return "" | |
| text = safe_str(cleaned_text) | |
| # catch standard, capitalized, and spaced-out versions | |
| patterns = [ | |
| r"\bA\s*B\s*S\s*T\s*R\s*A\s*C\s*T\b\s*[—\-:\.]?\s*", | |
| r"\bAbstract\b\s*[—\-:\.]?\s*" | |
| ] | |
| start_pos = -1 | |
| for p in patterns: | |
| m = re.search(p, text, flags=re.IGNORECASE) | |
| if m: | |
| start_pos = m.end() | |
| break | |
| # smart fallback: if no "abstract" keyword, grab the block before the introduction | |
| if start_pos == -1: | |
| intro_match = re.search(r"\b(1\.|I\.)?\s*INTRODUCTION\b", text, flags=re.IGNORECASE) | |
| if intro_match: | |
| potential_abstract = text[:intro_match.start()] | |
| # find the last chunky paragraph before intro | |
| paragraphs = potential_abstract.split("\n\n") | |
| for para in reversed(paragraphs): | |
| if len(para.split()) > 40: # abstracts usually have more than 40 words | |
| return re.sub(r"\s+", " ", para).strip() | |
| return "" | |
| tail = text[start_pos:] | |
| # strict stop markers so it doesn't bleed into the main body | |
| stop_markers = [ | |
| r"\bKeywords\b", r"\bIndex Terms\b", | |
| r"\bI\.\s*INTRODUCTION\b", r"\b1\.\s*INTRODUCTION\b", | |
| r"\n\s*INTRODUCTION\b" | |
| ] | |
| stop_pos = len(tail) | |
| for marker in stop_markers: | |
| mm = re.search(marker, tail, flags=re.IGNORECASE) | |
| if mm: | |
| stop_pos = min(stop_pos, mm.start()) | |
| abstract = tail[:stop_pos] | |
| abstract = re.sub(r"\s+", " ", abstract).strip() | |
| # remove leftover metadata noise | |
| abstract = re.sub(r"The associate editor.*?publication.*?\.", "", abstract, flags=re.IGNORECASE) | |
| return abstract[:2500].strip() | |
| # ============================================================ | |
| # 8) AUTHORS + AFFILIATION EXTRACTION (IEEE + GENERAL HEURISTIC) | |
| # ============================================================ | |
| def extract_authors_affiliation(cleaned_text, paper_title=""): | |
| lines = [ | |
| l.strip() | |
| for l in cleaned_text.split("\n") | |
| if l.strip() | |
| ] | |
| if not lines: | |
| return "author information not found", "affiliation data not found" | |
| authors = "" | |
| affiliation = "" | |
| affiliation_keywords = [ | |
| "university", "faculty", "department", "school", | |
| "college", "institute", "research center", "centre", | |
| "laboratory", "lab", "malaysia", "campus" | |
| ] | |
| reject_keywords = [ | |
| "abstract", "keywords", "index terms", "received", | |
| "accepted", "date of publication", "date of current version", | |
| "digital object identifier", "doi", "copyright", | |
| "volume", "issue", "ieee" | |
| ] | |
| head_lines = lines[:120] | |
| # ========================== | |
| # AFFILIATION | |
| # ========================== | |
| for line in head_lines: | |
| low = line.lower() | |
| if any(k in low for k in affiliation_keywords): | |
| if len(line) < 250: | |
| affiliation = line | |
| break | |
| # ========================== | |
| # AUTHORS | |
| # ========================== | |
| for line in head_lines: | |
| low = line.lower() | |
| # ignore the line if it is exactly the paper title! | |
| if paper_title and line.lower() == paper_title.lower(): | |
| continue | |
| if "abstract" in low: | |
| break | |
| if any(k in low for k in reject_keywords): | |
| continue | |
| if "@" in line: | |
| continue | |
| if any(k in low for k in affiliation_keywords): | |
| continue | |
| # skip section heading | |
| if re.match(r"^[IVX]{1,6}\.", line) or re.match(r"^\d+\.", line): | |
| continue | |
| # author line usually contains names | |
| capital_words = len(re.findall(r"\b[A-Z][a-z]+\b", line)) | |
| initials = len(re.findall(r"\b[A-Z]\.", line)) | |
| score = capital_words + (initials * 2) | |
| if "," in line: | |
| score += 3 | |
| if 2 <= len(line.split()) <= 20: | |
| score += 2 | |
| if score >= 6: | |
| authors = line | |
| break | |
| authors = re.sub(r"[^a-zA-Z0-9,\.\-\s]", "", authors).strip() | |
| affiliation = re.sub(r"[^a-zA-Z0-9,\.\-\s\(\)]", "", affiliation).strip() | |
| if not authors or len(authors) < 3: | |
| authors = "author information not found" | |
| if not affiliation or len(affiliation) < 3: | |
| affiliation = "affiliation data not found" | |
| return authors, affiliation | |
| # ============================================================ | |
| # 9) KEYWORDS EXTRACTION (ROBUST IEEE VERSION) | |
| # ============================================================ | |
| def extract_keywords(cleaned_text): | |
| text = safe_str(cleaned_text) | |
| if not text: | |
| return [] | |
| keywords = [] | |
| patterns = [ | |
| r"\bKeywords\s*[:\-]?\s*(.+)", | |
| r"\bIndex Terms\s*[:\-]?\s*(.+)", | |
| r"\bKeywords\s*[—–-]\s*(.+)", | |
| r"\bIndex Terms\s*[—–-]\s*(.+)" | |
| ] | |
| kw_block = "" | |
| for pattern in patterns: | |
| m = re.search( | |
| pattern, | |
| text, | |
| flags=re.IGNORECASE | |
| ) | |
| if m: | |
| start = m.start() | |
| tail = text[start:start + 1200] | |
| stop_patterns = [ | |
| r"\bI\.\s*INTRODUCTION\b", | |
| r"\b1\.\s*INTRODUCTION\b", | |
| r"\bINTRODUCTION\b", | |
| r"\bABSTRACT\b", | |
| r"\bREFERENCES\b", | |
| r"\bReceived\b", | |
| r"\bAccepted\b", | |
| r"\bDigital Object Identifier\b" | |
| ] | |
| stop_pos = len(tail) | |
| for sp in stop_patterns: | |
| mm = re.search( | |
| sp, | |
| tail, | |
| flags=re.IGNORECASE | |
| ) | |
| if mm: | |
| stop_pos = min( | |
| stop_pos, | |
| mm.start() | |
| ) | |
| kw_block = tail[:stop_pos] | |
| kw_block = re.sub( | |
| r"^(Keywords|Index Terms)\s*[:\-–—]?\s*", | |
| "", | |
| kw_block, | |
| flags=re.IGNORECASE | |
| ) | |
| break | |
| if not kw_block: | |
| return [] | |
| kw_block = kw_block.replace("\n", " ") | |
| kw_block = re.sub( | |
| r"\s+", | |
| " ", | |
| kw_block | |
| ) | |
| raw_keywords = re.split( | |
| r",|;", | |
| kw_block | |
| ) | |
| for kw in raw_keywords: | |
| kw = clean_text(kw) | |
| kw = re.sub( | |
| r"[^A-Za-z0-9\-\s\(\)]", | |
| "", | |
| kw | |
| ).strip() | |
| if len(kw) < 3: | |
| continue | |
| if len(kw) > 60: | |
| continue | |
| keywords.append(kw) | |
| keywords = list(dict.fromkeys(keywords)) | |
| return keywords[:12] | |
| # ============================================================ | |
| # 10) REFERENCES EXTRACTION | |
| # ============================================================ | |
| def extract_references(cleaned_text): | |
| cleaned_text = cleaned_text or "" | |
| ref_match = re.search( | |
| r"^\s*REFERENCES\b", | |
| cleaned_text, | |
| flags=re.IGNORECASE | re.MULTILINE | |
| ) | |
| if not ref_match: | |
| return "" | |
| references_text = cleaned_text[ | |
| ref_match.end(): | |
| ].strip() | |
| # ---------------------------------------- | |
| # STOP AFTER REFERENCES SECTION | |
| # ---------------------------------------- | |
| stop_patterns = [ | |
| r"^\s*APPENDIX\b", | |
| r"^\s*ACKNOWLEDGMENT\b", | |
| r"^\s*ACKNOWLEDGEMENTS\b", | |
| r"^\s*AUTHOR BIOGRAPHY\b", | |
| r"^\s*AUTHOR BIOGRAPHIES\b", | |
| r"^\s*BIOGRAPHY\b", | |
| r"^\s*BIOGRAPHIES\b", | |
| r"^\s*ABOUT THE AUTHORS\b" | |
| ] | |
| stop_pos = len(references_text) | |
| for pattern in stop_patterns: | |
| m = re.search( | |
| pattern, | |
| references_text, | |
| flags=re.IGNORECASE | re.MULTILINE | |
| ) | |
| if m: | |
| stop_pos = min( | |
| stop_pos, | |
| m.start() | |
| ) | |
| references_text = references_text[:stop_pos] | |
| # ---------------------------------------- | |
| # CLEAN | |
| # ---------------------------------------- | |
| references_text = remove_ieee_noise( | |
| references_text | |
| ) | |
| references_text = re.sub( | |
| r"\n{4,}", | |
| "\n\n", | |
| references_text | |
| ) | |
| references_text = references_text.strip() | |
| # ---------------------------------------- | |
| # LIMIT SIZE | |
| # ---------------------------------------- | |
| if len(references_text) > 15000: | |
| references_text = references_text[:15000] | |
| return references_text | |
| # ============================================================ | |
| # 11) REMOVE KEYWORDS + REFERENCES FROM MAIN BODY | |
| # ============================================================ | |
| def remove_keywords_and_references(cleaned_text): | |
| text = cleaned_text | |
| # remove keyword block (multi-line safe) | |
| text = re.sub( | |
| r"^\s*(Keywords|Index Terms)\s*[:\-]?.*?(\n\s*\n)", | |
| "\n\n", | |
| text, | |
| flags=re.IGNORECASE | re.DOTALL | |
| ) | |
| # cut before REFERENCES | |
| text = re.split(r"^\s*REFERENCES\b", text, flags=re.IGNORECASE | re.MULTILINE)[0] | |
| # remove extra blank lines | |
| text = re.sub(r"\n{4,}", "\n\n\n", text).strip() | |
| return text | |
| # ============================================================ | |
| # 12) DETECT SECTION HEADINGS | |
| # IEEE + GENERAL JOURNAL SUPPORT | |
| # ============================================================ | |
| def detect_section_headings(text): | |
| text = safe_str(text) | |
| headings = [] | |
| seen_positions = set() | |
| patterns = [ | |
| # III. RESULTS | |
| re.compile( | |
| r"^\s*([IVX]{1,8})\.\s+(.+?)\s*$", | |
| re.MULTILINE | |
| ), | |
| # 3 RESULTS | |
| re.compile( | |
| r"^\s*(\d{1,2})\.?\s+([A-Za-z].+?)\s*$", | |
| re.MULTILINE | |
| ), | |
| # A. Experimental Results | |
| re.compile( | |
| r"^\s*([A-Z])\.\s+(.+?)\s*$", | |
| re.MULTILINE | |
| ), | |
| # RESULTS AND DISCUSSION | |
| re.compile( | |
| r"^\s*([A-Z][A-Z0-9 \-\(\)/]{4,})\s*$", | |
| re.MULTILINE | |
| ) | |
| ] | |
| for pat in patterns: | |
| for m in pat.finditer(text): | |
| start = m.start() | |
| if start in seen_positions: | |
| continue | |
| seen_positions.add(start) | |
| if m.lastindex >= 2: | |
| label = m.group(1).strip() | |
| title = m.group(2).strip() | |
| else: | |
| label = "" | |
| title = m.group(1).strip() | |
| title = re.sub( | |
| r"\s{2,}", | |
| " ", | |
| title | |
| ).strip() | |
| low = title.lower() | |
| # ------------------------------------------------ | |
| # FILTER GARBAGE | |
| # ------------------------------------------------ | |
| if len(title) < 4: | |
| continue | |
| if len(title) > 120: | |
| continue | |
| if low.startswith("table"): | |
| continue | |
| if low.startswith("fig"): | |
| continue | |
| if low.startswith("figure"): | |
| continue | |
| if low.startswith("volume"): | |
| continue | |
| if low.startswith("received"): | |
| continue | |
| if low.startswith("accepted"): | |
| continue | |
| if "copyright" in low: | |
| continue | |
| if "creative commons" in low: | |
| continue | |
| if "digital object identifier" in low: | |
| continue | |
| if re.match( | |
| r"^w\.\s*[a-z]", | |
| low | |
| ): | |
| continue | |
| headings.append({ | |
| "label": label, | |
| "title": title, | |
| "start": start | |
| }) | |
| # -------------------------------------------------------- | |
| # SORT | |
| # -------------------------------------------------------- | |
| headings = sorted( | |
| headings, | |
| key=lambda x: x["start"] | |
| ) | |
| # -------------------------------------------------------- | |
| # REMOVE DUPLICATES | |
| # -------------------------------------------------------- | |
| cleaned = [] | |
| used_titles = set() | |
| for h in headings: | |
| title_key = ( | |
| h["title"] | |
| .lower() | |
| .strip() | |
| ) | |
| if title_key in used_titles: | |
| continue | |
| used_titles.add(title_key) | |
| cleaned.append(h) | |
| return cleaned | |
| # ============================================================ | |
| # 13) MAP HEADING INTO IMRAD CATEGORY | |
| # ============================================================ | |
| def map_heading_to_imrad(heading_title): | |
| t = safe_str(heading_title).lower().strip() | |
| # -------------------------------------------------------- | |
| # INTRODUCTION | |
| # -------------------------------------------------------- | |
| if any(k in t for k in [ | |
| "introduction", | |
| "background", | |
| "motivation", | |
| "overview", | |
| "preliminaries", | |
| "related work", | |
| "literature review", | |
| "state of the art", | |
| "problem statement", | |
| "research gap" | |
| ]): | |
| return "introduction" | |
| # -------------------------------------------------------- | |
| # METHODOLOGY | |
| # -------------------------------------------------------- | |
| if any(k in t for k in [ | |
| "method", | |
| "methodology", | |
| "materials", | |
| "implementation", | |
| "framework", | |
| "architecture", | |
| "design", | |
| "approach", | |
| "system model", | |
| "proposed system", | |
| "proposed method", | |
| "proposed framework", | |
| "procedure", | |
| "development", | |
| "algorithm", | |
| "workflow", | |
| # IEEE common | |
| "dataset", | |
| "data collection", | |
| "data preprocessing", | |
| "training", | |
| "testing setup", | |
| "experimental setup", | |
| "feature extraction", | |
| "model construction", | |
| "network structure", | |
| "network model", | |
| "model architecture", | |
| "fasternet", | |
| "yolov5", | |
| "cnn", | |
| "resnet", | |
| "classifier" | |
| ]): | |
| return "methodology" | |
| # -------------------------------------------------------- | |
| # RESULTS | |
| # -------------------------------------------------------- | |
| if any(k in t for k in [ | |
| "results", | |
| "evaluation", | |
| "experiment", | |
| "analysis", | |
| "performance", | |
| "validation", | |
| "discussion", | |
| "findings", | |
| "testing", | |
| "comparison", | |
| # IEEE common | |
| "experimental results", | |
| "performance comparison", | |
| "ablation study", | |
| "benchmark", | |
| "case study", | |
| "accuracy analysis", | |
| "result analysis" | |
| ]): | |
| return "results" | |
| # -------------------------------------------------------- | |
| # CONCLUSION | |
| # -------------------------------------------------------- | |
| if any(k in t for k in [ | |
| "conclusion", | |
| "conclusions", | |
| "future work", | |
| "future research", | |
| "summary", | |
| "limitations", | |
| "recommendation", | |
| "recommendations", | |
| "closing remarks" | |
| ]): | |
| return "conclusion" | |
| return "other" | |
| # ============================================================ | |
| # 14) FALLBACK SPLIT BY KEYWORDS (IMPROVED) | |
| # ============================================================ | |
| def fallback_split_by_keywords(text): | |
| text_lower = text.lower() | |
| def find_pos(keyword_list): | |
| positions = [] | |
| for kw in keyword_list: | |
| pos = text_lower.find(kw) | |
| if pos != -1: | |
| positions.append(pos) | |
| if positions: | |
| return min(positions) | |
| return -1 | |
| intro_pos = find_pos([ | |
| "introduction", | |
| "background", | |
| "motivation", | |
| "overview" | |
| ]) | |
| method_pos = find_pos([ | |
| "methodology", | |
| "methods", | |
| "materials and methods", | |
| "proposed method", | |
| "proposed framework", | |
| "proposed system", | |
| "system design", | |
| "framework", | |
| "architecture", | |
| "implementation", | |
| "algorithm", | |
| "workflow" | |
| ]) | |
| results_pos = find_pos([ | |
| "results", | |
| "experimental results", | |
| "evaluation", | |
| "performance evaluation", | |
| "experiment", | |
| "experiments", | |
| "analysis", | |
| "discussion", | |
| "findings", | |
| "testing" | |
| ]) | |
| concl_pos = find_pos([ | |
| "conclusion", | |
| "conclusions", | |
| "future work", | |
| "summary", | |
| "concluding remarks", | |
| "final remarks", | |
| "limitations" | |
| ]) | |
| positions = [ | |
| ("introduction", intro_pos), | |
| ("methodology", method_pos), | |
| ("results", results_pos), | |
| ("conclusion", concl_pos) | |
| ] | |
| positions = [ | |
| (name, pos) | |
| for name, pos in positions | |
| if pos != -1 | |
| ] | |
| positions = sorted( | |
| positions, | |
| key=lambda x: x[1] | |
| ) | |
| # -------------------------------------------------------- | |
| # No headings detected | |
| # -------------------------------------------------------- | |
| if len(positions) == 0: | |
| n = len(text) | |
| return { | |
| "introduction": | |
| text[:int(n * 0.30)].strip(), | |
| "methodology": | |
| text[int(n * 0.30):int(n * 0.60)].strip(), | |
| "results": | |
| text[int(n * 0.60):int(n * 0.85)].strip(), | |
| "conclusion": | |
| text[int(n * 0.85):].strip() | |
| } | |
| imrad = { | |
| "introduction": "", | |
| "methodology": "", | |
| "results": "", | |
| "conclusion": "" | |
| } | |
| for i, (name, start) in enumerate(positions): | |
| end = ( | |
| positions[i + 1][1] | |
| if i < len(positions) - 1 | |
| else len(text) | |
| ) | |
| chunk = text[start:end].strip() | |
| imrad[name] = chunk | |
| # -------------------------------------------------------- | |
| # Safety fallback | |
| # -------------------------------------------------------- | |
| if not imrad["introduction"]: | |
| imrad["introduction"] = text[:1500] | |
| if not imrad["conclusion"]: | |
| imrad["conclusion"] = text[-1500:] | |
| return imrad | |
| # ============================================================ | |
| # 15) SPLIT IMRAD USING HEADINGS | |
| # ============================================================ | |
| def split_into_imrad_sections(clean_body_text): | |
| clean_body_text = safe_str(clean_body_text) | |
| headings = detect_section_headings(clean_body_text) | |
| if len(headings) == 0: | |
| return fallback_split_by_keywords(clean_body_text) | |
| for i in range(len(headings)): | |
| if i < len(headings) - 1: | |
| headings[i]["end"] = headings[i + 1]["start"] | |
| else: | |
| headings[i]["end"] = len(clean_body_text) | |
| imrad = { | |
| "introduction": "", | |
| "methodology": "", | |
| "results": "", | |
| "conclusion": "" | |
| } | |
| other_chunks = [] | |
| for h in headings: | |
| title = safe_str(h.get("title", "")).strip() | |
| chunk = clean_body_text[h["start"]:h["end"]].strip() | |
| # cleanly remove the heading title itself from the paragraph | |
| chunk = re.sub( | |
| r"^\s*([IVX]{1,6}|[0-9]{1,3})\.?\s*" + re.escape(title) + r"\s*", | |
| "", | |
| chunk, | |
| flags=re.IGNORECASE | |
| ).strip() | |
| category = map_heading_to_imrad(title) | |
| if category in imrad: | |
| if imrad[category]: | |
| imrad[category] += "\n\n" | |
| imrad[category] += chunk | |
| else: | |
| other_chunks.append(chunk) | |
| # clean up extra spaces | |
| for key in imrad: | |
| imrad[key] = re.sub(r"\n{3,}", "\n\n", imrad[key]).strip() | |
| # smart content recovery for missing sections | |
| if not imrad["methodology"]: | |
| for chunk in other_chunks: | |
| if any(k in chunk.lower() for k in ["proposed method", "framework", "architecture", "dataset", "training"]): | |
| imrad["methodology"] = chunk | |
| break | |
| if not imrad["results"]: | |
| for chunk in other_chunks: | |
| if any(k in chunk.lower() for k in ["accuracy", "experiment", "evaluation", "performance"]): | |
| imrad["results"] = chunk | |
| break | |
| if not imrad["conclusion"]: | |
| for chunk in reversed(other_chunks): | |
| if any(k in chunk.lower() for k in ["conclusion", "future work", "summary", "limitation"]): | |
| imrad["conclusion"] = chunk | |
| break | |
| # hard fallback if the mapping completely failed | |
| empty_count = sum(1 for v in imrad.values() if not v.strip()) | |
| if empty_count >= 3: | |
| return fallback_split_by_keywords(clean_body_text) | |
| # fix the "giant introduction" bug for weird ieee formatting | |
| intro_len = len(imrad["introduction"]) | |
| if intro_len > 6000 and len(imrad["methodology"]) < 500: | |
| # if intro is massive but method is empty, split it manually | |
| half = intro_len // 2 | |
| imrad["methodology"] = imrad["introduction"][half:] | |
| imrad["introduction"] = imrad["introduction"][:half] | |
| return imrad | |
| # ============================================================ | |
| # 16) MAIN PIPELINE FUNCTION (PHASE 1) | |
| # ============================================================ | |
| def run_phase1_pipeline(pdf_path): | |
| raw_text = extract_text_from_pdf(pdf_path) | |
| if not raw_text.strip(): | |
| raise ValueError("❌ Extracted PDF text is empty (This PDF may be scanned, OCR is required).") | |
| cleaned_text = clean_extracted_text(raw_text) | |
| paper_id = os.path.splitext(os.path.basename(pdf_path))[0] | |
| title = extract_title(cleaned_text) | |
| doi = extract_doi(cleaned_text) | |
| year = extract_year(cleaned_text) | |
| abstract = extract_abstract(cleaned_text) | |
| keywords = extract_keywords(cleaned_text) | |
| references = extract_references(cleaned_text) | |
| authors, affiliation = extract_authors_affiliation(cleaned_text) | |
| # remove keywords + references before IMRAD split | |
| clean_body_text = remove_keywords_and_references(cleaned_text) | |
| # split IMRAD | |
| imrad_sections = split_into_imrad_sections(clean_body_text) | |
| imrad_sections = { | |
| "introduction": imrad_sections.get("introduction", "").strip(), | |
| "methodology": imrad_sections.get("methodology", "").strip(), | |
| "results": imrad_sections.get("results", "").strip(), | |
| "conclusion": imrad_sections.get("conclusion", "").strip() | |
| } | |
| metadata = { | |
| "paper_id": paper_id, | |
| "source_file": os.path.basename(pdf_path), | |
| "processed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "title": title, | |
| "authors": authors, | |
| "affiliation": affiliation, | |
| "doi": doi, | |
| "year": year, | |
| "raw_text_length": len(raw_text), | |
| "cleaned_text_length": len(cleaned_text), | |
| "body_length": len(clean_body_text), | |
| "imrad_detected": { | |
| "intro_len": len(imrad_sections["introduction"]), | |
| "method_len": len(imrad_sections["methodology"]), | |
| "results_len": len(imrad_sections["results"]), | |
| "conclusion_len": len(imrad_sections["conclusion"]) | |
| }, | |
| "notes": { | |
| "preserve_tables": True, | |
| "preserve_linebreaks": True, | |
| "preserve_numbering": True, | |
| "abstract_detected": True if abstract else False, | |
| "doi_detected": True if doi else False | |
| } | |
| } | |
| return { | |
| "paper_id": paper_id, | |
| "title": title, | |
| "doi": doi, | |
| "year": year, | |
| "keywords": keywords, | |
| "abstract": abstract, | |
| "cleaned_text": clean_body_text, | |
| "imrad_sections": imrad_sections, | |
| "references": references, | |
| "metadata": metadata, | |
| "status": "success" | |
| } | |
| # ============================================================ | |
| # QUICK TEST | |
| # ============================================================ | |
| if __name__ == "__main__": | |
| test_pdf = "sample.pdf" | |
| if os.path.exists(test_pdf): | |
| out = run_phase1_pipeline(test_pdf) | |
| print("\n========== PHASE 1 OUTPUT TEST ==========") | |
| print("PAPER ID:", out["paper_id"]) | |
| print("TITLE:", out["title"]) | |
| print("AUTHORS:", out["metadata"]["authors"]) | |
| print("AFFILIATION:", out["metadata"]["affiliation"]) | |
| print("DOI:", out["doi"]) | |
| print("YEAR:", out["year"]) | |
| print("KEYWORDS:", out["keywords"]) | |
| print("ABSTRACT LEN:", len(out["abstract"])) | |
| print("INTRO LEN:", len(out["imrad_sections"]["introduction"])) | |
| print("METHOD LEN:", len(out["imrad_sections"]["methodology"])) | |
| print("RESULTS LEN:", len(out["imrad_sections"]["results"])) | |
| print("CONCLUSION LEN:", len(out["imrad_sections"]["conclusion"])) | |
| print("REFERENCES LEN:", len(out["references"])) | |
| print("========================================\n") | |
| else: | |
| print("❌ sample.pdf not found for testing.") |