Spaces:
Sleeping
Sleeping
File size: 7,807 Bytes
01d5a5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
from pathlib import Path
import os
import uuid
from datetime import datetime
from lpm_kernel.common.logging import logger
from lpm_kernel.models.memory import Memory
from lpm_kernel.common.repository.database_session import DatabaseSession
from lpm_kernel.file_data.process_factory import ProcessorFactory
from lpm_kernel.file_data.document_service import DocumentService
from lpm_kernel.file_data.document_dto import CreateDocumentRequest
from .process_status import ProcessStatus
from sqlalchemy import select
class StorageService:
def __init__(self, config):
self.config = config
# get raw content directory configuration
raw_content_dir = config.get("USER_RAW_CONTENT_DIR", "resources/raw_content")
base_dir = config.get("LOCAL_BASE_DIR", ".")
logger.info(f"Initializing storage service, base_dir: {base_dir}")
logger.info(f"Raw content directory configuration: {raw_content_dir}")
# if path is not absolute, build full path based on base_dir
if not os.path.isabs(raw_content_dir):
# replace environment variable
raw_content_dir = raw_content_dir.replace("${RESOURCE_DIR}", "resources")
raw_content_dir = raw_content_dir.replace(
"${RAW_CONTENT_DIR}", "resources/raw_content"
)
# build full path based on base_dir
raw_content_dir = os.path.join(base_dir, raw_content_dir)
logger.info(f"Building complete path: {raw_content_dir}")
# convert to Path object and ensure directory exists
self.base_path = Path(raw_content_dir).resolve()
self.base_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Storage path created: {self.base_path}")
self.document_service = DocumentService()
def check_file_exists(self, filename: str, filesize: int) -> Memory:
"""Check if file already exists
Args:
filename: file name
filesize: file size
Returns:
Memory: if file exists, return corresponding Memory object; otherwise return None
"""
db = DatabaseSession()
with db._session_factory() as session:
# find record with same file name and size
query = select(Memory).where(
Memory.name == filename, Memory.size == filesize
)
result = session.execute(query)
memory = result.scalar_one_or_none()
if memory:
logger.info(f"Found duplicate file: {filename}, size: {filesize}")
# check if file really exists
if os.path.exists(memory.path):
return memory
logger.warning(f"File in database does not exist on disk: {memory.path}")
return None
def save_file(self, file, metadata=None):
"""Save file and process document
Args:
file: uploaded file object
metadata: file metadata
Returns:
tuple: (Memory object, Document object)
Raises:
ValueError: if file already exists
"""
logger.info(f"Starting to save file: {file.filename}")
logger.debug(f"File metadata: {metadata}")
try:
# get file size
file.seek(0, os.SEEK_END)
filesize = file.tell()
file.seek(0) # reset file pointer to start position
# check if file already exists
existing_memory = self.check_file_exists(file.filename, filesize)
if existing_memory:
raise ValueError(f"File '{file.filename}' already exists")
# save file to disk
filepath, filename, filesize = self._save_file_to_disk(file)
logger.info(f"File saved to disk: {filepath}, size: {filesize} bytes")
# create Memory record
memory = None
document = None
db = DatabaseSession()
session = db._session_factory()
try:
# create and save Memory record
memory = Memory(
name=filename,
size=filesize,
path=str(filepath),
metadata=metadata or {},
)
session.add(memory)
session.commit()
logger.info(f"Memory record created successfully: {memory.id}")
# process document
document = self._process_document(filepath, metadata)
if document:
memory.document_id = document.id
session.add(memory)
session.commit()
logger.info(f"Memory record updated, associated document ID: {document.id}")
# refresh memory object to ensure all fields are up to date
session.refresh(memory)
except Exception as e:
session.rollback()
logger.error(f"Database operation failed: {str(e)}", exc_info=True)
raise
finally:
session.close()
return memory, document
except Exception as e:
logger.error(f"Error occurred during file saving: {str(e)}", exc_info=True)
raise
def _save_file_to_disk(self, file):
"""Save file to disk
Args:
file: uploaded file object
Returns:
tuple: (file path, file name, file size)
"""
try:
# ensure directory exists
self.base_path.mkdir(parents=True, exist_ok=True)
logger.debug(f"Ensuring storage directory exists: {self.base_path}")
# generate file name and path
filename = file.filename
filepath = self.base_path / filename
logger.info(f"Preparing to save file to: {filepath}")
# save file
file.save(str(filepath))
filesize = os.path.getsize(filepath)
logger.info(f"File saved successfully: {filepath}, size: {filesize} bytes")
return filepath, filename, filesize
except Exception as e:
logger.error(f"Failed to save file to disk: {str(e)}", exc_info=True)
raise
def _process_document(self, filepath, metadata=None):
"""Process document and create Document record
Args:
filepath: file path
metadata: file metadata
Returns:
Document: created Document object, return None if processing fails
"""
try:
logger.info(f"Starting to process document: {filepath}")
doc = ProcessorFactory.auto_detect_and_process(str(filepath))
logger.info(
f"Document processing completed, type: {doc.mime_type}, size: {doc.document_size}"
)
request = CreateDocumentRequest(
name=doc.name,
title=metadata.get("name", doc.name) if metadata else doc.name,
mime_type=doc.mime_type,
user_description=metadata.get("description", "Uploaded document")
if metadata
else "Uploaded document",
document_size=doc.document_size,
url=str(filepath),
raw_content=doc.raw_content,
extract_status=doc.extract_status,
embedding_status=ProcessStatus.INITIALIZED,
)
saved_doc = self.document_service.create_document(request)
logger.info(f"Document record created: {saved_doc.id}")
return saved_doc
except Exception as e:
logger.error(f"Document processing failed: {str(e)}", exc_info=True)
return None
|