sofhiaazzhr commited on
Commit
770f26b
·
1 Parent(s): 29efec6

[KM-513][document] add convert to parquet if type file is XLSX and CSV

Browse files
src/knowledge/parquet_service.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Parquet service — converts, uploads, downloads, and deletes Parquet files for CSV/XLSX.
2
+
3
+ Parquet files are stored in Azure Blob alongside the original document using
4
+ a deterministic naming convention based on document_id:
5
+
6
+ CSV: {user_id}/{document_id}.parquet
7
+ XLSX sheet: {user_id}/{document_id}__{safe_sheet_name}.parquet
8
+
9
+ This allows tabular.py to construct the correct blob name at retrieval time
10
+ without needing to store it separately, and allows document_pipeline.py to
11
+ delete all Parquet files for a document using a prefix delete.
12
+ """
13
+
14
+ import io
15
+
16
+ import pandas as pd
17
+
18
+ from src.middlewares.logging import get_logger
19
+ from src.storage.az_blob.az_blob import blob_storage
20
+
21
+ logger = get_logger("parquet_service")
22
+
23
+
24
+ def _safe_sheet_name(sheet_name: str) -> str:
25
+ return sheet_name.replace("/", "_").replace(" ", "_").replace("\\", "_")
26
+
27
+
28
+ def parquet_blob_name(user_id: str, document_id: str, sheet_name: str | None = None) -> str:
29
+ """Construct deterministic Parquet blob name."""
30
+ if sheet_name:
31
+ return f"{user_id}/{document_id}__{_safe_sheet_name(sheet_name)}.parquet"
32
+ return f"{user_id}/{document_id}.parquet"
33
+
34
+
35
+ def _to_parquet_bytes(df: pd.DataFrame) -> bytes:
36
+ buf = io.BytesIO()
37
+ df.to_parquet(buf, index=False)
38
+ return buf.getvalue()
39
+
40
+
41
+ async def upload_parquet(
42
+ df: pd.DataFrame,
43
+ user_id: str,
44
+ document_id: str,
45
+ sheet_name: str | None = None,
46
+ ) -> str:
47
+ """Convert DataFrame to Parquet and upload to Azure Blob. Returns blob_name."""
48
+ blob_name = parquet_blob_name(user_id, document_id, sheet_name)
49
+ parquet_bytes = _to_parquet_bytes(df)
50
+ await blob_storage.upload_bytes(parquet_bytes, blob_name)
51
+ logger.info(f"Uploaded Parquet {blob_name} ({len(parquet_bytes)} bytes)")
52
+ return blob_name
53
+
54
+
55
+ async def download_parquet(
56
+ user_id: str,
57
+ document_id: str,
58
+ sheet_name: str | None = None,
59
+ ) -> pd.DataFrame:
60
+ """Download Parquet from Azure Blob and return as DataFrame."""
61
+ blob_name = parquet_blob_name(user_id, document_id, sheet_name)
62
+ content = await blob_storage.download_file(blob_name)
63
+ df = pd.read_parquet(io.BytesIO(content))
64
+ logger.info(f"Downloaded Parquet {blob_name}: {len(df)} rows, {len(df.columns)} columns")
65
+ return df
66
+
67
+
68
+ async def delete_document_parquets(user_id: str, document_id: str) -> int:
69
+ """Delete all Parquet files for a document (CSV = 1 file, XLSX = one per sheet).
70
+
71
+ Uses prefix delete: {user_id}/{document_id} matches all Parquet variants
72
+ for this document without touching the original blob (which uses a random UUID name).
73
+ """
74
+ prefix = f"{user_id}/{document_id}"
75
+ deleted = await blob_storage.delete_blobs_with_prefix(prefix)
76
+ logger.info(f"Deleted {deleted} Parquet file(s) for document {document_id}")
77
+ return deleted
src/knowledge/processing_service.py CHANGED
@@ -7,6 +7,7 @@ from src.storage.az_blob.az_blob import blob_storage
7
  from src.db.postgres.models import Document as DBDocument
8
  from sqlalchemy.ext.asyncio import AsyncSession
9
  from src.middlewares.logging import get_logger
 
10
  from typing import List
11
  from datetime import datetime, timezone, timedelta
12
  import sys
@@ -44,9 +45,9 @@ class KnowledgeProcessingService:
44
  if db_doc.file_type == "pdf":
45
  documents = await self._build_pdf_documents(content, db_doc)
46
  elif db_doc.file_type == "csv":
47
- documents = self._build_csv_documents(content, db_doc)
48
  elif db_doc.file_type == "xlsx":
49
- documents = self._build_excel_documents(content, db_doc)
50
  else:
51
  text = self._extract_text(content, db_doc.file_type)
52
  if not text.strip():
@@ -168,18 +169,25 @@ class KnowledgeProcessingService:
168
  ))
169
  return documents
170
 
171
- def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
172
- """Profile each column of a CSV file."""
173
  df = pd.read_csv(BytesIO(content))
 
 
174
  return self._profile_dataframe(df, db_doc.filename, db_doc)
175
 
176
- def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
177
- """Profile each column of every sheet in an Excel file."""
178
  sheets = pd.read_excel(BytesIO(content), sheet_name=None)
179
  documents = []
180
  for sheet_name, df in sheets.items():
181
  source_name = f"{db_doc.filename} / sheet: {sheet_name}"
182
- documents.extend(self._profile_dataframe(df, source_name, db_doc))
 
 
 
 
 
183
  return documents
184
 
185
  def _extract_text(self, content: bytes, file_type: str) -> str:
 
7
  from src.db.postgres.models import Document as DBDocument
8
  from sqlalchemy.ext.asyncio import AsyncSession
9
  from src.middlewares.logging import get_logger
10
+ from src.knowledge.parquet_service import upload_parquet
11
  from typing import List
12
  from datetime import datetime, timezone, timedelta
13
  import sys
 
45
  if db_doc.file_type == "pdf":
46
  documents = await self._build_pdf_documents(content, db_doc)
47
  elif db_doc.file_type == "csv":
48
+ documents = await self._build_csv_documents(content, db_doc)
49
  elif db_doc.file_type == "xlsx":
50
+ documents = await self._build_excel_documents(content, db_doc)
51
  else:
52
  text = self._extract_text(content, db_doc.file_type)
53
  if not text.strip():
 
169
  ))
170
  return documents
171
 
172
+ async def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
173
+ """Profile each column of a CSV file and upload Parquet to Azure Blob."""
174
  df = pd.read_csv(BytesIO(content))
175
+ await upload_parquet(df, db_doc.user_id, db_doc.id)
176
+ logger.info(f"Uploaded Parquet for CSV {db_doc.id}")
177
  return self._profile_dataframe(df, db_doc.filename, db_doc)
178
 
179
+ async def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
180
+ """Profile each column of every sheet in an Excel file and upload one Parquet per sheet."""
181
  sheets = pd.read_excel(BytesIO(content), sheet_name=None)
182
  documents = []
183
  for sheet_name, df in sheets.items():
184
  source_name = f"{db_doc.filename} / sheet: {sheet_name}"
185
+ docs = self._profile_dataframe(df, source_name, db_doc)
186
+ for doc in docs:
187
+ doc.metadata["data"]["sheet_name"] = sheet_name
188
+ documents.extend(docs)
189
+ await upload_parquet(df, db_doc.user_id, db_doc.id, sheet_name)
190
+ logger.info(f"Uploaded Parquet for sheet '{sheet_name}' of {db_doc.id}")
191
  return documents
192
 
193
  def _extract_text(self, content: bytes, file_type: str) -> str:
src/pipeline/document_pipeline/document_pipeline.py CHANGED
@@ -5,6 +5,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
5
 
6
  from src.document.document_service import document_service
7
  from src.knowledge.processing_service import knowledge_processor
 
8
  from src.middlewares.logging import get_logger
9
  from src.storage.az_blob.az_blob import blob_storage
10
 
@@ -81,6 +82,9 @@ class DocumentPipeline:
81
 
82
  await document_service.delete_document(db, document_id)
83
 
 
 
 
84
  logger.info(f"Deleted document {document_id} for user {user_id}")
85
  return {"document_id": document_id}
86
 
 
5
 
6
  from src.document.document_service import document_service
7
  from src.knowledge.processing_service import knowledge_processor
8
+ from src.knowledge.parquet_service import delete_document_parquets
9
  from src.middlewares.logging import get_logger
10
  from src.storage.az_blob.az_blob import blob_storage
11
 
 
82
 
83
  await document_service.delete_document(db, document_id)
84
 
85
+ if document.file_type in ("csv", "xlsx"):
86
+ await delete_document_parquets(user_id, document_id)
87
+
88
  logger.info(f"Deleted document {document_id} for user {user_id}")
89
  return {"document_id": document_id}
90
 
src/storage/az_blob/az_blob.py CHANGED
@@ -57,6 +57,22 @@ class AzureBlobStorage:
57
  logger.error(f"Failed to download blob {blob_name}", error=str(e))
58
  raise
59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
  async def delete_file(self, blob_name: str) -> bool:
61
  """Delete file from Azure Blob Storage."""
62
  try:
@@ -71,6 +87,24 @@ class AzureBlobStorage:
71
  logger.error(f"Failed to delete blob {blob_name}", error=str(e))
72
  return False
73
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
 
75
  # Singleton instance
76
  blob_storage = AzureBlobStorage()
 
57
  logger.error(f"Failed to download blob {blob_name}", error=str(e))
58
  raise
59
 
60
+ async def upload_bytes(self, content: bytes, blob_name: str) -> str:
61
+ """Upload bytes to Azure Blob Storage using a specific blob name.
62
+
63
+ Unlike upload_file(), this does not generate a UUID name — caller controls the blob_name.
64
+ Used for Parquet files where the name must be deterministic (derived from document_id).
65
+ """
66
+ try:
67
+ async with self._get_blob_client(blob_name) as blob_client:
68
+ logger.info(f"Uploading bytes to blob {blob_name}")
69
+ await blob_client.upload_blob(content, overwrite=True)
70
+ logger.info(f"Successfully uploaded {blob_name}")
71
+ return blob_name
72
+ except Exception as e:
73
+ logger.error(f"Failed to upload bytes to {blob_name}", error=str(e))
74
+ raise
75
+
76
  async def delete_file(self, blob_name: str) -> bool:
77
  """Delete file from Azure Blob Storage."""
78
  try:
 
87
  logger.error(f"Failed to delete blob {blob_name}", error=str(e))
88
  return False
89
 
90
+ async def delete_blobs_with_prefix(self, prefix: str) -> int:
91
+ """Delete all blobs whose name starts with prefix. Returns count deleted.
92
+
93
+ Used to delete all Parquet files for a document in one call.
94
+ """
95
+ from azure.storage.blob.aio import ContainerClient
96
+ container_url = f"{self.account_url}/{self.container_name}?{self.sas_token}"
97
+ deleted = 0
98
+ try:
99
+ async with ContainerClient.from_container_url(container_url) as container:
100
+ async for blob in container.list_blobs(name_starts_with=prefix):
101
+ await container.delete_blob(blob.name)
102
+ deleted += 1
103
+ logger.info(f"Deleted {deleted} blobs with prefix {prefix}")
104
+ except Exception as e:
105
+ logger.error(f"Failed to delete blobs with prefix {prefix}", error=str(e))
106
+ return deleted
107
+
108
 
109
  # Singleton instance
110
  blob_storage = AzureBlobStorage()