CiteAudit / app.py
Wenyu Zhang
add application file
a4d5a4d
import gradio as gr
from gradio_pdf import PDF
import fitz
import os
import tempfile
import json
import requests
import xml.etree.ElementTree as ET
import re
import time
import sys
from collections import OrderedDict
import Levenshtein
import jellyfish
from unidecode import unidecode
from venues import VENUE_NAMES, VENUE_ABBREVIATIONS, COMMON_TERMS
from urlextract import URLExtract
# Semantic Scholar Status Codes
SEMANTIC_SCHOLAR_STATUS_CODES = {
200: "OK: Request successful",
400: "Bad Request: Check parameters",
401: "Unauthorized: Invalid API key",
403: "Forbidden: No permission",
404: "Not Found: Endpoint or resource missing",
429: "Too Many Requests: Rate limited",
500: "Internal Server Error: Server-side issue"
}
# Initialize URL extractor
extractor = URLExtract()
def cleanup_old_temp_files(max_age_hours=1):
"""Clean up old temporary files from /tmp to save disk space.
Safe for multi-user: Only deletes files that match our specific app patterns
and are reliably 'old' (default > 1 hour).
"""
import time
now = time.time()
cutoff = now - (max_age_hours * 3600)
temp_dir = tempfile.gettempdir()
if not os.path.exists(temp_dir):
return
# patterns to look for (created by NamedTemporaryFile in our app)
# We look for files ending with our specific suffixes
target_suffixes = ("_grobid.pdf", "_ref_subset.pdf", "_verifications.csv")
try:
for filename in os.listdir(temp_dir):
if filename.endswith(target_suffixes):
file_path = os.path.join(temp_dir, filename)
try:
# Check age
if os.path.getmtime(file_path) < cutoff:
# Double check it's a file, not a directory
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception:
pass
except Exception as e:
print(f"Error during temp file cleanup: {e}")
def normalize_title_for_comparison(title):
"""Normalize title for similarity comparison: lowercase, remove punctuation."""
if not title:
return ""
# Lowercase and remove all non-alphanumeric/space characters
normalized = re.sub(r'[^a-zA-Z0-9\s]', ' ', title.lower())
# Collapse multiple spaces
return ' '.join(normalized.split())
def normalize_api_author(name):
"""Normalize author name strictly for API-sourced strings.
Handles 'Last, First' vs 'First Last' robustly.
"""
if not name:
return ""
# 1. ASCII normalization
name = unidecode(name)
# 2. Remove "et al" and "etal"
name = re.sub(r'\b(et\s*al\.?|etal)\b', '', name, flags=re.IGNORECASE).strip()
# 3. Detect "Last, First" vs "First Last"
if "," in name:
parts = name.split(",", 1)
surname = parts[0].strip()
given_name = parts[1].strip() if len(parts) > 1 else ""
else:
parts = name.split()
if not parts: return ""
if len(parts) == 1:
surname = parts[0]
given_name = ""
else:
surname = parts[-1]
# Everything before the last word is given name metadata
given_name = " ".join(parts[:-1])
# 4. Clean up the parts and generate initials
surname = re.sub(r'[^a-zA-Z]', '', surname).lower()
# Process given_name for initials
# Replace non-alpha with spaces to separate compact initials like 'J.K.'
given_clean = re.sub(r'[^a-zA-Z]', ' ', given_name).lower()
given_parts = given_clean.split()
initials = [g[0] for g in given_parts if g]
initials_str = " ".join(initials)
result = f"{surname} {initials_str}".strip()
return result
def normalize_d_author(name):
"""Normalize author name for PDF-sourced strings (simpler logic).
Takes last word as surname + first initial of first word.
"""
if not name:
return ""
# 1. ASCII normalization & strip
n = unidecode(name).strip()
# 2. Check for "Last, First" comma (from parse_names_by_pattern regrouping)
if "," in n:
parts = n.split(",", 1)
surname = re.sub(r'[^a-zA-Z\s]', '', parts[0]).strip().lower()
if len(parts) > 1:
# Split the part after comma into words (First Middle)
given_raw = parts[1].strip()
# Replace non-alpha with spaces to separate compact initials like 'J.K.'
given_clean = re.sub(r'[^a-zA-Z]', ' ', given_raw)
given_parts = given_clean.split()
# Abbreviate each word
initials = [g[0].lower() for g in given_parts if g]
initials_str = " ".join(initials)
else:
initials_str = ""
else:
# 3. Fallback: Last word is surname (First Middle Last format)
# Replace non-alpha with spaces to separate compact initials like 'J.K.'
n_clean = re.sub(r'[^a-zA-Z]', ' ', n)
parts = n_clean.split()
if not parts:
return ""
if len(parts) == 1:
surname = parts[0].lower()
initials_str = ""
else:
surname = parts[-1].lower()
# All words before the last one are treated as First/Middle names
# We take the first letter of each to form initials
initials = [p[0].lower() for p in parts[:-1] if p]
initials_str = " ".join(initials)
result = f"{surname} {initials_str}".strip()
return result
def calculate_title_similarity(d_title, api_title):
"""Calculate the similarity between two titles."""
norm_raw = normalize_title_for_comparison(d_title)
norm_api = normalize_title_for_comparison(api_title)
if not norm_raw or not norm_api:
return 0.0
return Levenshtein.ratio(norm_raw, norm_api)
def calculate_citation_recall(candidate_title, raw_citation):
"""
Calculate recall: roughly, how much of the candidate title is present in the raw citation?
We use fuzz matching to find the best substring in raw_citation that matches candidate_title.
Recall = (Length of Matched Substring) / (Length of Candidate Title)
Note: Ideally this should be close to 1.0 if the title is fully present.
"""
if not candidate_title or not raw_citation:
return 0.0
norm_cand = normalize_title_for_comparison(candidate_title)
norm_raw = normalize_title_for_comparison(raw_citation)
if not norm_cand or not norm_raw:
return 0.0
# Standard fuzzy substring search logic (similar to calculate_title_similarity but focus on length coverage)
cand_len = len(norm_cand)
max_score = 0.0
# We want to know if norm_cand exists in norm_raw.
# We search windows of approx size of cand in raw
for i in range(len(norm_raw)):
# Check window sizes +/- 10%
margin = max(3, int(cand_len * 0.1))
for window_size in range(cand_len - margin, cand_len + margin):
if window_size <= 0: continue
if i + window_size > len(norm_raw): break
substring = norm_raw[i : i + window_size]
# Use Levenshtein.ratio -> gives 2*matches / (len1 + len2)
# We want to approximate recall: (matches / len_cand)
# ratio * (len1 + len2) = 2 * matches
# matches = ratio * (len1 + len2) / 2
# Recall = matches / len_cand
ratio = Levenshtein.ratio(substring, norm_cand)
estimated_matches = ratio * (len(substring) + len(norm_cand)) / 2
recall = estimated_matches / len(norm_cand)
if recall > max_score:
max_score = recall
if max_score > 0.95: return 1.0 # Early exit
return min(max_score, 1.0)
def calculate_author_similarity(authors1, authors2):
"""Calculate Jaro-Winkler similarity for author lists (0-1).
z
Args:
authors1: List of author names from original citation (PDF)
authors2: List of author dicts from Semantic Scholar [{'name': ...}, ...] (API)
Returns:
Refined Jaro-Winkler score (0-1)
"""
norm1 = authors1
norm2 = authors2
if not norm1 or not norm2:
return 0.0
# Asymmetric Best-Match: For each PDF author, find the best partner in API list
best_match_scores = []
for n1 in norm1:
max_score = 0.0
best_partner = None
for n2 in norm2:
score = jellyfish.jaro_winkler_similarity(n1, n2)
if score > max_score:
max_score = score
best_partner = n2
best_match_scores.append(max_score)
sys.stdout.flush()
# Average best matches
avg_score = sum(best_match_scores) / len(best_match_scores) if best_match_scores else 0.0
# Hallucination Penalty: If PDF lists more authors than API has returned
# (Allow a small buffer of 1 for minor parsing differences)
if len(norm1) > len(norm2) + 1:
penalty = len(norm2) / len(norm1)
avg_score *= penalty
return avg_score
def discover_metadata_in_raw(raw_text, api_title, api_authors, is_exact_match=False):
"""
Search for the title and author segments in the raw text based on API results.
Returns: (title_after_verification, authors_after_verification) strings or empty.
"""
if not raw_text:
return "", ""
discovered_title = ""
discovered_authors = ""
# We create a normalized string AND a mapping from normalized index to original index
norm_raw = []
norm_to_orig = []
last_was_space = True # Start true to ignore leading non-alnum
for i, char in enumerate(raw_text):
if char.isalnum():
norm_raw.append(char.lower())
norm_to_orig.append(i)
last_was_space = False
else:
if not last_was_space:
norm_raw.append(' ')
norm_to_orig.append(i)
last_was_space = True
norm_raw_str = "".join(norm_raw)
# 1. Discover Title Segment
if is_exact_match:
discovered_title = api_title
elif api_title:
# Also clean API title with spaces
api_dirty = api_title.lower()
norm_api_list = []
last_space = True
for c in api_dirty:
if c.isalnum():
norm_api_list.append(c)
last_space = False
else:
if not last_space:
norm_api_list.append(' ')
last_space = True
norm_api = "".join(norm_api_list).strip()
if norm_api and norm_raw_str:
api_len = len(norm_api)
best_window = None
max_score = 0.0
for i in range(len(norm_raw_str)):
if i + api_len > len(norm_raw_str) + 5: break
for delta in [0, -1, 1, -2, 2, -3, 3]:
window_size = api_len + delta
if window_size <= 0: continue
if i + window_size > len(norm_raw_str): continue
substring = norm_raw_str[i : i + window_size]
score = Levenshtein.ratio(substring, norm_api)
if score > max_score:
max_score = score
best_window = (i, i + window_size)
# Perfect match optimization
if max_score > 0.99: break
if max_score > 0.99: break
# If we found a good match (> 0.75)
if max_score > 0.75 and best_window:
start_norm, end_norm = best_window
if start_norm < len(norm_to_orig) and end_norm <= len(norm_to_orig):
orig_start_idx = norm_to_orig[start_norm]
orig_end_idx = norm_to_orig[end_norm - 1]
raw_slice = raw_text[orig_start_idx : orig_end_idx + 1]
discovered_title = raw_slice.strip()
else:
discovered_title = api_title
else:
discovered_title = api_title
else:
discovered_title = api_title
# 2. Discover Author Segment
# We take everything from the beginning until the start of the title
author_limit_idx = -1
# Strategy A: Use Discovered Title Start
if discovered_title and discovered_title in raw_text:
author_limit_idx = raw_text.find(discovered_title)
# Strategy B: Use Year (Fail-safe)
year_match = re.search(r'\b(19|20|21)\d{2}\b', raw_text)
if year_match:
year_idx = year_match.start()
if author_limit_idx == -1 or year_idx < author_limit_idx:
author_limit_idx = year_idx
if author_limit_idx > 0:
segment = raw_text[:author_limit_idx]
discovered_authors = segment.strip().rstrip(".,:; ")
else:
if api_authors:
api_names = []
if isinstance(api_authors[0], dict):
api_names = [a.get('name', '') for a in api_authors if a.get('name')]
else:
api_names = [str(a) for a in api_authors]
found_indices = []
norm_raw_str_full = raw_text.lower()
for name in api_names:
parts = name.lower().split()
if len(parts) >= 2:
p = re.escape(parts[0]) + r'.*?' + re.escape(parts[-1])
m = re.search(p, norm_raw_str_full)
if m:
found_indices.append(m.end())
if found_indices:
last_author_end = max(found_indices)
discovered_authors = raw_text[:last_author_end].strip().rstrip(".,;:")
return discovered_title, discovered_authors
def classify_verification(title_score, author_score, has_error=False, error_msg=""):
"""Classify verification status based on weighted similarity scores.
Weights: 70% Title, 30% Authors
Returns:
dict with 'status', 'icon', 'title_score', 'author_score', 'confidence', 'error'
"""
if has_error:
return {
'status': 'api_error',
'icon': '✗',
'title_score': 0.0,
'author_score': 0.0,
'confidence': 0.0,
'error': error_msg
}
# Weighted Hybrid Score
confidence = (title_score * 0.70) + (author_score * 0.30)
# Threshold classification
if confidence >= 0.95:
return {
'status': 'verified',
'icon': '✓',
'title_score': title_score,
'author_score': author_score,
'confidence': confidence
}
elif confidence >= 0.75:
return {
'status': 'ambiguous',
'icon': '⚠',
'title_score': title_score,
'author_score': author_score,
'confidence': confidence
}
else:
return {
'status': 'suspected_hallucination',
'icon': '⚠⚠',
'title_score': title_score,
'author_score': author_score,
'confidence': confidence
}
def verify_citation_against_paper(raw_citation, api_paper, extracted_title, name_order="first_last", separator=","):
"""
Verify a citation against a paper using discovery with global pattern awareness.
"""
api_title = api_paper.get('title', '')
api_authors_list = api_paper.get('authors', [])
# Pre-normalize API authors (Ground Truth)
api_authors_norm = []
if api_authors_list:
# SS API returns [{'name': ...}, ...] or just list of names
if isinstance(api_authors_list[0], dict):
api_authors_norm = [normalize_api_author(a.get('name', '')) for a in api_authors_list if a.get('name')]
else:
api_authors_norm = [normalize_api_author(str(a)) for a in api_authors_list if a]
# --- TITLE SELECTION LOGIC ---
best_title_candidate = None
title_source = ""
is_exact_match = False
if extracted_title and api_title:
norm_extracted = normalize_title_for_comparison(extracted_title)
norm_api = normalize_title_for_comparison(api_title)
if norm_extracted == norm_api and len(norm_extracted) > 10:
is_exact_match = True
best_title_candidate = extracted_title
title_source = "exact_match"
if not is_exact_match:
# Compare extracted_title vs api_title based on RECALL of raw_citation
recall_extracted = calculate_citation_recall(extracted_title, raw_citation) if extracted_title else 0.0
recall_api = calculate_citation_recall(api_title, raw_citation)
# Tie-breaker: If recall is the same, pick the one with fewer words
if abs(recall_extracted - recall_api) < 1e-7:
# Tie case
words_ext = len(extracted_title.split()) if extracted_title else 999
words_api = len(api_title.split()) if api_title else 999
if words_ext < words_api:
best_title_candidate = extracted_title
title_source = "extracted (tie-breaker shorter)"
else:
best_title_candidate = api_title
title_source = "api (tie-breaker shorter)"
elif recall_extracted > (recall_api + 0.1):
best_title_candidate = extracted_title
title_source = "cleaned/extracted"
else:
best_title_candidate = api_title
title_source = "api"
# 1. Discovery Step
d_title, d_authors = discover_metadata_in_raw(raw_citation, best_title_candidate, api_authors_list, is_exact_match=is_exact_match)
# 2. Scoring Step: Compare the DISCOVERED title against the API title (Ground Truth)
if d_title:
t_score = calculate_title_similarity(d_title, api_title)
else:
# Fallback if discovery failed
# If discovery failed, score is 0 as we couldn't find the title segment
t_score = 0.0
# 3. Author Scoring Step
if d_authors:
# Detect "et al" in original segments (case-insensitive)
has_etal = re.search(r'\bet\s*al\b', d_authors, re.IGNORECASE)
# Use the global pattern and separator for surgery parsing
parsed_d_authors = parse_names_by_pattern(d_authors, name_order, separator)
score_forward = calculate_author_similarity(parsed_d_authors, api_authors_norm)
if has_etal:
a_score = score_forward
else:
score_backward = calculate_author_similarity(api_authors_norm, parsed_d_authors)
a_score = (0.5 * score_forward) + (0.5 * score_backward)
sys.stdout.flush()
else:
# If discovery failed to find an author segment, score is 0.0
a_score = 0.0
check_data = classify_verification(t_score, a_score)
check_data['semantic_data'] = api_paper
check_data['title_source'] = title_source
# Enhance check_data with discovery info
check_data['discovery'] = (d_title, d_authors)
return check_data, (d_title, d_authors)
def check_citations_semantic_scholar(citations_to_check, api_key=None, name_order="first_last", separator=","):
"""Check citations using Semantic Scholar API as a generator.
Args:
citations_to_check: List of citations to verify
api_key: Optional Semantic Scholar API key for higher rate limits
Yields:
Verified citation dictionary for each input citation
"""
for i, cit in enumerate(citations_to_check):
raw_text = cit.get('raw_text', '').strip()
title = cit.get('title', '').strip()
# Use the original PDF strings for verification
raw_citation = cit.get('raw_text', '').strip()
cleaned_title = title
# OPTIMIZATION: correct skipping of already verified citations
# If the citation is already verified/checked (has a determined status), skip it.
# relevant statuses: 'verified', 'ambiguous', 'suspected_hallucination', 'api_error'
# We might want to retry 'api_error', but definitely skip the others.
existing_status = cit.get('verification', {}).get('status')
if existing_status in ['verified', 'ambiguous', 'suspected_hallucination']:
yield cit
continue
try:
check_data = {'status': 'not_found', 'semantic_data': None}
found_stage1 = False
response = None
def make_request(url, p, h):
max_retries = 3
retry_cnt = 0
while retry_cnt <= max_retries:
try:
resp = requests.get(url, params=p, headers=h, timeout=10)
if resp.status_code == 429:
if retry_cnt < max_retries:
w_time = 2 ** retry_cnt
time.sleep(w_time)
retry_cnt += 1
else:
return resp
else:
return resp
except requests.exceptions.Timeout:
retry_cnt += 1
except Exception as e:
return None
return None
headers = {}
if api_key:
headers['x-api-key'] = api_key
if cleaned_title:
# --- STAGE 1: Direct Match (/match) by Title ---
match_url = "https://api.semanticscholar.org/graph/v1/paper/search/match"
params = {
'query': cleaned_title,
'fields': 'title,authors,year,venue'
}
response = make_request(match_url, params, headers)
if response is not None:
status_desc = SEMANTIC_SCHOLAR_STATUS_CODES.get(response.status_code, f"Unknown ({response.status_code})")
if response.status_code == 200:
resp_json = response.json()
if resp_json.get('data') and len(resp_json['data']) > 0:
paper = resp_json['data'][0]
if paper and paper.get('paperId'):
found_stage1 = True
# --- UNIFIED VERIFICATION LOGIC ---
check_data, discovery = verify_citation_against_paper(
raw_citation,
paper,
cleaned_title, # extracted_title
name_order=name_order,
separator=separator
)
d_title, d_authors = discovery
# Store discovery results
cit['title_after_verification'] = d_title
cit['authors_after_verification'] = d_authors
elif response.status_code in [400, 401, 403]:
found_stage1 = True
check_data = classify_verification(0, 0, has_error=True, error_msg=status_desc)
else:
found_stage1 = True
check_data = classify_verification(0, 0, has_error=True, error_msg="No Response")
# --- STAGE 2: Fallback Search (/search) if Stage 1 failed ---
if not found_stage1:
if response and response.status_code == 429:
check_data = classify_verification(0, 0, has_error=True, error_msg="Rate Limited (429)")
else:
search_url = "https://api.semanticscholar.org/graph/v1/paper/search"
# We try up to two different search queries to maximize recall
queries_to_try = []
if cleaned_title:
queries_to_try.append(("Title", cleaned_title))
queries_to_try.append(("Raw Citation", raw_citation))
all_candidates = {} # paperId -> paper_data
for q_type, q_string in queries_to_try:
search_params = {
'query': q_string,
'limit': 5,
'fields': 'title,authors,year,venue'
}
s_resp = make_request(search_url, search_params, headers)
if s_resp and s_resp.status_code == 200:
data = s_resp.json().get('data', [])
for paper in data:
pid = paper.get('paperId')
if pid and pid not in all_candidates:
all_candidates[pid] = paper
elif s_resp and s_resp.status_code == 429:
break # Stop trying queries if rate limited
if all_candidates:
results_list = list(all_candidates.values())
# --- STAGE 2 OPTIMIZATION: SELECT BEST API GROUND TRUTH BY RECALL ---
# 1. Find the API paper whose title has the highest recall against raw citation
best_api_paper = None
max_api_recall = -1.0
min_word_count = 999
for paper in results_list:
title = paper.get('title', '')
rec = calculate_citation_recall(title, raw_citation)
word_count = len(title.split()) if title else 999
if rec > max_api_recall:
max_api_recall = rec
min_word_count = word_count
best_api_paper = paper
elif abs(rec - max_api_recall) < 1e-7:
# Tie in recall, check word count
if word_count < min_word_count:
min_word_count = word_count
best_api_paper = paper
if best_api_paper:
# 2. Verify using this Best API Paper
# The helper function will automatically decide whether to use the
# Best API Title OR the Extracted Title as the 'Anchor' for discovery.
check_data, discovery = verify_citation_against_paper(
raw_citation,
best_api_paper,
cleaned_title,
name_order=name_order,
separator=separator
)
# Finalize discovery data on the citation object
cit['title_after_verification'], cit['authors_after_verification'] = discovery
if check_data.get('confidence', 0) < 0.4:
check_data = classify_verification(0, 0, has_error=True, error_msg="Low confidence match")
else:
check_data = classify_verification(0, 0, has_error=True, error_msg="No suitable API candidate found")
else:
check_data = classify_verification(0, 0, has_error=True, error_msg="No search results found by API")
sys.stdout.flush()
cit['verification'] = check_data
yield cit
except Exception as e:
cit['verification'] = classify_verification(0, 0, has_error=True, error_msg=str(e))
yield cit
sys.stdout.flush()
# Rate limiting: wait 1 second between requests to avoid 429 errors (only if no API key)
if not api_key and i < len(citations_to_check) - 1:
time.sleep(1)
def parse_tei_citations(tei_xml):
"""Parse TEI XML and extract citations."""
try:
root = ET.fromstring(tei_xml)
citations = []
ns = {'tei': 'http://www.tei-c.org/ns/1.0'}
for bibl in root.findall('.//tei:listBibl/tei:biblStruct', ns):
citation = {}
# Extract title
title_elem = bibl.find('.//tei:title[@level="a"]', ns)
used_monograph_as_title = False
if title_elem is None:
title_elem = bibl.find('.//tei:title[@level="m"]', ns)
if title_elem is not None:
used_monograph_as_title = True
if title_elem is not None and title_elem.text:
citation['title'] = title_elem.text.strip()
# Extract authors
authors = []
for author in bibl.findall('.//tei:author', ns):
persName = author.find('.//tei:persName', ns)
if persName is not None:
forename = persName.find('.//tei:forename', ns)
surname = persName.find('.//tei:surname', ns)
name_parts = []
if forename is not None and forename.text:
name_parts.append(forename.text.strip())
if surname is not None and surname.text:
name_parts.append(surname.text.strip())
if name_parts:
authors.append(' '.join(name_parts))
if authors:
citation['authors'] = authors
# Extract year
date_elem = bibl.find('.//tei:date[@type="published"]', ns)
if date_elem is not None and date_elem.get('when'):
citation['year'] = date_elem.get('when')
# Extract venue/journal - check multiple possible locations
venue_elem = bibl.find('.//tei:title[@level="j"]', ns) # Journal
if venue_elem is None and not used_monograph_as_title:
venue_elem = bibl.find('.//tei:title[@level="m"]', ns) # Monograph/Book
if venue_elem is None:
venue_elem = bibl.find('.//tei:meeting', ns) # Conference
if venue_elem is not None and venue_elem.text:
citation['venue'] = venue_elem.text.strip()
# Also try to get publisher if no venue found
if 'venue' not in citation:
publisher_elem = bibl.find('.//tei:publisher', ns)
if publisher_elem is not None and publisher_elem.text:
citation['venue'] = publisher_elem.text.strip()
if citation:
# Extract raw_reference text - this becomes the display text
raw_ref_elem = bibl.find('.//tei:note[@type="raw_reference"]', ns)
if raw_ref_elem is not None:
raw_ref_text = "".join(raw_ref_elem.itertext()).strip()
raw_ref_text = re.sub(r'\s+', ' ', raw_ref_text)
citation['raw_text'] = raw_ref_text
else:
# Fallback to biblStruct text if no raw_reference
raw_text = "".join(bibl.itertext()).strip()
raw_text = re.sub(r'\s+', ' ', raw_text)
citation['raw_text'] = raw_text
# Store entire biblStruct XML for parsing
citation['grobid_xml'] = ET.tostring(bibl, encoding='unicode')
citations.append(citation)
return citations
except Exception as e:
return []
def extract_title_and_authors_from_xml(xml_string):
"""Extract title and authors from GROBID biblStruct XML.
Args:
xml_string: XML string of biblStruct element
Returns:
Dictionary with 'title' and 'authors' fields
"""
try:
root = ET.fromstring(xml_string)
ns = {'ns0': 'http://www.tei-c.org/ns/1.0', 'tei': 'http://www.tei-c.org/ns/1.0'}
result = {}
# Extract title - try multiple paths
title_elem = root.find('.//ns0:title[@level="a"][@type="main"]', ns)
if title_elem is None:
title_elem = root.find('.//ns0:title[@level="a"]', ns)
if title_elem is None:
title_elem = root.find('.//ns0:title[@level="m"]', ns)
if title_elem is None:
title_elem = root.find('.//ns0:title', ns)
if title_elem is None:
title_elem = root.find('.//tei:title[@level="a"][@type="main"]', ns)
if title_elem is None:
title_elem = root.find('.//tei:title[@level="a"]', ns)
if title_elem is None:
title_elem = root.find('.//tei:title', ns)
if title_elem is not None and title_elem.text:
result['title'] = title_elem.text.strip()
result['authors'] = []
return result
except Exception as e:
return {}
def clean_metadata(text):
"""Clean title or author string specifically by removing segments that contain known publication venues or URLs.
Splits text by common punctuation (.,:;?!), checks each segment for venue names
(case-insensitive), abbreviations (case-sensitive), or URLs, and removes contaminated segments.
"""
if not text:
return ""
# Pre-cleaning: Remove parentheses symbols but keep the content
text = text.replace('(', '').replace(')', '')
# Define additional DOI/Arxiv extraction terms that might not be caught by URLExtract
extra_patterns = r'arxiv\.org|doi\.org|\bdoi:|\burl\b'
# 1. Protect URLs during splitting using URLExtract
# We find all URL matches and replace them with placeholders
placeholders = []
temp_text = text
# Get all URLs from the text
urls = extractor.find_urls(text, True)
# Sort by length descending to avoid partial replacement issues
for url in sorted(list(set(urls)), key=len, reverse=True):
placeholder = f"__URL_PH_{len(placeholders)}__"
placeholders.append(url)
temp_text = temp_text.replace(url, placeholder)
# Also handle the explicitly requested labels like doi:
def replace_extra(match):
placeholder = f"__URL_PH_{len(placeholders)}__"
placeholders.append(match.group(0))
return placeholder
temp_text = re.sub(extra_patterns, replace_extra, temp_text, flags=re.IGNORECASE)
# 2. Split by punctuation (period, question mark, exclamation mark)
# We split on . ? or ! followed by space or end of string
parts = re.split(r'([.?!]\s|[.?!]$)', temp_text)
# Re-group content and its trailing separator
segments = []
current_segment = ""
for part in parts:
if part and (part.strip() in ['.', '?', '!'] or re.match(r'[.?!]\s', part)):
segments.append(current_segment + part)
current_segment = ""
else:
current_segment += part
if current_segment:
segments.append(current_segment)
final_segments = []
for seg in segments:
# Check if this segment contains a URL placeholder
if "__URL_PH_" in seg:
# Entire segment contains a URL, TRUNCATE HERE
break
# Restore placeholders just for this segment to check for venues
check_seg = seg
for i, val in enumerate(placeholders):
check_seg = check_seg.replace(f"__URL_PH_{i}__", val)
seg_lower = check_seg.lower()
found_contamination = False
# Check for Venues (Case-Insensitive names, Case-Sensitive abbrs)
for venue in VENUE_NAMES:
if venue.lower() in seg_lower:
found_contamination = True
break
if not found_contamination:
for abbr in VENUE_ABBREVIATIONS:
if re.search(r'\b' + re.escape(abbr) + r'\b', check_seg):
found_contamination = True
break
if not found_contamination:
for term in COMMON_TERMS:
if term.lower() in seg_lower:
found_contamination = True
break
if not found_contamination:
# Check for Years (19xx-21xx) - Truncate if found
# User requested to remove segments with years, but NOT all digits
if re.search(r'\b(19|20|21)\d{2}\b', check_seg):
found_contamination = True
if not found_contamination:
# Double check for any missed URLs just in case
if extractor.has_urls(check_seg) or re.search(extra_patterns, check_seg, re.IGNORECASE):
found_contamination = True
if found_contamination:
# TRUNCATE HERE
break
# Reconstruct the segment with URLs restored
restored_seg = seg
for i, val in enumerate(placeholders):
restored_seg = restored_seg.replace(f"__URL_PH_{i}__", val)
final_segments.append(restored_seg)
# Join remaining segments
text = "".join(final_segments).strip()
# Final cleanup
text = re.sub(r'\s+', ' ', text).strip()
text = re.sub(r'\(\s*\)', '', text)
text = re.sub(r'\[\s*\]', '', text)
text = text.strip(".,;: -()[]")
return text
def find_reference_pages(pdf_path):
"""Find reference section pages in the PDF and extract their text."""
doc = fitz.open(pdf_path)
start_page = None
end_page = len(doc)
ref_text = "" # Will store concatenated reference section text
# Find the start page
for page_num, page in enumerate(doc):
text = page.get_text("text")
lines = [l.strip().lower() for l in text.splitlines() if l.strip()]
found_candidate = False
for line in lines:
if len(line.split()) <= 5 and ("references" in line or "bibliography" in line):
found_candidate = True
break
if found_candidate:
# Verify if this page actually contains citations
# This filters out TOCs or other non-reference sections
cits = _get_grobid_boundaries(pdf_path, [page_num])
if cits:
start_page = page_num
break
if start_page is not None:
# Initial guess is JUST the start page.
# The iterative GROBID pass in extract_citations_auto will expand this.
end_page = start_page + 1
ref_pages = [start_page]
# Extract text for visibility (just the first page for now)
ref_text = doc[start_page].get_text("text") + "\n"
else:
ref_pages = []
doc.close()
return ref_pages, start_page, end_page, ref_text
def process_pdf_initial(pdf_file, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text):
"""Initial PDF processing - find references and show PDF immediately."""
# Clean up old temp files whenever a new PDF is uploaded
cleanup_old_temp_files(max_age_hours=1)
if pdf_file is None:
return (None, "No PDF uploaded",
gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False),
gr.update(interactive=False, visible=False),
gr.update(interactive=False, visible=False),
None, [], [], [], None, "",
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False),
False,
gr.update(visible=False),
None, # reset state_ref_pdf_path
"", # reset state_pdf_name
gr.update(visible=False), # reset export_btn
gr.update(visible=False)) # reset download_file
new_pdf_path = pdf_file.name
new_citations = []
new_removed_citations = []
# Find reference pages
new_ref_pages, start_page, end_page, new_ref_text = find_reference_pages(new_pdf_path)
new_appendix_header = None # Initialize empty logic for iterative detection
# Initial status log
status = f"✓ Loaded PDF: {os.path.basename(new_pdf_path)}\n"
if new_ref_pages:
status += f"\n✓ Identified reference section start: page {start_page + 1}"
else:
status += "\n⚠ No reference section found"
status += "\n⏳ Starting automatic extraction... Please wait."
basename = os.path.basename(new_pdf_path)
# Return immediately - show PDF right away, extraction starts automatically via event chain
return (new_pdf_path, status,
gr.update(value=new_pdf_path, visible=True),
gr.update(visible=True, value="Show Full PDF"),
gr.update(visible=False), # Citations display
gr.update(interactive=False, visible=False), # Verify Button
gr.update(interactive=False, visible=False), # Slider
new_pdf_path, new_ref_pages, new_citations, new_removed_citations, new_appendix_header, new_ref_text,
gr.update(visible=False), # citations_header
gr.update(visible=False), # verification_header
gr.update(visible=False), # verification_divider
gr.update(visible=False), # api_key_input
False, # state_extraction_done
gr.update(visible=False, value=""), # corrected_display cleared completely
None, # reset state_ref_pdf_path
basename, # state_pdf_name
gr.update(visible=False), # export_btn
gr.update(visible=False, value=None)) # download_file
def _get_grobid_boundaries(pdf_path, page_indices):
"""Helper to get GROBID citation boundaries for specific pages."""
if not page_indices:
return []
output_path = None
try:
doc = fitz.open(pdf_path)
temp_grobid = tempfile.NamedTemporaryFile(delete=False, suffix="_grobid.pdf")
output_path = temp_grobid.name
temp_grobid.close()
ref_doc = fitz.open()
for page_idx in page_indices:
ref_doc.insert_pdf(doc, from_page=page_idx, to_page=page_idx)
ref_doc.save(output_path, garbage=4, deflate=True, clean=True, expand=True)
ref_doc.close()
doc.close()
with open(output_path, 'rb') as f:
files = {'input': (os.path.basename(output_path), f, 'application/pdf')}
data = {'consolidateCitations': '0', 'includeRawCitations': '1'}
response = requests.post(
'http://localhost:8070/api/processFulltextDocument',
files=files,
data=data,
timeout=120
)
if response.status_code == 200:
return parse_tei_citations(response.text)
else:
return []
except Exception:
return []
finally:
if output_path and os.path.exists(output_path):
try:
os.unlink(output_path)
except:
pass
def extract_citations_auto(view_mode, previous_status, state_pdf_path, state_ref_pages, state_ref_text, state_citations, state_removed_citations, state_appendix_header, state_extraction_done):
"""Extract citations using triple-pass hybrid pipeline to improve recall."""
# Helper for intermediate updates
def gen_update(status_txt, done=False, final_cits=[], final_rem=[], final_pages=None, final_text=None, final_header=None):
# Use current state or provided finals
cits = final_cits if final_cits is not None else state_citations
rem = final_rem if final_rem is not None else state_removed_citations
pages = final_pages if final_pages is not None else state_ref_pages
text = final_text if final_text is not None else state_ref_text
header = final_header if final_header is not None else state_appendix_header
loading_update = gr.update(visible=False) if done else gr.update()
verify_vis = done
slider_vis = done
headers_vis = done
slider_max = len(cits) if cits else 1
slider_val = min(1, slider_max)
# Logic to pre-generate Citation HTML when done
citations_html_update = gr.update(visible=headers_vis)
if done:
display_text = format_citations_display(cits)
if rem:
display_text += "\n\nREMOVED CITATIONS ({})\n\n".format(len(rem))
display_text += format_citations_display(rem, show_reason=True)
citations_html_update = gr.update(value=display_text, visible=headers_vis)
else:
citations_html_update = gr.update(visible=headers_vis) if done else gr.update()
return (status_txt,
citations_html_update, # citations_display (Populated when done)
gr.update(interactive=verify_vis, visible=verify_vis), # verify_btn
gr.update(interactive=slider_vis, maximum=slider_max, value=slider_val, visible=slider_vis), # slider
cits, rem, pages, text, header,
gr.update(), # pdf_viewer (handled by update_view, we just update state)
loading_update, # Loading Indicator
gr.update(visible=headers_vis), # citations_header
gr.update(visible=headers_vis), # verification_header
gr.update(visible=headers_vis), # verification_divider
gr.update(visible=headers_vis), # api_key_input
done, # state_extraction_done
gr.update(visible=headers_vis), # corrected_display
gr.update(visible=done), # export_btn
gr.update(visible=False, value=None)) # download_file
if not state_ref_pages or not state_pdf_path:
yield gen_update(previous_status + "\n⚠ No reference pages to process", done=True)
return
try:
start_page_idx = state_ref_pages[0]
confirmed_ref_pages = []
per_page_citations = []
yield gen_update(previous_status + f"\n⏳ Scanning pages starting from {start_page_idx + 1}...")
doc_temp = fitz.open(state_pdf_path)
total_pages = len(doc_temp)
doc_temp.close()
current_page = start_page_idx
while current_page < total_pages:
yield gen_update(previous_status + f"\n⏳ Scanning Page {current_page + 1}... Citations will be displayed once finished.")
page_cits = _get_grobid_boundaries(state_pdf_path, [current_page])
valid_count = 0
for c in page_cits:
if c.get('title') or c.get('authors') or c.get('year'):
valid_count += 1
if valid_count == 0:
break
else:
confirmed_ref_pages.append(current_page)
per_page_citations.append(page_cits)
current_page += 1
if not confirmed_ref_pages:
yield gen_update(previous_status + "\n⚠ No valid citations extracted from start page.", done=True)
return
yield gen_update(previous_status + f"\n✓ Range confirmed: {confirmed_ref_pages[0]+1}-{confirmed_ref_pages[-1]+1}. Merging...", final_pages=confirmed_ref_pages)
# Update status log with the confirmed range
status_update = f"\n✓ Confirmed Reference Range: Pages {confirmed_ref_pages[0]+1}-{confirmed_ref_pages[-1]+1} ({len(confirmed_ref_pages)} pages)"
previous_status += status_update
state_ref_pages = confirmed_ref_pages
# Re-extract text for the full confirmed range
updated_ref_text = ""
doc_temp = fitz.open(state_pdf_path)
for p_idx in state_ref_pages:
updated_ref_text += doc_temp[p_idx].get_text("text") + "\n"
# --- DYNAMIC HEADER DETECTION ---
last_page_text = doc_temp[state_ref_pages[-1]].get_text("text")
lines = [l.strip() for l in last_page_text.splitlines() if l.strip()]
appendix_keywords = ["appendix", "appendices", "supplement", "limitation", "checklist", "statement"]
last_page_citations = per_page_citations[-1]
citation_start_line_indices = []
for cit in last_page_citations:
cit_text = cit.get('raw_text', '').strip()
if not cit_text: continue
cit_prefix = cit_text[:30].strip().lower()
for k, line in enumerate(lines):
if cit_prefix in line.lower():
citation_start_line_indices.append(k)
break
header_candidates = []
for i, line in enumerate(lines):
line_lower = line.lower()
if len(line.split()) <= 5:
is_match = False
if any(k in line_lower for k in appendix_keywords):
is_match = True
elif re.match(r'^A[\.\:]?$', line.split()[0] if line.split() else ""):
is_match = True
if is_match:
candidate = line
curr_idx = i + 1
while len(candidate) < 5 and curr_idx < len(lines):
candidate += " " + lines[curr_idx]
curr_idx += 1
has_citations_after = any(start_idx > i for start_idx in citation_start_line_indices)
if not has_citations_after:
header_candidates.append(candidate)
if header_candidates:
found_header = header_candidates[0]
state_appendix_header = found_header
else:
state_appendix_header = None
doc_temp.close()
state_ref_text = updated_ref_text
# 2. Get Consolidated List (LIST C)
yield gen_update(previous_status + "\n⏳ Sending full context to GROBID...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header)
grobid_citations_a = _get_grobid_boundaries(state_pdf_path, confirmed_ref_pages)
# 3. Span Detection & Merging
import difflib
list_i_pages = per_page_citations
list_c = grobid_citations_a
def get_text(cit):
return cit.get('raw_text', '').strip()
refined_list_i = []
actions = {}
for p_idx in range(len(list_i_pages)):
current_page = list_i_pages[p_idx]
if not current_page: continue
cit_x = current_page[-1]
cit_x_text = get_text(cit_x)
cit_y = None
cit_y_text = ""
cit_z = None
cit_z_text = ""
if p_idx + 1 < len(list_i_pages) and list_i_pages[p_idx+1]:
cit_y = list_i_pages[p_idx+1][0]
cit_y_text = get_text(cit_y)
if len(list_i_pages[p_idx+1]) > 1:
cit_z = list_i_pages[p_idx+1][1]
cit_z_text = get_text(cit_z)
matches = []
for c_item in list_c:
c_text = get_text(c_item)
if cit_x_text in c_text:
matches.append(c_item)
best_action = None
for cit_match in matches:
match_text = get_text(cit_match)
if cit_z and cit_z_text in match_text: continue
if cit_y and cit_y_text in match_text: continue
if len(match_text) > len(cit_x_text):
best_action = {'type': 'extension', 'target': cit_match}
break
if best_action:
actions[id(cit_x)] = best_action
flat_list_i = []
skip_ids = set()
for p_list in list_i_pages:
for cit in p_list:
if id(cit) in skip_ids: continue
if id(cit) in actions:
act = actions[id(cit)]
if act['type'] == 'extension':
flat_list_i.append(act['target'])
else:
flat_list_i.append(cit)
texts_i = [get_text(c) for c in flat_list_i]
texts_c = [get_text(c) for c in list_c]
matcher = difflib.SequenceMatcher(None, texts_i, texts_c)
final_merged_list = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'equal': final_merged_list.extend(flat_list_i[i1:i2])
elif tag == 'delete': final_merged_list.extend(flat_list_i[i1:i2])
elif tag == 'insert': final_merged_list.extend(list_c[j1:j2])
elif tag == 'replace': final_merged_list.extend(flat_list_i[i1:i2])
grobid_citations = final_merged_list
merged_citations = []
for cit in grobid_citations:
raw_text = cit.get('raw_text', '').strip()
has_url = extractor.has_urls(raw_text) or re.search(r'arxiv\.org|doi\.org|\bdoi:|\burl\b', raw_text, re.IGNORECASE)
is_url_only = has_url and len(raw_text.split()) <= 6
if merged_citations and is_url_only:
prev_cit = merged_citations[-1]
prev_cit['raw_text'] = (prev_cit.get('raw_text', '') + " " + raw_text).strip()
else:
merged_citations.append(cit)
grobid_citations = merged_citations
yield gen_update(previous_status + f"\n⏳ Parsing metadata for {len(grobid_citations)} citations...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header)
# Stage 2: Extract title and authors
parsed_citations = []
for idx, cit in enumerate(grobid_citations):
# Frequent yields during heavy parsing loop (every 5)
if idx % 5 == 0:
yield gen_update(previous_status + f"\n⏳ Parsing citation {idx+1}/{len(grobid_citations)}...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header)
raw_text = cit.get('raw_text', '')
grobid_xml = cit.get('grobid_xml', '')
if idx == len(grobid_citations) - 1 and state_appendix_header:
clean_header = state_appendix_header.strip()[:10].strip().lower()
clean_header = re.sub(r'\s+', ' ', clean_header)
raw_lower = re.sub(r'\s+', ' ', raw_text.lower())
cutoff_index = raw_lower.find(clean_header)
if cutoff_index > 0:
cleaned_raw_reference = raw_text[:cutoff_index].strip()
cleaned_raw_reference = re.sub(r'(\.\s*See\s*|\s*See\s*|\.\s*)$', '', cleaned_raw_reference, flags=re.IGNORECASE).strip()
raw_text = cleaned_raw_reference
try:
response = requests.post(
'http://localhost:8070/api/processCitation',
data={'citations': cleaned_raw_reference, 'includeRawCitations': '1'},
timeout=30
)
if response.status_code == 200:
grobid_xml = response.text
raw_text = cleaned_raw_reference
except Exception:
pass
parsed_fields = extract_title_and_authors_from_xml(grobid_xml)
title = parsed_fields.get('title', '')
authors = parsed_fields.get('authors', [])
raw_text = raw_text.replace("- ", "")
title = title.replace("- ", "")
if title and len(title) > 5:
clean_title_prefix = re.sub(r'\W+', '', title.lower()[:40])
if clean_title_prefix:
pattern_parts = [re.escape(c) + r'[\W]*' for c in clean_title_prefix]
fuzzy_pattern = r''.join(pattern_parts)
raw_lower = raw_text.lower()
t_match = re.search(fuzzy_pattern, raw_lower)
if t_match:
match_start = t_match.start()
prev_dot = raw_text.rfind('.', 0, match_start)
prev_q = raw_text.rfind('?', 0, match_start)
prev_ex = raw_text.rfind('!', 0, match_start)
prev_comma = raw_text.rfind(',', 0, match_start)
boundary_idx = max(prev_dot, prev_q, prev_ex, prev_comma)
start_idx = boundary_idx + 1 if boundary_idx != -1 else 0
missed_prefix = raw_text[start_idx:match_start].strip()
if missed_prefix:
title = f"{missed_prefix} {title}".strip()
title = clean_metadata(title)
refined_authors = refine_author_string(raw_text, authors, title)
refined_authors = clean_metadata(refined_authors)
if title and len(title) > 8:
if title in refined_authors:
refined_authors = refined_authors.split(title)[0].strip()
refined_authors = refined_authors.strip(".,;: -()")
citation = {
'raw_text': raw_text,
'title': title,
'authors': refined_authors,
'year': cit.get('year', ''),
'venue': cit.get('venue', '')
}
parsed_citations.append(citation)
final_citations = []
final_removed_citations = []
for cit in parsed_citations:
title = cit.get('title', '').strip()
rejection_reason = None
raw_text_clean = cit.get('raw_text', '').strip()
alpha_chars = sum(c.isalnum() for c in raw_text_clean)
alpha_density = alpha_chars / len(raw_text_clean) if raw_text_clean else 0
if title.lower().startswith("fig.") or title.lower().startswith("figure"): rejection_reason = "Figure caption detected"
elif not title and not cit.get('authors') and not cit.get('year'): rejection_reason = "Missing title, authors, and year"
elif raw_text_clean.lower() in ["references", "bibliography", "works cited"]: rejection_reason = "Section header detected"
elif len(raw_text_clean) > 5 and alpha_density < 0.3: rejection_reason = "Likely noise or artifact (low text density)"
if rejection_reason:
cit['rejection_reason'] = rejection_reason
final_removed_citations.append(cit)
continue
is_dup = False
for existing in final_citations:
existing_text = existing.get('raw_text', '').strip()
if jellyfish.jaro_winkler_similarity(raw_text_clean, existing_text) >= 0.95:
is_dup = True
break
if not is_dup: final_citations.append(cit)
else:
cit['rejection_reason'] = "Duplicate (95%+ similarity)"
final_removed_citations.append(cit)
status = previous_status + f"\n✓ Hybrid extraction: {len(final_citations)} citations (+{len(final_removed_citations)} filtered)"
# FINAL YIELD
yield gen_update(status, done=True, final_cits=final_citations, final_rem=final_removed_citations, final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header)
except Exception as e:
# Error Update
yield gen_update(previous_status + f"\n❌ Error: {str(e)}", done=True, final_cits=[], final_rem=[])
def run_citation_check(num_to_check, previous_status, api_key, state_citations):
"""Run citation check with per-user state."""
if not state_citations:
# Match the multi-output signature: [status_text, corrected_display, state_citations]
yield (previous_status + "\n⚠ No citations to verify.",
gr.update(), state_citations)
return
# 1. Identify Author Pattern from the top 10 citations
sample_author_strings = [cit.get('authors', '') for cit in state_citations[:10] if cit.get('authors') and isinstance(cit.get('authors'), str)]
name_order, separator = identify_author_pattern(sample_author_strings)
# Identifies pattern, then creates work list
import copy
to_check = copy.deepcopy(state_citations[:num_to_check])
# Use API key if provided
api_key_clean = api_key.strip() if api_key else None
# Process
updated_citations = list(state_citations)
total = len(to_check)
# Iterate through the generator to process citations
for i, verified_cit in enumerate(check_citations_semantic_scholar(to_check, api_key=api_key_clean, name_order=name_order, separator=separator)):
# Update the citation in the list
if i < len(updated_citations):
updated_citations[i] = verified_cit
# Yield status update to show progress
# We also yield the updated citations display so "Show Citations" reflects progress
status_msg = f"{previous_status}\n⏳ Verifying citation {i+1}/{total}... Results will be displayed once finished."
updated_cit_html = format_citations_display(updated_citations)
yield (status_msg, gr.update(), updated_cit_html, updated_citations)
# Final return with final view
final_ver_html = format_verifications_display(updated_citations)
final_cit_html = format_citations_display(updated_citations)
v_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'verified')
a_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'ambiguous')
h_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'suspected_hallucination')
e_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'api_error')
status_msg = f"Verification Complete: ✅ {v_count} | ⚠️ {a_count} | ❌ {h_count} | 🔌 {e_count}"
yield (status_msg, final_ver_html, final_cit_html, updated_citations)
def format_citations_display(citations, show_reason=False):
"""Format citations for display as HTML."""
if not citations:
return ""
import html as html_lib
html_output = "<div class='citations-container'>"
for i, cit in enumerate(citations, 1):
# Display the raw_text directly
raw_text = cit.get('raw_text', 'No citation text')
safe_raw = html_lib.escape(raw_text)
cit_block = f"<div class='citation-item'>"
cit_block += f"<div><strong>[{i}]</strong> {safe_raw}"
if show_reason and 'rejection_reason' in cit:
reason = html_lib.escape(cit['rejection_reason'])
cit_block += f" <span class='rejection-reason'>[REASON: {reason}]</span>"
cit_block += "</div>"
# Add Extracted Fields indented for visibility - Styled in Gray
title = cit.get('title', '')
if title:
cit_block += "<div class='citation-metadata'>"
safe_title = html_lib.escape(title)
cit_block += f"<div style='margin-bottom: 2px;'>Title: {safe_title}</div>"
cit_block += "</div>"
# Add "After Verification" fields if present (from discovery mapping)
title_after = cit.get('title_after_verification', '')
authors_after = cit.get('authors_after_verification', '')
if title_after or authors_after:
cit_block += "<div class='ver-verified'>"
if title_after:
safe_title_after = html_lib.escape(title_after)
cit_block += f"<div style='margin-bottom: 2px;'><strong>Title:</strong> {safe_title_after}</div>"
if authors_after:
if isinstance(authors_after, list):
auth_str_after = ", ".join(authors_after)
else:
auth_str_after = str(authors_after)
safe_authors_after = html_lib.escape(auth_str_after)
cit_block += f"<div><strong>Authors:</strong> {safe_authors_after}</div>"
cit_block += "</div>"
cit_block += "</div>"
html_output += cit_block
html_output += "</div>"
return html_output
def refine_author_string(raw_text, grobid_authors, title=None):
"""
Simplified Author Extraction:
Starts at index 0 and extracts up until the segment (separated by period or comma)
that contains a 4-digit Year or the Title.
"""
if not raw_text:
return ""
raw_lower = raw_text.lower()
# 1. Identify "Metadata Start" candidates (Year or Title)
possible_starts = []
# Candidate A: Year (19xx, 20xx, 21xx)
year_match = re.search(r'\b(19|20|21)\d{2}\b', raw_text)
if year_match:
possible_starts.append(year_match.start())
# Candidate B: Title (fuzzy-matched prefix)
if title and len(title) > 5:
# Match the first substantial chunk of the title
clean_title_prefix = re.sub(r'\W+', '', title.lower()[:20])
if clean_title_prefix:
pattern_parts = [re.escape(c) + r'[\W]*' for c in clean_title_prefix]
fuzzy_pattern = r''.join(pattern_parts)
t_match = re.search(fuzzy_pattern, raw_lower)
if t_match:
possible_starts.append(t_match.start())
# 2. Determine the earliest metadata point
if not possible_starts:
# Fallback: keep the full text and let clean_metadata handle it later
return raw_text.strip()
metadata_begin = min(possible_starts)
# 3. Handle the "Discard entire segment containing metadata" rule
# We find the nearest period or comma BEFORE the metadata_begin
preceding_text = raw_text[:metadata_begin]
last_period = preceding_text.rfind('.')
last_comma = preceding_text.rfind(',')
boundary_idx = max(last_period, last_comma)
if boundary_idx != -1:
# Extract everything from the beginning up-to-and-including the separator
# This excludes the entire segment that contains the year/title
segment = raw_text[0:boundary_idx + 1].strip()
else:
# If no separator found (e.g. metadata is in the first sentence),
# cut precisely at the start of the metadata
segment = raw_text[0:metadata_begin].strip()
# Clean up trailing punctuation (e.g. "Author, Author.")
segment = segment.rstrip(".,:; ")
return segment
def identify_author_pattern(author_strings):
"""
Analyzes a list of author strings (top 10) to identify the naming pattern.
Returns: (name_order, separator)
"""
if not author_strings:
return "first_last", ","
# 1. Determine the Divider (Separator)
# Rule: Sum total semicolons across all strings. If >= 5, use semicolon.
total_semicolons = sum(s.count(";") for s in author_strings)
total_commas = sum(s.count(",") for s in author_strings)
main_sep = ";" if total_semicolons > (total_commas // 2) else ","
# 2. Analyze Name Order (First Last vs Last, First)
order = None
if main_sep == ";":
# If using semicolon, we check if many segments HAVE a comma inside
internal_comma_count = 0
total_parts = 0
for s in author_strings:
# Replace "and" with our sep for logic test
s_clean = re.sub(r'\s+(?:and|&)\s+', '; ', s, flags=re.IGNORECASE)
parts = [p.strip() for p in s_clean.split(';') if p.strip()]
for p in parts:
total_parts += 1
if "," in p: internal_comma_count += 1
if total_parts > 0 and internal_comma_count >= (total_parts * 0.5):
order = "last_first"
else:
order = "first_last"
else:
# main_sep is ","
# Logic: If chunks are mostly single words (after replacing 'and' with comma), it's Last, First
single_word_parts = 0
total_parts = 0
for s in author_strings:
# Normalize 'and' to comma for the heuristic
s_clean = re.sub(r'\s+(?:and|&)\s+', ', ', s, flags=re.IGNORECASE)
parts = [p.strip() for p in s_clean.split(",") if p.strip()]
for p in parts:
total_parts += 1
if len(p.split(" ")) == 1:
single_word_parts += 1
if total_parts > 0 and single_word_parts >= (total_parts * 0.7):
order = "last_first"
else:
order = "first_last"
if order is None:
order = "first_last" # Final fallback if both heuristics fail
return order, main_sep
def parse_names_by_pattern(author_string, order, separator):
"""
Robustly parses author string using a global pattern and divider.
"""
if not author_string:
return []
author_string = re.sub(r'\b(et\s*al\.?|etal)\b', '', author_string, flags=re.IGNORECASE)
s = re.sub(r'\b(?:and|&)\b', separator, author_string, flags=re.IGNORECASE)
sep_esc = re.escape(separator)
# This regex collapses multiple separators and any whitespace/separators between them
s = re.sub(sep_esc + r'[\s' + sep_esc + r']*' + sep_esc, separator, s)
# Remove leading/trailing dividers
s = s.strip().strip(separator).strip()
# 3. Split by the divider
segments = [p.strip() for p in s.split(separator) if p.strip()]
# 4. Regroup based on logic
raw_names = []
if order == "last_first" and separator == ",":
# Comma divider with Last, First order: join every two segments to get a name
i = 0
while i < len(segments):
p1 = segments[i]
if i + 1 < len(segments):
p2 = segments[i+1]
raw_names.append(f"{p1}, {p2}")
i += 2
else:
raw_names.append(p1)
i += 1
else:
# For first_last OR semicolon separator: each segment is treated as a full name
raw_names = segments
# 5. Final normalization to standardized format (using PDF-specific logic)
authors = []
for name in raw_names:
norm = normalize_d_author(name)
if norm:
authors.append(norm)
return authors
def format_verifications_display(citations):
"""Format citations with verification status badges."""
if not citations:
return "<p>No citations extracted yet.</p>"
html_parts = ["<div class='ver-badge-container'>"]
for i, cit in enumerate(citations, 1):
verification = cit.get('verification', {})
import html as html_lib
raw_text = cit.get('raw_text', 'No citation text')
safe_raw = html_lib.escape(raw_text)
html_parts.append(f"<div class='ver-item'>")
html_parts.append(f"<div><strong>[{i}]</strong> {safe_raw}</div>")
# Add verification status badge
verification = cit.get('verification', {})
status = verification.get('status', 'not_verified')
icon = verification.get('icon', '')
if status == 'verified':
confidence = verification.get('confidence', 0)
title_score = verification.get('title_score', 0)
author_score = verification.get('author_score', 0)
html_parts.append(f"<div class='ver-status-verified'>")
html_parts.append(f"<strong>{icon} Verified (Confidence: {confidence:.2%})</strong>")
html_parts.append(f"<br/><small>Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}</small>")
html_parts.append("</div>")
elif status == 'ambiguous':
confidence = verification.get('confidence', 0)
title_score = verification.get('title_score', 0)
author_score = verification.get('author_score', 0)
html_parts.append(f"<div class='ver-status-ambiguous'>")
html_parts.append(f"<strong>{icon} Ambiguous (Confidence: {confidence:.2%})</strong>")
html_parts.append(f"<br/><small>Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}</small>")
html_parts.append("</div>")
elif status == 'suspected_hallucination':
confidence = verification.get('confidence', 0)
title_score = verification.get('title_score', 0)
author_score = verification.get('author_score', 0)
html_parts.append(f"<div class='ver-status-hallucination'>")
html_parts.append(f"<strong>{icon} Suspected Hallucination (Confidence: {confidence:.2%})</strong>")
html_parts.append(f"<br/><small>Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}</small>")
html_parts.append("</div>")
elif status == 'api_error':
error_msg = verification.get('error', 'Unknown error')
is_no_result = error_msg == "No search results found by API"
label = "Verification Note" if is_no_result else "API Error"
html_parts.append(f"<div class='ver-status-error'>")
html_parts.append(f"<strong>{icon} {label}</strong><br/>")
html_parts.append(f"<small>{error_msg}</small>")
html_parts.append("</div>")
elif status == 'not_verified' or not verification:
html_parts.append(f"<div class='ver-status-unverified'>")
html_parts.append(f"<strong>Not Verified</strong>")
html_parts.append("</div>")
html_parts.append("</div>")
html_parts.append("</div>")
return ''.join(html_parts)
def export_verifications_csv(state_citations, pdf_name):
"""Export citation verifications to a CSV file."""
if not state_citations:
return None
import csv
# Use the original PDF name for the CSV filename
basename = os.path.splitext(pdf_name)[0] if pdf_name else "verifications"
csv_filename = f"{basename}_verifications.csv"
# Create a temp directory to hold the specifically named file
temp_dir = tempfile.mkdtemp()
filepath = os.path.join(temp_dir, csv_filename)
try:
with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'ID', 'Status', 'Confidence', 'Title Similarity', 'Author Similarity',
'Raw Citation', 'Title', 'Authors',
'API Title', 'API Authors'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for i, cit in enumerate(state_citations, 1):
verification = cit.get('verification', {})
status = verification.get('status', 'not_verified')
confidence = verification.get('confidence', 0)
t_score = verification.get('title_score', 0)
a_score = verification.get('author_score', 0)
semantic_data = verification.get('semantic_data', {})
api_title = semantic_data.get('title', '') if semantic_data else ''
api_authors_list = semantic_data.get('authors', []) if semantic_data else []
if api_authors_list:
if isinstance(api_authors_list[0], dict):
api_authors = ", ".join([a.get('name', '') for a in api_authors_list if a.get('name')])
else:
api_authors = ", ".join([str(a) for a in api_authors_list if a])
else:
api_authors = ""
raw_text = cit.get('raw_text', '')
ver_title = cit.get('title_after_verification', '')
ver_authors = cit.get('authors_after_verification', '')
if isinstance(ver_authors, list):
ver_authors = ", ".join(ver_authors)
elif not isinstance(ver_authors, str):
ver_authors = str(ver_authors)
writer.writerow({
'ID': i,
'Status': status,
'Confidence': f"{confidence:.2%}" if status != 'not_verified' else 'N/A',
'Title Similarity': f"{t_score:.2%}" if status != 'not_verified' else 'N/A',
'Author Similarity': f"{a_score:.2%}" if status != 'not_verified' else 'N/A',
'Raw Citation': raw_text,
'Title': ver_title,
'Authors': ver_authors,
'API Title': api_title,
'API Authors': api_authors
})
return filepath
except Exception:
return None
def update_view(view_mode, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path):
"""Update the view based on selected mode. Controls GROUP visibility."""
# OUTPUTS:
# 1. view_full_pdf (Group)
# 2. view_ref_pages (Group)
# 3. view_citations (Group)
# 4. view_verifications (Group)
# 5. pdf_viewer_ref (PDF Component - Update content if Ref Pages)
# 6. citations_display (HTML - Update content if Citations)
# 7. corrected_display (HTML - Update content if Verifications)
# 8. loading_indicator (Markdown)
# 9. state_ref_pdf_path (str) -- New Cache!
vis_full = gr.update(visible=False)
vis_ref = gr.update(visible=False)
vis_cit = gr.update(visible=False)
vis_ver = gr.update(visible=False)
upd_ref_pdf = gr.update()
upd_cit_disp = gr.update()
upd_ver_disp = gr.update()
upd_load = gr.update(visible=False) # Default hidden
if not state_extraction_done and view_mode != "Show Full PDF":
# Extraction in progress -> Show Loading (unless Full PDF)
upd_load = gr.update(visible=True)
# And keep all views hidden?
return (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path)
if view_mode == "Show Full PDF":
vis_full = gr.update(visible=True)
# pdf_viewer_full should already have content from process_pdf_initial
yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path)
elif view_mode == "Show Reference Pages":
vis_ref = gr.update(visible=True)
# Check cache first
if state_ref_pdf_path and os.path.exists(state_ref_pdf_path):
# Return path
upd_ref_pdf = gr.update(value=state_ref_pdf_path)
else:
# Generate the Subset PDF if needed.
if state_ref_pages and state_pdf_path:
doc = fitz.open(state_pdf_path)
new_doc = fitz.open()
new_doc.insert_pdf(doc, from_page=state_ref_pages[0], to_page=state_ref_pages[-1])
temp_preview = tempfile.NamedTemporaryFile(delete=False, suffix="_ref_subset.pdf")
output_path = temp_preview.name
temp_preview.close()
new_doc.save(output_path, garbage=4, deflate=True, clean=True, expand=True)
new_doc.close()
doc.close()
state_ref_pdf_path = output_path
# Return path
upd_ref_pdf = gr.update(value=output_path)
yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path)
elif view_mode == "Show Citations":
vis_cit = gr.update(visible=True)
# Content is pre-filled by extract_citations_auto
yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path)
elif view_mode == "Show Verifications":
vis_ver = gr.update(visible=True)
# Always render the list. Unverified items will show "Not Verified".
formatted_ver = format_verifications_display(state_citations)
upd_ver_disp = gr.update(value=formatted_ver)
# Content is pre-filled by run_citation_check
yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path)
# Build the UI
with gr.Blocks(title="CiteAudit", css="""
/* Container Styles */
#pdf-viewer-full, #pdf-viewer-ref {
height: 700px;
width: 100%;
}
#view-citations, #view-verifications {
border: none !important;
box-shadow: none !important;
background-color: transparent !important;
}
#citations-list, #view-verifications .gr-html {
background-color: transparent !important;
}
#main-display-area {
min-height: 700px;
border-radius: 8px;
background-color: var(--background-fill-primary);
}
/* Citation List */
.citations-container {
font-family: sans-serif;
font-size: 14px;
line-height: 1.5;
color: var(--body-text-color);
max-height: 600px;
overflow-y: auto;
padding: 12px;
border: 1px solid var(--border-color-primary);
border-radius: 4px;
background-color: var(--background-fill-secondary);
}
.citation-item {
margin-bottom: 16px;
padding-bottom: 8px;
border-bottom: 1px solid var(--border-color-primary);
}
.rejection-reason {
color: #ef5350; /* Red 400 */
font-weight: bold;
margin-left: 8px;
}
.dark .rejection-reason {
color: #ef9a9a; /* Red 200 */
}
.citation-metadata {
color: var(--body-text-color-subdued);
margin-left: 24px;
font-size: 0.95em;
margin-top: 4px;
}
/* Verification Styles */
.ver-verified {
color: #1b5e20; /* Green 900 */
margin-left: 24px;
font-size: 0.95em;
margin-top: 6px;
padding: 4px;
background-color: #e8f5e9; /* Green 50 */
border-left: 3px solid #4caf50; /* Green 500 */
}
.dark .ver-verified {
color: #a5d6a7; /* Green 200 */
background-color: rgba(27, 94, 32, 0.4); /* Dark Green alpha */
border-left-color: #66bb6a; /* Green 400 */
}
/* Status Badges in format_verifications_display */
.ver-badge-container {
font-family: monospace;
font-size: 14px;
background-color: var(--background-fill-secondary);
padding: 15px;
border-radius: 5px;
color: var(--body-text-color);
}
.ver-item {
margin-bottom: 20px;
padding: 10px;
border: 1px solid var(--border-color-primary);
border-radius: 5px;
}
.ver-status-verified {
margin-top: 8px;
padding: 6px;
background-color: #e8f5e9;
border-left: 3px solid #4caf50;
color: #1b5e20; /* Darker Text */
}
.dark .ver-status-verified {
background-color: rgba(27, 94, 32, 0.4);
border-left-color: #66bb6a;
color: #e8f5e9; /* Light Text */
}
.ver-status-verified strong, .ver-verified strong { color: inherit; }
.ver-status-ambiguous {
margin-top: 8px;
padding: 6px;
background-color: #fff3e0;
border-left: 3px solid #ff9800;
color: #e65100;
}
.dark .ver-status-ambiguous {
background-color: rgba(230, 81, 0, 0.3);
border-left-color: #ffb74d;
color: #ffe0b2;
}
.ver-status-hallucination {
margin-top: 8px;
padding: 6px;
background-color: #ffebee;
border-left: 3px solid #f44336;
color: #c62828;
}
.dark .ver-status-hallucination {
background-color: rgba(183, 28, 28, 0.3);
border-left-color: #e57373;
color: #ffcdd2;
}
.ver-status-error {
margin-top: 8px;
padding: 6px;
background-color: #fafafa;
border-left: 3px solid #9e9e9e;
color: #424242;
}
.dark .ver-status-error {
background-color: rgba(66, 66, 66, 0.4);
border-left-color: #bdbdbd;
color: #e0e0e0;
}
.ver-status-unverified {
margin-top: 8px;
padding: 6px;
background-color: #f5f5f5;
border-left: 3px solid #bdbdbd;
color: #757575;
}
.dark .ver-status-unverified {
background-color: rgba(97, 97, 97, 0.3);
border-left-color: #9e9e9e;
color: #bdbdbd;
}
""") as demo:
# Per-user session state
state_pdf_path = gr.State(None)
state_ref_pages = gr.State([])
state_citations = gr.State([])
state_removed_citations = gr.State([])
state_appendix_header = gr.State(None)
state_ref_text = gr.State("")
state_extraction_done = gr.State(False)
state_ref_pdf_path = gr.State(None) # Cache for Reference Pages PDF
state_pdf_name = gr.State("") # Original PDF filename
gr.Markdown("# CiteAudit")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="Upload PDF", file_types=[".pdf"])
status_text = gr.Textbox(label="Status", interactive=False, lines=6)
view_toggle = gr.Radio(
choices=["Show Full PDF", "Show Reference Pages", "Show Citations", "Show Verifications"],
value="Show Full PDF",
label="View Mode",
interactive=True,
visible=False
)
verification_divider = gr.Markdown("---", visible=False)
verification_header = gr.Markdown("### Citation Verification", visible=False)
api_key_input = gr.Textbox(
label="Semantic Scholar API Key (Optional)",
placeholder="Leave empty for free tier (with rate limits)",
type="password",
interactive=True,
visible=False
)
verify_btn = gr.Button("✅ Verify Citations", variant="secondary", interactive=False, visible=False)
check_count_slider = gr.Slider(
minimum=1,
maximum=50,
value=1,
step=1,
label="Number of citations to check",
interactive=False,
visible=False
)
export_btn = gr.Button("📊 Download Verifications (CSV)", visible=False)
download_file = gr.File(label="Download CSV", visible=False)
gr.Markdown("<br/><small style='color: var(--body-text-color-subdued);'>* Automated verification may have mistakes and are restricted to returns from Semantic Scholar API. Please check all your citations.</small>")
with gr.Column(scale=2, elem_id="main-display-area"):
# Loading indicator
loading_indicator = gr.Markdown("## ⏳ Extracting content...", visible=False)
# 1. Full PDF View
with gr.Group(visible=True) as view_full_pdf:
# Use gradio_pdf for better compatibility
pdf_viewer_full = PDF(label="Full PDF", elem_id="pdf-viewer-full", interactive=False)
# 2. Reference Pages View
with gr.Group(visible=False) as view_ref_pages:
# Use gradio_pdf
pdf_viewer_ref = PDF(label="Reference Pages", elem_id="pdf-viewer-ref", interactive=False)
# 3. Citations View
with gr.Group(visible=False, elem_id="view-citations") as view_citations:
citations_header = gr.Markdown("### Extracted Citations")
citations_display = gr.HTML(elem_id="citations-list")
# 4. Verifications View
with gr.Group(visible=False, elem_id="view-verifications") as view_verifications:
corrected_display = gr.HTML(label="Corrected Citations")
file_input.upload(
fn=process_pdf_initial,
inputs=[file_input, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text],
outputs=[file_input, status_text, pdf_viewer_full, view_toggle, citations_display, verify_btn, check_count_slider,
state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text,
citations_header, verification_header, verification_divider, api_key_input, state_extraction_done, corrected_display, state_ref_pdf_path, state_pdf_name, export_btn, download_file]
).then(
fn=extract_citations_auto,
inputs=[view_toggle, status_text, state_pdf_path, state_ref_pages, state_ref_text, state_citations, state_removed_citations, state_appendix_header, state_extraction_done],
outputs=[status_text, citations_display, verify_btn, check_count_slider, state_citations, state_removed_citations, state_ref_pages, state_ref_text, state_appendix_header, pdf_viewer_ref, loading_indicator, citations_header, verification_header, verification_divider, api_key_input, state_extraction_done, corrected_display, export_btn, download_file],
show_progress="hidden"
).then(
fn=update_view,
inputs=[view_toggle, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path],
outputs=[view_full_pdf, view_ref_pages, view_citations, view_verifications, pdf_viewer_ref, citations_display, corrected_display, loading_indicator, state_ref_pdf_path]
)
verify_btn.click(
fn=lambda status: (
gr.update(value="Show Verifications"),
status + "\n⏳ Starting verification process... Please wait.",
gr.update(), # Do not wipe previous content with a loading message
gr.update(visible=False, value=None), # Reset download button
gr.update(visible=False) # Hide export trigger button while processing
),
inputs=[status_text],
outputs=[view_toggle, status_text, corrected_display, download_file, export_btn]
).then(
fn=run_citation_check,
inputs=[check_count_slider, status_text, api_key_input, state_citations],
outputs=[status_text, corrected_display, citations_display, state_citations],
show_progress="hidden"
).then(
fn=lambda: gr.update(visible=True),
inputs=None,
outputs=[export_btn]
)
export_btn.click(
fn=export_verifications_csv,
inputs=[state_citations, state_pdf_name],
outputs=[download_file]
).then(
fn=lambda: gr.update(visible=True),
inputs=None,
outputs=[download_file]
)
view_toggle.change(
fn=update_view,
inputs=[view_toggle, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path],
outputs=[view_full_pdf, view_ref_pages, view_citations, view_verifications, pdf_viewer_ref, citations_display, corrected_display, loading_indicator, state_ref_pdf_path],
concurrency_limit=None,
show_progress="hidden"
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, show_api=False)