File size: 8,925 Bytes
6c33ee2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 | #!/usr/bin/env python3
"""
Stage 1: Extract estimated dates for documents.
Sources (in priority order):
1. Filename parsing (congress session, year folders, JFK doc IDs)
2. DATE entities already in the entities table (most frequent date per doc)
3. Regex patterns in OCR text (fallback)
Populates: document_dates table
"""
import re
import logging
import sys
from datetime import date, datetime
from collections import Counter
import psycopg2
import psycopg2.extras
from config import CONGRESS_DATES, BATCH_SIZE
from db import get_conn, fetch_all, fetch_one
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
log = logging.getLogger(__name__)
def parse_congress_from_path(file_path: str) -> int | None:
"""Extract congress session number from file path or filename."""
# Match patterns like congress_118, BILLS-118hr, congress_103
m = re.search(r'congress_(\d{2,3})', file_path)
if m:
return int(m.group(1))
m = re.search(r'BILLS-(\d{2,3})', file_path)
if m:
return int(m.group(1))
# Congressional Record with ordinal congress
m = re.search(r'(\d{2,3})(st|nd|rd|th)\s+Congress', file_path, re.IGNORECASE)
if m:
return int(m.group(1))
return None
def parse_year_from_path(file_path: str) -> int | None:
"""Extract a year from folder structure like /2021/ or /2017-2018/."""
# Folder-based year
m = re.search(r'/(\d{4})(?:[_/-](\d{4}))?/', file_path)
if m:
return int(m.group(1))
# Year in filename
m = re.search(r'[_-](\d{4})[_.-]', file_path)
if m:
yr = int(m.group(1))
if 1800 <= yr <= 2030:
return yr
return None
def congress_to_date_range(session: int) -> tuple[date | None, date | None]:
"""Convert congress session to a date range."""
if session in CONGRESS_DATES:
s, e = CONGRESS_DATES[session]
return date.fromisoformat(s), date.fromisoformat(e)
# Approximate: each congress starts Jan 3 of odd year
# Congress 1 started 1789, session N starts 1789 + (N-1)*2
start_year = 1789 + (session - 1) * 2
if 1789 <= start_year <= 2030:
return date(start_year, 1, 3), date(start_year + 2, 1, 3)
return None, None
def parse_date_entities(doc_id: int, conn) -> tuple[date | None, float]:
"""
Find the most common parseable date from DATE entities for a document.
Returns (estimated_date, confidence).
"""
with conn.cursor() as cur:
cur.execute(
"SELECT entity_text FROM entities "
"WHERE document_id = %s AND entity_type = 'DATE'",
(doc_id,)
)
rows = cur.fetchall()
if not rows:
return None, 0.0
year_counts = Counter()
full_dates = []
for (text,) in rows:
text = text.strip()
# Try full date patterns
for fmt in ("%B %d, %Y", "%b %d, %Y", "%m/%d/%Y", "%Y-%m-%d", "%d %B %Y"):
try:
dt = datetime.strptime(text, fmt).date()
if 1800 <= dt.year <= 2030:
full_dates.append(dt)
year_counts[dt.year] += 1
break
except ValueError:
continue
else:
# Try just year
m = re.search(r'\b(1[89]\d{2}|20[0-2]\d)\b', text)
if m:
year_counts[int(m.group(1))] += 1
if full_dates:
# Return most common full date
date_counts = Counter(full_dates)
best_date, count = date_counts.most_common(1)[0]
confidence = min(count / len(rows), 1.0)
return best_date, confidence
if year_counts:
best_year, count = year_counts.most_common(1)[0]
confidence = min(count / len(rows) * 0.5, 0.8) # lower confidence for year-only
return date(best_year, 7, 1), confidence # midpoint of year
return None, 0.0
def process_documents():
"""Main processing loop."""
conn = get_conn()
conn.autocommit = False
# Get documents that don't have dates yet
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""
SELECT d.id, d.file_path, d.source_section
FROM documents d
LEFT JOIN document_dates dd ON dd.document_id = d.id
WHERE dd.document_id IS NULL
ORDER BY d.id
""")
docs = cur.fetchall()
total = len(docs)
log.info(f"Processing {total} documents for date extraction")
batch = []
processed = 0
for doc in docs:
doc_id = doc["id"]
path = doc["file_path"]
section = doc["source_section"]
estimated_date = None
date_source = None
date_confidence = 0.0
date_range_start = None
date_range_end = None
congress_session = None
# Priority 1: Congress session from filename
congress = parse_congress_from_path(path)
if congress:
congress_session = congress
start, end = congress_to_date_range(congress)
if start and end:
date_range_start = start
date_range_end = end
# Midpoint as estimate
mid = start.toordinal() + (end.toordinal() - start.toordinal()) // 2
estimated_date = date.fromordinal(mid)
date_source = "filename_congress"
date_confidence = 0.7
# Priority 2: Year from folder/filename
if not estimated_date:
year = parse_year_from_path(path)
if year:
estimated_date = date(year, 7, 1)
date_range_start = date(year, 1, 1)
date_range_end = date(year, 12, 31)
date_source = "filename_year"
date_confidence = 0.6
# Priority 3: DATE entities from NER
if not estimated_date:
ner_date, ner_conf = parse_date_entities(doc_id, conn)
if ner_date:
estimated_date = ner_date
date_source = "ner_entities"
date_confidence = ner_conf
# Priority 4: Collection-level defaults
if not estimated_date:
defaults = {
"cia_mkultra": (date(1963, 1, 1), "collection_default", 0.3,
date(1953, 1, 1), date(1973, 12, 31)),
"cia_stargate": (date(1986, 1, 1), "collection_default", 0.3,
date(1978, 1, 1), date(1995, 12, 31)),
"lincoln_archives": (date(1865, 1, 1), "collection_default", 0.3,
date(1860, 1, 1), date(1877, 12, 31)),
}
if section in defaults:
d = defaults[section]
estimated_date = d[0]
date_source = d[1]
date_confidence = d[2]
date_range_start = d[3]
date_range_end = d[4]
batch.append((
doc_id, estimated_date, date_source, date_confidence,
date_range_start, date_range_end, congress_session,
))
if len(batch) >= BATCH_SIZE:
_flush_batch(conn, batch)
processed += len(batch)
log.info(f"Progress: {processed}/{total} ({processed*100//total}%)")
batch = []
if batch:
_flush_batch(conn, batch)
processed += len(batch)
conn.close()
log.info(f"Done. Processed {processed} documents.")
# Stats
stats = fetch_all("""
SELECT date_source, COUNT(*) as cnt,
ROUND(AVG(date_confidence)::numeric, 2) as avg_conf
FROM document_dates
GROUP BY date_source
ORDER BY cnt DESC
""")
log.info("Date extraction stats:")
for row in stats:
log.info(f" {row['date_source'] or 'no_date'}: {row['cnt']} docs (avg conf: {row['avg_conf']})")
def _flush_batch(conn, batch):
with conn.cursor() as cur:
psycopg2.extras.execute_batch(
cur,
"""INSERT INTO document_dates
(document_id, estimated_date, date_source, date_confidence,
date_range_start, date_range_end, congress_session)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (document_id) DO UPDATE SET
estimated_date = EXCLUDED.estimated_date,
date_source = EXCLUDED.date_source,
date_confidence = EXCLUDED.date_confidence,
date_range_start = EXCLUDED.date_range_start,
date_range_end = EXCLUDED.date_range_end,
congress_session = EXCLUDED.congress_session,
created_at = NOW()
""",
batch,
page_size=500,
)
conn.commit()
if __name__ == "__main__":
process_documents()
|