| | import gradio as gr |
| | from gradio_pdf import PDF |
| |
|
| | import fitz |
| | import os |
| | import tempfile |
| | import json |
| | import requests |
| | import xml.etree.ElementTree as ET |
| | import re |
| | import time |
| | import sys |
| | from collections import OrderedDict |
| | import Levenshtein |
| | import jellyfish |
| | from unidecode import unidecode |
| | from venues import VENUE_NAMES, VENUE_ABBREVIATIONS, COMMON_TERMS |
| | from urlextract import URLExtract |
| |
|
| |
|
| |
|
| | |
| | SEMANTIC_SCHOLAR_STATUS_CODES = { |
| | 200: "OK: Request successful", |
| | 400: "Bad Request: Check parameters", |
| | 401: "Unauthorized: Invalid API key", |
| | 403: "Forbidden: No permission", |
| | 404: "Not Found: Endpoint or resource missing", |
| | 429: "Too Many Requests: Rate limited", |
| | 500: "Internal Server Error: Server-side issue" |
| | } |
| |
|
| | |
| | extractor = URLExtract() |
| |
|
| | def cleanup_old_temp_files(max_age_hours=1): |
| | """Clean up old temporary files from /tmp to save disk space. |
| | |
| | Safe for multi-user: Only deletes files that match our specific app patterns |
| | and are reliably 'old' (default > 1 hour). |
| | """ |
| | import time |
| | now = time.time() |
| | cutoff = now - (max_age_hours * 3600) |
| | |
| | temp_dir = tempfile.gettempdir() |
| | if not os.path.exists(temp_dir): |
| | return |
| | |
| | |
| | |
| | target_suffixes = ("_grobid.pdf", "_ref_subset.pdf", "_verifications.csv") |
| | |
| | try: |
| | for filename in os.listdir(temp_dir): |
| | if filename.endswith(target_suffixes): |
| | file_path = os.path.join(temp_dir, filename) |
| | try: |
| | |
| | if os.path.getmtime(file_path) < cutoff: |
| | |
| | if os.path.isfile(file_path): |
| | os.unlink(file_path) |
| | except Exception: |
| | pass |
| | except Exception as e: |
| | print(f"Error during temp file cleanup: {e}") |
| |
|
| |
|
| | def normalize_title_for_comparison(title): |
| | """Normalize title for similarity comparison: lowercase, remove punctuation.""" |
| | if not title: |
| | return "" |
| | |
| | normalized = re.sub(r'[^a-zA-Z0-9\s]', ' ', title.lower()) |
| | |
| | return ' '.join(normalized.split()) |
| |
|
| | def normalize_api_author(name): |
| | """Normalize author name strictly for API-sourced strings. |
| | Handles 'Last, First' vs 'First Last' robustly. |
| | """ |
| | if not name: |
| | return "" |
| | |
| | |
| | name = unidecode(name) |
| | |
| | |
| | name = re.sub(r'\b(et\s*al\.?|etal)\b', '', name, flags=re.IGNORECASE).strip() |
| | |
| | |
| | if "," in name: |
| | parts = name.split(",", 1) |
| | surname = parts[0].strip() |
| | given_name = parts[1].strip() if len(parts) > 1 else "" |
| | else: |
| | parts = name.split() |
| | if not parts: return "" |
| | if len(parts) == 1: |
| | surname = parts[0] |
| | given_name = "" |
| | else: |
| | surname = parts[-1] |
| | |
| | given_name = " ".join(parts[:-1]) |
| | |
| | |
| | surname = re.sub(r'[^a-zA-Z]', '', surname).lower() |
| | |
| | |
| | |
| | given_clean = re.sub(r'[^a-zA-Z]', ' ', given_name).lower() |
| | given_parts = given_clean.split() |
| | initials = [g[0] for g in given_parts if g] |
| | initials_str = " ".join(initials) |
| | |
| | result = f"{surname} {initials_str}".strip() |
| |
|
| | return result |
| |
|
| | def normalize_d_author(name): |
| | """Normalize author name for PDF-sourced strings (simpler logic). |
| | Takes last word as surname + first initial of first word. |
| | """ |
| | if not name: |
| | return "" |
| | |
| | |
| | n = unidecode(name).strip() |
| | |
| | |
| | if "," in n: |
| | parts = n.split(",", 1) |
| | surname = re.sub(r'[^a-zA-Z\s]', '', parts[0]).strip().lower() |
| | if len(parts) > 1: |
| | |
| | given_raw = parts[1].strip() |
| | |
| | given_clean = re.sub(r'[^a-zA-Z]', ' ', given_raw) |
| | given_parts = given_clean.split() |
| | |
| | initials = [g[0].lower() for g in given_parts if g] |
| | initials_str = " ".join(initials) |
| | else: |
| | initials_str = "" |
| | |
| | else: |
| | |
| | |
| | n_clean = re.sub(r'[^a-zA-Z]', ' ', n) |
| | parts = n_clean.split() |
| | if not parts: |
| | return "" |
| | if len(parts) == 1: |
| | surname = parts[0].lower() |
| | initials_str = "" |
| | else: |
| | surname = parts[-1].lower() |
| | |
| | |
| | initials = [p[0].lower() for p in parts[:-1] if p] |
| | initials_str = " ".join(initials) |
| | |
| | result = f"{surname} {initials_str}".strip() |
| |
|
| | return result |
| |
|
| | def calculate_title_similarity(d_title, api_title): |
| | """Calculate the similarity between two titles.""" |
| | norm_raw = normalize_title_for_comparison(d_title) |
| | norm_api = normalize_title_for_comparison(api_title) |
| |
|
| | if not norm_raw or not norm_api: |
| | return 0.0 |
| | |
| | return Levenshtein.ratio(norm_raw, norm_api) |
| |
|
| | def calculate_citation_recall(candidate_title, raw_citation): |
| | """ |
| | Calculate recall: roughly, how much of the candidate title is present in the raw citation? |
| | We use fuzz matching to find the best substring in raw_citation that matches candidate_title. |
| | Recall = (Length of Matched Substring) / (Length of Candidate Title) |
| | Note: Ideally this should be close to 1.0 if the title is fully present. |
| | """ |
| | if not candidate_title or not raw_citation: |
| | return 0.0 |
| | |
| | norm_cand = normalize_title_for_comparison(candidate_title) |
| | norm_raw = normalize_title_for_comparison(raw_citation) |
| | |
| | if not norm_cand or not norm_raw: |
| | return 0.0 |
| | |
| | |
| | cand_len = len(norm_cand) |
| | max_score = 0.0 |
| | |
| | |
| | |
| | for i in range(len(norm_raw)): |
| | |
| | margin = max(3, int(cand_len * 0.1)) |
| | for window_size in range(cand_len - margin, cand_len + margin): |
| | if window_size <= 0: continue |
| | if i + window_size > len(norm_raw): break |
| | |
| | substring = norm_raw[i : i + window_size] |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | ratio = Levenshtein.ratio(substring, norm_cand) |
| | estimated_matches = ratio * (len(substring) + len(norm_cand)) / 2 |
| | recall = estimated_matches / len(norm_cand) |
| | |
| | if recall > max_score: |
| | max_score = recall |
| | if max_score > 0.95: return 1.0 |
| | |
| | return min(max_score, 1.0) |
| |
|
| | def calculate_author_similarity(authors1, authors2): |
| | """Calculate Jaro-Winkler similarity for author lists (0-1). |
| | z |
| | Args: |
| | authors1: List of author names from original citation (PDF) |
| | authors2: List of author dicts from Semantic Scholar [{'name': ...}, ...] (API) |
| | |
| | Returns: |
| | Refined Jaro-Winkler score (0-1) |
| | """ |
| | norm1 = authors1 |
| | norm2 = authors2 |
| | |
| | if not norm1 or not norm2: |
| | return 0.0 |
| | |
| | |
| | best_match_scores = [] |
| | for n1 in norm1: |
| | max_score = 0.0 |
| | best_partner = None |
| | for n2 in norm2: |
| | score = jellyfish.jaro_winkler_similarity(n1, n2) |
| | if score > max_score: |
| | max_score = score |
| | best_partner = n2 |
| | best_match_scores.append(max_score) |
| | sys.stdout.flush() |
| | |
| | |
| | avg_score = sum(best_match_scores) / len(best_match_scores) if best_match_scores else 0.0 |
| | |
| | |
| | |
| | if len(norm1) > len(norm2) + 1: |
| | penalty = len(norm2) / len(norm1) |
| | avg_score *= penalty |
| | return avg_score |
| |
|
| | def discover_metadata_in_raw(raw_text, api_title, api_authors, is_exact_match=False): |
| | """ |
| | Search for the title and author segments in the raw text based on API results. |
| | Returns: (title_after_verification, authors_after_verification) strings or empty. |
| | """ |
| | if not raw_text: |
| | return "", "" |
| | |
| | discovered_title = "" |
| | discovered_authors = "" |
| | |
| | |
| | norm_raw = [] |
| | norm_to_orig = [] |
| | |
| | last_was_space = True |
| | |
| | for i, char in enumerate(raw_text): |
| | if char.isalnum(): |
| | norm_raw.append(char.lower()) |
| | norm_to_orig.append(i) |
| | last_was_space = False |
| | else: |
| | if not last_was_space: |
| | norm_raw.append(' ') |
| | norm_to_orig.append(i) |
| | last_was_space = True |
| | |
| | norm_raw_str = "".join(norm_raw) |
| | |
| | |
| | if is_exact_match: |
| | discovered_title = api_title |
| | elif api_title: |
| | |
| | api_dirty = api_title.lower() |
| | norm_api_list = [] |
| | last_space = True |
| | for c in api_dirty: |
| | if c.isalnum(): |
| | norm_api_list.append(c) |
| | last_space = False |
| | else: |
| | if not last_space: |
| | norm_api_list.append(' ') |
| | last_space = True |
| | norm_api = "".join(norm_api_list).strip() |
| | |
| | if norm_api and norm_raw_str: |
| | api_len = len(norm_api) |
| | best_window = None |
| | max_score = 0.0 |
| | |
| | for i in range(len(norm_raw_str)): |
| | if i + api_len > len(norm_raw_str) + 5: break |
| | |
| | for delta in [0, -1, 1, -2, 2, -3, 3]: |
| | window_size = api_len + delta |
| | if window_size <= 0: continue |
| | if i + window_size > len(norm_raw_str): continue |
| | |
| | substring = norm_raw_str[i : i + window_size] |
| | |
| | score = Levenshtein.ratio(substring, norm_api) |
| | |
| | if score > max_score: |
| | max_score = score |
| | best_window = (i, i + window_size) |
| | |
| | |
| | if max_score > 0.99: break |
| | if max_score > 0.99: break |
| | |
| | |
| | if max_score > 0.75 and best_window: |
| | start_norm, end_norm = best_window |
| | |
| | if start_norm < len(norm_to_orig) and end_norm <= len(norm_to_orig): |
| | orig_start_idx = norm_to_orig[start_norm] |
| | orig_end_idx = norm_to_orig[end_norm - 1] |
| | |
| | raw_slice = raw_text[orig_start_idx : orig_end_idx + 1] |
| | discovered_title = raw_slice.strip() |
| | else: |
| | discovered_title = api_title |
| | else: |
| | discovered_title = api_title |
| | else: |
| | discovered_title = api_title |
| |
|
| | |
| | |
| | |
| | author_limit_idx = -1 |
| | |
| | |
| | if discovered_title and discovered_title in raw_text: |
| | author_limit_idx = raw_text.find(discovered_title) |
| | |
| | |
| | year_match = re.search(r'\b(19|20|21)\d{2}\b', raw_text) |
| | if year_match: |
| | year_idx = year_match.start() |
| | if author_limit_idx == -1 or year_idx < author_limit_idx: |
| | author_limit_idx = year_idx |
| | |
| | if author_limit_idx > 0: |
| | segment = raw_text[:author_limit_idx] |
| | discovered_authors = segment.strip().rstrip(".,:; ") |
| | else: |
| | if api_authors: |
| | api_names = [] |
| | if isinstance(api_authors[0], dict): |
| | api_names = [a.get('name', '') for a in api_authors if a.get('name')] |
| | else: |
| | api_names = [str(a) for a in api_authors] |
| | |
| | found_indices = [] |
| | norm_raw_str_full = raw_text.lower() |
| | |
| | for name in api_names: |
| | parts = name.lower().split() |
| | if len(parts) >= 2: |
| | p = re.escape(parts[0]) + r'.*?' + re.escape(parts[-1]) |
| | m = re.search(p, norm_raw_str_full) |
| | if m: |
| | found_indices.append(m.end()) |
| | |
| | if found_indices: |
| | last_author_end = max(found_indices) |
| | discovered_authors = raw_text[:last_author_end].strip().rstrip(".,;:") |
| |
|
| | return discovered_title, discovered_authors |
| |
|
| | def classify_verification(title_score, author_score, has_error=False, error_msg=""): |
| | """Classify verification status based on weighted similarity scores. |
| | |
| | Weights: 70% Title, 30% Authors |
| | |
| | Returns: |
| | dict with 'status', 'icon', 'title_score', 'author_score', 'confidence', 'error' |
| | """ |
| | if has_error: |
| | return { |
| | 'status': 'api_error', |
| | 'icon': '✗', |
| | 'title_score': 0.0, |
| | 'author_score': 0.0, |
| | 'confidence': 0.0, |
| | 'error': error_msg |
| | } |
| | |
| | |
| | confidence = (title_score * 0.70) + (author_score * 0.30) |
| | |
| | |
| | if confidence >= 0.95: |
| | return { |
| | 'status': 'verified', |
| | 'icon': '✓', |
| | 'title_score': title_score, |
| | 'author_score': author_score, |
| | 'confidence': confidence |
| | } |
| | elif confidence >= 0.75: |
| | return { |
| | 'status': 'ambiguous', |
| | 'icon': '⚠', |
| | 'title_score': title_score, |
| | 'author_score': author_score, |
| | 'confidence': confidence |
| | } |
| | else: |
| | return { |
| | 'status': 'suspected_hallucination', |
| | 'icon': '⚠⚠', |
| | 'title_score': title_score, |
| | 'author_score': author_score, |
| | 'confidence': confidence |
| | } |
| |
|
| |
|
| | def verify_citation_against_paper(raw_citation, api_paper, extracted_title, name_order="first_last", separator=","): |
| | """ |
| | Verify a citation against a paper using discovery with global pattern awareness. |
| | """ |
| | api_title = api_paper.get('title', '') |
| | api_authors_list = api_paper.get('authors', []) |
| | |
| | |
| | api_authors_norm = [] |
| | if api_authors_list: |
| | |
| | if isinstance(api_authors_list[0], dict): |
| | api_authors_norm = [normalize_api_author(a.get('name', '')) for a in api_authors_list if a.get('name')] |
| | else: |
| | api_authors_norm = [normalize_api_author(str(a)) for a in api_authors_list if a] |
| | |
| | |
| | best_title_candidate = None |
| | title_source = "" |
| | is_exact_match = False |
| | |
| | if extracted_title and api_title: |
| | norm_extracted = normalize_title_for_comparison(extracted_title) |
| | norm_api = normalize_title_for_comparison(api_title) |
| | |
| | if norm_extracted == norm_api and len(norm_extracted) > 10: |
| | is_exact_match = True |
| | best_title_candidate = extracted_title |
| | title_source = "exact_match" |
| | |
| | if not is_exact_match: |
| | |
| | recall_extracted = calculate_citation_recall(extracted_title, raw_citation) if extracted_title else 0.0 |
| | recall_api = calculate_citation_recall(api_title, raw_citation) |
| | |
| | |
| | if abs(recall_extracted - recall_api) < 1e-7: |
| | |
| | words_ext = len(extracted_title.split()) if extracted_title else 999 |
| | words_api = len(api_title.split()) if api_title else 999 |
| | if words_ext < words_api: |
| | best_title_candidate = extracted_title |
| | title_source = "extracted (tie-breaker shorter)" |
| | else: |
| | best_title_candidate = api_title |
| | title_source = "api (tie-breaker shorter)" |
| | elif recall_extracted > (recall_api + 0.1): |
| | best_title_candidate = extracted_title |
| | title_source = "cleaned/extracted" |
| | else: |
| | best_title_candidate = api_title |
| | title_source = "api" |
| | |
| | |
| | d_title, d_authors = discover_metadata_in_raw(raw_citation, best_title_candidate, api_authors_list, is_exact_match=is_exact_match) |
| | |
| | |
| | if d_title: |
| | t_score = calculate_title_similarity(d_title, api_title) |
| | else: |
| | |
| | |
| | t_score = 0.0 |
| | |
| | |
| | if d_authors: |
| | |
| | has_etal = re.search(r'\bet\s*al\b', d_authors, re.IGNORECASE) |
| | |
| | |
| | parsed_d_authors = parse_names_by_pattern(d_authors, name_order, separator) |
| | |
| | score_forward = calculate_author_similarity(parsed_d_authors, api_authors_norm) |
| | |
| | if has_etal: |
| | a_score = score_forward |
| | else: |
| | score_backward = calculate_author_similarity(api_authors_norm, parsed_d_authors) |
| | a_score = (0.5 * score_forward) + (0.5 * score_backward) |
| | sys.stdout.flush() |
| | else: |
| | |
| | a_score = 0.0 |
| | |
| | check_data = classify_verification(t_score, a_score) |
| | check_data['semantic_data'] = api_paper |
| | check_data['title_source'] = title_source |
| | |
| | |
| | check_data['discovery'] = (d_title, d_authors) |
| | |
| | return check_data, (d_title, d_authors) |
| | |
| | def check_citations_semantic_scholar(citations_to_check, api_key=None, name_order="first_last", separator=","): |
| | """Check citations using Semantic Scholar API as a generator. |
| | |
| | Args: |
| | citations_to_check: List of citations to verify |
| | api_key: Optional Semantic Scholar API key for higher rate limits |
| | |
| | Yields: |
| | Verified citation dictionary for each input citation |
| | """ |
| | for i, cit in enumerate(citations_to_check): |
| | raw_text = cit.get('raw_text', '').strip() |
| | title = cit.get('title', '').strip() |
| |
|
| | |
| | raw_citation = cit.get('raw_text', '').strip() |
| | cleaned_title = title |
| |
|
| | |
| | |
| | |
| | |
| | existing_status = cit.get('verification', {}).get('status') |
| | if existing_status in ['verified', 'ambiguous', 'suspected_hallucination']: |
| | yield cit |
| | continue |
| |
|
| | try: |
| | check_data = {'status': 'not_found', 'semantic_data': None} |
| | found_stage1 = False |
| | response = None |
| |
|
| | def make_request(url, p, h): |
| | max_retries = 3 |
| | retry_cnt = 0 |
| | while retry_cnt <= max_retries: |
| | try: |
| | resp = requests.get(url, params=p, headers=h, timeout=10) |
| | if resp.status_code == 429: |
| | if retry_cnt < max_retries: |
| | w_time = 2 ** retry_cnt |
| | time.sleep(w_time) |
| | retry_cnt += 1 |
| | else: |
| | return resp |
| | else: |
| | return resp |
| | except requests.exceptions.Timeout: |
| | retry_cnt += 1 |
| | except Exception as e: |
| | return None |
| | return None |
| |
|
| | headers = {} |
| | if api_key: |
| | headers['x-api-key'] = api_key |
| |
|
| | if cleaned_title: |
| | |
| | match_url = "https://api.semanticscholar.org/graph/v1/paper/search/match" |
| | |
| | params = { |
| | 'query': cleaned_title, |
| | 'fields': 'title,authors,year,venue' |
| | } |
| | response = make_request(match_url, params, headers) |
| | |
| | if response is not None: |
| | status_desc = SEMANTIC_SCHOLAR_STATUS_CODES.get(response.status_code, f"Unknown ({response.status_code})") |
| | |
| | if response.status_code == 200: |
| | resp_json = response.json() |
| | if resp_json.get('data') and len(resp_json['data']) > 0: |
| | paper = resp_json['data'][0] |
| | if paper and paper.get('paperId'): |
| | found_stage1 = True |
| | |
| | |
| | check_data, discovery = verify_citation_against_paper( |
| | raw_citation, |
| | paper, |
| | cleaned_title, |
| | name_order=name_order, |
| | separator=separator |
| | ) |
| | d_title, d_authors = discovery |
| | |
| | |
| | cit['title_after_verification'] = d_title |
| | cit['authors_after_verification'] = d_authors |
| | |
| | elif response.status_code in [400, 401, 403]: |
| | found_stage1 = True |
| | check_data = classify_verification(0, 0, has_error=True, error_msg=status_desc) |
| | else: |
| | found_stage1 = True |
| | check_data = classify_verification(0, 0, has_error=True, error_msg="No Response") |
| | |
| | |
| | if not found_stage1: |
| | if response and response.status_code == 429: |
| | check_data = classify_verification(0, 0, has_error=True, error_msg="Rate Limited (429)") |
| | else: |
| | search_url = "https://api.semanticscholar.org/graph/v1/paper/search" |
| | |
| | |
| | queries_to_try = [] |
| | if cleaned_title: |
| | queries_to_try.append(("Title", cleaned_title)) |
| | queries_to_try.append(("Raw Citation", raw_citation)) |
| | |
| | all_candidates = {} |
| | |
| | for q_type, q_string in queries_to_try: |
| | search_params = { |
| | 'query': q_string, |
| | 'limit': 5, |
| | 'fields': 'title,authors,year,venue' |
| | } |
| | s_resp = make_request(search_url, search_params, headers) |
| | |
| | if s_resp and s_resp.status_code == 200: |
| | data = s_resp.json().get('data', []) |
| | for paper in data: |
| | pid = paper.get('paperId') |
| | if pid and pid not in all_candidates: |
| | all_candidates[pid] = paper |
| | elif s_resp and s_resp.status_code == 429: |
| | break |
| | if all_candidates: |
| | results_list = list(all_candidates.values()) |
| | |
| | |
| | |
| | best_api_paper = None |
| | max_api_recall = -1.0 |
| | min_word_count = 999 |
| | |
| | for paper in results_list: |
| | title = paper.get('title', '') |
| | rec = calculate_citation_recall(title, raw_citation) |
| | word_count = len(title.split()) if title else 999 |
| | |
| | if rec > max_api_recall: |
| | max_api_recall = rec |
| | min_word_count = word_count |
| | best_api_paper = paper |
| | elif abs(rec - max_api_recall) < 1e-7: |
| | |
| | if word_count < min_word_count: |
| | min_word_count = word_count |
| | best_api_paper = paper |
| | |
| | if best_api_paper: |
| | |
| | |
| | |
| | check_data, discovery = verify_citation_against_paper( |
| | raw_citation, |
| | best_api_paper, |
| | cleaned_title, |
| | name_order=name_order, |
| | separator=separator |
| | ) |
| | |
| | |
| | cit['title_after_verification'], cit['authors_after_verification'] = discovery |
| | |
| | if check_data.get('confidence', 0) < 0.4: |
| | check_data = classify_verification(0, 0, has_error=True, error_msg="Low confidence match") |
| | else: |
| | check_data = classify_verification(0, 0, has_error=True, error_msg="No suitable API candidate found") |
| |
|
| | else: |
| | check_data = classify_verification(0, 0, has_error=True, error_msg="No search results found by API") |
| | sys.stdout.flush() |
| |
|
| |
|
| | |
| | cit['verification'] = check_data |
| | yield cit |
| | |
| | except Exception as e: |
| | cit['verification'] = classify_verification(0, 0, has_error=True, error_msg=str(e)) |
| | yield cit |
| | sys.stdout.flush() |
| | |
| | |
| | if not api_key and i < len(citations_to_check) - 1: |
| | time.sleep(1) |
| |
|
| | def parse_tei_citations(tei_xml): |
| | """Parse TEI XML and extract citations.""" |
| | try: |
| | root = ET.fromstring(tei_xml) |
| | citations = [] |
| | ns = {'tei': 'http://www.tei-c.org/ns/1.0'} |
| | |
| | for bibl in root.findall('.//tei:listBibl/tei:biblStruct', ns): |
| | citation = {} |
| | |
| | |
| | title_elem = bibl.find('.//tei:title[@level="a"]', ns) |
| | used_monograph_as_title = False |
| | |
| | if title_elem is None: |
| | title_elem = bibl.find('.//tei:title[@level="m"]', ns) |
| | if title_elem is not None: |
| | used_monograph_as_title = True |
| | |
| | if title_elem is not None and title_elem.text: |
| | citation['title'] = title_elem.text.strip() |
| | |
| | |
| | authors = [] |
| | for author in bibl.findall('.//tei:author', ns): |
| | persName = author.find('.//tei:persName', ns) |
| | if persName is not None: |
| | forename = persName.find('.//tei:forename', ns) |
| | surname = persName.find('.//tei:surname', ns) |
| | name_parts = [] |
| | if forename is not None and forename.text: |
| | name_parts.append(forename.text.strip()) |
| | if surname is not None and surname.text: |
| | name_parts.append(surname.text.strip()) |
| | if name_parts: |
| | authors.append(' '.join(name_parts)) |
| | |
| | if authors: |
| | citation['authors'] = authors |
| | |
| | |
| | date_elem = bibl.find('.//tei:date[@type="published"]', ns) |
| | if date_elem is not None and date_elem.get('when'): |
| | citation['year'] = date_elem.get('when') |
| | |
| | |
| | venue_elem = bibl.find('.//tei:title[@level="j"]', ns) |
| | |
| | if venue_elem is None and not used_monograph_as_title: |
| | venue_elem = bibl.find('.//tei:title[@level="m"]', ns) |
| | |
| | if venue_elem is None: |
| | venue_elem = bibl.find('.//tei:meeting', ns) |
| | |
| | if venue_elem is not None and venue_elem.text: |
| | citation['venue'] = venue_elem.text.strip() |
| | |
| | |
| | if 'venue' not in citation: |
| | publisher_elem = bibl.find('.//tei:publisher', ns) |
| | if publisher_elem is not None and publisher_elem.text: |
| | citation['venue'] = publisher_elem.text.strip() |
| | |
| | if citation: |
| | |
| | raw_ref_elem = bibl.find('.//tei:note[@type="raw_reference"]', ns) |
| | if raw_ref_elem is not None: |
| | raw_ref_text = "".join(raw_ref_elem.itertext()).strip() |
| | raw_ref_text = re.sub(r'\s+', ' ', raw_ref_text) |
| | citation['raw_text'] = raw_ref_text |
| | else: |
| | |
| | raw_text = "".join(bibl.itertext()).strip() |
| | raw_text = re.sub(r'\s+', ' ', raw_text) |
| | citation['raw_text'] = raw_text |
| | |
| | |
| | citation['grobid_xml'] = ET.tostring(bibl, encoding='unicode') |
| | |
| | citations.append(citation) |
| | |
| | return citations |
| | except Exception as e: |
| | return [] |
| |
|
| | def extract_title_and_authors_from_xml(xml_string): |
| | """Extract title and authors from GROBID biblStruct XML. |
| | |
| | Args: |
| | xml_string: XML string of biblStruct element |
| | |
| | Returns: |
| | Dictionary with 'title' and 'authors' fields |
| | """ |
| | try: |
| | root = ET.fromstring(xml_string) |
| | ns = {'ns0': 'http://www.tei-c.org/ns/1.0', 'tei': 'http://www.tei-c.org/ns/1.0'} |
| | |
| | result = {} |
| | |
| | |
| | title_elem = root.find('.//ns0:title[@level="a"][@type="main"]', ns) |
| | if title_elem is None: |
| | title_elem = root.find('.//ns0:title[@level="a"]', ns) |
| | if title_elem is None: |
| | title_elem = root.find('.//ns0:title[@level="m"]', ns) |
| | if title_elem is None: |
| | title_elem = root.find('.//ns0:title', ns) |
| | if title_elem is None: |
| | title_elem = root.find('.//tei:title[@level="a"][@type="main"]', ns) |
| | if title_elem is None: |
| | title_elem = root.find('.//tei:title[@level="a"]', ns) |
| | if title_elem is None: |
| | title_elem = root.find('.//tei:title', ns) |
| | |
| | if title_elem is not None and title_elem.text: |
| | result['title'] = title_elem.text.strip() |
| | |
| | result['authors'] = [] |
| | |
| | return result |
| | |
| | except Exception as e: |
| | return {} |
| |
|
| | def clean_metadata(text): |
| | """Clean title or author string specifically by removing segments that contain known publication venues or URLs. |
| | |
| | Splits text by common punctuation (.,:;?!), checks each segment for venue names |
| | (case-insensitive), abbreviations (case-sensitive), or URLs, and removes contaminated segments. |
| | """ |
| | if not text: |
| | return "" |
| | |
| | |
| | text = text.replace('(', '').replace(')', '') |
| | |
| | |
| | extra_patterns = r'arxiv\.org|doi\.org|\bdoi:|\burl\b' |
| | |
| | |
| | |
| | placeholders = [] |
| | temp_text = text |
| | |
| | |
| | urls = extractor.find_urls(text, True) |
| | |
| | |
| | for url in sorted(list(set(urls)), key=len, reverse=True): |
| | placeholder = f"__URL_PH_{len(placeholders)}__" |
| | placeholders.append(url) |
| | temp_text = temp_text.replace(url, placeholder) |
| | |
| | |
| | def replace_extra(match): |
| | placeholder = f"__URL_PH_{len(placeholders)}__" |
| | placeholders.append(match.group(0)) |
| | return placeholder |
| | |
| | temp_text = re.sub(extra_patterns, replace_extra, temp_text, flags=re.IGNORECASE) |
| | |
| | |
| | |
| | parts = re.split(r'([.?!]\s|[.?!]$)', temp_text) |
| | |
| | |
| | segments = [] |
| | current_segment = "" |
| | for part in parts: |
| | if part and (part.strip() in ['.', '?', '!'] or re.match(r'[.?!]\s', part)): |
| | segments.append(current_segment + part) |
| | current_segment = "" |
| | else: |
| | current_segment += part |
| | if current_segment: |
| | segments.append(current_segment) |
| | |
| | final_segments = [] |
| | for seg in segments: |
| | |
| | if "__URL_PH_" in seg: |
| | |
| | break |
| | |
| | |
| | check_seg = seg |
| | for i, val in enumerate(placeholders): |
| | check_seg = check_seg.replace(f"__URL_PH_{i}__", val) |
| | |
| | seg_lower = check_seg.lower() |
| | found_contamination = False |
| | |
| | |
| | for venue in VENUE_NAMES: |
| | if venue.lower() in seg_lower: |
| | found_contamination = True |
| | break |
| | |
| | if not found_contamination: |
| | for abbr in VENUE_ABBREVIATIONS: |
| | if re.search(r'\b' + re.escape(abbr) + r'\b', check_seg): |
| | found_contamination = True |
| | break |
| | |
| | if not found_contamination: |
| | for term in COMMON_TERMS: |
| | if term.lower() in seg_lower: |
| | found_contamination = True |
| | break |
| | |
| | if not found_contamination: |
| | |
| | |
| | if re.search(r'\b(19|20|21)\d{2}\b', check_seg): |
| | found_contamination = True |
| | |
| | if not found_contamination: |
| | |
| | if extractor.has_urls(check_seg) or re.search(extra_patterns, check_seg, re.IGNORECASE): |
| | found_contamination = True |
| | |
| | if found_contamination: |
| | |
| | break |
| | |
| | |
| | restored_seg = seg |
| | for i, val in enumerate(placeholders): |
| | restored_seg = restored_seg.replace(f"__URL_PH_{i}__", val) |
| | final_segments.append(restored_seg) |
| | |
| | |
| | text = "".join(final_segments).strip() |
| | |
| | |
| | text = re.sub(r'\s+', ' ', text).strip() |
| | text = re.sub(r'\(\s*\)', '', text) |
| | text = re.sub(r'\[\s*\]', '', text) |
| | text = text.strip(".,;: -()[]") |
| | |
| | return text |
| |
|
| | def find_reference_pages(pdf_path): |
| | """Find reference section pages in the PDF and extract their text.""" |
| | doc = fitz.open(pdf_path) |
| | start_page = None |
| | end_page = len(doc) |
| | ref_text = "" |
| | |
| | |
| | for page_num, page in enumerate(doc): |
| | text = page.get_text("text") |
| | lines = [l.strip().lower() for l in text.splitlines() if l.strip()] |
| | |
| | found_candidate = False |
| | for line in lines: |
| | if len(line.split()) <= 5 and ("references" in line or "bibliography" in line): |
| | found_candidate = True |
| | break |
| | |
| | if found_candidate: |
| | |
| | |
| | cits = _get_grobid_boundaries(pdf_path, [page_num]) |
| | if cits: |
| | start_page = page_num |
| | break |
| | |
| | |
| | if start_page is not None: |
| | |
| | |
| | end_page = start_page + 1 |
| | ref_pages = [start_page] |
| | |
| | |
| | ref_text = doc[start_page].get_text("text") + "\n" |
| | else: |
| | ref_pages = [] |
| | |
| | doc.close() |
| | return ref_pages, start_page, end_page, ref_text |
| |
|
| | def process_pdf_initial(pdf_file, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text): |
| | """Initial PDF processing - find references and show PDF immediately.""" |
| | |
| | cleanup_old_temp_files(max_age_hours=1) |
| | |
| | if pdf_file is None: |
| | return (None, "No PDF uploaded", |
| | gr.update(visible=False), gr.update(visible=False), |
| | gr.update(visible=False), |
| | gr.update(interactive=False, visible=False), |
| | gr.update(interactive=False, visible=False), |
| | None, [], [], [], None, "", |
| | gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), |
| | False, |
| | gr.update(visible=False), |
| | None, |
| | "", |
| | gr.update(visible=False), |
| | gr.update(visible=False)) |
| | |
| | new_pdf_path = pdf_file.name |
| | new_citations = [] |
| | new_removed_citations = [] |
| | |
| | |
| | new_ref_pages, start_page, end_page, new_ref_text = find_reference_pages(new_pdf_path) |
| | new_appendix_header = None |
| | |
| | |
| | status = f"✓ Loaded PDF: {os.path.basename(new_pdf_path)}\n" |
| | |
| | if new_ref_pages: |
| | status += f"\n✓ Identified reference section start: page {start_page + 1}" |
| | else: |
| | status += "\n⚠ No reference section found" |
| | |
| | status += "\n⏳ Starting automatic extraction... Please wait." |
| | |
| | basename = os.path.basename(new_pdf_path) |
| | |
| | return (new_pdf_path, status, |
| | gr.update(value=new_pdf_path, visible=True), |
| | gr.update(visible=True, value="Show Full PDF"), |
| | gr.update(visible=False), |
| | gr.update(interactive=False, visible=False), |
| | gr.update(interactive=False, visible=False), |
| | new_pdf_path, new_ref_pages, new_citations, new_removed_citations, new_appendix_header, new_ref_text, |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | gr.update(visible=False), |
| | False, |
| | gr.update(visible=False, value=""), |
| | None, |
| | basename, |
| | gr.update(visible=False), |
| | gr.update(visible=False, value=None)) |
| |
|
| | def _get_grobid_boundaries(pdf_path, page_indices): |
| | """Helper to get GROBID citation boundaries for specific pages.""" |
| | if not page_indices: |
| | return [] |
| | |
| | output_path = None |
| | try: |
| | doc = fitz.open(pdf_path) |
| | temp_grobid = tempfile.NamedTemporaryFile(delete=False, suffix="_grobid.pdf") |
| | output_path = temp_grobid.name |
| | temp_grobid.close() |
| | |
| | ref_doc = fitz.open() |
| | for page_idx in page_indices: |
| | ref_doc.insert_pdf(doc, from_page=page_idx, to_page=page_idx) |
| | |
| | ref_doc.save(output_path, garbage=4, deflate=True, clean=True, expand=True) |
| | ref_doc.close() |
| | doc.close() |
| | |
| | with open(output_path, 'rb') as f: |
| | files = {'input': (os.path.basename(output_path), f, 'application/pdf')} |
| | data = {'consolidateCitations': '0', 'includeRawCitations': '1'} |
| | response = requests.post( |
| | 'http://localhost:8070/api/processFulltextDocument', |
| | files=files, |
| | data=data, |
| | timeout=120 |
| | ) |
| | |
| | if response.status_code == 200: |
| | return parse_tei_citations(response.text) |
| | else: |
| | return [] |
| | except Exception: |
| | return [] |
| | finally: |
| | if output_path and os.path.exists(output_path): |
| | try: |
| | os.unlink(output_path) |
| | except: |
| | pass |
| |
|
| | def extract_citations_auto(view_mode, previous_status, state_pdf_path, state_ref_pages, state_ref_text, state_citations, state_removed_citations, state_appendix_header, state_extraction_done): |
| | """Extract citations using triple-pass hybrid pipeline to improve recall.""" |
| | |
| | |
| | def gen_update(status_txt, done=False, final_cits=[], final_rem=[], final_pages=None, final_text=None, final_header=None): |
| | |
| | cits = final_cits if final_cits is not None else state_citations |
| | rem = final_rem if final_rem is not None else state_removed_citations |
| | pages = final_pages if final_pages is not None else state_ref_pages |
| | text = final_text if final_text is not None else state_ref_text |
| | header = final_header if final_header is not None else state_appendix_header |
| | |
| | loading_update = gr.update(visible=False) if done else gr.update() |
| | |
| | verify_vis = done |
| | slider_vis = done |
| | headers_vis = done |
| | |
| | slider_max = len(cits) if cits else 1 |
| | slider_val = min(1, slider_max) |
| | |
| | |
| | citations_html_update = gr.update(visible=headers_vis) |
| | if done: |
| | display_text = format_citations_display(cits) |
| | if rem: |
| | display_text += "\n\nREMOVED CITATIONS ({})\n\n".format(len(rem)) |
| | display_text += format_citations_display(rem, show_reason=True) |
| | citations_html_update = gr.update(value=display_text, visible=headers_vis) |
| | else: |
| | citations_html_update = gr.update(visible=headers_vis) if done else gr.update() |
| | |
| | return (status_txt, |
| | citations_html_update, |
| | gr.update(interactive=verify_vis, visible=verify_vis), |
| | gr.update(interactive=slider_vis, maximum=slider_max, value=slider_val, visible=slider_vis), |
| | cits, rem, pages, text, header, |
| | gr.update(), |
| | loading_update, |
| | gr.update(visible=headers_vis), |
| | gr.update(visible=headers_vis), |
| | gr.update(visible=headers_vis), |
| | gr.update(visible=headers_vis), |
| | done, |
| | gr.update(visible=headers_vis), |
| | gr.update(visible=done), |
| | gr.update(visible=False, value=None)) |
| |
|
| | if not state_ref_pages or not state_pdf_path: |
| | yield gen_update(previous_status + "\n⚠ No reference pages to process", done=True) |
| | return |
| | |
| | try: |
| | start_page_idx = state_ref_pages[0] |
| | confirmed_ref_pages = [] |
| | per_page_citations = [] |
| | |
| | yield gen_update(previous_status + f"\n⏳ Scanning pages starting from {start_page_idx + 1}...") |
| | |
| | doc_temp = fitz.open(state_pdf_path) |
| | total_pages = len(doc_temp) |
| | doc_temp.close() |
| | |
| | current_page = start_page_idx |
| | |
| | while current_page < total_pages: |
| | yield gen_update(previous_status + f"\n⏳ Scanning Page {current_page + 1}... Citations will be displayed once finished.") |
| | |
| | page_cits = _get_grobid_boundaries(state_pdf_path, [current_page]) |
| | |
| | valid_count = 0 |
| | for c in page_cits: |
| | if c.get('title') or c.get('authors') or c.get('year'): |
| | valid_count += 1 |
| | |
| | if valid_count == 0: |
| | break |
| | else: |
| | confirmed_ref_pages.append(current_page) |
| | per_page_citations.append(page_cits) |
| | current_page += 1 |
| | |
| | if not confirmed_ref_pages: |
| | yield gen_update(previous_status + "\n⚠ No valid citations extracted from start page.", done=True) |
| | return |
| |
|
| | yield gen_update(previous_status + f"\n✓ Range confirmed: {confirmed_ref_pages[0]+1}-{confirmed_ref_pages[-1]+1}. Merging...", final_pages=confirmed_ref_pages) |
| | |
| | |
| | status_update = f"\n✓ Confirmed Reference Range: Pages {confirmed_ref_pages[0]+1}-{confirmed_ref_pages[-1]+1} ({len(confirmed_ref_pages)} pages)" |
| | previous_status += status_update |
| | |
| | state_ref_pages = confirmed_ref_pages |
| | |
| | |
| | updated_ref_text = "" |
| | doc_temp = fitz.open(state_pdf_path) |
| | for p_idx in state_ref_pages: |
| | updated_ref_text += doc_temp[p_idx].get_text("text") + "\n" |
| | |
| | |
| | last_page_text = doc_temp[state_ref_pages[-1]].get_text("text") |
| | lines = [l.strip() for l in last_page_text.splitlines() if l.strip()] |
| | |
| | appendix_keywords = ["appendix", "appendices", "supplement", "limitation", "checklist", "statement"] |
| | |
| | last_page_citations = per_page_citations[-1] |
| | citation_start_line_indices = [] |
| | for cit in last_page_citations: |
| | cit_text = cit.get('raw_text', '').strip() |
| | if not cit_text: continue |
| | cit_prefix = cit_text[:30].strip().lower() |
| | for k, line in enumerate(lines): |
| | if cit_prefix in line.lower(): |
| | citation_start_line_indices.append(k) |
| | break |
| | |
| | header_candidates = [] |
| | for i, line in enumerate(lines): |
| | line_lower = line.lower() |
| | if len(line.split()) <= 5: |
| | is_match = False |
| | if any(k in line_lower for k in appendix_keywords): |
| | is_match = True |
| | elif re.match(r'^A[\.\:]?$', line.split()[0] if line.split() else ""): |
| | is_match = True |
| | |
| | if is_match: |
| | candidate = line |
| | curr_idx = i + 1 |
| | while len(candidate) < 5 and curr_idx < len(lines): |
| | candidate += " " + lines[curr_idx] |
| | curr_idx += 1 |
| | |
| | has_citations_after = any(start_idx > i for start_idx in citation_start_line_indices) |
| | if not has_citations_after: |
| | header_candidates.append(candidate) |
| | |
| | if header_candidates: |
| | found_header = header_candidates[0] |
| | state_appendix_header = found_header |
| | else: |
| | state_appendix_header = None |
| | |
| | doc_temp.close() |
| | state_ref_text = updated_ref_text |
| | |
| | |
| | yield gen_update(previous_status + "\n⏳ Sending full context to GROBID...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header) |
| | grobid_citations_a = _get_grobid_boundaries(state_pdf_path, confirmed_ref_pages) |
| | |
| | |
| | import difflib |
| | list_i_pages = per_page_citations |
| | list_c = grobid_citations_a |
| | |
| | def get_text(cit): |
| | return cit.get('raw_text', '').strip() |
| |
|
| | refined_list_i = [] |
| | actions = {} |
| | |
| | for p_idx in range(len(list_i_pages)): |
| | current_page = list_i_pages[p_idx] |
| | if not current_page: continue |
| | |
| | cit_x = current_page[-1] |
| | cit_x_text = get_text(cit_x) |
| | |
| | cit_y = None |
| | cit_y_text = "" |
| | cit_z = None |
| | cit_z_text = "" |
| | |
| | if p_idx + 1 < len(list_i_pages) and list_i_pages[p_idx+1]: |
| | cit_y = list_i_pages[p_idx+1][0] |
| | cit_y_text = get_text(cit_y) |
| | if len(list_i_pages[p_idx+1]) > 1: |
| | cit_z = list_i_pages[p_idx+1][1] |
| | cit_z_text = get_text(cit_z) |
| | |
| | matches = [] |
| | for c_item in list_c: |
| | c_text = get_text(c_item) |
| | if cit_x_text in c_text: |
| | matches.append(c_item) |
| | |
| | best_action = None |
| | for cit_match in matches: |
| | match_text = get_text(cit_match) |
| | if cit_z and cit_z_text in match_text: continue |
| | if cit_y and cit_y_text in match_text: continue |
| | |
| | if len(match_text) > len(cit_x_text): |
| | best_action = {'type': 'extension', 'target': cit_match} |
| | break |
| | |
| | if best_action: |
| | actions[id(cit_x)] = best_action |
| | |
| | flat_list_i = [] |
| | skip_ids = set() |
| | for p_list in list_i_pages: |
| | for cit in p_list: |
| | if id(cit) in skip_ids: continue |
| | if id(cit) in actions: |
| | act = actions[id(cit)] |
| | if act['type'] == 'extension': |
| | flat_list_i.append(act['target']) |
| | else: |
| | flat_list_i.append(cit) |
| | |
| | texts_i = [get_text(c) for c in flat_list_i] |
| | texts_c = [get_text(c) for c in list_c] |
| | matcher = difflib.SequenceMatcher(None, texts_i, texts_c) |
| | final_merged_list = [] |
| | for tag, i1, i2, j1, j2 in matcher.get_opcodes(): |
| | if tag == 'equal': final_merged_list.extend(flat_list_i[i1:i2]) |
| | elif tag == 'delete': final_merged_list.extend(flat_list_i[i1:i2]) |
| | elif tag == 'insert': final_merged_list.extend(list_c[j1:j2]) |
| | elif tag == 'replace': final_merged_list.extend(flat_list_i[i1:i2]) |
| | |
| | grobid_citations = final_merged_list |
| | |
| | merged_citations = [] |
| | for cit in grobid_citations: |
| | raw_text = cit.get('raw_text', '').strip() |
| | has_url = extractor.has_urls(raw_text) or re.search(r'arxiv\.org|doi\.org|\bdoi:|\burl\b', raw_text, re.IGNORECASE) |
| | is_url_only = has_url and len(raw_text.split()) <= 6 |
| | |
| | if merged_citations and is_url_only: |
| | prev_cit = merged_citations[-1] |
| | prev_cit['raw_text'] = (prev_cit.get('raw_text', '') + " " + raw_text).strip() |
| | else: |
| | merged_citations.append(cit) |
| | |
| | grobid_citations = merged_citations |
| |
|
| | yield gen_update(previous_status + f"\n⏳ Parsing metadata for {len(grobid_citations)} citations...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header) |
| | |
| | |
| | parsed_citations = [] |
| | |
| | for idx, cit in enumerate(grobid_citations): |
| | |
| | if idx % 5 == 0: |
| | yield gen_update(previous_status + f"\n⏳ Parsing citation {idx+1}/{len(grobid_citations)}...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header) |
| | |
| | raw_text = cit.get('raw_text', '') |
| | grobid_xml = cit.get('grobid_xml', '') |
| | |
| | if idx == len(grobid_citations) - 1 and state_appendix_header: |
| | clean_header = state_appendix_header.strip()[:10].strip().lower() |
| | clean_header = re.sub(r'\s+', ' ', clean_header) |
| | raw_lower = re.sub(r'\s+', ' ', raw_text.lower()) |
| | cutoff_index = raw_lower.find(clean_header) |
| | if cutoff_index > 0: |
| | cleaned_raw_reference = raw_text[:cutoff_index].strip() |
| | cleaned_raw_reference = re.sub(r'(\.\s*See\s*|\s*See\s*|\.\s*)$', '', cleaned_raw_reference, flags=re.IGNORECASE).strip() |
| | raw_text = cleaned_raw_reference |
| | try: |
| | response = requests.post( |
| | 'http://localhost:8070/api/processCitation', |
| | data={'citations': cleaned_raw_reference, 'includeRawCitations': '1'}, |
| | timeout=30 |
| | ) |
| | if response.status_code == 200: |
| | grobid_xml = response.text |
| | raw_text = cleaned_raw_reference |
| | except Exception: |
| | pass |
| | |
| | parsed_fields = extract_title_and_authors_from_xml(grobid_xml) |
| | title = parsed_fields.get('title', '') |
| | authors = parsed_fields.get('authors', []) |
| | |
| | raw_text = raw_text.replace("- ", "") |
| | title = title.replace("- ", "") |
| | |
| | if title and len(title) > 5: |
| | clean_title_prefix = re.sub(r'\W+', '', title.lower()[:40]) |
| | if clean_title_prefix: |
| | pattern_parts = [re.escape(c) + r'[\W]*' for c in clean_title_prefix] |
| | fuzzy_pattern = r''.join(pattern_parts) |
| | raw_lower = raw_text.lower() |
| | t_match = re.search(fuzzy_pattern, raw_lower) |
| | if t_match: |
| | match_start = t_match.start() |
| | prev_dot = raw_text.rfind('.', 0, match_start) |
| | prev_q = raw_text.rfind('?', 0, match_start) |
| | prev_ex = raw_text.rfind('!', 0, match_start) |
| | prev_comma = raw_text.rfind(',', 0, match_start) |
| | boundary_idx = max(prev_dot, prev_q, prev_ex, prev_comma) |
| | start_idx = boundary_idx + 1 if boundary_idx != -1 else 0 |
| | missed_prefix = raw_text[start_idx:match_start].strip() |
| | if missed_prefix: |
| | title = f"{missed_prefix} {title}".strip() |
| |
|
| | title = clean_metadata(title) |
| | |
| | refined_authors = refine_author_string(raw_text, authors, title) |
| | refined_authors = clean_metadata(refined_authors) |
| | |
| | if title and len(title) > 8: |
| | if title in refined_authors: |
| | refined_authors = refined_authors.split(title)[0].strip() |
| | |
| | refined_authors = refined_authors.strip(".,;: -()") |
| |
|
| | citation = { |
| | 'raw_text': raw_text, |
| | 'title': title, |
| | 'authors': refined_authors, |
| | 'year': cit.get('year', ''), |
| | 'venue': cit.get('venue', '') |
| | } |
| | parsed_citations.append(citation) |
| | |
| | final_citations = [] |
| | final_removed_citations = [] |
| | |
| | for cit in parsed_citations: |
| | title = cit.get('title', '').strip() |
| | rejection_reason = None |
| | raw_text_clean = cit.get('raw_text', '').strip() |
| | alpha_chars = sum(c.isalnum() for c in raw_text_clean) |
| | alpha_density = alpha_chars / len(raw_text_clean) if raw_text_clean else 0 |
| | |
| | if title.lower().startswith("fig.") or title.lower().startswith("figure"): rejection_reason = "Figure caption detected" |
| | elif not title and not cit.get('authors') and not cit.get('year'): rejection_reason = "Missing title, authors, and year" |
| | elif raw_text_clean.lower() in ["references", "bibliography", "works cited"]: rejection_reason = "Section header detected" |
| | elif len(raw_text_clean) > 5 and alpha_density < 0.3: rejection_reason = "Likely noise or artifact (low text density)" |
| | |
| | if rejection_reason: |
| | cit['rejection_reason'] = rejection_reason |
| | final_removed_citations.append(cit) |
| | continue |
| | |
| | is_dup = False |
| | for existing in final_citations: |
| | existing_text = existing.get('raw_text', '').strip() |
| | if jellyfish.jaro_winkler_similarity(raw_text_clean, existing_text) >= 0.95: |
| | is_dup = True |
| | break |
| |
|
| | if not is_dup: final_citations.append(cit) |
| | else: |
| | cit['rejection_reason'] = "Duplicate (95%+ similarity)" |
| | final_removed_citations.append(cit) |
| | |
| | status = previous_status + f"\n✓ Hybrid extraction: {len(final_citations)} citations (+{len(final_removed_citations)} filtered)" |
| | |
| | |
| | yield gen_update(status, done=True, final_cits=final_citations, final_rem=final_removed_citations, final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header) |
| | |
| | except Exception as e: |
| | |
| | yield gen_update(previous_status + f"\n❌ Error: {str(e)}", done=True, final_cits=[], final_rem=[]) |
| |
|
| | def run_citation_check(num_to_check, previous_status, api_key, state_citations): |
| | """Run citation check with per-user state.""" |
| | |
| | if not state_citations: |
| | |
| | yield (previous_status + "\n⚠ No citations to verify.", |
| | gr.update(), state_citations) |
| | return |
| | |
| | |
| | sample_author_strings = [cit.get('authors', '') for cit in state_citations[:10] if cit.get('authors') and isinstance(cit.get('authors'), str)] |
| | name_order, separator = identify_author_pattern(sample_author_strings) |
| | |
| | |
| | import copy |
| | to_check = copy.deepcopy(state_citations[:num_to_check]) |
| | |
| | |
| | api_key_clean = api_key.strip() if api_key else None |
| | |
| | |
| | updated_citations = list(state_citations) |
| | total = len(to_check) |
| | |
| | |
| | for i, verified_cit in enumerate(check_citations_semantic_scholar(to_check, api_key=api_key_clean, name_order=name_order, separator=separator)): |
| | |
| | if i < len(updated_citations): |
| | updated_citations[i] = verified_cit |
| | |
| | |
| | |
| | status_msg = f"{previous_status}\n⏳ Verifying citation {i+1}/{total}... Results will be displayed once finished." |
| | updated_cit_html = format_citations_display(updated_citations) |
| | yield (status_msg, gr.update(), updated_cit_html, updated_citations) |
| |
|
| | |
| | final_ver_html = format_verifications_display(updated_citations) |
| | final_cit_html = format_citations_display(updated_citations) |
| | v_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'verified') |
| | a_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'ambiguous') |
| | h_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'suspected_hallucination') |
| | e_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'api_error') |
| | status_msg = f"Verification Complete: ✅ {v_count} | ⚠️ {a_count} | ❌ {h_count} | 🔌 {e_count}" |
| | |
| | yield (status_msg, final_ver_html, final_cit_html, updated_citations) |
| |
|
| | def format_citations_display(citations, show_reason=False): |
| | """Format citations for display as HTML.""" |
| | if not citations: |
| | return "" |
| | |
| | import html as html_lib |
| | |
| | html_output = "<div class='citations-container'>" |
| | |
| | for i, cit in enumerate(citations, 1): |
| | |
| | raw_text = cit.get('raw_text', 'No citation text') |
| | safe_raw = html_lib.escape(raw_text) |
| | |
| | cit_block = f"<div class='citation-item'>" |
| | cit_block += f"<div><strong>[{i}]</strong> {safe_raw}" |
| | |
| | if show_reason and 'rejection_reason' in cit: |
| | reason = html_lib.escape(cit['rejection_reason']) |
| | cit_block += f" <span class='rejection-reason'>[REASON: {reason}]</span>" |
| | |
| | cit_block += "</div>" |
| | |
| | |
| | title = cit.get('title', '') |
| | if title: |
| | cit_block += "<div class='citation-metadata'>" |
| | safe_title = html_lib.escape(title) |
| | cit_block += f"<div style='margin-bottom: 2px;'>Title: {safe_title}</div>" |
| | cit_block += "</div>" |
| | |
| | |
| | title_after = cit.get('title_after_verification', '') |
| | authors_after = cit.get('authors_after_verification', '') |
| | |
| | if title_after or authors_after: |
| | cit_block += "<div class='ver-verified'>" |
| | if title_after: |
| | safe_title_after = html_lib.escape(title_after) |
| | cit_block += f"<div style='margin-bottom: 2px;'><strong>Title:</strong> {safe_title_after}</div>" |
| | if authors_after: |
| | if isinstance(authors_after, list): |
| | auth_str_after = ", ".join(authors_after) |
| | else: |
| | auth_str_after = str(authors_after) |
| | safe_authors_after = html_lib.escape(auth_str_after) |
| | cit_block += f"<div><strong>Authors:</strong> {safe_authors_after}</div>" |
| | cit_block += "</div>" |
| | |
| | cit_block += "</div>" |
| | html_output += cit_block |
| | |
| | html_output += "</div>" |
| | return html_output |
| |
|
| | def refine_author_string(raw_text, grobid_authors, title=None): |
| | """ |
| | Simplified Author Extraction: |
| | Starts at index 0 and extracts up until the segment (separated by period or comma) |
| | that contains a 4-digit Year or the Title. |
| | """ |
| | if not raw_text: |
| | return "" |
| | |
| | raw_lower = raw_text.lower() |
| | |
| | |
| | possible_starts = [] |
| | |
| | |
| | year_match = re.search(r'\b(19|20|21)\d{2}\b', raw_text) |
| | if year_match: |
| | possible_starts.append(year_match.start()) |
| | |
| | |
| | if title and len(title) > 5: |
| | |
| | clean_title_prefix = re.sub(r'\W+', '', title.lower()[:20]) |
| | if clean_title_prefix: |
| | pattern_parts = [re.escape(c) + r'[\W]*' for c in clean_title_prefix] |
| | fuzzy_pattern = r''.join(pattern_parts) |
| | t_match = re.search(fuzzy_pattern, raw_lower) |
| | if t_match: |
| | possible_starts.append(t_match.start()) |
| | |
| | |
| | if not possible_starts: |
| | |
| | return raw_text.strip() |
| | |
| | metadata_begin = min(possible_starts) |
| | |
| | |
| | |
| | preceding_text = raw_text[:metadata_begin] |
| | last_period = preceding_text.rfind('.') |
| | last_comma = preceding_text.rfind(',') |
| | |
| | boundary_idx = max(last_period, last_comma) |
| | |
| | if boundary_idx != -1: |
| | |
| | |
| | segment = raw_text[0:boundary_idx + 1].strip() |
| | else: |
| | |
| | |
| | segment = raw_text[0:metadata_begin].strip() |
| | |
| | |
| | segment = segment.rstrip(".,:; ") |
| | |
| | return segment |
| |
|
| | def identify_author_pattern(author_strings): |
| | """ |
| | Analyzes a list of author strings (top 10) to identify the naming pattern. |
| | Returns: (name_order, separator) |
| | """ |
| | if not author_strings: |
| | return "first_last", "," |
| |
|
| | |
| | |
| | total_semicolons = sum(s.count(";") for s in author_strings) |
| | total_commas = sum(s.count(",") for s in author_strings) |
| | main_sep = ";" if total_semicolons > (total_commas // 2) else "," |
| | |
| | |
| | order = None |
| | |
| | if main_sep == ";": |
| | |
| | internal_comma_count = 0 |
| | total_parts = 0 |
| | for s in author_strings: |
| | |
| | s_clean = re.sub(r'\s+(?:and|&)\s+', '; ', s, flags=re.IGNORECASE) |
| | parts = [p.strip() for p in s_clean.split(';') if p.strip()] |
| | for p in parts: |
| | total_parts += 1 |
| | if "," in p: internal_comma_count += 1 |
| | |
| | if total_parts > 0 and internal_comma_count >= (total_parts * 0.5): |
| | order = "last_first" |
| | else: |
| | order = "first_last" |
| | else: |
| | |
| | |
| | single_word_parts = 0 |
| | total_parts = 0 |
| | for s in author_strings: |
| | |
| | s_clean = re.sub(r'\s+(?:and|&)\s+', ', ', s, flags=re.IGNORECASE) |
| | parts = [p.strip() for p in s_clean.split(",") if p.strip()] |
| | for p in parts: |
| | total_parts += 1 |
| | if len(p.split(" ")) == 1: |
| | single_word_parts += 1 |
| | |
| | if total_parts > 0 and single_word_parts >= (total_parts * 0.7): |
| | order = "last_first" |
| | else: |
| | order = "first_last" |
| | |
| | if order is None: |
| | order = "first_last" |
| |
|
| | return order, main_sep |
| |
|
| | def parse_names_by_pattern(author_string, order, separator): |
| | """ |
| | Robustly parses author string using a global pattern and divider. |
| | """ |
| | if not author_string: |
| | return [] |
| | |
| | author_string = re.sub(r'\b(et\s*al\.?|etal)\b', '', author_string, flags=re.IGNORECASE) |
| | s = re.sub(r'\b(?:and|&)\b', separator, author_string, flags=re.IGNORECASE) |
| | sep_esc = re.escape(separator) |
| | |
| | s = re.sub(sep_esc + r'[\s' + sep_esc + r']*' + sep_esc, separator, s) |
| | |
| | s = s.strip().strip(separator).strip() |
| | |
| | |
| | segments = [p.strip() for p in s.split(separator) if p.strip()] |
| | |
| | |
| | raw_names = [] |
| | if order == "last_first" and separator == ",": |
| | |
| | i = 0 |
| | while i < len(segments): |
| | p1 = segments[i] |
| | if i + 1 < len(segments): |
| | p2 = segments[i+1] |
| | raw_names.append(f"{p1}, {p2}") |
| | i += 2 |
| | else: |
| | raw_names.append(p1) |
| | i += 1 |
| | else: |
| | |
| | raw_names = segments |
| | |
| | |
| | authors = [] |
| | for name in raw_names: |
| | norm = normalize_d_author(name) |
| | if norm: |
| | authors.append(norm) |
| | |
| | return authors |
| |
|
| | def format_verifications_display(citations): |
| | """Format citations with verification status badges.""" |
| | |
| | if not citations: |
| | return "<p>No citations extracted yet.</p>" |
| | |
| | |
| | html_parts = ["<div class='ver-badge-container'>"] |
| | |
| | for i, cit in enumerate(citations, 1): |
| | verification = cit.get('verification', {}) |
| | |
| | import html as html_lib |
| | raw_text = cit.get('raw_text', 'No citation text') |
| | safe_raw = html_lib.escape(raw_text) |
| | |
| | html_parts.append(f"<div class='ver-item'>") |
| | html_parts.append(f"<div><strong>[{i}]</strong> {safe_raw}</div>") |
| | |
| | |
| | verification = cit.get('verification', {}) |
| | |
| | status = verification.get('status', 'not_verified') |
| | icon = verification.get('icon', '') |
| | |
| | if status == 'verified': |
| | confidence = verification.get('confidence', 0) |
| | title_score = verification.get('title_score', 0) |
| | author_score = verification.get('author_score', 0) |
| | html_parts.append(f"<div class='ver-status-verified'>") |
| | html_parts.append(f"<strong>{icon} Verified (Confidence: {confidence:.2%})</strong>") |
| | html_parts.append(f"<br/><small>Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}</small>") |
| | html_parts.append("</div>") |
| | |
| | elif status == 'ambiguous': |
| | confidence = verification.get('confidence', 0) |
| | title_score = verification.get('title_score', 0) |
| | author_score = verification.get('author_score', 0) |
| | html_parts.append(f"<div class='ver-status-ambiguous'>") |
| | html_parts.append(f"<strong>{icon} Ambiguous (Confidence: {confidence:.2%})</strong>") |
| | html_parts.append(f"<br/><small>Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}</small>") |
| | html_parts.append("</div>") |
| | |
| | elif status == 'suspected_hallucination': |
| | confidence = verification.get('confidence', 0) |
| | title_score = verification.get('title_score', 0) |
| | author_score = verification.get('author_score', 0) |
| | html_parts.append(f"<div class='ver-status-hallucination'>") |
| | html_parts.append(f"<strong>{icon} Suspected Hallucination (Confidence: {confidence:.2%})</strong>") |
| | html_parts.append(f"<br/><small>Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}</small>") |
| | html_parts.append("</div>") |
| | |
| | elif status == 'api_error': |
| | error_msg = verification.get('error', 'Unknown error') |
| | is_no_result = error_msg == "No search results found by API" |
| | label = "Verification Note" if is_no_result else "API Error" |
| | |
| | html_parts.append(f"<div class='ver-status-error'>") |
| | html_parts.append(f"<strong>{icon} {label}</strong><br/>") |
| | html_parts.append(f"<small>{error_msg}</small>") |
| | html_parts.append("</div>") |
| |
|
| | elif status == 'not_verified' or not verification: |
| | html_parts.append(f"<div class='ver-status-unverified'>") |
| | html_parts.append(f"<strong>Not Verified</strong>") |
| | html_parts.append("</div>") |
| | |
| | html_parts.append("</div>") |
| | |
| | html_parts.append("</div>") |
| | return ''.join(html_parts) |
| |
|
| | def export_verifications_csv(state_citations, pdf_name): |
| | """Export citation verifications to a CSV file.""" |
| | if not state_citations: |
| | return None |
| | |
| | import csv |
| | |
| | |
| | basename = os.path.splitext(pdf_name)[0] if pdf_name else "verifications" |
| | csv_filename = f"{basename}_verifications.csv" |
| | |
| | |
| | temp_dir = tempfile.mkdtemp() |
| | filepath = os.path.join(temp_dir, csv_filename) |
| |
|
| | try: |
| | with open(filepath, 'w', newline='', encoding='utf-8') as csvfile: |
| | fieldnames = [ |
| | 'ID', 'Status', 'Confidence', 'Title Similarity', 'Author Similarity', |
| | 'Raw Citation', 'Title', 'Authors', |
| | 'API Title', 'API Authors' |
| | ] |
| | writer = csv.DictWriter(csvfile, fieldnames=fieldnames) |
| | writer.writeheader() |
| | |
| | for i, cit in enumerate(state_citations, 1): |
| | verification = cit.get('verification', {}) |
| | status = verification.get('status', 'not_verified') |
| | confidence = verification.get('confidence', 0) |
| | t_score = verification.get('title_score', 0) |
| | a_score = verification.get('author_score', 0) |
| | |
| | semantic_data = verification.get('semantic_data', {}) |
| | api_title = semantic_data.get('title', '') if semantic_data else '' |
| | api_authors_list = semantic_data.get('authors', []) if semantic_data else [] |
| | if api_authors_list: |
| | if isinstance(api_authors_list[0], dict): |
| | api_authors = ", ".join([a.get('name', '') for a in api_authors_list if a.get('name')]) |
| | else: |
| | api_authors = ", ".join([str(a) for a in api_authors_list if a]) |
| | else: |
| | api_authors = "" |
| |
|
| | raw_text = cit.get('raw_text', '') |
| | |
| | ver_title = cit.get('title_after_verification', '') |
| | ver_authors = cit.get('authors_after_verification', '') |
| | if isinstance(ver_authors, list): |
| | ver_authors = ", ".join(ver_authors) |
| | elif not isinstance(ver_authors, str): |
| | ver_authors = str(ver_authors) |
| | |
| | writer.writerow({ |
| | 'ID': i, |
| | 'Status': status, |
| | 'Confidence': f"{confidence:.2%}" if status != 'not_verified' else 'N/A', |
| | 'Title Similarity': f"{t_score:.2%}" if status != 'not_verified' else 'N/A', |
| | 'Author Similarity': f"{a_score:.2%}" if status != 'not_verified' else 'N/A', |
| | 'Raw Citation': raw_text, |
| | 'Title': ver_title, |
| | 'Authors': ver_authors, |
| | 'API Title': api_title, |
| | 'API Authors': api_authors |
| | }) |
| | return filepath |
| | except Exception: |
| | return None |
| |
|
| | def update_view(view_mode, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path): |
| | """Update the view based on selected mode. Controls GROUP visibility.""" |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | vis_full = gr.update(visible=False) |
| | vis_ref = gr.update(visible=False) |
| | vis_cit = gr.update(visible=False) |
| | vis_ver = gr.update(visible=False) |
| | |
| | upd_ref_pdf = gr.update() |
| | upd_cit_disp = gr.update() |
| | upd_ver_disp = gr.update() |
| | upd_load = gr.update(visible=False) |
| | |
| | if not state_extraction_done and view_mode != "Show Full PDF": |
| | |
| | upd_load = gr.update(visible=True) |
| | |
| | return (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path) |
| |
|
| | if view_mode == "Show Full PDF": |
| | vis_full = gr.update(visible=True) |
| | |
| | yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path) |
| |
|
| | elif view_mode == "Show Reference Pages": |
| | vis_ref = gr.update(visible=True) |
| | |
| | |
| | if state_ref_pdf_path and os.path.exists(state_ref_pdf_path): |
| | |
| | upd_ref_pdf = gr.update(value=state_ref_pdf_path) |
| | else: |
| | |
| | if state_ref_pages and state_pdf_path: |
| | doc = fitz.open(state_pdf_path) |
| | new_doc = fitz.open() |
| | new_doc.insert_pdf(doc, from_page=state_ref_pages[0], to_page=state_ref_pages[-1]) |
| | temp_preview = tempfile.NamedTemporaryFile(delete=False, suffix="_ref_subset.pdf") |
| | output_path = temp_preview.name |
| | temp_preview.close() |
| | new_doc.save(output_path, garbage=4, deflate=True, clean=True, expand=True) |
| | new_doc.close() |
| | doc.close() |
| | |
| | state_ref_pdf_path = output_path |
| | |
| | upd_ref_pdf = gr.update(value=output_path) |
| | |
| | yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path) |
| |
|
| | elif view_mode == "Show Citations": |
| | vis_cit = gr.update(visible=True) |
| | |
| | yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path) |
| | |
| | elif view_mode == "Show Verifications": |
| | vis_ver = gr.update(visible=True) |
| | |
| | |
| | formatted_ver = format_verifications_display(state_citations) |
| | upd_ver_disp = gr.update(value=formatted_ver) |
| | |
| | |
| | yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path) |
| |
|
| | |
| | with gr.Blocks(title="CiteAudit", css=""" |
| | /* Container Styles */ |
| | #pdf-viewer-full, #pdf-viewer-ref { |
| | height: 700px; |
| | width: 100%; |
| | } |
| | |
| | #view-citations, #view-verifications { |
| | border: none !important; |
| | box-shadow: none !important; |
| | background-color: transparent !important; |
| | } |
| | |
| | #citations-list, #view-verifications .gr-html { |
| | background-color: transparent !important; |
| | } |
| | |
| | #main-display-area { |
| | min-height: 700px; |
| | border-radius: 8px; |
| | background-color: var(--background-fill-primary); |
| | } |
| | |
| | /* Citation List */ |
| | .citations-container { |
| | font-family: sans-serif; |
| | font-size: 14px; |
| | line-height: 1.5; |
| | color: var(--body-text-color); |
| | max-height: 600px; |
| | overflow-y: auto; |
| | padding: 12px; |
| | border: 1px solid var(--border-color-primary); |
| | border-radius: 4px; |
| | background-color: var(--background-fill-secondary); |
| | } |
| | |
| | .citation-item { |
| | margin-bottom: 16px; |
| | padding-bottom: 8px; |
| | border-bottom: 1px solid var(--border-color-primary); |
| | } |
| | |
| | .rejection-reason { |
| | color: #ef5350; /* Red 400 */ |
| | font-weight: bold; |
| | margin-left: 8px; |
| | } |
| | .dark .rejection-reason { |
| | color: #ef9a9a; /* Red 200 */ |
| | } |
| | |
| | .citation-metadata { |
| | color: var(--body-text-color-subdued); |
| | margin-left: 24px; |
| | font-size: 0.95em; |
| | margin-top: 4px; |
| | } |
| | |
| | /* Verification Styles */ |
| | .ver-verified { |
| | color: #1b5e20; /* Green 900 */ |
| | margin-left: 24px; |
| | font-size: 0.95em; |
| | margin-top: 6px; |
| | padding: 4px; |
| | background-color: #e8f5e9; /* Green 50 */ |
| | border-left: 3px solid #4caf50; /* Green 500 */ |
| | } |
| | .dark .ver-verified { |
| | color: #a5d6a7; /* Green 200 */ |
| | background-color: rgba(27, 94, 32, 0.4); /* Dark Green alpha */ |
| | border-left-color: #66bb6a; /* Green 400 */ |
| | } |
| | |
| | /* Status Badges in format_verifications_display */ |
| | .ver-badge-container { |
| | font-family: monospace; |
| | font-size: 14px; |
| | background-color: var(--background-fill-secondary); |
| | padding: 15px; |
| | border-radius: 5px; |
| | color: var(--body-text-color); |
| | } |
| | |
| | .ver-item { |
| | margin-bottom: 20px; |
| | padding: 10px; |
| | border: 1px solid var(--border-color-primary); |
| | border-radius: 5px; |
| | } |
| | |
| | .ver-status-verified { |
| | margin-top: 8px; |
| | padding: 6px; |
| | background-color: #e8f5e9; |
| | border-left: 3px solid #4caf50; |
| | color: #1b5e20; /* Darker Text */ |
| | } |
| | .dark .ver-status-verified { |
| | background-color: rgba(27, 94, 32, 0.4); |
| | border-left-color: #66bb6a; |
| | color: #e8f5e9; /* Light Text */ |
| | } |
| | .ver-status-verified strong, .ver-verified strong { color: inherit; } |
| | |
| | |
| | .ver-status-ambiguous { |
| | margin-top: 8px; |
| | padding: 6px; |
| | background-color: #fff3e0; |
| | border-left: 3px solid #ff9800; |
| | color: #e65100; |
| | } |
| | .dark .ver-status-ambiguous { |
| | background-color: rgba(230, 81, 0, 0.3); |
| | border-left-color: #ffb74d; |
| | color: #ffe0b2; |
| | } |
| | |
| | .ver-status-hallucination { |
| | margin-top: 8px; |
| | padding: 6px; |
| | background-color: #ffebee; |
| | border-left: 3px solid #f44336; |
| | color: #c62828; |
| | } |
| | .dark .ver-status-hallucination { |
| | background-color: rgba(183, 28, 28, 0.3); |
| | border-left-color: #e57373; |
| | color: #ffcdd2; |
| | } |
| | |
| | .ver-status-error { |
| | margin-top: 8px; |
| | padding: 6px; |
| | background-color: #fafafa; |
| | border-left: 3px solid #9e9e9e; |
| | color: #424242; |
| | } |
| | .dark .ver-status-error { |
| | background-color: rgba(66, 66, 66, 0.4); |
| | border-left-color: #bdbdbd; |
| | color: #e0e0e0; |
| | } |
| | |
| | .ver-status-unverified { |
| | margin-top: 8px; |
| | padding: 6px; |
| | background-color: #f5f5f5; |
| | border-left: 3px solid #bdbdbd; |
| | color: #757575; |
| | } |
| | .dark .ver-status-unverified { |
| | background-color: rgba(97, 97, 97, 0.3); |
| | border-left-color: #9e9e9e; |
| | color: #bdbdbd; |
| | } |
| | """) as demo: |
| | |
| | state_pdf_path = gr.State(None) |
| | state_ref_pages = gr.State([]) |
| | state_citations = gr.State([]) |
| | state_removed_citations = gr.State([]) |
| | state_appendix_header = gr.State(None) |
| | state_ref_text = gr.State("") |
| | state_extraction_done = gr.State(False) |
| | state_ref_pdf_path = gr.State(None) |
| | state_pdf_name = gr.State("") |
| | |
| | gr.Markdown("# CiteAudit") |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | file_input = gr.File(label="Upload PDF", file_types=[".pdf"]) |
| | status_text = gr.Textbox(label="Status", interactive=False, lines=6) |
| | |
| | view_toggle = gr.Radio( |
| | choices=["Show Full PDF", "Show Reference Pages", "Show Citations", "Show Verifications"], |
| | value="Show Full PDF", |
| | label="View Mode", |
| | interactive=True, |
| | visible=False |
| | ) |
| | |
| | verification_divider = gr.Markdown("---", visible=False) |
| | verification_header = gr.Markdown("### Citation Verification", visible=False) |
| | |
| | api_key_input = gr.Textbox( |
| | label="Semantic Scholar API Key (Optional)", |
| | placeholder="Leave empty for free tier (with rate limits)", |
| | type="password", |
| | interactive=True, |
| | visible=False |
| | ) |
| | |
| | verify_btn = gr.Button("✅ Verify Citations", variant="secondary", interactive=False, visible=False) |
| | |
| | check_count_slider = gr.Slider( |
| | minimum=1, |
| | maximum=50, |
| | value=1, |
| | step=1, |
| | label="Number of citations to check", |
| | interactive=False, |
| | visible=False |
| | ) |
| | |
| | export_btn = gr.Button("📊 Download Verifications (CSV)", visible=False) |
| | download_file = gr.File(label="Download CSV", visible=False) |
| | |
| | gr.Markdown("<br/><small style='color: var(--body-text-color-subdued);'>* Automated verification may have mistakes and are restricted to returns from Semantic Scholar API. Please check all your citations.</small>") |
| | |
| | with gr.Column(scale=2, elem_id="main-display-area"): |
| | |
| | loading_indicator = gr.Markdown("## ⏳ Extracting content...", visible=False) |
| | |
| | |
| | with gr.Group(visible=True) as view_full_pdf: |
| | |
| | pdf_viewer_full = PDF(label="Full PDF", elem_id="pdf-viewer-full", interactive=False) |
| | |
| | |
| | with gr.Group(visible=False) as view_ref_pages: |
| | |
| | pdf_viewer_ref = PDF(label="Reference Pages", elem_id="pdf-viewer-ref", interactive=False) |
| | |
| | |
| | with gr.Group(visible=False, elem_id="view-citations") as view_citations: |
| | citations_header = gr.Markdown("### Extracted Citations") |
| | citations_display = gr.HTML(elem_id="citations-list") |
| | |
| | |
| | with gr.Group(visible=False, elem_id="view-verifications") as view_verifications: |
| | corrected_display = gr.HTML(label="Corrected Citations") |
| | |
| | file_input.upload( |
| | fn=process_pdf_initial, |
| | inputs=[file_input, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text], |
| | outputs=[file_input, status_text, pdf_viewer_full, view_toggle, citations_display, verify_btn, check_count_slider, |
| | state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text, |
| | citations_header, verification_header, verification_divider, api_key_input, state_extraction_done, corrected_display, state_ref_pdf_path, state_pdf_name, export_btn, download_file] |
| | ).then( |
| | fn=extract_citations_auto, |
| | inputs=[view_toggle, status_text, state_pdf_path, state_ref_pages, state_ref_text, state_citations, state_removed_citations, state_appendix_header, state_extraction_done], |
| | outputs=[status_text, citations_display, verify_btn, check_count_slider, state_citations, state_removed_citations, state_ref_pages, state_ref_text, state_appendix_header, pdf_viewer_ref, loading_indicator, citations_header, verification_header, verification_divider, api_key_input, state_extraction_done, corrected_display, export_btn, download_file], |
| | show_progress="hidden" |
| | ).then( |
| | fn=update_view, |
| | inputs=[view_toggle, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path], |
| | outputs=[view_full_pdf, view_ref_pages, view_citations, view_verifications, pdf_viewer_ref, citations_display, corrected_display, loading_indicator, state_ref_pdf_path] |
| | ) |
| | |
| | verify_btn.click( |
| | fn=lambda status: ( |
| | gr.update(value="Show Verifications"), |
| | status + "\n⏳ Starting verification process... Please wait.", |
| | gr.update(), |
| | gr.update(visible=False, value=None), |
| | gr.update(visible=False) |
| | ), |
| | inputs=[status_text], |
| | outputs=[view_toggle, status_text, corrected_display, download_file, export_btn] |
| | ).then( |
| | fn=run_citation_check, |
| | inputs=[check_count_slider, status_text, api_key_input, state_citations], |
| | outputs=[status_text, corrected_display, citations_display, state_citations], |
| | show_progress="hidden" |
| | ).then( |
| | fn=lambda: gr.update(visible=True), |
| | inputs=None, |
| | outputs=[export_btn] |
| | ) |
| | |
| | export_btn.click( |
| | fn=export_verifications_csv, |
| | inputs=[state_citations, state_pdf_name], |
| | outputs=[download_file] |
| | ).then( |
| | fn=lambda: gr.update(visible=True), |
| | inputs=None, |
| | outputs=[download_file] |
| | ) |
| | |
| | view_toggle.change( |
| | fn=update_view, |
| | inputs=[view_toggle, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path], |
| | outputs=[view_full_pdf, view_ref_pages, view_citations, view_verifications, pdf_viewer_ref, citations_display, corrected_display, loading_indicator, state_ref_pdf_path], |
| | concurrency_limit=None, |
| | show_progress="hidden" |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(server_name="0.0.0.0", server_port=7860, show_api=False) |
| |
|