File size: 2,794 Bytes
767625e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""Parquet service — converts, uploads, downloads, and deletes Parquet files for CSV/XLSX.

Parquet files are stored in Azure Blob alongside the original document using
a deterministic naming convention based on document_id:

  CSV:        {user_id}/{document_id}.parquet
  XLSX sheet: {user_id}/{document_id}__{safe_sheet_name}.parquet

This allows tabular.py to construct the correct blob name at retrieval time
without needing to store it separately, and allows document_pipeline.py to
delete all Parquet files for a document using a prefix delete.
"""

import io

import pandas as pd

from src.middlewares.logging import get_logger
from src.storage.az_blob.az_blob import blob_storage

logger = get_logger("parquet_service")


def _safe_sheet_name(sheet_name: str) -> str:
    return sheet_name.replace("/", "_").replace(" ", "_").replace("\\", "_")


def parquet_blob_name(user_id: str, document_id: str, sheet_name: str | None = None) -> str:
    """Construct deterministic Parquet blob name."""
    if sheet_name:
        return f"{user_id}/{document_id}__{_safe_sheet_name(sheet_name)}.parquet"
    return f"{user_id}/{document_id}.parquet"


def _to_parquet_bytes(df: pd.DataFrame) -> bytes:
    buf = io.BytesIO()
    df.to_parquet(buf, index=False)
    return buf.getvalue()


async def upload_parquet(
    df: pd.DataFrame,
    user_id: str,
    document_id: str,
    sheet_name: str | None = None,
) -> str:
    """Convert DataFrame to Parquet and upload to Azure Blob. Returns blob_name."""
    blob_name = parquet_blob_name(user_id, document_id, sheet_name)
    parquet_bytes = _to_parquet_bytes(df)
    await blob_storage.upload_bytes(parquet_bytes, blob_name)
    logger.info(f"Uploaded Parquet {blob_name} ({len(parquet_bytes)} bytes)")
    return blob_name


async def download_parquet(
    user_id: str,
    document_id: str,
    sheet_name: str | None = None,
) -> pd.DataFrame:
    """Download Parquet from Azure Blob and return as DataFrame."""
    blob_name = parquet_blob_name(user_id, document_id, sheet_name)
    content = await blob_storage.download_file(blob_name)
    df = pd.read_parquet(io.BytesIO(content))
    logger.info(f"Downloaded Parquet {blob_name}: {len(df)} rows, {len(df.columns)} columns")
    return df


async def delete_document_parquets(user_id: str, document_id: str) -> int:
    """Delete all Parquet files for a document (CSV = 1 file, XLSX = one per sheet).

    Uses prefix delete: {user_id}/{document_id} matches all Parquet variants
    for this document without touching the original blob (which uses a random UUID name).
    """
    prefix = f"{user_id}/{document_id}"
    deleted = await blob_storage.delete_blobs_with_prefix(prefix)
    logger.info(f"Deleted {deleted} Parquet file(s) for document {document_id}")
    return deleted