import gradio as gr from gradio_pdf import PDF import fitz import os import tempfile import json import requests import xml.etree.ElementTree as ET import re import time import sys from collections import OrderedDict import Levenshtein import jellyfish from unidecode import unidecode from venues import VENUE_NAMES, VENUE_ABBREVIATIONS, COMMON_TERMS from urlextract import URLExtract # Semantic Scholar Status Codes SEMANTIC_SCHOLAR_STATUS_CODES = { 200: "OK: Request successful", 400: "Bad Request: Check parameters", 401: "Unauthorized: Invalid API key", 403: "Forbidden: No permission", 404: "Not Found: Endpoint or resource missing", 429: "Too Many Requests: Rate limited", 500: "Internal Server Error: Server-side issue" } # Initialize URL extractor extractor = URLExtract() def cleanup_old_temp_files(max_age_hours=1): """Clean up old temporary files from /tmp to save disk space. Safe for multi-user: Only deletes files that match our specific app patterns and are reliably 'old' (default > 1 hour). """ import time now = time.time() cutoff = now - (max_age_hours * 3600) temp_dir = tempfile.gettempdir() if not os.path.exists(temp_dir): return # patterns to look for (created by NamedTemporaryFile in our app) # We look for files ending with our specific suffixes target_suffixes = ("_grobid.pdf", "_ref_subset.pdf", "_verifications.csv") try: for filename in os.listdir(temp_dir): if filename.endswith(target_suffixes): file_path = os.path.join(temp_dir, filename) try: # Check age if os.path.getmtime(file_path) < cutoff: # Double check it's a file, not a directory if os.path.isfile(file_path): os.unlink(file_path) except Exception: pass except Exception as e: print(f"Error during temp file cleanup: {e}") def normalize_title_for_comparison(title): """Normalize title for similarity comparison: lowercase, remove punctuation.""" if not title: return "" # Lowercase and remove all non-alphanumeric/space characters normalized = re.sub(r'[^a-zA-Z0-9\s]', ' ', title.lower()) # Collapse multiple spaces return ' '.join(normalized.split()) def normalize_api_author(name): """Normalize author name strictly for API-sourced strings. Handles 'Last, First' vs 'First Last' robustly. """ if not name: return "" # 1. ASCII normalization name = unidecode(name) # 2. Remove "et al" and "etal" name = re.sub(r'\b(et\s*al\.?|etal)\b', '', name, flags=re.IGNORECASE).strip() # 3. Detect "Last, First" vs "First Last" if "," in name: parts = name.split(",", 1) surname = parts[0].strip() given_name = parts[1].strip() if len(parts) > 1 else "" else: parts = name.split() if not parts: return "" if len(parts) == 1: surname = parts[0] given_name = "" else: surname = parts[-1] # Everything before the last word is given name metadata given_name = " ".join(parts[:-1]) # 4. Clean up the parts and generate initials surname = re.sub(r'[^a-zA-Z]', '', surname).lower() # Process given_name for initials # Replace non-alpha with spaces to separate compact initials like 'J.K.' given_clean = re.sub(r'[^a-zA-Z]', ' ', given_name).lower() given_parts = given_clean.split() initials = [g[0] for g in given_parts if g] initials_str = " ".join(initials) result = f"{surname} {initials_str}".strip() return result def normalize_d_author(name): """Normalize author name for PDF-sourced strings (simpler logic). Takes last word as surname + first initial of first word. """ if not name: return "" # 1. ASCII normalization & strip n = unidecode(name).strip() # 2. Check for "Last, First" comma (from parse_names_by_pattern regrouping) if "," in n: parts = n.split(",", 1) surname = re.sub(r'[^a-zA-Z\s]', '', parts[0]).strip().lower() if len(parts) > 1: # Split the part after comma into words (First Middle) given_raw = parts[1].strip() # Replace non-alpha with spaces to separate compact initials like 'J.K.' given_clean = re.sub(r'[^a-zA-Z]', ' ', given_raw) given_parts = given_clean.split() # Abbreviate each word initials = [g[0].lower() for g in given_parts if g] initials_str = " ".join(initials) else: initials_str = "" else: # 3. Fallback: Last word is surname (First Middle Last format) # Replace non-alpha with spaces to separate compact initials like 'J.K.' n_clean = re.sub(r'[^a-zA-Z]', ' ', n) parts = n_clean.split() if not parts: return "" if len(parts) == 1: surname = parts[0].lower() initials_str = "" else: surname = parts[-1].lower() # All words before the last one are treated as First/Middle names # We take the first letter of each to form initials initials = [p[0].lower() for p in parts[:-1] if p] initials_str = " ".join(initials) result = f"{surname} {initials_str}".strip() return result def calculate_title_similarity(d_title, api_title): """Calculate the similarity between two titles.""" norm_raw = normalize_title_for_comparison(d_title) norm_api = normalize_title_for_comparison(api_title) if not norm_raw or not norm_api: return 0.0 return Levenshtein.ratio(norm_raw, norm_api) def calculate_citation_recall(candidate_title, raw_citation): """ Calculate recall: roughly, how much of the candidate title is present in the raw citation? We use fuzz matching to find the best substring in raw_citation that matches candidate_title. Recall = (Length of Matched Substring) / (Length of Candidate Title) Note: Ideally this should be close to 1.0 if the title is fully present. """ if not candidate_title or not raw_citation: return 0.0 norm_cand = normalize_title_for_comparison(candidate_title) norm_raw = normalize_title_for_comparison(raw_citation) if not norm_cand or not norm_raw: return 0.0 # Standard fuzzy substring search logic (similar to calculate_title_similarity but focus on length coverage) cand_len = len(norm_cand) max_score = 0.0 # We want to know if norm_cand exists in norm_raw. # We search windows of approx size of cand in raw for i in range(len(norm_raw)): # Check window sizes +/- 10% margin = max(3, int(cand_len * 0.1)) for window_size in range(cand_len - margin, cand_len + margin): if window_size <= 0: continue if i + window_size > len(norm_raw): break substring = norm_raw[i : i + window_size] # Use Levenshtein.ratio -> gives 2*matches / (len1 + len2) # We want to approximate recall: (matches / len_cand) # ratio * (len1 + len2) = 2 * matches # matches = ratio * (len1 + len2) / 2 # Recall = matches / len_cand ratio = Levenshtein.ratio(substring, norm_cand) estimated_matches = ratio * (len(substring) + len(norm_cand)) / 2 recall = estimated_matches / len(norm_cand) if recall > max_score: max_score = recall if max_score > 0.95: return 1.0 # Early exit return min(max_score, 1.0) def calculate_author_similarity(authors1, authors2): """Calculate Jaro-Winkler similarity for author lists (0-1). z Args: authors1: List of author names from original citation (PDF) authors2: List of author dicts from Semantic Scholar [{'name': ...}, ...] (API) Returns: Refined Jaro-Winkler score (0-1) """ norm1 = authors1 norm2 = authors2 if not norm1 or not norm2: return 0.0 # Asymmetric Best-Match: For each PDF author, find the best partner in API list best_match_scores = [] for n1 in norm1: max_score = 0.0 best_partner = None for n2 in norm2: score = jellyfish.jaro_winkler_similarity(n1, n2) if score > max_score: max_score = score best_partner = n2 best_match_scores.append(max_score) sys.stdout.flush() # Average best matches avg_score = sum(best_match_scores) / len(best_match_scores) if best_match_scores else 0.0 # Hallucination Penalty: If PDF lists more authors than API has returned # (Allow a small buffer of 1 for minor parsing differences) if len(norm1) > len(norm2) + 1: penalty = len(norm2) / len(norm1) avg_score *= penalty return avg_score def discover_metadata_in_raw(raw_text, api_title, api_authors, is_exact_match=False): """ Search for the title and author segments in the raw text based on API results. Returns: (title_after_verification, authors_after_verification) strings or empty. """ if not raw_text: return "", "" discovered_title = "" discovered_authors = "" # We create a normalized string AND a mapping from normalized index to original index norm_raw = [] norm_to_orig = [] last_was_space = True # Start true to ignore leading non-alnum for i, char in enumerate(raw_text): if char.isalnum(): norm_raw.append(char.lower()) norm_to_orig.append(i) last_was_space = False else: if not last_was_space: norm_raw.append(' ') norm_to_orig.append(i) last_was_space = True norm_raw_str = "".join(norm_raw) # 1. Discover Title Segment if is_exact_match: discovered_title = api_title elif api_title: # Also clean API title with spaces api_dirty = api_title.lower() norm_api_list = [] last_space = True for c in api_dirty: if c.isalnum(): norm_api_list.append(c) last_space = False else: if not last_space: norm_api_list.append(' ') last_space = True norm_api = "".join(norm_api_list).strip() if norm_api and norm_raw_str: api_len = len(norm_api) best_window = None max_score = 0.0 for i in range(len(norm_raw_str)): if i + api_len > len(norm_raw_str) + 5: break for delta in [0, -1, 1, -2, 2, -3, 3]: window_size = api_len + delta if window_size <= 0: continue if i + window_size > len(norm_raw_str): continue substring = norm_raw_str[i : i + window_size] score = Levenshtein.ratio(substring, norm_api) if score > max_score: max_score = score best_window = (i, i + window_size) # Perfect match optimization if max_score > 0.99: break if max_score > 0.99: break # If we found a good match (> 0.75) if max_score > 0.75 and best_window: start_norm, end_norm = best_window if start_norm < len(norm_to_orig) and end_norm <= len(norm_to_orig): orig_start_idx = norm_to_orig[start_norm] orig_end_idx = norm_to_orig[end_norm - 1] raw_slice = raw_text[orig_start_idx : orig_end_idx + 1] discovered_title = raw_slice.strip() else: discovered_title = api_title else: discovered_title = api_title else: discovered_title = api_title # 2. Discover Author Segment # We take everything from the beginning until the start of the title author_limit_idx = -1 # Strategy A: Use Discovered Title Start if discovered_title and discovered_title in raw_text: author_limit_idx = raw_text.find(discovered_title) # Strategy B: Use Year (Fail-safe) year_match = re.search(r'\b(19|20|21)\d{2}\b', raw_text) if year_match: year_idx = year_match.start() if author_limit_idx == -1 or year_idx < author_limit_idx: author_limit_idx = year_idx if author_limit_idx > 0: segment = raw_text[:author_limit_idx] discovered_authors = segment.strip().rstrip(".,:; ") else: if api_authors: api_names = [] if isinstance(api_authors[0], dict): api_names = [a.get('name', '') for a in api_authors if a.get('name')] else: api_names = [str(a) for a in api_authors] found_indices = [] norm_raw_str_full = raw_text.lower() for name in api_names: parts = name.lower().split() if len(parts) >= 2: p = re.escape(parts[0]) + r'.*?' + re.escape(parts[-1]) m = re.search(p, norm_raw_str_full) if m: found_indices.append(m.end()) if found_indices: last_author_end = max(found_indices) discovered_authors = raw_text[:last_author_end].strip().rstrip(".,;:") return discovered_title, discovered_authors def classify_verification(title_score, author_score, has_error=False, error_msg=""): """Classify verification status based on weighted similarity scores. Weights: 70% Title, 30% Authors Returns: dict with 'status', 'icon', 'title_score', 'author_score', 'confidence', 'error' """ if has_error: return { 'status': 'api_error', 'icon': '✗', 'title_score': 0.0, 'author_score': 0.0, 'confidence': 0.0, 'error': error_msg } # Weighted Hybrid Score confidence = (title_score * 0.70) + (author_score * 0.30) # Threshold classification if confidence >= 0.95: return { 'status': 'verified', 'icon': '✓', 'title_score': title_score, 'author_score': author_score, 'confidence': confidence } elif confidence >= 0.75: return { 'status': 'ambiguous', 'icon': '⚠', 'title_score': title_score, 'author_score': author_score, 'confidence': confidence } else: return { 'status': 'suspected_hallucination', 'icon': '⚠⚠', 'title_score': title_score, 'author_score': author_score, 'confidence': confidence } def verify_citation_against_paper(raw_citation, api_paper, extracted_title, name_order="first_last", separator=","): """ Verify a citation against a paper using discovery with global pattern awareness. """ api_title = api_paper.get('title', '') api_authors_list = api_paper.get('authors', []) # Pre-normalize API authors (Ground Truth) api_authors_norm = [] if api_authors_list: # SS API returns [{'name': ...}, ...] or just list of names if isinstance(api_authors_list[0], dict): api_authors_norm = [normalize_api_author(a.get('name', '')) for a in api_authors_list if a.get('name')] else: api_authors_norm = [normalize_api_author(str(a)) for a in api_authors_list if a] # --- TITLE SELECTION LOGIC --- best_title_candidate = None title_source = "" is_exact_match = False if extracted_title and api_title: norm_extracted = normalize_title_for_comparison(extracted_title) norm_api = normalize_title_for_comparison(api_title) if norm_extracted == norm_api and len(norm_extracted) > 10: is_exact_match = True best_title_candidate = extracted_title title_source = "exact_match" if not is_exact_match: # Compare extracted_title vs api_title based on RECALL of raw_citation recall_extracted = calculate_citation_recall(extracted_title, raw_citation) if extracted_title else 0.0 recall_api = calculate_citation_recall(api_title, raw_citation) # Tie-breaker: If recall is the same, pick the one with fewer words if abs(recall_extracted - recall_api) < 1e-7: # Tie case words_ext = len(extracted_title.split()) if extracted_title else 999 words_api = len(api_title.split()) if api_title else 999 if words_ext < words_api: best_title_candidate = extracted_title title_source = "extracted (tie-breaker shorter)" else: best_title_candidate = api_title title_source = "api (tie-breaker shorter)" elif recall_extracted > (recall_api + 0.1): best_title_candidate = extracted_title title_source = "cleaned/extracted" else: best_title_candidate = api_title title_source = "api" # 1. Discovery Step d_title, d_authors = discover_metadata_in_raw(raw_citation, best_title_candidate, api_authors_list, is_exact_match=is_exact_match) # 2. Scoring Step: Compare the DISCOVERED title against the API title (Ground Truth) if d_title: t_score = calculate_title_similarity(d_title, api_title) else: # Fallback if discovery failed # If discovery failed, score is 0 as we couldn't find the title segment t_score = 0.0 # 3. Author Scoring Step if d_authors: # Detect "et al" in original segments (case-insensitive) has_etal = re.search(r'\bet\s*al\b', d_authors, re.IGNORECASE) # Use the global pattern and separator for surgery parsing parsed_d_authors = parse_names_by_pattern(d_authors, name_order, separator) score_forward = calculate_author_similarity(parsed_d_authors, api_authors_norm) if has_etal: a_score = score_forward else: score_backward = calculate_author_similarity(api_authors_norm, parsed_d_authors) a_score = (0.5 * score_forward) + (0.5 * score_backward) sys.stdout.flush() else: # If discovery failed to find an author segment, score is 0.0 a_score = 0.0 check_data = classify_verification(t_score, a_score) check_data['semantic_data'] = api_paper check_data['title_source'] = title_source # Enhance check_data with discovery info check_data['discovery'] = (d_title, d_authors) return check_data, (d_title, d_authors) def check_citations_semantic_scholar(citations_to_check, api_key=None, name_order="first_last", separator=","): """Check citations using Semantic Scholar API as a generator. Args: citations_to_check: List of citations to verify api_key: Optional Semantic Scholar API key for higher rate limits Yields: Verified citation dictionary for each input citation """ for i, cit in enumerate(citations_to_check): raw_text = cit.get('raw_text', '').strip() title = cit.get('title', '').strip() # Use the original PDF strings for verification raw_citation = cit.get('raw_text', '').strip() cleaned_title = title # OPTIMIZATION: correct skipping of already verified citations # If the citation is already verified/checked (has a determined status), skip it. # relevant statuses: 'verified', 'ambiguous', 'suspected_hallucination', 'api_error' # We might want to retry 'api_error', but definitely skip the others. existing_status = cit.get('verification', {}).get('status') if existing_status in ['verified', 'ambiguous', 'suspected_hallucination']: yield cit continue try: check_data = {'status': 'not_found', 'semantic_data': None} found_stage1 = False response = None def make_request(url, p, h): max_retries = 3 retry_cnt = 0 while retry_cnt <= max_retries: try: resp = requests.get(url, params=p, headers=h, timeout=10) if resp.status_code == 429: if retry_cnt < max_retries: w_time = 2 ** retry_cnt time.sleep(w_time) retry_cnt += 1 else: return resp else: return resp except requests.exceptions.Timeout: retry_cnt += 1 except Exception as e: return None return None headers = {} if api_key: headers['x-api-key'] = api_key if cleaned_title: # --- STAGE 1: Direct Match (/match) by Title --- match_url = "https://api.semanticscholar.org/graph/v1/paper/search/match" params = { 'query': cleaned_title, 'fields': 'title,authors,year,venue' } response = make_request(match_url, params, headers) if response is not None: status_desc = SEMANTIC_SCHOLAR_STATUS_CODES.get(response.status_code, f"Unknown ({response.status_code})") if response.status_code == 200: resp_json = response.json() if resp_json.get('data') and len(resp_json['data']) > 0: paper = resp_json['data'][0] if paper and paper.get('paperId'): found_stage1 = True # --- UNIFIED VERIFICATION LOGIC --- check_data, discovery = verify_citation_against_paper( raw_citation, paper, cleaned_title, # extracted_title name_order=name_order, separator=separator ) d_title, d_authors = discovery # Store discovery results cit['title_after_verification'] = d_title cit['authors_after_verification'] = d_authors elif response.status_code in [400, 401, 403]: found_stage1 = True check_data = classify_verification(0, 0, has_error=True, error_msg=status_desc) else: found_stage1 = True check_data = classify_verification(0, 0, has_error=True, error_msg="No Response") # --- STAGE 2: Fallback Search (/search) if Stage 1 failed --- if not found_stage1: if response and response.status_code == 429: check_data = classify_verification(0, 0, has_error=True, error_msg="Rate Limited (429)") else: search_url = "https://api.semanticscholar.org/graph/v1/paper/search" # We try up to two different search queries to maximize recall queries_to_try = [] if cleaned_title: queries_to_try.append(("Title", cleaned_title)) queries_to_try.append(("Raw Citation", raw_citation)) all_candidates = {} # paperId -> paper_data for q_type, q_string in queries_to_try: search_params = { 'query': q_string, 'limit': 5, 'fields': 'title,authors,year,venue' } s_resp = make_request(search_url, search_params, headers) if s_resp and s_resp.status_code == 200: data = s_resp.json().get('data', []) for paper in data: pid = paper.get('paperId') if pid and pid not in all_candidates: all_candidates[pid] = paper elif s_resp and s_resp.status_code == 429: break # Stop trying queries if rate limited if all_candidates: results_list = list(all_candidates.values()) # --- STAGE 2 OPTIMIZATION: SELECT BEST API GROUND TRUTH BY RECALL --- # 1. Find the API paper whose title has the highest recall against raw citation best_api_paper = None max_api_recall = -1.0 min_word_count = 999 for paper in results_list: title = paper.get('title', '') rec = calculate_citation_recall(title, raw_citation) word_count = len(title.split()) if title else 999 if rec > max_api_recall: max_api_recall = rec min_word_count = word_count best_api_paper = paper elif abs(rec - max_api_recall) < 1e-7: # Tie in recall, check word count if word_count < min_word_count: min_word_count = word_count best_api_paper = paper if best_api_paper: # 2. Verify using this Best API Paper # The helper function will automatically decide whether to use the # Best API Title OR the Extracted Title as the 'Anchor' for discovery. check_data, discovery = verify_citation_against_paper( raw_citation, best_api_paper, cleaned_title, name_order=name_order, separator=separator ) # Finalize discovery data on the citation object cit['title_after_verification'], cit['authors_after_verification'] = discovery if check_data.get('confidence', 0) < 0.4: check_data = classify_verification(0, 0, has_error=True, error_msg="Low confidence match") else: check_data = classify_verification(0, 0, has_error=True, error_msg="No suitable API candidate found") else: check_data = classify_verification(0, 0, has_error=True, error_msg="No search results found by API") sys.stdout.flush() cit['verification'] = check_data yield cit except Exception as e: cit['verification'] = classify_verification(0, 0, has_error=True, error_msg=str(e)) yield cit sys.stdout.flush() # Rate limiting: wait 1 second between requests to avoid 429 errors (only if no API key) if not api_key and i < len(citations_to_check) - 1: time.sleep(1) def parse_tei_citations(tei_xml): """Parse TEI XML and extract citations.""" try: root = ET.fromstring(tei_xml) citations = [] ns = {'tei': 'http://www.tei-c.org/ns/1.0'} for bibl in root.findall('.//tei:listBibl/tei:biblStruct', ns): citation = {} # Extract title title_elem = bibl.find('.//tei:title[@level="a"]', ns) used_monograph_as_title = False if title_elem is None: title_elem = bibl.find('.//tei:title[@level="m"]', ns) if title_elem is not None: used_monograph_as_title = True if title_elem is not None and title_elem.text: citation['title'] = title_elem.text.strip() # Extract authors authors = [] for author in bibl.findall('.//tei:author', ns): persName = author.find('.//tei:persName', ns) if persName is not None: forename = persName.find('.//tei:forename', ns) surname = persName.find('.//tei:surname', ns) name_parts = [] if forename is not None and forename.text: name_parts.append(forename.text.strip()) if surname is not None and surname.text: name_parts.append(surname.text.strip()) if name_parts: authors.append(' '.join(name_parts)) if authors: citation['authors'] = authors # Extract year date_elem = bibl.find('.//tei:date[@type="published"]', ns) if date_elem is not None and date_elem.get('when'): citation['year'] = date_elem.get('when') # Extract venue/journal - check multiple possible locations venue_elem = bibl.find('.//tei:title[@level="j"]', ns) # Journal if venue_elem is None and not used_monograph_as_title: venue_elem = bibl.find('.//tei:title[@level="m"]', ns) # Monograph/Book if venue_elem is None: venue_elem = bibl.find('.//tei:meeting', ns) # Conference if venue_elem is not None and venue_elem.text: citation['venue'] = venue_elem.text.strip() # Also try to get publisher if no venue found if 'venue' not in citation: publisher_elem = bibl.find('.//tei:publisher', ns) if publisher_elem is not None and publisher_elem.text: citation['venue'] = publisher_elem.text.strip() if citation: # Extract raw_reference text - this becomes the display text raw_ref_elem = bibl.find('.//tei:note[@type="raw_reference"]', ns) if raw_ref_elem is not None: raw_ref_text = "".join(raw_ref_elem.itertext()).strip() raw_ref_text = re.sub(r'\s+', ' ', raw_ref_text) citation['raw_text'] = raw_ref_text else: # Fallback to biblStruct text if no raw_reference raw_text = "".join(bibl.itertext()).strip() raw_text = re.sub(r'\s+', ' ', raw_text) citation['raw_text'] = raw_text # Store entire biblStruct XML for parsing citation['grobid_xml'] = ET.tostring(bibl, encoding='unicode') citations.append(citation) return citations except Exception as e: return [] def extract_title_and_authors_from_xml(xml_string): """Extract title and authors from GROBID biblStruct XML. Args: xml_string: XML string of biblStruct element Returns: Dictionary with 'title' and 'authors' fields """ try: root = ET.fromstring(xml_string) ns = {'ns0': 'http://www.tei-c.org/ns/1.0', 'tei': 'http://www.tei-c.org/ns/1.0'} result = {} # Extract title - try multiple paths title_elem = root.find('.//ns0:title[@level="a"][@type="main"]', ns) if title_elem is None: title_elem = root.find('.//ns0:title[@level="a"]', ns) if title_elem is None: title_elem = root.find('.//ns0:title[@level="m"]', ns) if title_elem is None: title_elem = root.find('.//ns0:title', ns) if title_elem is None: title_elem = root.find('.//tei:title[@level="a"][@type="main"]', ns) if title_elem is None: title_elem = root.find('.//tei:title[@level="a"]', ns) if title_elem is None: title_elem = root.find('.//tei:title', ns) if title_elem is not None and title_elem.text: result['title'] = title_elem.text.strip() result['authors'] = [] return result except Exception as e: return {} def clean_metadata(text): """Clean title or author string specifically by removing segments that contain known publication venues or URLs. Splits text by common punctuation (.,:;?!), checks each segment for venue names (case-insensitive), abbreviations (case-sensitive), or URLs, and removes contaminated segments. """ if not text: return "" # Pre-cleaning: Remove parentheses symbols but keep the content text = text.replace('(', '').replace(')', '') # Define additional DOI/Arxiv extraction terms that might not be caught by URLExtract extra_patterns = r'arxiv\.org|doi\.org|\bdoi:|\burl\b' # 1. Protect URLs during splitting using URLExtract # We find all URL matches and replace them with placeholders placeholders = [] temp_text = text # Get all URLs from the text urls = extractor.find_urls(text, True) # Sort by length descending to avoid partial replacement issues for url in sorted(list(set(urls)), key=len, reverse=True): placeholder = f"__URL_PH_{len(placeholders)}__" placeholders.append(url) temp_text = temp_text.replace(url, placeholder) # Also handle the explicitly requested labels like doi: def replace_extra(match): placeholder = f"__URL_PH_{len(placeholders)}__" placeholders.append(match.group(0)) return placeholder temp_text = re.sub(extra_patterns, replace_extra, temp_text, flags=re.IGNORECASE) # 2. Split by punctuation (period, question mark, exclamation mark) # We split on . ? or ! followed by space or end of string parts = re.split(r'([.?!]\s|[.?!]$)', temp_text) # Re-group content and its trailing separator segments = [] current_segment = "" for part in parts: if part and (part.strip() in ['.', '?', '!'] or re.match(r'[.?!]\s', part)): segments.append(current_segment + part) current_segment = "" else: current_segment += part if current_segment: segments.append(current_segment) final_segments = [] for seg in segments: # Check if this segment contains a URL placeholder if "__URL_PH_" in seg: # Entire segment contains a URL, TRUNCATE HERE break # Restore placeholders just for this segment to check for venues check_seg = seg for i, val in enumerate(placeholders): check_seg = check_seg.replace(f"__URL_PH_{i}__", val) seg_lower = check_seg.lower() found_contamination = False # Check for Venues (Case-Insensitive names, Case-Sensitive abbrs) for venue in VENUE_NAMES: if venue.lower() in seg_lower: found_contamination = True break if not found_contamination: for abbr in VENUE_ABBREVIATIONS: if re.search(r'\b' + re.escape(abbr) + r'\b', check_seg): found_contamination = True break if not found_contamination: for term in COMMON_TERMS: if term.lower() in seg_lower: found_contamination = True break if not found_contamination: # Check for Years (19xx-21xx) - Truncate if found # User requested to remove segments with years, but NOT all digits if re.search(r'\b(19|20|21)\d{2}\b', check_seg): found_contamination = True if not found_contamination: # Double check for any missed URLs just in case if extractor.has_urls(check_seg) or re.search(extra_patterns, check_seg, re.IGNORECASE): found_contamination = True if found_contamination: # TRUNCATE HERE break # Reconstruct the segment with URLs restored restored_seg = seg for i, val in enumerate(placeholders): restored_seg = restored_seg.replace(f"__URL_PH_{i}__", val) final_segments.append(restored_seg) # Join remaining segments text = "".join(final_segments).strip() # Final cleanup text = re.sub(r'\s+', ' ', text).strip() text = re.sub(r'\(\s*\)', '', text) text = re.sub(r'\[\s*\]', '', text) text = text.strip(".,;: -()[]") return text def find_reference_pages(pdf_path): """Find reference section pages in the PDF and extract their text.""" doc = fitz.open(pdf_path) start_page = None end_page = len(doc) ref_text = "" # Will store concatenated reference section text # Find the start page for page_num, page in enumerate(doc): text = page.get_text("text") lines = [l.strip().lower() for l in text.splitlines() if l.strip()] found_candidate = False for line in lines: if len(line.split()) <= 5 and ("references" in line or "bibliography" in line): found_candidate = True break if found_candidate: # Verify if this page actually contains citations # This filters out TOCs or other non-reference sections cits = _get_grobid_boundaries(pdf_path, [page_num]) if cits: start_page = page_num break if start_page is not None: # Initial guess is JUST the start page. # The iterative GROBID pass in extract_citations_auto will expand this. end_page = start_page + 1 ref_pages = [start_page] # Extract text for visibility (just the first page for now) ref_text = doc[start_page].get_text("text") + "\n" else: ref_pages = [] doc.close() return ref_pages, start_page, end_page, ref_text def process_pdf_initial(pdf_file, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text): """Initial PDF processing - find references and show PDF immediately.""" # Clean up old temp files whenever a new PDF is uploaded cleanup_old_temp_files(max_age_hours=1) if pdf_file is None: return (None, "No PDF uploaded", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(interactive=False, visible=False), gr.update(interactive=False, visible=False), None, [], [], [], None, "", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), False, gr.update(visible=False), None, # reset state_ref_pdf_path "", # reset state_pdf_name gr.update(visible=False), # reset export_btn gr.update(visible=False)) # reset download_file new_pdf_path = pdf_file.name new_citations = [] new_removed_citations = [] # Find reference pages new_ref_pages, start_page, end_page, new_ref_text = find_reference_pages(new_pdf_path) new_appendix_header = None # Initialize empty logic for iterative detection # Initial status log status = f"✓ Loaded PDF: {os.path.basename(new_pdf_path)}\n" if new_ref_pages: status += f"\n✓ Identified reference section start: page {start_page + 1}" else: status += "\n⚠ No reference section found" status += "\n⏳ Starting automatic extraction... Please wait." basename = os.path.basename(new_pdf_path) # Return immediately - show PDF right away, extraction starts automatically via event chain return (new_pdf_path, status, gr.update(value=new_pdf_path, visible=True), gr.update(visible=True, value="Show Full PDF"), gr.update(visible=False), # Citations display gr.update(interactive=False, visible=False), # Verify Button gr.update(interactive=False, visible=False), # Slider new_pdf_path, new_ref_pages, new_citations, new_removed_citations, new_appendix_header, new_ref_text, gr.update(visible=False), # citations_header gr.update(visible=False), # verification_header gr.update(visible=False), # verification_divider gr.update(visible=False), # api_key_input False, # state_extraction_done gr.update(visible=False, value=""), # corrected_display cleared completely None, # reset state_ref_pdf_path basename, # state_pdf_name gr.update(visible=False), # export_btn gr.update(visible=False, value=None)) # download_file def _get_grobid_boundaries(pdf_path, page_indices): """Helper to get GROBID citation boundaries for specific pages.""" if not page_indices: return [] output_path = None try: doc = fitz.open(pdf_path) temp_grobid = tempfile.NamedTemporaryFile(delete=False, suffix="_grobid.pdf") output_path = temp_grobid.name temp_grobid.close() ref_doc = fitz.open() for page_idx in page_indices: ref_doc.insert_pdf(doc, from_page=page_idx, to_page=page_idx) ref_doc.save(output_path, garbage=4, deflate=True, clean=True, expand=True) ref_doc.close() doc.close() with open(output_path, 'rb') as f: files = {'input': (os.path.basename(output_path), f, 'application/pdf')} data = {'consolidateCitations': '0', 'includeRawCitations': '1'} response = requests.post( 'http://localhost:8070/api/processFulltextDocument', files=files, data=data, timeout=120 ) if response.status_code == 200: return parse_tei_citations(response.text) else: return [] except Exception: return [] finally: if output_path and os.path.exists(output_path): try: os.unlink(output_path) except: pass def extract_citations_auto(view_mode, previous_status, state_pdf_path, state_ref_pages, state_ref_text, state_citations, state_removed_citations, state_appendix_header, state_extraction_done): """Extract citations using triple-pass hybrid pipeline to improve recall.""" # Helper for intermediate updates def gen_update(status_txt, done=False, final_cits=[], final_rem=[], final_pages=None, final_text=None, final_header=None): # Use current state or provided finals cits = final_cits if final_cits is not None else state_citations rem = final_rem if final_rem is not None else state_removed_citations pages = final_pages if final_pages is not None else state_ref_pages text = final_text if final_text is not None else state_ref_text header = final_header if final_header is not None else state_appendix_header loading_update = gr.update(visible=False) if done else gr.update() verify_vis = done slider_vis = done headers_vis = done slider_max = len(cits) if cits else 1 slider_val = min(1, slider_max) # Logic to pre-generate Citation HTML when done citations_html_update = gr.update(visible=headers_vis) if done: display_text = format_citations_display(cits) if rem: display_text += "\n\nREMOVED CITATIONS ({})\n\n".format(len(rem)) display_text += format_citations_display(rem, show_reason=True) citations_html_update = gr.update(value=display_text, visible=headers_vis) else: citations_html_update = gr.update(visible=headers_vis) if done else gr.update() return (status_txt, citations_html_update, # citations_display (Populated when done) gr.update(interactive=verify_vis, visible=verify_vis), # verify_btn gr.update(interactive=slider_vis, maximum=slider_max, value=slider_val, visible=slider_vis), # slider cits, rem, pages, text, header, gr.update(), # pdf_viewer (handled by update_view, we just update state) loading_update, # Loading Indicator gr.update(visible=headers_vis), # citations_header gr.update(visible=headers_vis), # verification_header gr.update(visible=headers_vis), # verification_divider gr.update(visible=headers_vis), # api_key_input done, # state_extraction_done gr.update(visible=headers_vis), # corrected_display gr.update(visible=done), # export_btn gr.update(visible=False, value=None)) # download_file if not state_ref_pages or not state_pdf_path: yield gen_update(previous_status + "\n⚠ No reference pages to process", done=True) return try: start_page_idx = state_ref_pages[0] confirmed_ref_pages = [] per_page_citations = [] yield gen_update(previous_status + f"\n⏳ Scanning pages starting from {start_page_idx + 1}...") doc_temp = fitz.open(state_pdf_path) total_pages = len(doc_temp) doc_temp.close() current_page = start_page_idx while current_page < total_pages: yield gen_update(previous_status + f"\n⏳ Scanning Page {current_page + 1}... Citations will be displayed once finished.") page_cits = _get_grobid_boundaries(state_pdf_path, [current_page]) valid_count = 0 for c in page_cits: if c.get('title') or c.get('authors') or c.get('year'): valid_count += 1 if valid_count == 0: break else: confirmed_ref_pages.append(current_page) per_page_citations.append(page_cits) current_page += 1 if not confirmed_ref_pages: yield gen_update(previous_status + "\n⚠ No valid citations extracted from start page.", done=True) return yield gen_update(previous_status + f"\n✓ Range confirmed: {confirmed_ref_pages[0]+1}-{confirmed_ref_pages[-1]+1}. Merging...", final_pages=confirmed_ref_pages) # Update status log with the confirmed range status_update = f"\n✓ Confirmed Reference Range: Pages {confirmed_ref_pages[0]+1}-{confirmed_ref_pages[-1]+1} ({len(confirmed_ref_pages)} pages)" previous_status += status_update state_ref_pages = confirmed_ref_pages # Re-extract text for the full confirmed range updated_ref_text = "" doc_temp = fitz.open(state_pdf_path) for p_idx in state_ref_pages: updated_ref_text += doc_temp[p_idx].get_text("text") + "\n" # --- DYNAMIC HEADER DETECTION --- last_page_text = doc_temp[state_ref_pages[-1]].get_text("text") lines = [l.strip() for l in last_page_text.splitlines() if l.strip()] appendix_keywords = ["appendix", "appendices", "supplement", "limitation", "checklist", "statement"] last_page_citations = per_page_citations[-1] citation_start_line_indices = [] for cit in last_page_citations: cit_text = cit.get('raw_text', '').strip() if not cit_text: continue cit_prefix = cit_text[:30].strip().lower() for k, line in enumerate(lines): if cit_prefix in line.lower(): citation_start_line_indices.append(k) break header_candidates = [] for i, line in enumerate(lines): line_lower = line.lower() if len(line.split()) <= 5: is_match = False if any(k in line_lower for k in appendix_keywords): is_match = True elif re.match(r'^A[\.\:]?$', line.split()[0] if line.split() else ""): is_match = True if is_match: candidate = line curr_idx = i + 1 while len(candidate) < 5 and curr_idx < len(lines): candidate += " " + lines[curr_idx] curr_idx += 1 has_citations_after = any(start_idx > i for start_idx in citation_start_line_indices) if not has_citations_after: header_candidates.append(candidate) if header_candidates: found_header = header_candidates[0] state_appendix_header = found_header else: state_appendix_header = None doc_temp.close() state_ref_text = updated_ref_text # 2. Get Consolidated List (LIST C) yield gen_update(previous_status + "\n⏳ Sending full context to GROBID...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header) grobid_citations_a = _get_grobid_boundaries(state_pdf_path, confirmed_ref_pages) # 3. Span Detection & Merging import difflib list_i_pages = per_page_citations list_c = grobid_citations_a def get_text(cit): return cit.get('raw_text', '').strip() refined_list_i = [] actions = {} for p_idx in range(len(list_i_pages)): current_page = list_i_pages[p_idx] if not current_page: continue cit_x = current_page[-1] cit_x_text = get_text(cit_x) cit_y = None cit_y_text = "" cit_z = None cit_z_text = "" if p_idx + 1 < len(list_i_pages) and list_i_pages[p_idx+1]: cit_y = list_i_pages[p_idx+1][0] cit_y_text = get_text(cit_y) if len(list_i_pages[p_idx+1]) > 1: cit_z = list_i_pages[p_idx+1][1] cit_z_text = get_text(cit_z) matches = [] for c_item in list_c: c_text = get_text(c_item) if cit_x_text in c_text: matches.append(c_item) best_action = None for cit_match in matches: match_text = get_text(cit_match) if cit_z and cit_z_text in match_text: continue if cit_y and cit_y_text in match_text: continue if len(match_text) > len(cit_x_text): best_action = {'type': 'extension', 'target': cit_match} break if best_action: actions[id(cit_x)] = best_action flat_list_i = [] skip_ids = set() for p_list in list_i_pages: for cit in p_list: if id(cit) in skip_ids: continue if id(cit) in actions: act = actions[id(cit)] if act['type'] == 'extension': flat_list_i.append(act['target']) else: flat_list_i.append(cit) texts_i = [get_text(c) for c in flat_list_i] texts_c = [get_text(c) for c in list_c] matcher = difflib.SequenceMatcher(None, texts_i, texts_c) final_merged_list = [] for tag, i1, i2, j1, j2 in matcher.get_opcodes(): if tag == 'equal': final_merged_list.extend(flat_list_i[i1:i2]) elif tag == 'delete': final_merged_list.extend(flat_list_i[i1:i2]) elif tag == 'insert': final_merged_list.extend(list_c[j1:j2]) elif tag == 'replace': final_merged_list.extend(flat_list_i[i1:i2]) grobid_citations = final_merged_list merged_citations = [] for cit in grobid_citations: raw_text = cit.get('raw_text', '').strip() has_url = extractor.has_urls(raw_text) or re.search(r'arxiv\.org|doi\.org|\bdoi:|\burl\b', raw_text, re.IGNORECASE) is_url_only = has_url and len(raw_text.split()) <= 6 if merged_citations and is_url_only: prev_cit = merged_citations[-1] prev_cit['raw_text'] = (prev_cit.get('raw_text', '') + " " + raw_text).strip() else: merged_citations.append(cit) grobid_citations = merged_citations yield gen_update(previous_status + f"\n⏳ Parsing metadata for {len(grobid_citations)} citations...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header) # Stage 2: Extract title and authors parsed_citations = [] for idx, cit in enumerate(grobid_citations): # Frequent yields during heavy parsing loop (every 5) if idx % 5 == 0: yield gen_update(previous_status + f"\n⏳ Parsing citation {idx+1}/{len(grobid_citations)}...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header) raw_text = cit.get('raw_text', '') grobid_xml = cit.get('grobid_xml', '') if idx == len(grobid_citations) - 1 and state_appendix_header: clean_header = state_appendix_header.strip()[:10].strip().lower() clean_header = re.sub(r'\s+', ' ', clean_header) raw_lower = re.sub(r'\s+', ' ', raw_text.lower()) cutoff_index = raw_lower.find(clean_header) if cutoff_index > 0: cleaned_raw_reference = raw_text[:cutoff_index].strip() cleaned_raw_reference = re.sub(r'(\.\s*See\s*|\s*See\s*|\.\s*)$', '', cleaned_raw_reference, flags=re.IGNORECASE).strip() raw_text = cleaned_raw_reference try: response = requests.post( 'http://localhost:8070/api/processCitation', data={'citations': cleaned_raw_reference, 'includeRawCitations': '1'}, timeout=30 ) if response.status_code == 200: grobid_xml = response.text raw_text = cleaned_raw_reference except Exception: pass parsed_fields = extract_title_and_authors_from_xml(grobid_xml) title = parsed_fields.get('title', '') authors = parsed_fields.get('authors', []) raw_text = raw_text.replace("- ", "") title = title.replace("- ", "") if title and len(title) > 5: clean_title_prefix = re.sub(r'\W+', '', title.lower()[:40]) if clean_title_prefix: pattern_parts = [re.escape(c) + r'[\W]*' for c in clean_title_prefix] fuzzy_pattern = r''.join(pattern_parts) raw_lower = raw_text.lower() t_match = re.search(fuzzy_pattern, raw_lower) if t_match: match_start = t_match.start() prev_dot = raw_text.rfind('.', 0, match_start) prev_q = raw_text.rfind('?', 0, match_start) prev_ex = raw_text.rfind('!', 0, match_start) prev_comma = raw_text.rfind(',', 0, match_start) boundary_idx = max(prev_dot, prev_q, prev_ex, prev_comma) start_idx = boundary_idx + 1 if boundary_idx != -1 else 0 missed_prefix = raw_text[start_idx:match_start].strip() if missed_prefix: title = f"{missed_prefix} {title}".strip() title = clean_metadata(title) refined_authors = refine_author_string(raw_text, authors, title) refined_authors = clean_metadata(refined_authors) if title and len(title) > 8: if title in refined_authors: refined_authors = refined_authors.split(title)[0].strip() refined_authors = refined_authors.strip(".,;: -()") citation = { 'raw_text': raw_text, 'title': title, 'authors': refined_authors, 'year': cit.get('year', ''), 'venue': cit.get('venue', '') } parsed_citations.append(citation) final_citations = [] final_removed_citations = [] for cit in parsed_citations: title = cit.get('title', '').strip() rejection_reason = None raw_text_clean = cit.get('raw_text', '').strip() alpha_chars = sum(c.isalnum() for c in raw_text_clean) alpha_density = alpha_chars / len(raw_text_clean) if raw_text_clean else 0 if title.lower().startswith("fig.") or title.lower().startswith("figure"): rejection_reason = "Figure caption detected" elif not title and not cit.get('authors') and not cit.get('year'): rejection_reason = "Missing title, authors, and year" elif raw_text_clean.lower() in ["references", "bibliography", "works cited"]: rejection_reason = "Section header detected" elif len(raw_text_clean) > 5 and alpha_density < 0.3: rejection_reason = "Likely noise or artifact (low text density)" if rejection_reason: cit['rejection_reason'] = rejection_reason final_removed_citations.append(cit) continue is_dup = False for existing in final_citations: existing_text = existing.get('raw_text', '').strip() if jellyfish.jaro_winkler_similarity(raw_text_clean, existing_text) >= 0.95: is_dup = True break if not is_dup: final_citations.append(cit) else: cit['rejection_reason'] = "Duplicate (95%+ similarity)" final_removed_citations.append(cit) status = previous_status + f"\n✓ Hybrid extraction: {len(final_citations)} citations (+{len(final_removed_citations)} filtered)" # FINAL YIELD yield gen_update(status, done=True, final_cits=final_citations, final_rem=final_removed_citations, final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header) except Exception as e: # Error Update yield gen_update(previous_status + f"\n❌ Error: {str(e)}", done=True, final_cits=[], final_rem=[]) def run_citation_check(num_to_check, previous_status, api_key, state_citations): """Run citation check with per-user state.""" if not state_citations: # Match the multi-output signature: [status_text, corrected_display, state_citations] yield (previous_status + "\n⚠ No citations to verify.", gr.update(), state_citations) return # 1. Identify Author Pattern from the top 10 citations sample_author_strings = [cit.get('authors', '') for cit in state_citations[:10] if cit.get('authors') and isinstance(cit.get('authors'), str)] name_order, separator = identify_author_pattern(sample_author_strings) # Identifies pattern, then creates work list import copy to_check = copy.deepcopy(state_citations[:num_to_check]) # Use API key if provided api_key_clean = api_key.strip() if api_key else None # Process updated_citations = list(state_citations) total = len(to_check) # Iterate through the generator to process citations for i, verified_cit in enumerate(check_citations_semantic_scholar(to_check, api_key=api_key_clean, name_order=name_order, separator=separator)): # Update the citation in the list if i < len(updated_citations): updated_citations[i] = verified_cit # Yield status update to show progress # We also yield the updated citations display so "Show Citations" reflects progress status_msg = f"{previous_status}\n⏳ Verifying citation {i+1}/{total}... Results will be displayed once finished." updated_cit_html = format_citations_display(updated_citations) yield (status_msg, gr.update(), updated_cit_html, updated_citations) # Final return with final view final_ver_html = format_verifications_display(updated_citations) final_cit_html = format_citations_display(updated_citations) v_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'verified') a_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'ambiguous') h_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'suspected_hallucination') e_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'api_error') status_msg = f"Verification Complete: ✅ {v_count} | ⚠️ {a_count} | ❌ {h_count} | 🔌 {e_count}" yield (status_msg, final_ver_html, final_cit_html, updated_citations) def format_citations_display(citations, show_reason=False): """Format citations for display as HTML.""" if not citations: return "" import html as html_lib html_output = "
" for i, cit in enumerate(citations, 1): # Display the raw_text directly raw_text = cit.get('raw_text', 'No citation text') safe_raw = html_lib.escape(raw_text) cit_block = f"
" cit_block += f"
[{i}] {safe_raw}" if show_reason and 'rejection_reason' in cit: reason = html_lib.escape(cit['rejection_reason']) cit_block += f" [REASON: {reason}]" cit_block += "
" # Add Extracted Fields indented for visibility - Styled in Gray title = cit.get('title', '') if title: cit_block += "" # Add "After Verification" fields if present (from discovery mapping) title_after = cit.get('title_after_verification', '') authors_after = cit.get('authors_after_verification', '') if title_after or authors_after: cit_block += "
" if title_after: safe_title_after = html_lib.escape(title_after) cit_block += f"
Title: {safe_title_after}
" if authors_after: if isinstance(authors_after, list): auth_str_after = ", ".join(authors_after) else: auth_str_after = str(authors_after) safe_authors_after = html_lib.escape(auth_str_after) cit_block += f"
Authors: {safe_authors_after}
" cit_block += "
" cit_block += "
" html_output += cit_block html_output += "
" return html_output def refine_author_string(raw_text, grobid_authors, title=None): """ Simplified Author Extraction: Starts at index 0 and extracts up until the segment (separated by period or comma) that contains a 4-digit Year or the Title. """ if not raw_text: return "" raw_lower = raw_text.lower() # 1. Identify "Metadata Start" candidates (Year or Title) possible_starts = [] # Candidate A: Year (19xx, 20xx, 21xx) year_match = re.search(r'\b(19|20|21)\d{2}\b', raw_text) if year_match: possible_starts.append(year_match.start()) # Candidate B: Title (fuzzy-matched prefix) if title and len(title) > 5: # Match the first substantial chunk of the title clean_title_prefix = re.sub(r'\W+', '', title.lower()[:20]) if clean_title_prefix: pattern_parts = [re.escape(c) + r'[\W]*' for c in clean_title_prefix] fuzzy_pattern = r''.join(pattern_parts) t_match = re.search(fuzzy_pattern, raw_lower) if t_match: possible_starts.append(t_match.start()) # 2. Determine the earliest metadata point if not possible_starts: # Fallback: keep the full text and let clean_metadata handle it later return raw_text.strip() metadata_begin = min(possible_starts) # 3. Handle the "Discard entire segment containing metadata" rule # We find the nearest period or comma BEFORE the metadata_begin preceding_text = raw_text[:metadata_begin] last_period = preceding_text.rfind('.') last_comma = preceding_text.rfind(',') boundary_idx = max(last_period, last_comma) if boundary_idx != -1: # Extract everything from the beginning up-to-and-including the separator # This excludes the entire segment that contains the year/title segment = raw_text[0:boundary_idx + 1].strip() else: # If no separator found (e.g. metadata is in the first sentence), # cut precisely at the start of the metadata segment = raw_text[0:metadata_begin].strip() # Clean up trailing punctuation (e.g. "Author, Author.") segment = segment.rstrip(".,:; ") return segment def identify_author_pattern(author_strings): """ Analyzes a list of author strings (top 10) to identify the naming pattern. Returns: (name_order, separator) """ if not author_strings: return "first_last", "," # 1. Determine the Divider (Separator) # Rule: Sum total semicolons across all strings. If >= 5, use semicolon. total_semicolons = sum(s.count(";") for s in author_strings) total_commas = sum(s.count(",") for s in author_strings) main_sep = ";" if total_semicolons > (total_commas // 2) else "," # 2. Analyze Name Order (First Last vs Last, First) order = None if main_sep == ";": # If using semicolon, we check if many segments HAVE a comma inside internal_comma_count = 0 total_parts = 0 for s in author_strings: # Replace "and" with our sep for logic test s_clean = re.sub(r'\s+(?:and|&)\s+', '; ', s, flags=re.IGNORECASE) parts = [p.strip() for p in s_clean.split(';') if p.strip()] for p in parts: total_parts += 1 if "," in p: internal_comma_count += 1 if total_parts > 0 and internal_comma_count >= (total_parts * 0.5): order = "last_first" else: order = "first_last" else: # main_sep is "," # Logic: If chunks are mostly single words (after replacing 'and' with comma), it's Last, First single_word_parts = 0 total_parts = 0 for s in author_strings: # Normalize 'and' to comma for the heuristic s_clean = re.sub(r'\s+(?:and|&)\s+', ', ', s, flags=re.IGNORECASE) parts = [p.strip() for p in s_clean.split(",") if p.strip()] for p in parts: total_parts += 1 if len(p.split(" ")) == 1: single_word_parts += 1 if total_parts > 0 and single_word_parts >= (total_parts * 0.7): order = "last_first" else: order = "first_last" if order is None: order = "first_last" # Final fallback if both heuristics fail return order, main_sep def parse_names_by_pattern(author_string, order, separator): """ Robustly parses author string using a global pattern and divider. """ if not author_string: return [] author_string = re.sub(r'\b(et\s*al\.?|etal)\b', '', author_string, flags=re.IGNORECASE) s = re.sub(r'\b(?:and|&)\b', separator, author_string, flags=re.IGNORECASE) sep_esc = re.escape(separator) # This regex collapses multiple separators and any whitespace/separators between them s = re.sub(sep_esc + r'[\s' + sep_esc + r']*' + sep_esc, separator, s) # Remove leading/trailing dividers s = s.strip().strip(separator).strip() # 3. Split by the divider segments = [p.strip() for p in s.split(separator) if p.strip()] # 4. Regroup based on logic raw_names = [] if order == "last_first" and separator == ",": # Comma divider with Last, First order: join every two segments to get a name i = 0 while i < len(segments): p1 = segments[i] if i + 1 < len(segments): p2 = segments[i+1] raw_names.append(f"{p1}, {p2}") i += 2 else: raw_names.append(p1) i += 1 else: # For first_last OR semicolon separator: each segment is treated as a full name raw_names = segments # 5. Final normalization to standardized format (using PDF-specific logic) authors = [] for name in raw_names: norm = normalize_d_author(name) if norm: authors.append(norm) return authors def format_verifications_display(citations): """Format citations with verification status badges.""" if not citations: return "

No citations extracted yet.

" html_parts = ["
"] for i, cit in enumerate(citations, 1): verification = cit.get('verification', {}) import html as html_lib raw_text = cit.get('raw_text', 'No citation text') safe_raw = html_lib.escape(raw_text) html_parts.append(f"
") html_parts.append(f"
[{i}] {safe_raw}
") # Add verification status badge verification = cit.get('verification', {}) status = verification.get('status', 'not_verified') icon = verification.get('icon', '') if status == 'verified': confidence = verification.get('confidence', 0) title_score = verification.get('title_score', 0) author_score = verification.get('author_score', 0) html_parts.append(f"
") html_parts.append(f"{icon} Verified (Confidence: {confidence:.2%})") html_parts.append(f"
Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}") html_parts.append("
") elif status == 'ambiguous': confidence = verification.get('confidence', 0) title_score = verification.get('title_score', 0) author_score = verification.get('author_score', 0) html_parts.append(f"
") html_parts.append(f"{icon} Ambiguous (Confidence: {confidence:.2%})") html_parts.append(f"
Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}") html_parts.append("
") elif status == 'suspected_hallucination': confidence = verification.get('confidence', 0) title_score = verification.get('title_score', 0) author_score = verification.get('author_score', 0) html_parts.append(f"
") html_parts.append(f"{icon} Suspected Hallucination (Confidence: {confidence:.2%})") html_parts.append(f"
Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}") html_parts.append("
") elif status == 'api_error': error_msg = verification.get('error', 'Unknown error') is_no_result = error_msg == "No search results found by API" label = "Verification Note" if is_no_result else "API Error" html_parts.append(f"
") html_parts.append(f"{icon} {label}
") html_parts.append(f"{error_msg}") html_parts.append("
") elif status == 'not_verified' or not verification: html_parts.append(f"
") html_parts.append(f"Not Verified") html_parts.append("
") html_parts.append("
") html_parts.append("
") return ''.join(html_parts) def export_verifications_csv(state_citations, pdf_name): """Export citation verifications to a CSV file.""" if not state_citations: return None import csv # Use the original PDF name for the CSV filename basename = os.path.splitext(pdf_name)[0] if pdf_name else "verifications" csv_filename = f"{basename}_verifications.csv" # Create a temp directory to hold the specifically named file temp_dir = tempfile.mkdtemp() filepath = os.path.join(temp_dir, csv_filename) try: with open(filepath, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = [ 'ID', 'Status', 'Confidence', 'Title Similarity', 'Author Similarity', 'Raw Citation', 'Title', 'Authors', 'API Title', 'API Authors' ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for i, cit in enumerate(state_citations, 1): verification = cit.get('verification', {}) status = verification.get('status', 'not_verified') confidence = verification.get('confidence', 0) t_score = verification.get('title_score', 0) a_score = verification.get('author_score', 0) semantic_data = verification.get('semantic_data', {}) api_title = semantic_data.get('title', '') if semantic_data else '' api_authors_list = semantic_data.get('authors', []) if semantic_data else [] if api_authors_list: if isinstance(api_authors_list[0], dict): api_authors = ", ".join([a.get('name', '') for a in api_authors_list if a.get('name')]) else: api_authors = ", ".join([str(a) for a in api_authors_list if a]) else: api_authors = "" raw_text = cit.get('raw_text', '') ver_title = cit.get('title_after_verification', '') ver_authors = cit.get('authors_after_verification', '') if isinstance(ver_authors, list): ver_authors = ", ".join(ver_authors) elif not isinstance(ver_authors, str): ver_authors = str(ver_authors) writer.writerow({ 'ID': i, 'Status': status, 'Confidence': f"{confidence:.2%}" if status != 'not_verified' else 'N/A', 'Title Similarity': f"{t_score:.2%}" if status != 'not_verified' else 'N/A', 'Author Similarity': f"{a_score:.2%}" if status != 'not_verified' else 'N/A', 'Raw Citation': raw_text, 'Title': ver_title, 'Authors': ver_authors, 'API Title': api_title, 'API Authors': api_authors }) return filepath except Exception: return None def update_view(view_mode, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path): """Update the view based on selected mode. Controls GROUP visibility.""" # OUTPUTS: # 1. view_full_pdf (Group) # 2. view_ref_pages (Group) # 3. view_citations (Group) # 4. view_verifications (Group) # 5. pdf_viewer_ref (PDF Component - Update content if Ref Pages) # 6. citations_display (HTML - Update content if Citations) # 7. corrected_display (HTML - Update content if Verifications) # 8. loading_indicator (Markdown) # 9. state_ref_pdf_path (str) -- New Cache! vis_full = gr.update(visible=False) vis_ref = gr.update(visible=False) vis_cit = gr.update(visible=False) vis_ver = gr.update(visible=False) upd_ref_pdf = gr.update() upd_cit_disp = gr.update() upd_ver_disp = gr.update() upd_load = gr.update(visible=False) # Default hidden if not state_extraction_done and view_mode != "Show Full PDF": # Extraction in progress -> Show Loading (unless Full PDF) upd_load = gr.update(visible=True) # And keep all views hidden? return (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path) if view_mode == "Show Full PDF": vis_full = gr.update(visible=True) # pdf_viewer_full should already have content from process_pdf_initial yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path) elif view_mode == "Show Reference Pages": vis_ref = gr.update(visible=True) # Check cache first if state_ref_pdf_path and os.path.exists(state_ref_pdf_path): # Return path upd_ref_pdf = gr.update(value=state_ref_pdf_path) else: # Generate the Subset PDF if needed. if state_ref_pages and state_pdf_path: doc = fitz.open(state_pdf_path) new_doc = fitz.open() new_doc.insert_pdf(doc, from_page=state_ref_pages[0], to_page=state_ref_pages[-1]) temp_preview = tempfile.NamedTemporaryFile(delete=False, suffix="_ref_subset.pdf") output_path = temp_preview.name temp_preview.close() new_doc.save(output_path, garbage=4, deflate=True, clean=True, expand=True) new_doc.close() doc.close() state_ref_pdf_path = output_path # Return path upd_ref_pdf = gr.update(value=output_path) yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path) elif view_mode == "Show Citations": vis_cit = gr.update(visible=True) # Content is pre-filled by extract_citations_auto yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path) elif view_mode == "Show Verifications": vis_ver = gr.update(visible=True) # Always render the list. Unverified items will show "Not Verified". formatted_ver = format_verifications_display(state_citations) upd_ver_disp = gr.update(value=formatted_ver) # Content is pre-filled by run_citation_check yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path) # Build the UI with gr.Blocks(title="CiteAudit", css=""" /* Container Styles */ #pdf-viewer-full, #pdf-viewer-ref { height: 700px; width: 100%; } #view-citations, #view-verifications { border: none !important; box-shadow: none !important; background-color: transparent !important; } #citations-list, #view-verifications .gr-html { background-color: transparent !important; } #main-display-area { min-height: 700px; border-radius: 8px; background-color: var(--background-fill-primary); } /* Citation List */ .citations-container { font-family: sans-serif; font-size: 14px; line-height: 1.5; color: var(--body-text-color); max-height: 600px; overflow-y: auto; padding: 12px; border: 1px solid var(--border-color-primary); border-radius: 4px; background-color: var(--background-fill-secondary); } .citation-item { margin-bottom: 16px; padding-bottom: 8px; border-bottom: 1px solid var(--border-color-primary); } .rejection-reason { color: #ef5350; /* Red 400 */ font-weight: bold; margin-left: 8px; } .dark .rejection-reason { color: #ef9a9a; /* Red 200 */ } .citation-metadata { color: var(--body-text-color-subdued); margin-left: 24px; font-size: 0.95em; margin-top: 4px; } /* Verification Styles */ .ver-verified { color: #1b5e20; /* Green 900 */ margin-left: 24px; font-size: 0.95em; margin-top: 6px; padding: 4px; background-color: #e8f5e9; /* Green 50 */ border-left: 3px solid #4caf50; /* Green 500 */ } .dark .ver-verified { color: #a5d6a7; /* Green 200 */ background-color: rgba(27, 94, 32, 0.4); /* Dark Green alpha */ border-left-color: #66bb6a; /* Green 400 */ } /* Status Badges in format_verifications_display */ .ver-badge-container { font-family: monospace; font-size: 14px; background-color: var(--background-fill-secondary); padding: 15px; border-radius: 5px; color: var(--body-text-color); } .ver-item { margin-bottom: 20px; padding: 10px; border: 1px solid var(--border-color-primary); border-radius: 5px; } .ver-status-verified { margin-top: 8px; padding: 6px; background-color: #e8f5e9; border-left: 3px solid #4caf50; color: #1b5e20; /* Darker Text */ } .dark .ver-status-verified { background-color: rgba(27, 94, 32, 0.4); border-left-color: #66bb6a; color: #e8f5e9; /* Light Text */ } .ver-status-verified strong, .ver-verified strong { color: inherit; } .ver-status-ambiguous { margin-top: 8px; padding: 6px; background-color: #fff3e0; border-left: 3px solid #ff9800; color: #e65100; } .dark .ver-status-ambiguous { background-color: rgba(230, 81, 0, 0.3); border-left-color: #ffb74d; color: #ffe0b2; } .ver-status-hallucination { margin-top: 8px; padding: 6px; background-color: #ffebee; border-left: 3px solid #f44336; color: #c62828; } .dark .ver-status-hallucination { background-color: rgba(183, 28, 28, 0.3); border-left-color: #e57373; color: #ffcdd2; } .ver-status-error { margin-top: 8px; padding: 6px; background-color: #fafafa; border-left: 3px solid #9e9e9e; color: #424242; } .dark .ver-status-error { background-color: rgba(66, 66, 66, 0.4); border-left-color: #bdbdbd; color: #e0e0e0; } .ver-status-unverified { margin-top: 8px; padding: 6px; background-color: #f5f5f5; border-left: 3px solid #bdbdbd; color: #757575; } .dark .ver-status-unverified { background-color: rgba(97, 97, 97, 0.3); border-left-color: #9e9e9e; color: #bdbdbd; } """) as demo: # Per-user session state state_pdf_path = gr.State(None) state_ref_pages = gr.State([]) state_citations = gr.State([]) state_removed_citations = gr.State([]) state_appendix_header = gr.State(None) state_ref_text = gr.State("") state_extraction_done = gr.State(False) state_ref_pdf_path = gr.State(None) # Cache for Reference Pages PDF state_pdf_name = gr.State("") # Original PDF filename gr.Markdown("# CiteAudit") with gr.Row(): with gr.Column(scale=1): file_input = gr.File(label="Upload PDF", file_types=[".pdf"]) status_text = gr.Textbox(label="Status", interactive=False, lines=6) view_toggle = gr.Radio( choices=["Show Full PDF", "Show Reference Pages", "Show Citations", "Show Verifications"], value="Show Full PDF", label="View Mode", interactive=True, visible=False ) verification_divider = gr.Markdown("---", visible=False) verification_header = gr.Markdown("### Citation Verification", visible=False) api_key_input = gr.Textbox( label="Semantic Scholar API Key (Optional)", placeholder="Leave empty for free tier (with rate limits)", type="password", interactive=True, visible=False ) verify_btn = gr.Button("✅ Verify Citations", variant="secondary", interactive=False, visible=False) check_count_slider = gr.Slider( minimum=1, maximum=50, value=1, step=1, label="Number of citations to check", interactive=False, visible=False ) export_btn = gr.Button("📊 Download Verifications (CSV)", visible=False) download_file = gr.File(label="Download CSV", visible=False) gr.Markdown("
* Automated verification may have mistakes and are restricted to returns from Semantic Scholar API. Please check all your citations.") with gr.Column(scale=2, elem_id="main-display-area"): # Loading indicator loading_indicator = gr.Markdown("## ⏳ Extracting content...", visible=False) # 1. Full PDF View with gr.Group(visible=True) as view_full_pdf: # Use gradio_pdf for better compatibility pdf_viewer_full = PDF(label="Full PDF", elem_id="pdf-viewer-full", interactive=False) # 2. Reference Pages View with gr.Group(visible=False) as view_ref_pages: # Use gradio_pdf pdf_viewer_ref = PDF(label="Reference Pages", elem_id="pdf-viewer-ref", interactive=False) # 3. Citations View with gr.Group(visible=False, elem_id="view-citations") as view_citations: citations_header = gr.Markdown("### Extracted Citations") citations_display = gr.HTML(elem_id="citations-list") # 4. Verifications View with gr.Group(visible=False, elem_id="view-verifications") as view_verifications: corrected_display = gr.HTML(label="Corrected Citations") file_input.upload( fn=process_pdf_initial, inputs=[file_input, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text], outputs=[file_input, status_text, pdf_viewer_full, view_toggle, citations_display, verify_btn, check_count_slider, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text, citations_header, verification_header, verification_divider, api_key_input, state_extraction_done, corrected_display, state_ref_pdf_path, state_pdf_name, export_btn, download_file] ).then( fn=extract_citations_auto, inputs=[view_toggle, status_text, state_pdf_path, state_ref_pages, state_ref_text, state_citations, state_removed_citations, state_appendix_header, state_extraction_done], outputs=[status_text, citations_display, verify_btn, check_count_slider, state_citations, state_removed_citations, state_ref_pages, state_ref_text, state_appendix_header, pdf_viewer_ref, loading_indicator, citations_header, verification_header, verification_divider, api_key_input, state_extraction_done, corrected_display, export_btn, download_file], show_progress="hidden" ).then( fn=update_view, inputs=[view_toggle, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path], outputs=[view_full_pdf, view_ref_pages, view_citations, view_verifications, pdf_viewer_ref, citations_display, corrected_display, loading_indicator, state_ref_pdf_path] ) verify_btn.click( fn=lambda status: ( gr.update(value="Show Verifications"), status + "\n⏳ Starting verification process... Please wait.", gr.update(), # Do not wipe previous content with a loading message gr.update(visible=False, value=None), # Reset download button gr.update(visible=False) # Hide export trigger button while processing ), inputs=[status_text], outputs=[view_toggle, status_text, corrected_display, download_file, export_btn] ).then( fn=run_citation_check, inputs=[check_count_slider, status_text, api_key_input, state_citations], outputs=[status_text, corrected_display, citations_display, state_citations], show_progress="hidden" ).then( fn=lambda: gr.update(visible=True), inputs=None, outputs=[export_btn] ) export_btn.click( fn=export_verifications_csv, inputs=[state_citations, state_pdf_name], outputs=[download_file] ).then( fn=lambda: gr.update(visible=True), inputs=None, outputs=[download_file] ) view_toggle.change( fn=update_view, inputs=[view_toggle, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path], outputs=[view_full_pdf, view_ref_pages, view_citations, view_verifications, pdf_viewer_ref, citations_display, corrected_display, loading_indicator, state_ref_pdf_path], concurrency_limit=None, show_progress="hidden" ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860, show_api=False)