| """Parquet service — converts, uploads, downloads, and deletes Parquet files for CSV/XLSX. |
| |
| Parquet files are stored in Azure Blob alongside the original document using |
| a deterministic naming convention based on document_id: |
| |
| CSV: {user_id}/{document_id}.parquet |
| XLSX sheet: {user_id}/{document_id}__{safe_sheet_name}.parquet |
| |
| This allows tabular.py to construct the correct blob name at retrieval time |
| without needing to store it separately, and allows document_pipeline.py to |
| delete all Parquet files for a document using a prefix delete. |
| """ |
|
|
| import io |
|
|
| import pandas as pd |
|
|
| from src.middlewares.logging import get_logger |
| from src.storage.az_blob.az_blob import blob_storage |
|
|
| logger = get_logger("parquet_service") |
|
|
|
|
| def _safe_sheet_name(sheet_name: str) -> str: |
| return sheet_name.replace("/", "_").replace(" ", "_").replace("\\", "_") |
|
|
|
|
| def parquet_blob_name(user_id: str, document_id: str, sheet_name: str | None = None) -> str: |
| """Construct deterministic Parquet blob name.""" |
| if sheet_name: |
| return f"{user_id}/{document_id}__{_safe_sheet_name(sheet_name)}.parquet" |
| return f"{user_id}/{document_id}.parquet" |
|
|
|
|
| def _to_parquet_bytes(df: pd.DataFrame) -> bytes: |
| buf = io.BytesIO() |
| df.to_parquet(buf, index=False) |
| return buf.getvalue() |
|
|
|
|
| async def upload_parquet( |
| df: pd.DataFrame, |
| user_id: str, |
| document_id: str, |
| sheet_name: str | None = None, |
| ) -> str: |
| """Convert DataFrame to Parquet and upload to Azure Blob. Returns blob_name.""" |
| blob_name = parquet_blob_name(user_id, document_id, sheet_name) |
| parquet_bytes = _to_parquet_bytes(df) |
| await blob_storage.upload_bytes(parquet_bytes, blob_name) |
| logger.info(f"Uploaded Parquet {blob_name} ({len(parquet_bytes)} bytes)") |
| return blob_name |
|
|
|
|
| async def download_parquet( |
| user_id: str, |
| document_id: str, |
| sheet_name: str | None = None, |
| ) -> pd.DataFrame: |
| """Download Parquet from Azure Blob and return as DataFrame.""" |
| blob_name = parquet_blob_name(user_id, document_id, sheet_name) |
| content = await blob_storage.download_file(blob_name) |
| df = pd.read_parquet(io.BytesIO(content)) |
| logger.info(f"Downloaded Parquet {blob_name}: {len(df)} rows, {len(df.columns)} columns") |
| return df |
|
|
|
|
| async def delete_document_parquets(user_id: str, document_id: str) -> int: |
| """Delete all Parquet files for a document (CSV = 1 file, XLSX = one per sheet). |
| |
| Uses prefix delete: {user_id}/{document_id} matches all Parquet variants |
| for this document without touching the original blob (which uses a random UUID name). |
| """ |
| prefix = f"{user_id}/{document_id}" |
| deleted = await blob_storage.delete_blobs_with_prefix(prefix) |
| logger.info(f"Deleted {deleted} Parquet file(s) for document {document_id}") |
| return deleted |
|
|