File size: 93,854 Bytes
e83b370 f37904b cea5652 e83b370 bd2540a e83b370 a4d5a4d e83b370 41028b7 e83b370 cb364d2 41028b7 e83b370 41028b7 e83b370 bd2540a e83b370 cea5652 e83b370 bd2540a e83b370 f37904b e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 f37904b e83b370 bd2540a e83b370 f37904b e83b370 cea5652 f37904b cea5652 e83b370 cea5652 e83b370 cea5652 e83b370 f37904b e83b370 f37904b e83b370 cea5652 e83b370 bd2540a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 | import gradio as gr
from gradio_pdf import PDF
import fitz
import os
import tempfile
import json
import requests
import xml.etree.ElementTree as ET
import re
import time
import sys
from collections import OrderedDict
import Levenshtein
import jellyfish
from unidecode import unidecode
from venues import VENUE_NAMES, VENUE_ABBREVIATIONS, COMMON_TERMS
from urlextract import URLExtract
# Semantic Scholar Status Codes
SEMANTIC_SCHOLAR_STATUS_CODES = {
200: "OK: Request successful",
400: "Bad Request: Check parameters",
401: "Unauthorized: Invalid API key",
403: "Forbidden: No permission",
404: "Not Found: Endpoint or resource missing",
429: "Too Many Requests: Rate limited",
500: "Internal Server Error: Server-side issue"
}
# Initialize URL extractor
extractor = URLExtract()
def cleanup_old_temp_files(max_age_hours=1):
"""Clean up old temporary files from /tmp to save disk space.
Safe for multi-user: Only deletes files that match our specific app patterns
and are reliably 'old' (default > 1 hour).
"""
import time
now = time.time()
cutoff = now - (max_age_hours * 3600)
temp_dir = tempfile.gettempdir()
if not os.path.exists(temp_dir):
return
# patterns to look for (created by NamedTemporaryFile in our app)
# We look for files ending with our specific suffixes
target_suffixes = ("_grobid.pdf", "_ref_subset.pdf", "_verifications.csv")
try:
for filename in os.listdir(temp_dir):
if filename.endswith(target_suffixes):
file_path = os.path.join(temp_dir, filename)
try:
# Check age
if os.path.getmtime(file_path) < cutoff:
# Double check it's a file, not a directory
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception:
pass
except Exception as e:
print(f"Error during temp file cleanup: {e}")
def normalize_title_for_comparison(title):
"""Normalize title for similarity comparison: lowercase, remove punctuation."""
if not title:
return ""
# Lowercase and remove all non-alphanumeric/space characters
normalized = re.sub(r'[^a-zA-Z0-9\s]', ' ', title.lower())
# Collapse multiple spaces
return ' '.join(normalized.split())
def normalize_api_author(name):
"""Normalize author name strictly for API-sourced strings.
Handles 'Last, First' vs 'First Last' robustly.
"""
if not name:
return ""
# 1. ASCII normalization
name = unidecode(name)
# 2. Remove "et al" and "etal"
name = re.sub(r'\b(et\s*al\.?|etal)\b', '', name, flags=re.IGNORECASE).strip()
# 3. Detect "Last, First" vs "First Last"
if "," in name:
parts = name.split(",", 1)
surname = parts[0].strip()
given_name = parts[1].strip() if len(parts) > 1 else ""
else:
parts = name.split()
if not parts: return ""
if len(parts) == 1:
surname = parts[0]
given_name = ""
else:
surname = parts[-1]
# Everything before the last word is given name metadata
given_name = " ".join(parts[:-1])
# 4. Clean up the parts and generate initials
surname = re.sub(r'[^a-zA-Z]', '', surname).lower()
# Process given_name for initials
# Replace non-alpha with spaces to separate compact initials like 'J.K.'
given_clean = re.sub(r'[^a-zA-Z]', ' ', given_name).lower()
given_parts = given_clean.split()
initials = [g[0] for g in given_parts if g]
initials_str = " ".join(initials)
result = f"{surname} {initials_str}".strip()
return result
def normalize_d_author(name):
"""Normalize author name for PDF-sourced strings (simpler logic).
Takes last word as surname + first initial of first word.
"""
if not name:
return ""
# 1. ASCII normalization & strip
n = unidecode(name).strip()
# 2. Check for "Last, First" comma (from parse_names_by_pattern regrouping)
if "," in n:
parts = n.split(",", 1)
surname = re.sub(r'[^a-zA-Z\s]', '', parts[0]).strip().lower()
if len(parts) > 1:
# Split the part after comma into words (First Middle)
given_raw = parts[1].strip()
# Replace non-alpha with spaces to separate compact initials like 'J.K.'
given_clean = re.sub(r'[^a-zA-Z]', ' ', given_raw)
given_parts = given_clean.split()
# Abbreviate each word
initials = [g[0].lower() for g in given_parts if g]
initials_str = " ".join(initials)
else:
initials_str = ""
else:
# 3. Fallback: Last word is surname (First Middle Last format)
# Replace non-alpha with spaces to separate compact initials like 'J.K.'
n_clean = re.sub(r'[^a-zA-Z]', ' ', n)
parts = n_clean.split()
if not parts:
return ""
if len(parts) == 1:
surname = parts[0].lower()
initials_str = ""
else:
surname = parts[-1].lower()
# All words before the last one are treated as First/Middle names
# We take the first letter of each to form initials
initials = [p[0].lower() for p in parts[:-1] if p]
initials_str = " ".join(initials)
result = f"{surname} {initials_str}".strip()
return result
def calculate_title_similarity(d_title, api_title):
"""Calculate the similarity between two titles."""
norm_raw = normalize_title_for_comparison(d_title)
norm_api = normalize_title_for_comparison(api_title)
if not norm_raw or not norm_api:
return 0.0
return Levenshtein.ratio(norm_raw, norm_api)
def calculate_citation_recall(candidate_title, raw_citation):
"""
Calculate recall: roughly, how much of the candidate title is present in the raw citation?
We use fuzz matching to find the best substring in raw_citation that matches candidate_title.
Recall = (Length of Matched Substring) / (Length of Candidate Title)
Note: Ideally this should be close to 1.0 if the title is fully present.
"""
if not candidate_title or not raw_citation:
return 0.0
norm_cand = normalize_title_for_comparison(candidate_title)
norm_raw = normalize_title_for_comparison(raw_citation)
if not norm_cand or not norm_raw:
return 0.0
# Standard fuzzy substring search logic (similar to calculate_title_similarity but focus on length coverage)
cand_len = len(norm_cand)
max_score = 0.0
# We want to know if norm_cand exists in norm_raw.
# We search windows of approx size of cand in raw
for i in range(len(norm_raw)):
# Check window sizes +/- 10%
margin = max(3, int(cand_len * 0.1))
for window_size in range(cand_len - margin, cand_len + margin):
if window_size <= 0: continue
if i + window_size > len(norm_raw): break
substring = norm_raw[i : i + window_size]
# Use Levenshtein.ratio -> gives 2*matches / (len1 + len2)
# We want to approximate recall: (matches / len_cand)
# ratio * (len1 + len2) = 2 * matches
# matches = ratio * (len1 + len2) / 2
# Recall = matches / len_cand
ratio = Levenshtein.ratio(substring, norm_cand)
estimated_matches = ratio * (len(substring) + len(norm_cand)) / 2
recall = estimated_matches / len(norm_cand)
if recall > max_score:
max_score = recall
if max_score > 0.95: return 1.0 # Early exit
return min(max_score, 1.0)
def calculate_author_similarity(authors1, authors2):
"""Calculate Jaro-Winkler similarity for author lists (0-1).
z
Args:
authors1: List of author names from original citation (PDF)
authors2: List of author dicts from Semantic Scholar [{'name': ...}, ...] (API)
Returns:
Refined Jaro-Winkler score (0-1)
"""
norm1 = authors1
norm2 = authors2
if not norm1 or not norm2:
return 0.0
# Asymmetric Best-Match: For each PDF author, find the best partner in API list
best_match_scores = []
for n1 in norm1:
max_score = 0.0
best_partner = None
for n2 in norm2:
score = jellyfish.jaro_winkler_similarity(n1, n2)
if score > max_score:
max_score = score
best_partner = n2
best_match_scores.append(max_score)
sys.stdout.flush()
# Average best matches
avg_score = sum(best_match_scores) / len(best_match_scores) if best_match_scores else 0.0
# Hallucination Penalty: If PDF lists more authors than API has returned
# (Allow a small buffer of 1 for minor parsing differences)
if len(norm1) > len(norm2) + 1:
penalty = len(norm2) / len(norm1)
avg_score *= penalty
return avg_score
def discover_metadata_in_raw(raw_text, api_title, api_authors, is_exact_match=False):
"""
Search for the title and author segments in the raw text based on API results.
Returns: (title_after_verification, authors_after_verification) strings or empty.
"""
if not raw_text:
return "", ""
discovered_title = ""
discovered_authors = ""
# We create a normalized string AND a mapping from normalized index to original index
norm_raw = []
norm_to_orig = []
last_was_space = True # Start true to ignore leading non-alnum
for i, char in enumerate(raw_text):
if char.isalnum():
norm_raw.append(char.lower())
norm_to_orig.append(i)
last_was_space = False
else:
if not last_was_space:
norm_raw.append(' ')
norm_to_orig.append(i)
last_was_space = True
norm_raw_str = "".join(norm_raw)
# 1. Discover Title Segment
if is_exact_match:
discovered_title = api_title
elif api_title:
# Also clean API title with spaces
api_dirty = api_title.lower()
norm_api_list = []
last_space = True
for c in api_dirty:
if c.isalnum():
norm_api_list.append(c)
last_space = False
else:
if not last_space:
norm_api_list.append(' ')
last_space = True
norm_api = "".join(norm_api_list).strip()
if norm_api and norm_raw_str:
api_len = len(norm_api)
best_window = None
max_score = 0.0
for i in range(len(norm_raw_str)):
if i + api_len > len(norm_raw_str) + 5: break
for delta in [0, -1, 1, -2, 2, -3, 3]:
window_size = api_len + delta
if window_size <= 0: continue
if i + window_size > len(norm_raw_str): continue
substring = norm_raw_str[i : i + window_size]
score = Levenshtein.ratio(substring, norm_api)
if score > max_score:
max_score = score
best_window = (i, i + window_size)
# Perfect match optimization
if max_score > 0.99: break
if max_score > 0.99: break
# If we found a good match (> 0.75)
if max_score > 0.75 and best_window:
start_norm, end_norm = best_window
if start_norm < len(norm_to_orig) and end_norm <= len(norm_to_orig):
orig_start_idx = norm_to_orig[start_norm]
orig_end_idx = norm_to_orig[end_norm - 1]
raw_slice = raw_text[orig_start_idx : orig_end_idx + 1]
discovered_title = raw_slice.strip()
else:
discovered_title = api_title
else:
discovered_title = api_title
else:
discovered_title = api_title
# 2. Discover Author Segment
# We take everything from the beginning until the start of the title
author_limit_idx = -1
# Strategy A: Use Discovered Title Start
if discovered_title and discovered_title in raw_text:
author_limit_idx = raw_text.find(discovered_title)
# Strategy B: Use Year (Fail-safe)
year_match = re.search(r'\b(19|20|21)\d{2}\b', raw_text)
if year_match:
year_idx = year_match.start()
if author_limit_idx == -1 or year_idx < author_limit_idx:
author_limit_idx = year_idx
if author_limit_idx > 0:
segment = raw_text[:author_limit_idx]
discovered_authors = segment.strip().rstrip(".,:; ")
else:
if api_authors:
api_names = []
if isinstance(api_authors[0], dict):
api_names = [a.get('name', '') for a in api_authors if a.get('name')]
else:
api_names = [str(a) for a in api_authors]
found_indices = []
norm_raw_str_full = raw_text.lower()
for name in api_names:
parts = name.lower().split()
if len(parts) >= 2:
p = re.escape(parts[0]) + r'.*?' + re.escape(parts[-1])
m = re.search(p, norm_raw_str_full)
if m:
found_indices.append(m.end())
if found_indices:
last_author_end = max(found_indices)
discovered_authors = raw_text[:last_author_end].strip().rstrip(".,;:")
return discovered_title, discovered_authors
def classify_verification(title_score, author_score, has_error=False, error_msg=""):
"""Classify verification status based on weighted similarity scores.
Weights: 70% Title, 30% Authors
Returns:
dict with 'status', 'icon', 'title_score', 'author_score', 'confidence', 'error'
"""
if has_error:
return {
'status': 'api_error',
'icon': '✗',
'title_score': 0.0,
'author_score': 0.0,
'confidence': 0.0,
'error': error_msg
}
# Weighted Hybrid Score
confidence = (title_score * 0.70) + (author_score * 0.30)
# Threshold classification
if confidence >= 0.95:
return {
'status': 'verified',
'icon': '✓',
'title_score': title_score,
'author_score': author_score,
'confidence': confidence
}
elif confidence >= 0.75:
return {
'status': 'ambiguous',
'icon': '⚠',
'title_score': title_score,
'author_score': author_score,
'confidence': confidence
}
else:
return {
'status': 'suspected_hallucination',
'icon': '⚠⚠',
'title_score': title_score,
'author_score': author_score,
'confidence': confidence
}
def verify_citation_against_paper(raw_citation, api_paper, extracted_title, name_order="first_last", separator=","):
"""
Verify a citation against a paper using discovery with global pattern awareness.
"""
api_title = api_paper.get('title', '')
api_authors_list = api_paper.get('authors', [])
# Pre-normalize API authors (Ground Truth)
api_authors_norm = []
if api_authors_list:
# SS API returns [{'name': ...}, ...] or just list of names
if isinstance(api_authors_list[0], dict):
api_authors_norm = [normalize_api_author(a.get('name', '')) for a in api_authors_list if a.get('name')]
else:
api_authors_norm = [normalize_api_author(str(a)) for a in api_authors_list if a]
# --- TITLE SELECTION LOGIC ---
best_title_candidate = None
title_source = ""
is_exact_match = False
if extracted_title and api_title:
norm_extracted = normalize_title_for_comparison(extracted_title)
norm_api = normalize_title_for_comparison(api_title)
if norm_extracted == norm_api and len(norm_extracted) > 10:
is_exact_match = True
best_title_candidate = extracted_title
title_source = "exact_match"
if not is_exact_match:
# Compare extracted_title vs api_title based on RECALL of raw_citation
recall_extracted = calculate_citation_recall(extracted_title, raw_citation) if extracted_title else 0.0
recall_api = calculate_citation_recall(api_title, raw_citation)
# Tie-breaker: If recall is the same, pick the one with fewer words
if abs(recall_extracted - recall_api) < 1e-7:
# Tie case
words_ext = len(extracted_title.split()) if extracted_title else 999
words_api = len(api_title.split()) if api_title else 999
if words_ext < words_api:
best_title_candidate = extracted_title
title_source = "extracted (tie-breaker shorter)"
else:
best_title_candidate = api_title
title_source = "api (tie-breaker shorter)"
elif recall_extracted > (recall_api + 0.1):
best_title_candidate = extracted_title
title_source = "cleaned/extracted"
else:
best_title_candidate = api_title
title_source = "api"
# 1. Discovery Step
d_title, d_authors = discover_metadata_in_raw(raw_citation, best_title_candidate, api_authors_list, is_exact_match=is_exact_match)
# 2. Scoring Step: Compare the DISCOVERED title against the API title (Ground Truth)
if d_title:
t_score = calculate_title_similarity(d_title, api_title)
else:
# Fallback if discovery failed
# If discovery failed, score is 0 as we couldn't find the title segment
t_score = 0.0
# 3. Author Scoring Step
if d_authors:
# Detect "et al" in original segments (case-insensitive)
has_etal = re.search(r'\bet\s*al\b', d_authors, re.IGNORECASE)
# Use the global pattern and separator for surgery parsing
parsed_d_authors = parse_names_by_pattern(d_authors, name_order, separator)
score_forward = calculate_author_similarity(parsed_d_authors, api_authors_norm)
if has_etal:
a_score = score_forward
else:
score_backward = calculate_author_similarity(api_authors_norm, parsed_d_authors)
a_score = (0.5 * score_forward) + (0.5 * score_backward)
sys.stdout.flush()
else:
# If discovery failed to find an author segment, score is 0.0
a_score = 0.0
check_data = classify_verification(t_score, a_score)
check_data['semantic_data'] = api_paper
check_data['title_source'] = title_source
# Enhance check_data with discovery info
check_data['discovery'] = (d_title, d_authors)
return check_data, (d_title, d_authors)
def check_citations_semantic_scholar(citations_to_check, api_key=None, name_order="first_last", separator=","):
"""Check citations using Semantic Scholar API as a generator.
Args:
citations_to_check: List of citations to verify
api_key: Optional Semantic Scholar API key for higher rate limits
Yields:
Verified citation dictionary for each input citation
"""
for i, cit in enumerate(citations_to_check):
raw_text = cit.get('raw_text', '').strip()
title = cit.get('title', '').strip()
# Use the original PDF strings for verification
raw_citation = cit.get('raw_text', '').strip()
cleaned_title = title
# OPTIMIZATION: correct skipping of already verified citations
# If the citation is already verified/checked (has a determined status), skip it.
# relevant statuses: 'verified', 'ambiguous', 'suspected_hallucination', 'api_error'
# We might want to retry 'api_error', but definitely skip the others.
existing_status = cit.get('verification', {}).get('status')
if existing_status in ['verified', 'ambiguous', 'suspected_hallucination']:
yield cit
continue
try:
check_data = {'status': 'not_found', 'semantic_data': None}
found_stage1 = False
response = None
def make_request(url, p, h):
max_retries = 3
retry_cnt = 0
while retry_cnt <= max_retries:
try:
resp = requests.get(url, params=p, headers=h, timeout=10)
if resp.status_code == 429:
if retry_cnt < max_retries:
w_time = 2 ** retry_cnt
time.sleep(w_time)
retry_cnt += 1
else:
return resp
else:
return resp
except requests.exceptions.Timeout:
retry_cnt += 1
except Exception as e:
return None
return None
headers = {}
if api_key:
headers['x-api-key'] = api_key
if cleaned_title:
# --- STAGE 1: Direct Match (/match) by Title ---
match_url = "https://api.semanticscholar.org/graph/v1/paper/search/match"
params = {
'query': cleaned_title,
'fields': 'title,authors,year,venue'
}
response = make_request(match_url, params, headers)
if response is not None:
status_desc = SEMANTIC_SCHOLAR_STATUS_CODES.get(response.status_code, f"Unknown ({response.status_code})")
if response.status_code == 200:
resp_json = response.json()
if resp_json.get('data') and len(resp_json['data']) > 0:
paper = resp_json['data'][0]
if paper and paper.get('paperId'):
found_stage1 = True
# --- UNIFIED VERIFICATION LOGIC ---
check_data, discovery = verify_citation_against_paper(
raw_citation,
paper,
cleaned_title, # extracted_title
name_order=name_order,
separator=separator
)
d_title, d_authors = discovery
# Store discovery results
cit['title_after_verification'] = d_title
cit['authors_after_verification'] = d_authors
elif response.status_code in [400, 401, 403]:
found_stage1 = True
check_data = classify_verification(0, 0, has_error=True, error_msg=status_desc)
else:
found_stage1 = True
check_data = classify_verification(0, 0, has_error=True, error_msg="No Response")
# --- STAGE 2: Fallback Search (/search) if Stage 1 failed ---
if not found_stage1:
if response and response.status_code == 429:
check_data = classify_verification(0, 0, has_error=True, error_msg="Rate Limited (429)")
else:
search_url = "https://api.semanticscholar.org/graph/v1/paper/search"
# We try up to two different search queries to maximize recall
queries_to_try = []
if cleaned_title:
queries_to_try.append(("Title", cleaned_title))
queries_to_try.append(("Raw Citation", raw_citation))
all_candidates = {} # paperId -> paper_data
for q_type, q_string in queries_to_try:
search_params = {
'query': q_string,
'limit': 5,
'fields': 'title,authors,year,venue'
}
s_resp = make_request(search_url, search_params, headers)
if s_resp and s_resp.status_code == 200:
data = s_resp.json().get('data', [])
for paper in data:
pid = paper.get('paperId')
if pid and pid not in all_candidates:
all_candidates[pid] = paper
elif s_resp and s_resp.status_code == 429:
break # Stop trying queries if rate limited
if all_candidates:
results_list = list(all_candidates.values())
# --- STAGE 2 OPTIMIZATION: SELECT BEST API GROUND TRUTH BY RECALL ---
# 1. Find the API paper whose title has the highest recall against raw citation
best_api_paper = None
max_api_recall = -1.0
min_word_count = 999
for paper in results_list:
title = paper.get('title', '')
rec = calculate_citation_recall(title, raw_citation)
word_count = len(title.split()) if title else 999
if rec > max_api_recall:
max_api_recall = rec
min_word_count = word_count
best_api_paper = paper
elif abs(rec - max_api_recall) < 1e-7:
# Tie in recall, check word count
if word_count < min_word_count:
min_word_count = word_count
best_api_paper = paper
if best_api_paper:
# 2. Verify using this Best API Paper
# The helper function will automatically decide whether to use the
# Best API Title OR the Extracted Title as the 'Anchor' for discovery.
check_data, discovery = verify_citation_against_paper(
raw_citation,
best_api_paper,
cleaned_title,
name_order=name_order,
separator=separator
)
# Finalize discovery data on the citation object
cit['title_after_verification'], cit['authors_after_verification'] = discovery
if check_data.get('confidence', 0) < 0.4:
check_data = classify_verification(0, 0, has_error=True, error_msg="Low confidence match")
else:
check_data = classify_verification(0, 0, has_error=True, error_msg="No suitable API candidate found")
else:
check_data = classify_verification(0, 0, has_error=True, error_msg="No search results found by API")
sys.stdout.flush()
cit['verification'] = check_data
yield cit
except Exception as e:
cit['verification'] = classify_verification(0, 0, has_error=True, error_msg=str(e))
yield cit
sys.stdout.flush()
# Rate limiting: wait 1 second between requests to avoid 429 errors (only if no API key)
if not api_key and i < len(citations_to_check) - 1:
time.sleep(1)
def parse_tei_citations(tei_xml):
"""Parse TEI XML and extract citations."""
try:
root = ET.fromstring(tei_xml)
citations = []
ns = {'tei': 'http://www.tei-c.org/ns/1.0'}
for bibl in root.findall('.//tei:listBibl/tei:biblStruct', ns):
citation = {}
# Extract title
title_elem = bibl.find('.//tei:title[@level="a"]', ns)
used_monograph_as_title = False
if title_elem is None:
title_elem = bibl.find('.//tei:title[@level="m"]', ns)
if title_elem is not None:
used_monograph_as_title = True
if title_elem is not None and title_elem.text:
citation['title'] = title_elem.text.strip()
# Extract authors
authors = []
for author in bibl.findall('.//tei:author', ns):
persName = author.find('.//tei:persName', ns)
if persName is not None:
forename = persName.find('.//tei:forename', ns)
surname = persName.find('.//tei:surname', ns)
name_parts = []
if forename is not None and forename.text:
name_parts.append(forename.text.strip())
if surname is not None and surname.text:
name_parts.append(surname.text.strip())
if name_parts:
authors.append(' '.join(name_parts))
if authors:
citation['authors'] = authors
# Extract year
date_elem = bibl.find('.//tei:date[@type="published"]', ns)
if date_elem is not None and date_elem.get('when'):
citation['year'] = date_elem.get('when')
# Extract venue/journal - check multiple possible locations
venue_elem = bibl.find('.//tei:title[@level="j"]', ns) # Journal
if venue_elem is None and not used_monograph_as_title:
venue_elem = bibl.find('.//tei:title[@level="m"]', ns) # Monograph/Book
if venue_elem is None:
venue_elem = bibl.find('.//tei:meeting', ns) # Conference
if venue_elem is not None and venue_elem.text:
citation['venue'] = venue_elem.text.strip()
# Also try to get publisher if no venue found
if 'venue' not in citation:
publisher_elem = bibl.find('.//tei:publisher', ns)
if publisher_elem is not None and publisher_elem.text:
citation['venue'] = publisher_elem.text.strip()
if citation:
# Extract raw_reference text - this becomes the display text
raw_ref_elem = bibl.find('.//tei:note[@type="raw_reference"]', ns)
if raw_ref_elem is not None:
raw_ref_text = "".join(raw_ref_elem.itertext()).strip()
raw_ref_text = re.sub(r'\s+', ' ', raw_ref_text)
citation['raw_text'] = raw_ref_text
else:
# Fallback to biblStruct text if no raw_reference
raw_text = "".join(bibl.itertext()).strip()
raw_text = re.sub(r'\s+', ' ', raw_text)
citation['raw_text'] = raw_text
# Store entire biblStruct XML for parsing
citation['grobid_xml'] = ET.tostring(bibl, encoding='unicode')
citations.append(citation)
return citations
except Exception as e:
return []
def extract_title_and_authors_from_xml(xml_string):
"""Extract title and authors from GROBID biblStruct XML.
Args:
xml_string: XML string of biblStruct element
Returns:
Dictionary with 'title' and 'authors' fields
"""
try:
root = ET.fromstring(xml_string)
ns = {'ns0': 'http://www.tei-c.org/ns/1.0', 'tei': 'http://www.tei-c.org/ns/1.0'}
result = {}
# Extract title - try multiple paths
title_elem = root.find('.//ns0:title[@level="a"][@type="main"]', ns)
if title_elem is None:
title_elem = root.find('.//ns0:title[@level="a"]', ns)
if title_elem is None:
title_elem = root.find('.//ns0:title[@level="m"]', ns)
if title_elem is None:
title_elem = root.find('.//ns0:title', ns)
if title_elem is None:
title_elem = root.find('.//tei:title[@level="a"][@type="main"]', ns)
if title_elem is None:
title_elem = root.find('.//tei:title[@level="a"]', ns)
if title_elem is None:
title_elem = root.find('.//tei:title', ns)
if title_elem is not None and title_elem.text:
result['title'] = title_elem.text.strip()
result['authors'] = []
return result
except Exception as e:
return {}
def clean_metadata(text):
"""Clean title or author string specifically by removing segments that contain known publication venues or URLs.
Splits text by common punctuation (.,:;?!), checks each segment for venue names
(case-insensitive), abbreviations (case-sensitive), or URLs, and removes contaminated segments.
"""
if not text:
return ""
# Pre-cleaning: Remove parentheses symbols but keep the content
text = text.replace('(', '').replace(')', '')
# Define additional DOI/Arxiv extraction terms that might not be caught by URLExtract
extra_patterns = r'arxiv\.org|doi\.org|\bdoi:|\burl\b'
# 1. Protect URLs during splitting using URLExtract
# We find all URL matches and replace them with placeholders
placeholders = []
temp_text = text
# Get all URLs from the text
urls = extractor.find_urls(text, True)
# Sort by length descending to avoid partial replacement issues
for url in sorted(list(set(urls)), key=len, reverse=True):
placeholder = f"__URL_PH_{len(placeholders)}__"
placeholders.append(url)
temp_text = temp_text.replace(url, placeholder)
# Also handle the explicitly requested labels like doi:
def replace_extra(match):
placeholder = f"__URL_PH_{len(placeholders)}__"
placeholders.append(match.group(0))
return placeholder
temp_text = re.sub(extra_patterns, replace_extra, temp_text, flags=re.IGNORECASE)
# 2. Split by punctuation (period, question mark, exclamation mark)
# We split on . ? or ! followed by space or end of string
parts = re.split(r'([.?!]\s|[.?!]$)', temp_text)
# Re-group content and its trailing separator
segments = []
current_segment = ""
for part in parts:
if part and (part.strip() in ['.', '?', '!'] or re.match(r'[.?!]\s', part)):
segments.append(current_segment + part)
current_segment = ""
else:
current_segment += part
if current_segment:
segments.append(current_segment)
final_segments = []
for seg in segments:
# Check if this segment contains a URL placeholder
if "__URL_PH_" in seg:
# Entire segment contains a URL, TRUNCATE HERE
break
# Restore placeholders just for this segment to check for venues
check_seg = seg
for i, val in enumerate(placeholders):
check_seg = check_seg.replace(f"__URL_PH_{i}__", val)
seg_lower = check_seg.lower()
found_contamination = False
# Check for Venues (Case-Insensitive names, Case-Sensitive abbrs)
for venue in VENUE_NAMES:
if venue.lower() in seg_lower:
found_contamination = True
break
if not found_contamination:
for abbr in VENUE_ABBREVIATIONS:
if re.search(r'\b' + re.escape(abbr) + r'\b', check_seg):
found_contamination = True
break
if not found_contamination:
for term in COMMON_TERMS:
if term.lower() in seg_lower:
found_contamination = True
break
if not found_contamination:
# Check for Years (19xx-21xx) - Truncate if found
# User requested to remove segments with years, but NOT all digits
if re.search(r'\b(19|20|21)\d{2}\b', check_seg):
found_contamination = True
if not found_contamination:
# Double check for any missed URLs just in case
if extractor.has_urls(check_seg) or re.search(extra_patterns, check_seg, re.IGNORECASE):
found_contamination = True
if found_contamination:
# TRUNCATE HERE
break
# Reconstruct the segment with URLs restored
restored_seg = seg
for i, val in enumerate(placeholders):
restored_seg = restored_seg.replace(f"__URL_PH_{i}__", val)
final_segments.append(restored_seg)
# Join remaining segments
text = "".join(final_segments).strip()
# Final cleanup
text = re.sub(r'\s+', ' ', text).strip()
text = re.sub(r'\(\s*\)', '', text)
text = re.sub(r'\[\s*\]', '', text)
text = text.strip(".,;: -()[]")
return text
def find_reference_pages(pdf_path):
"""Find reference section pages in the PDF and extract their text."""
doc = fitz.open(pdf_path)
start_page = None
end_page = len(doc)
ref_text = "" # Will store concatenated reference section text
# Find the start page
for page_num, page in enumerate(doc):
text = page.get_text("text")
lines = [l.strip().lower() for l in text.splitlines() if l.strip()]
found_candidate = False
for line in lines:
if len(line.split()) <= 5 and ("references" in line or "bibliography" in line):
found_candidate = True
break
if found_candidate:
# Verify if this page actually contains citations
# This filters out TOCs or other non-reference sections
cits = _get_grobid_boundaries(pdf_path, [page_num])
if cits:
start_page = page_num
break
if start_page is not None:
# Initial guess is JUST the start page.
# The iterative GROBID pass in extract_citations_auto will expand this.
end_page = start_page + 1
ref_pages = [start_page]
# Extract text for visibility (just the first page for now)
ref_text = doc[start_page].get_text("text") + "\n"
else:
ref_pages = []
doc.close()
return ref_pages, start_page, end_page, ref_text
def process_pdf_initial(pdf_file, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text):
"""Initial PDF processing - find references and show PDF immediately."""
# Clean up old temp files whenever a new PDF is uploaded
cleanup_old_temp_files(max_age_hours=1)
if pdf_file is None:
return (None, "No PDF uploaded",
gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False),
gr.update(interactive=False, visible=False),
gr.update(interactive=False, visible=False),
None, [], [], [], None, "",
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False),
False,
gr.update(visible=False),
None, # reset state_ref_pdf_path
"", # reset state_pdf_name
gr.update(visible=False), # reset export_btn
gr.update(visible=False)) # reset download_file
new_pdf_path = pdf_file.name
new_citations = []
new_removed_citations = []
# Find reference pages
new_ref_pages, start_page, end_page, new_ref_text = find_reference_pages(new_pdf_path)
new_appendix_header = None # Initialize empty logic for iterative detection
# Initial status log
status = f"✓ Loaded PDF: {os.path.basename(new_pdf_path)}\n"
if new_ref_pages:
status += f"\n✓ Identified reference section start: page {start_page + 1}"
else:
status += "\n⚠ No reference section found"
status += "\n⏳ Starting automatic extraction... Please wait."
basename = os.path.basename(new_pdf_path)
# Return immediately - show PDF right away, extraction starts automatically via event chain
return (new_pdf_path, status,
gr.update(value=new_pdf_path, visible=True),
gr.update(visible=True, value="Show Full PDF"),
gr.update(visible=False), # Citations display
gr.update(interactive=False, visible=False), # Verify Button
gr.update(interactive=False, visible=False), # Slider
new_pdf_path, new_ref_pages, new_citations, new_removed_citations, new_appendix_header, new_ref_text,
gr.update(visible=False), # citations_header
gr.update(visible=False), # verification_header
gr.update(visible=False), # verification_divider
gr.update(visible=False), # api_key_input
False, # state_extraction_done
gr.update(visible=False, value=""), # corrected_display cleared completely
None, # reset state_ref_pdf_path
basename, # state_pdf_name
gr.update(visible=False), # export_btn
gr.update(visible=False, value=None)) # download_file
def _get_grobid_boundaries(pdf_path, page_indices):
"""Helper to get GROBID citation boundaries for specific pages."""
if not page_indices:
return []
output_path = None
try:
doc = fitz.open(pdf_path)
temp_grobid = tempfile.NamedTemporaryFile(delete=False, suffix="_grobid.pdf")
output_path = temp_grobid.name
temp_grobid.close()
ref_doc = fitz.open()
for page_idx in page_indices:
ref_doc.insert_pdf(doc, from_page=page_idx, to_page=page_idx)
ref_doc.save(output_path, garbage=4, deflate=True, clean=True, expand=True)
ref_doc.close()
doc.close()
with open(output_path, 'rb') as f:
files = {'input': (os.path.basename(output_path), f, 'application/pdf')}
data = {'consolidateCitations': '0', 'includeRawCitations': '1'}
response = requests.post(
'http://localhost:8070/api/processFulltextDocument',
files=files,
data=data,
timeout=120
)
if response.status_code == 200:
return parse_tei_citations(response.text)
else:
return []
except Exception:
return []
finally:
if output_path and os.path.exists(output_path):
try:
os.unlink(output_path)
except:
pass
def extract_citations_auto(view_mode, previous_status, state_pdf_path, state_ref_pages, state_ref_text, state_citations, state_removed_citations, state_appendix_header, state_extraction_done):
"""Extract citations using triple-pass hybrid pipeline to improve recall."""
# Helper for intermediate updates
def gen_update(status_txt, done=False, final_cits=[], final_rem=[], final_pages=None, final_text=None, final_header=None):
# Use current state or provided finals
cits = final_cits if final_cits is not None else state_citations
rem = final_rem if final_rem is not None else state_removed_citations
pages = final_pages if final_pages is not None else state_ref_pages
text = final_text if final_text is not None else state_ref_text
header = final_header if final_header is not None else state_appendix_header
loading_update = gr.update(visible=False) if done else gr.update()
verify_vis = done
slider_vis = done
headers_vis = done
slider_max = len(cits) if cits else 1
slider_val = min(1, slider_max)
# Logic to pre-generate Citation HTML when done
citations_html_update = gr.update(visible=headers_vis)
if done:
display_text = format_citations_display(cits)
if rem:
display_text += "\n\nREMOVED CITATIONS ({})\n\n".format(len(rem))
display_text += format_citations_display(rem, show_reason=True)
citations_html_update = gr.update(value=display_text, visible=headers_vis)
else:
citations_html_update = gr.update(visible=headers_vis) if done else gr.update()
return (status_txt,
citations_html_update, # citations_display (Populated when done)
gr.update(interactive=verify_vis, visible=verify_vis), # verify_btn
gr.update(interactive=slider_vis, maximum=slider_max, value=slider_val, visible=slider_vis), # slider
cits, rem, pages, text, header,
gr.update(), # pdf_viewer (handled by update_view, we just update state)
loading_update, # Loading Indicator
gr.update(visible=headers_vis), # citations_header
gr.update(visible=headers_vis), # verification_header
gr.update(visible=headers_vis), # verification_divider
gr.update(visible=headers_vis), # api_key_input
done, # state_extraction_done
gr.update(visible=headers_vis), # corrected_display
gr.update(visible=done), # export_btn
gr.update(visible=False, value=None)) # download_file
if not state_ref_pages or not state_pdf_path:
yield gen_update(previous_status + "\n⚠ No reference pages to process", done=True)
return
try:
start_page_idx = state_ref_pages[0]
confirmed_ref_pages = []
per_page_citations = []
yield gen_update(previous_status + f"\n⏳ Scanning pages starting from {start_page_idx + 1}...")
doc_temp = fitz.open(state_pdf_path)
total_pages = len(doc_temp)
doc_temp.close()
current_page = start_page_idx
while current_page < total_pages:
yield gen_update(previous_status + f"\n⏳ Scanning Page {current_page + 1}... Citations will be displayed once finished.")
page_cits = _get_grobid_boundaries(state_pdf_path, [current_page])
valid_count = 0
for c in page_cits:
if c.get('title') or c.get('authors') or c.get('year'):
valid_count += 1
if valid_count == 0:
break
else:
confirmed_ref_pages.append(current_page)
per_page_citations.append(page_cits)
current_page += 1
if not confirmed_ref_pages:
yield gen_update(previous_status + "\n⚠ No valid citations extracted from start page.", done=True)
return
yield gen_update(previous_status + f"\n✓ Range confirmed: {confirmed_ref_pages[0]+1}-{confirmed_ref_pages[-1]+1}. Merging...", final_pages=confirmed_ref_pages)
# Update status log with the confirmed range
status_update = f"\n✓ Confirmed Reference Range: Pages {confirmed_ref_pages[0]+1}-{confirmed_ref_pages[-1]+1} ({len(confirmed_ref_pages)} pages)"
previous_status += status_update
state_ref_pages = confirmed_ref_pages
# Re-extract text for the full confirmed range
updated_ref_text = ""
doc_temp = fitz.open(state_pdf_path)
for p_idx in state_ref_pages:
updated_ref_text += doc_temp[p_idx].get_text("text") + "\n"
# --- DYNAMIC HEADER DETECTION ---
last_page_text = doc_temp[state_ref_pages[-1]].get_text("text")
lines = [l.strip() for l in last_page_text.splitlines() if l.strip()]
appendix_keywords = ["appendix", "appendices", "supplement", "limitation", "checklist", "statement"]
last_page_citations = per_page_citations[-1]
citation_start_line_indices = []
for cit in last_page_citations:
cit_text = cit.get('raw_text', '').strip()
if not cit_text: continue
cit_prefix = cit_text[:30].strip().lower()
for k, line in enumerate(lines):
if cit_prefix in line.lower():
citation_start_line_indices.append(k)
break
header_candidates = []
for i, line in enumerate(lines):
line_lower = line.lower()
if len(line.split()) <= 5:
is_match = False
if any(k in line_lower for k in appendix_keywords):
is_match = True
elif re.match(r'^A[\.\:]?$', line.split()[0] if line.split() else ""):
is_match = True
if is_match:
candidate = line
curr_idx = i + 1
while len(candidate) < 5 and curr_idx < len(lines):
candidate += " " + lines[curr_idx]
curr_idx += 1
has_citations_after = any(start_idx > i for start_idx in citation_start_line_indices)
if not has_citations_after:
header_candidates.append(candidate)
if header_candidates:
found_header = header_candidates[0]
state_appendix_header = found_header
else:
state_appendix_header = None
doc_temp.close()
state_ref_text = updated_ref_text
# 2. Get Consolidated List (LIST C)
yield gen_update(previous_status + "\n⏳ Sending full context to GROBID...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header)
grobid_citations_a = _get_grobid_boundaries(state_pdf_path, confirmed_ref_pages)
# 3. Span Detection & Merging
import difflib
list_i_pages = per_page_citations
list_c = grobid_citations_a
def get_text(cit):
return cit.get('raw_text', '').strip()
refined_list_i = []
actions = {}
for p_idx in range(len(list_i_pages)):
current_page = list_i_pages[p_idx]
if not current_page: continue
cit_x = current_page[-1]
cit_x_text = get_text(cit_x)
cit_y = None
cit_y_text = ""
cit_z = None
cit_z_text = ""
if p_idx + 1 < len(list_i_pages) and list_i_pages[p_idx+1]:
cit_y = list_i_pages[p_idx+1][0]
cit_y_text = get_text(cit_y)
if len(list_i_pages[p_idx+1]) > 1:
cit_z = list_i_pages[p_idx+1][1]
cit_z_text = get_text(cit_z)
matches = []
for c_item in list_c:
c_text = get_text(c_item)
if cit_x_text in c_text:
matches.append(c_item)
best_action = None
for cit_match in matches:
match_text = get_text(cit_match)
if cit_z and cit_z_text in match_text: continue
if cit_y and cit_y_text in match_text: continue
if len(match_text) > len(cit_x_text):
best_action = {'type': 'extension', 'target': cit_match}
break
if best_action:
actions[id(cit_x)] = best_action
flat_list_i = []
skip_ids = set()
for p_list in list_i_pages:
for cit in p_list:
if id(cit) in skip_ids: continue
if id(cit) in actions:
act = actions[id(cit)]
if act['type'] == 'extension':
flat_list_i.append(act['target'])
else:
flat_list_i.append(cit)
texts_i = [get_text(c) for c in flat_list_i]
texts_c = [get_text(c) for c in list_c]
matcher = difflib.SequenceMatcher(None, texts_i, texts_c)
final_merged_list = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'equal': final_merged_list.extend(flat_list_i[i1:i2])
elif tag == 'delete': final_merged_list.extend(flat_list_i[i1:i2])
elif tag == 'insert': final_merged_list.extend(list_c[j1:j2])
elif tag == 'replace': final_merged_list.extend(flat_list_i[i1:i2])
grobid_citations = final_merged_list
merged_citations = []
for cit in grobid_citations:
raw_text = cit.get('raw_text', '').strip()
has_url = extractor.has_urls(raw_text) or re.search(r'arxiv\.org|doi\.org|\bdoi:|\burl\b', raw_text, re.IGNORECASE)
is_url_only = has_url and len(raw_text.split()) <= 6
if merged_citations and is_url_only:
prev_cit = merged_citations[-1]
prev_cit['raw_text'] = (prev_cit.get('raw_text', '') + " " + raw_text).strip()
else:
merged_citations.append(cit)
grobid_citations = merged_citations
yield gen_update(previous_status + f"\n⏳ Parsing metadata for {len(grobid_citations)} citations...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header)
# Stage 2: Extract title and authors
parsed_citations = []
for idx, cit in enumerate(grobid_citations):
# Frequent yields during heavy parsing loop (every 5)
if idx % 5 == 0:
yield gen_update(previous_status + f"\n⏳ Parsing citation {idx+1}/{len(grobid_citations)}...", final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header)
raw_text = cit.get('raw_text', '')
grobid_xml = cit.get('grobid_xml', '')
if idx == len(grobid_citations) - 1 and state_appendix_header:
clean_header = state_appendix_header.strip()[:10].strip().lower()
clean_header = re.sub(r'\s+', ' ', clean_header)
raw_lower = re.sub(r'\s+', ' ', raw_text.lower())
cutoff_index = raw_lower.find(clean_header)
if cutoff_index > 0:
cleaned_raw_reference = raw_text[:cutoff_index].strip()
cleaned_raw_reference = re.sub(r'(\.\s*See\s*|\s*See\s*|\.\s*)$', '', cleaned_raw_reference, flags=re.IGNORECASE).strip()
raw_text = cleaned_raw_reference
try:
response = requests.post(
'http://localhost:8070/api/processCitation',
data={'citations': cleaned_raw_reference, 'includeRawCitations': '1'},
timeout=30
)
if response.status_code == 200:
grobid_xml = response.text
raw_text = cleaned_raw_reference
except Exception:
pass
parsed_fields = extract_title_and_authors_from_xml(grobid_xml)
title = parsed_fields.get('title', '')
authors = parsed_fields.get('authors', [])
raw_text = raw_text.replace("- ", "")
title = title.replace("- ", "")
if title and len(title) > 5:
clean_title_prefix = re.sub(r'\W+', '', title.lower()[:40])
if clean_title_prefix:
pattern_parts = [re.escape(c) + r'[\W]*' for c in clean_title_prefix]
fuzzy_pattern = r''.join(pattern_parts)
raw_lower = raw_text.lower()
t_match = re.search(fuzzy_pattern, raw_lower)
if t_match:
match_start = t_match.start()
prev_dot = raw_text.rfind('.', 0, match_start)
prev_q = raw_text.rfind('?', 0, match_start)
prev_ex = raw_text.rfind('!', 0, match_start)
prev_comma = raw_text.rfind(',', 0, match_start)
boundary_idx = max(prev_dot, prev_q, prev_ex, prev_comma)
start_idx = boundary_idx + 1 if boundary_idx != -1 else 0
missed_prefix = raw_text[start_idx:match_start].strip()
if missed_prefix:
title = f"{missed_prefix} {title}".strip()
title = clean_metadata(title)
refined_authors = refine_author_string(raw_text, authors, title)
refined_authors = clean_metadata(refined_authors)
if title and len(title) > 8:
if title in refined_authors:
refined_authors = refined_authors.split(title)[0].strip()
refined_authors = refined_authors.strip(".,;: -()")
citation = {
'raw_text': raw_text,
'title': title,
'authors': refined_authors,
'year': cit.get('year', ''),
'venue': cit.get('venue', '')
}
parsed_citations.append(citation)
final_citations = []
final_removed_citations = []
for cit in parsed_citations:
title = cit.get('title', '').strip()
rejection_reason = None
raw_text_clean = cit.get('raw_text', '').strip()
alpha_chars = sum(c.isalnum() for c in raw_text_clean)
alpha_density = alpha_chars / len(raw_text_clean) if raw_text_clean else 0
if title.lower().startswith("fig.") or title.lower().startswith("figure"): rejection_reason = "Figure caption detected"
elif not title and not cit.get('authors') and not cit.get('year'): rejection_reason = "Missing title, authors, and year"
elif raw_text_clean.lower() in ["references", "bibliography", "works cited"]: rejection_reason = "Section header detected"
elif len(raw_text_clean) > 5 and alpha_density < 0.3: rejection_reason = "Likely noise or artifact (low text density)"
if rejection_reason:
cit['rejection_reason'] = rejection_reason
final_removed_citations.append(cit)
continue
is_dup = False
for existing in final_citations:
existing_text = existing.get('raw_text', '').strip()
if jellyfish.jaro_winkler_similarity(raw_text_clean, existing_text) >= 0.95:
is_dup = True
break
if not is_dup: final_citations.append(cit)
else:
cit['rejection_reason'] = "Duplicate (95%+ similarity)"
final_removed_citations.append(cit)
status = previous_status + f"\n✓ Hybrid extraction: {len(final_citations)} citations (+{len(final_removed_citations)} filtered)"
# FINAL YIELD
yield gen_update(status, done=True, final_cits=final_citations, final_rem=final_removed_citations, final_pages=state_ref_pages, final_text=state_ref_text, final_header=state_appendix_header)
except Exception as e:
# Error Update
yield gen_update(previous_status + f"\n❌ Error: {str(e)}", done=True, final_cits=[], final_rem=[])
def run_citation_check(num_to_check, previous_status, api_key, state_citations):
"""Run citation check with per-user state."""
if not state_citations:
# Match the multi-output signature: [status_text, corrected_display, state_citations]
yield (previous_status + "\n⚠ No citations to verify.",
gr.update(), state_citations)
return
# 1. Identify Author Pattern from the top 10 citations
sample_author_strings = [cit.get('authors', '') for cit in state_citations[:10] if cit.get('authors') and isinstance(cit.get('authors'), str)]
name_order, separator = identify_author_pattern(sample_author_strings)
# Identifies pattern, then creates work list
import copy
to_check = copy.deepcopy(state_citations[:num_to_check])
# Use API key if provided
api_key_clean = api_key.strip() if api_key else None
# Process
updated_citations = list(state_citations)
total = len(to_check)
# Iterate through the generator to process citations
for i, verified_cit in enumerate(check_citations_semantic_scholar(to_check, api_key=api_key_clean, name_order=name_order, separator=separator)):
# Update the citation in the list
if i < len(updated_citations):
updated_citations[i] = verified_cit
# Yield status update to show progress
# We also yield the updated citations display so "Show Citations" reflects progress
status_msg = f"{previous_status}\n⏳ Verifying citation {i+1}/{total}... Results will be displayed once finished."
updated_cit_html = format_citations_display(updated_citations)
yield (status_msg, gr.update(), updated_cit_html, updated_citations)
# Final return with final view
final_ver_html = format_verifications_display(updated_citations)
final_cit_html = format_citations_display(updated_citations)
v_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'verified')
a_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'ambiguous')
h_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'suspected_hallucination')
e_count = sum(1 for c in updated_citations[:total] if c.get('verification', {}).get('status') == 'api_error')
status_msg = f"Verification Complete: ✅ {v_count} | ⚠️ {a_count} | ❌ {h_count} | 🔌 {e_count}"
yield (status_msg, final_ver_html, final_cit_html, updated_citations)
def format_citations_display(citations, show_reason=False):
"""Format citations for display as HTML."""
if not citations:
return ""
import html as html_lib
html_output = "<div class='citations-container'>"
for i, cit in enumerate(citations, 1):
# Display the raw_text directly
raw_text = cit.get('raw_text', 'No citation text')
safe_raw = html_lib.escape(raw_text)
cit_block = f"<div class='citation-item'>"
cit_block += f"<div><strong>[{i}]</strong> {safe_raw}"
if show_reason and 'rejection_reason' in cit:
reason = html_lib.escape(cit['rejection_reason'])
cit_block += f" <span class='rejection-reason'>[REASON: {reason}]</span>"
cit_block += "</div>"
# Add Extracted Fields indented for visibility - Styled in Gray
title = cit.get('title', '')
if title:
cit_block += "<div class='citation-metadata'>"
safe_title = html_lib.escape(title)
cit_block += f"<div style='margin-bottom: 2px;'>Title: {safe_title}</div>"
cit_block += "</div>"
# Add "After Verification" fields if present (from discovery mapping)
title_after = cit.get('title_after_verification', '')
authors_after = cit.get('authors_after_verification', '')
if title_after or authors_after:
cit_block += "<div class='ver-verified'>"
if title_after:
safe_title_after = html_lib.escape(title_after)
cit_block += f"<div style='margin-bottom: 2px;'><strong>Title:</strong> {safe_title_after}</div>"
if authors_after:
if isinstance(authors_after, list):
auth_str_after = ", ".join(authors_after)
else:
auth_str_after = str(authors_after)
safe_authors_after = html_lib.escape(auth_str_after)
cit_block += f"<div><strong>Authors:</strong> {safe_authors_after}</div>"
cit_block += "</div>"
cit_block += "</div>"
html_output += cit_block
html_output += "</div>"
return html_output
def refine_author_string(raw_text, grobid_authors, title=None):
"""
Simplified Author Extraction:
Starts at index 0 and extracts up until the segment (separated by period or comma)
that contains a 4-digit Year or the Title.
"""
if not raw_text:
return ""
raw_lower = raw_text.lower()
# 1. Identify "Metadata Start" candidates (Year or Title)
possible_starts = []
# Candidate A: Year (19xx, 20xx, 21xx)
year_match = re.search(r'\b(19|20|21)\d{2}\b', raw_text)
if year_match:
possible_starts.append(year_match.start())
# Candidate B: Title (fuzzy-matched prefix)
if title and len(title) > 5:
# Match the first substantial chunk of the title
clean_title_prefix = re.sub(r'\W+', '', title.lower()[:20])
if clean_title_prefix:
pattern_parts = [re.escape(c) + r'[\W]*' for c in clean_title_prefix]
fuzzy_pattern = r''.join(pattern_parts)
t_match = re.search(fuzzy_pattern, raw_lower)
if t_match:
possible_starts.append(t_match.start())
# 2. Determine the earliest metadata point
if not possible_starts:
# Fallback: keep the full text and let clean_metadata handle it later
return raw_text.strip()
metadata_begin = min(possible_starts)
# 3. Handle the "Discard entire segment containing metadata" rule
# We find the nearest period or comma BEFORE the metadata_begin
preceding_text = raw_text[:metadata_begin]
last_period = preceding_text.rfind('.')
last_comma = preceding_text.rfind(',')
boundary_idx = max(last_period, last_comma)
if boundary_idx != -1:
# Extract everything from the beginning up-to-and-including the separator
# This excludes the entire segment that contains the year/title
segment = raw_text[0:boundary_idx + 1].strip()
else:
# If no separator found (e.g. metadata is in the first sentence),
# cut precisely at the start of the metadata
segment = raw_text[0:metadata_begin].strip()
# Clean up trailing punctuation (e.g. "Author, Author.")
segment = segment.rstrip(".,:; ")
return segment
def identify_author_pattern(author_strings):
"""
Analyzes a list of author strings (top 10) to identify the naming pattern.
Returns: (name_order, separator)
"""
if not author_strings:
return "first_last", ","
# 1. Determine the Divider (Separator)
# Rule: Sum total semicolons across all strings. If >= 5, use semicolon.
total_semicolons = sum(s.count(";") for s in author_strings)
total_commas = sum(s.count(",") for s in author_strings)
main_sep = ";" if total_semicolons > (total_commas // 2) else ","
# 2. Analyze Name Order (First Last vs Last, First)
order = None
if main_sep == ";":
# If using semicolon, we check if many segments HAVE a comma inside
internal_comma_count = 0
total_parts = 0
for s in author_strings:
# Replace "and" with our sep for logic test
s_clean = re.sub(r'\s+(?:and|&)\s+', '; ', s, flags=re.IGNORECASE)
parts = [p.strip() for p in s_clean.split(';') if p.strip()]
for p in parts:
total_parts += 1
if "," in p: internal_comma_count += 1
if total_parts > 0 and internal_comma_count >= (total_parts * 0.5):
order = "last_first"
else:
order = "first_last"
else:
# main_sep is ","
# Logic: If chunks are mostly single words (after replacing 'and' with comma), it's Last, First
single_word_parts = 0
total_parts = 0
for s in author_strings:
# Normalize 'and' to comma for the heuristic
s_clean = re.sub(r'\s+(?:and|&)\s+', ', ', s, flags=re.IGNORECASE)
parts = [p.strip() for p in s_clean.split(",") if p.strip()]
for p in parts:
total_parts += 1
if len(p.split(" ")) == 1:
single_word_parts += 1
if total_parts > 0 and single_word_parts >= (total_parts * 0.7):
order = "last_first"
else:
order = "first_last"
if order is None:
order = "first_last" # Final fallback if both heuristics fail
return order, main_sep
def parse_names_by_pattern(author_string, order, separator):
"""
Robustly parses author string using a global pattern and divider.
"""
if not author_string:
return []
author_string = re.sub(r'\b(et\s*al\.?|etal)\b', '', author_string, flags=re.IGNORECASE)
s = re.sub(r'\b(?:and|&)\b', separator, author_string, flags=re.IGNORECASE)
sep_esc = re.escape(separator)
# This regex collapses multiple separators and any whitespace/separators between them
s = re.sub(sep_esc + r'[\s' + sep_esc + r']*' + sep_esc, separator, s)
# Remove leading/trailing dividers
s = s.strip().strip(separator).strip()
# 3. Split by the divider
segments = [p.strip() for p in s.split(separator) if p.strip()]
# 4. Regroup based on logic
raw_names = []
if order == "last_first" and separator == ",":
# Comma divider with Last, First order: join every two segments to get a name
i = 0
while i < len(segments):
p1 = segments[i]
if i + 1 < len(segments):
p2 = segments[i+1]
raw_names.append(f"{p1}, {p2}")
i += 2
else:
raw_names.append(p1)
i += 1
else:
# For first_last OR semicolon separator: each segment is treated as a full name
raw_names = segments
# 5. Final normalization to standardized format (using PDF-specific logic)
authors = []
for name in raw_names:
norm = normalize_d_author(name)
if norm:
authors.append(norm)
return authors
def format_verifications_display(citations):
"""Format citations with verification status badges."""
if not citations:
return "<p>No citations extracted yet.</p>"
html_parts = ["<div class='ver-badge-container'>"]
for i, cit in enumerate(citations, 1):
verification = cit.get('verification', {})
import html as html_lib
raw_text = cit.get('raw_text', 'No citation text')
safe_raw = html_lib.escape(raw_text)
html_parts.append(f"<div class='ver-item'>")
html_parts.append(f"<div><strong>[{i}]</strong> {safe_raw}</div>")
# Add verification status badge
verification = cit.get('verification', {})
status = verification.get('status', 'not_verified')
icon = verification.get('icon', '')
if status == 'verified':
confidence = verification.get('confidence', 0)
title_score = verification.get('title_score', 0)
author_score = verification.get('author_score', 0)
html_parts.append(f"<div class='ver-status-verified'>")
html_parts.append(f"<strong>{icon} Verified (Confidence: {confidence:.2%})</strong>")
html_parts.append(f"<br/><small>Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}</small>")
html_parts.append("</div>")
elif status == 'ambiguous':
confidence = verification.get('confidence', 0)
title_score = verification.get('title_score', 0)
author_score = verification.get('author_score', 0)
html_parts.append(f"<div class='ver-status-ambiguous'>")
html_parts.append(f"<strong>{icon} Ambiguous (Confidence: {confidence:.2%})</strong>")
html_parts.append(f"<br/><small>Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}</small>")
html_parts.append("</div>")
elif status == 'suspected_hallucination':
confidence = verification.get('confidence', 0)
title_score = verification.get('title_score', 0)
author_score = verification.get('author_score', 0)
html_parts.append(f"<div class='ver-status-hallucination'>")
html_parts.append(f"<strong>{icon} Suspected Hallucination (Confidence: {confidence:.2%})</strong>")
html_parts.append(f"<br/><small>Title similarity: {title_score:.2%} | Author similarity: {author_score:.2%}</small>")
html_parts.append("</div>")
elif status == 'api_error':
error_msg = verification.get('error', 'Unknown error')
is_no_result = error_msg == "No search results found by API"
label = "Verification Note" if is_no_result else "API Error"
html_parts.append(f"<div class='ver-status-error'>")
html_parts.append(f"<strong>{icon} {label}</strong><br/>")
html_parts.append(f"<small>{error_msg}</small>")
html_parts.append("</div>")
elif status == 'not_verified' or not verification:
html_parts.append(f"<div class='ver-status-unverified'>")
html_parts.append(f"<strong>Not Verified</strong>")
html_parts.append("</div>")
html_parts.append("</div>")
html_parts.append("</div>")
return ''.join(html_parts)
def export_verifications_csv(state_citations, pdf_name):
"""Export citation verifications to a CSV file."""
if not state_citations:
return None
import csv
# Use the original PDF name for the CSV filename
basename = os.path.splitext(pdf_name)[0] if pdf_name else "verifications"
csv_filename = f"{basename}_verifications.csv"
# Create a temp directory to hold the specifically named file
temp_dir = tempfile.mkdtemp()
filepath = os.path.join(temp_dir, csv_filename)
try:
with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'ID', 'Status', 'Confidence', 'Title Similarity', 'Author Similarity',
'Raw Citation', 'Title', 'Authors',
'API Title', 'API Authors'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for i, cit in enumerate(state_citations, 1):
verification = cit.get('verification', {})
status = verification.get('status', 'not_verified')
confidence = verification.get('confidence', 0)
t_score = verification.get('title_score', 0)
a_score = verification.get('author_score', 0)
semantic_data = verification.get('semantic_data', {})
api_title = semantic_data.get('title', '') if semantic_data else ''
api_authors_list = semantic_data.get('authors', []) if semantic_data else []
if api_authors_list:
if isinstance(api_authors_list[0], dict):
api_authors = ", ".join([a.get('name', '') for a in api_authors_list if a.get('name')])
else:
api_authors = ", ".join([str(a) for a in api_authors_list if a])
else:
api_authors = ""
raw_text = cit.get('raw_text', '')
ver_title = cit.get('title_after_verification', '')
ver_authors = cit.get('authors_after_verification', '')
if isinstance(ver_authors, list):
ver_authors = ", ".join(ver_authors)
elif not isinstance(ver_authors, str):
ver_authors = str(ver_authors)
writer.writerow({
'ID': i,
'Status': status,
'Confidence': f"{confidence:.2%}" if status != 'not_verified' else 'N/A',
'Title Similarity': f"{t_score:.2%}" if status != 'not_verified' else 'N/A',
'Author Similarity': f"{a_score:.2%}" if status != 'not_verified' else 'N/A',
'Raw Citation': raw_text,
'Title': ver_title,
'Authors': ver_authors,
'API Title': api_title,
'API Authors': api_authors
})
return filepath
except Exception:
return None
def update_view(view_mode, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path):
"""Update the view based on selected mode. Controls GROUP visibility."""
# OUTPUTS:
# 1. view_full_pdf (Group)
# 2. view_ref_pages (Group)
# 3. view_citations (Group)
# 4. view_verifications (Group)
# 5. pdf_viewer_ref (PDF Component - Update content if Ref Pages)
# 6. citations_display (HTML - Update content if Citations)
# 7. corrected_display (HTML - Update content if Verifications)
# 8. loading_indicator (Markdown)
# 9. state_ref_pdf_path (str) -- New Cache!
vis_full = gr.update(visible=False)
vis_ref = gr.update(visible=False)
vis_cit = gr.update(visible=False)
vis_ver = gr.update(visible=False)
upd_ref_pdf = gr.update()
upd_cit_disp = gr.update()
upd_ver_disp = gr.update()
upd_load = gr.update(visible=False) # Default hidden
if not state_extraction_done and view_mode != "Show Full PDF":
# Extraction in progress -> Show Loading (unless Full PDF)
upd_load = gr.update(visible=True)
# And keep all views hidden?
return (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path)
if view_mode == "Show Full PDF":
vis_full = gr.update(visible=True)
# pdf_viewer_full should already have content from process_pdf_initial
yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path)
elif view_mode == "Show Reference Pages":
vis_ref = gr.update(visible=True)
# Check cache first
if state_ref_pdf_path and os.path.exists(state_ref_pdf_path):
# Return path
upd_ref_pdf = gr.update(value=state_ref_pdf_path)
else:
# Generate the Subset PDF if needed.
if state_ref_pages and state_pdf_path:
doc = fitz.open(state_pdf_path)
new_doc = fitz.open()
new_doc.insert_pdf(doc, from_page=state_ref_pages[0], to_page=state_ref_pages[-1])
temp_preview = tempfile.NamedTemporaryFile(delete=False, suffix="_ref_subset.pdf")
output_path = temp_preview.name
temp_preview.close()
new_doc.save(output_path, garbage=4, deflate=True, clean=True, expand=True)
new_doc.close()
doc.close()
state_ref_pdf_path = output_path
# Return path
upd_ref_pdf = gr.update(value=output_path)
yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path)
elif view_mode == "Show Citations":
vis_cit = gr.update(visible=True)
# Content is pre-filled by extract_citations_auto
yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path)
elif view_mode == "Show Verifications":
vis_ver = gr.update(visible=True)
# Always render the list. Unverified items will show "Not Verified".
formatted_ver = format_verifications_display(state_citations)
upd_ver_disp = gr.update(value=formatted_ver)
# Content is pre-filled by run_citation_check
yield (vis_full, vis_ref, vis_cit, vis_ver, upd_ref_pdf, upd_cit_disp, upd_ver_disp, upd_load, state_ref_pdf_path)
# Build the UI
with gr.Blocks(title="CiteAudit", css="""
/* Container Styles */
#pdf-viewer-full, #pdf-viewer-ref {
height: 700px;
width: 100%;
}
#view-citations, #view-verifications {
border: none !important;
box-shadow: none !important;
background-color: transparent !important;
}
#citations-list, #view-verifications .gr-html {
background-color: transparent !important;
}
#main-display-area {
min-height: 700px;
border-radius: 8px;
background-color: var(--background-fill-primary);
}
/* Citation List */
.citations-container {
font-family: sans-serif;
font-size: 14px;
line-height: 1.5;
color: var(--body-text-color);
max-height: 600px;
overflow-y: auto;
padding: 12px;
border: 1px solid var(--border-color-primary);
border-radius: 4px;
background-color: var(--background-fill-secondary);
}
.citation-item {
margin-bottom: 16px;
padding-bottom: 8px;
border-bottom: 1px solid var(--border-color-primary);
}
.rejection-reason {
color: #ef5350; /* Red 400 */
font-weight: bold;
margin-left: 8px;
}
.dark .rejection-reason {
color: #ef9a9a; /* Red 200 */
}
.citation-metadata {
color: var(--body-text-color-subdued);
margin-left: 24px;
font-size: 0.95em;
margin-top: 4px;
}
/* Verification Styles */
.ver-verified {
color: #1b5e20; /* Green 900 */
margin-left: 24px;
font-size: 0.95em;
margin-top: 6px;
padding: 4px;
background-color: #e8f5e9; /* Green 50 */
border-left: 3px solid #4caf50; /* Green 500 */
}
.dark .ver-verified {
color: #a5d6a7; /* Green 200 */
background-color: rgba(27, 94, 32, 0.4); /* Dark Green alpha */
border-left-color: #66bb6a; /* Green 400 */
}
/* Status Badges in format_verifications_display */
.ver-badge-container {
font-family: monospace;
font-size: 14px;
background-color: var(--background-fill-secondary);
padding: 15px;
border-radius: 5px;
color: var(--body-text-color);
}
.ver-item {
margin-bottom: 20px;
padding: 10px;
border: 1px solid var(--border-color-primary);
border-radius: 5px;
}
.ver-status-verified {
margin-top: 8px;
padding: 6px;
background-color: #e8f5e9;
border-left: 3px solid #4caf50;
color: #1b5e20; /* Darker Text */
}
.dark .ver-status-verified {
background-color: rgba(27, 94, 32, 0.4);
border-left-color: #66bb6a;
color: #e8f5e9; /* Light Text */
}
.ver-status-verified strong, .ver-verified strong { color: inherit; }
.ver-status-ambiguous {
margin-top: 8px;
padding: 6px;
background-color: #fff3e0;
border-left: 3px solid #ff9800;
color: #e65100;
}
.dark .ver-status-ambiguous {
background-color: rgba(230, 81, 0, 0.3);
border-left-color: #ffb74d;
color: #ffe0b2;
}
.ver-status-hallucination {
margin-top: 8px;
padding: 6px;
background-color: #ffebee;
border-left: 3px solid #f44336;
color: #c62828;
}
.dark .ver-status-hallucination {
background-color: rgba(183, 28, 28, 0.3);
border-left-color: #e57373;
color: #ffcdd2;
}
.ver-status-error {
margin-top: 8px;
padding: 6px;
background-color: #fafafa;
border-left: 3px solid #9e9e9e;
color: #424242;
}
.dark .ver-status-error {
background-color: rgba(66, 66, 66, 0.4);
border-left-color: #bdbdbd;
color: #e0e0e0;
}
.ver-status-unverified {
margin-top: 8px;
padding: 6px;
background-color: #f5f5f5;
border-left: 3px solid #bdbdbd;
color: #757575;
}
.dark .ver-status-unverified {
background-color: rgba(97, 97, 97, 0.3);
border-left-color: #9e9e9e;
color: #bdbdbd;
}
""") as demo:
# Per-user session state
state_pdf_path = gr.State(None)
state_ref_pages = gr.State([])
state_citations = gr.State([])
state_removed_citations = gr.State([])
state_appendix_header = gr.State(None)
state_ref_text = gr.State("")
state_extraction_done = gr.State(False)
state_ref_pdf_path = gr.State(None) # Cache for Reference Pages PDF
state_pdf_name = gr.State("") # Original PDF filename
gr.Markdown("# CiteAudit")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="Upload PDF", file_types=[".pdf"])
status_text = gr.Textbox(label="Status", interactive=False, lines=6)
view_toggle = gr.Radio(
choices=["Show Full PDF", "Show Reference Pages", "Show Citations", "Show Verifications"],
value="Show Full PDF",
label="View Mode",
interactive=True,
visible=False
)
verification_divider = gr.Markdown("---", visible=False)
verification_header = gr.Markdown("### Citation Verification", visible=False)
api_key_input = gr.Textbox(
label="Semantic Scholar API Key (Optional)",
placeholder="Leave empty for free tier (with rate limits)",
type="password",
interactive=True,
visible=False
)
verify_btn = gr.Button("✅ Verify Citations", variant="secondary", interactive=False, visible=False)
check_count_slider = gr.Slider(
minimum=1,
maximum=50,
value=1,
step=1,
label="Number of citations to check",
interactive=False,
visible=False
)
export_btn = gr.Button("📊 Download Verifications (CSV)", visible=False)
download_file = gr.File(label="Download CSV", visible=False)
gr.Markdown("<br/><small style='color: var(--body-text-color-subdued);'>* Automated verification may have mistakes and are restricted to returns from Semantic Scholar API. Please check all your citations.</small>")
with gr.Column(scale=2, elem_id="main-display-area"):
# Loading indicator
loading_indicator = gr.Markdown("## ⏳ Extracting content...", visible=False)
# 1. Full PDF View
with gr.Group(visible=True) as view_full_pdf:
# Use gradio_pdf for better compatibility
pdf_viewer_full = PDF(label="Full PDF", elem_id="pdf-viewer-full", interactive=False)
# 2. Reference Pages View
with gr.Group(visible=False) as view_ref_pages:
# Use gradio_pdf
pdf_viewer_ref = PDF(label="Reference Pages", elem_id="pdf-viewer-ref", interactive=False)
# 3. Citations View
with gr.Group(visible=False, elem_id="view-citations") as view_citations:
citations_header = gr.Markdown("### Extracted Citations")
citations_display = gr.HTML(elem_id="citations-list")
# 4. Verifications View
with gr.Group(visible=False, elem_id="view-verifications") as view_verifications:
corrected_display = gr.HTML(label="Corrected Citations")
file_input.upload(
fn=process_pdf_initial,
inputs=[file_input, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text],
outputs=[file_input, status_text, pdf_viewer_full, view_toggle, citations_display, verify_btn, check_count_slider,
state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_appendix_header, state_ref_text,
citations_header, verification_header, verification_divider, api_key_input, state_extraction_done, corrected_display, state_ref_pdf_path, state_pdf_name, export_btn, download_file]
).then(
fn=extract_citations_auto,
inputs=[view_toggle, status_text, state_pdf_path, state_ref_pages, state_ref_text, state_citations, state_removed_citations, state_appendix_header, state_extraction_done],
outputs=[status_text, citations_display, verify_btn, check_count_slider, state_citations, state_removed_citations, state_ref_pages, state_ref_text, state_appendix_header, pdf_viewer_ref, loading_indicator, citations_header, verification_header, verification_divider, api_key_input, state_extraction_done, corrected_display, export_btn, download_file],
show_progress="hidden"
).then(
fn=update_view,
inputs=[view_toggle, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path],
outputs=[view_full_pdf, view_ref_pages, view_citations, view_verifications, pdf_viewer_ref, citations_display, corrected_display, loading_indicator, state_ref_pdf_path]
)
verify_btn.click(
fn=lambda status: (
gr.update(value="Show Verifications"),
status + "\n⏳ Starting verification process... Please wait.",
gr.update(), # Do not wipe previous content with a loading message
gr.update(visible=False, value=None), # Reset download button
gr.update(visible=False) # Hide export trigger button while processing
),
inputs=[status_text],
outputs=[view_toggle, status_text, corrected_display, download_file, export_btn]
).then(
fn=run_citation_check,
inputs=[check_count_slider, status_text, api_key_input, state_citations],
outputs=[status_text, corrected_display, citations_display, state_citations],
show_progress="hidden"
).then(
fn=lambda: gr.update(visible=True),
inputs=None,
outputs=[export_btn]
)
export_btn.click(
fn=export_verifications_csv,
inputs=[state_citations, state_pdf_name],
outputs=[download_file]
).then(
fn=lambda: gr.update(visible=True),
inputs=None,
outputs=[download_file]
)
view_toggle.change(
fn=update_view,
inputs=[view_toggle, state_pdf_path, state_ref_pages, state_citations, state_removed_citations, state_extraction_done, state_ref_pdf_path],
outputs=[view_full_pdf, view_ref_pages, view_citations, view_verifications, pdf_viewer_ref, citations_display, corrected_display, loading_indicator, state_ref_pdf_path],
concurrency_limit=None,
show_progress="hidden"
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, show_api=False)
|