kennethzychew's picture
fix(lint): clear 5 ruff F401/F841 errors found by verifier
d4cf766
Raw
History Blame Contribute Delete
3.66 kB
"""CSV export from the SQLite document store.
Exports all persisted records from the SQLite database to a flat CSV file,
one row per document. Line items (stored as a JSON column) are serialised as
a count rather than expanded into multiple rows, keeping the CSV flat and
easy to open in a spreadsheet.
See ``docs/02_architecture.md`` section 4 and build-plan task 4.1.
"""
from __future__ import annotations
import csv
import json
import logging
from pathlib import Path
from doc_agent.store.db import _connect
logger = logging.getLogger(__name__)
# Columns written to the CSV, in order. Matches the DB columns except
# line_items is replaced with line_item_count for readability.
_CSV_FIELDS = [
"id",
"content_hash",
"source_path",
"processed_at",
"doc_type",
"vendor_name",
"vendor_address",
"invoice_number",
"document_date",
"due_date",
"currency",
"subtotal",
"tax",
"total",
"line_item_count",
"confidence",
"backend",
"modality",
"decision",
]
_SELECT = """
SELECT
id, content_hash, source_path, processed_at,
doc_type, vendor_name, vendor_address, invoice_number,
document_date, due_date, currency,
subtotal, tax, total, line_items,
confidence, backend, modality, decision
FROM documents
ORDER BY id
"""
def export_csv(db_path: Path, output_path: Path) -> int:
"""Export all records from the database to a CSV file.
Args:
db_path: Path to the SQLite database file.
output_path: Path to write the CSV; parent directories are created
if they do not exist.
Returns:
Number of rows written (0 if the database is empty or absent).
"""
if not db_path.exists():
logger.warning("export: database not found at %s, writing empty CSV", db_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(",".join(_CSV_FIELDS) + "\n", encoding="utf-8")
return 0
output_path.parent.mkdir(parents=True, exist_ok=True)
written = 0
with _connect(db_path) as conn:
rows = conn.execute(_SELECT).fetchall()
with output_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=_CSV_FIELDS)
writer.writeheader()
for row in rows:
(
id_, hash_, src, ts,
doc_type, vendor, addr, inv_num,
doc_date, due_date, currency,
subtotal, tax, total, line_items_json,
confidence, backend, modality, decision,
) = row
try:
line_item_count = len(json.loads(line_items_json or "[]"))
except (json.JSONDecodeError, TypeError):
line_item_count = 0
writer.writerow({
"id": id_,
"content_hash": hash_,
"source_path": src,
"processed_at": ts,
"doc_type": doc_type,
"vendor_name": vendor,
"vendor_address": addr,
"invoice_number": inv_num,
"document_date": doc_date,
"due_date": due_date,
"currency": currency,
"subtotal": subtotal,
"tax": tax,
"total": total,
"line_item_count": line_item_count,
"confidence": confidence,
"backend": backend,
"modality": modality,
"decision": decision,
})
written += 1
logger.info("export: wrote %d rows to %s", written, output_path)
return written