File size: 3,263 Bytes
37c6d5c
 
 
 
 
 
2dad3d9
 
37c6d5c
 
2dad3d9
 
 
 
 
 
 
 
37c6d5c
 
 
 
2dad3d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37c6d5c
2dad3d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37c6d5c
 
 
 
 
 
 
2dad3d9
37c6d5c
 
 
 
 
 
 
2dad3d9
 
 
 
 
 
37c6d5c
2dad3d9
37c6d5c
2dad3d9
 
 
 
 
37c6d5c
2dad3d9
37c6d5c
2dad3d9
 
 
37c6d5c
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""
Utilities for processing uploaded files.
"""
import os
import tempfile
import shutil
from typing import List, Optional
from pathlib import Path

from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import (
    PyPDFLoader, 
    TextLoader, 
    CSVLoader, 
    UnstructuredExcelLoader,
    Docx2txtLoader
)
from langchain_core.documents import Document
from chainlit.types import AskFileResponse

import config

def get_document_loader(file_path: str):
    """
    Get appropriate document loader based on file extension.
    
    Args:
        file_path: Path to the file
        
    Returns:
        Document loader instance
    """
    file_extension = Path(file_path).suffix.lower()
    
    # Select appropriate loader based on file extension
    if file_extension == '.pdf':
        return PyPDFLoader(file_path)
    elif file_extension == '.txt' or file_extension == '.md' or file_extension == '.py':
        return TextLoader(file_path)
    elif file_extension == '.csv':
        return CSVLoader(file_path)
    elif file_extension == '.xlsx' or file_extension == '.xls':
        return UnstructuredExcelLoader(file_path)
    elif file_extension == '.docx' or file_extension == '.doc':
        return Docx2txtLoader(file_path)
    else:
        # Default to text loader
        return TextLoader(file_path)

def create_text_splitter():
    """
    Create a text splitter with the configured settings.
    
    Returns:
        Initialized text splitter
    """
    return RecursiveCharacterTextSplitter(
        chunk_size=config.CHUNK_SIZE,
        chunk_overlap=config.CHUNK_OVERLAP,
        length_function=len,
        is_separator_regex=False,
        separators=config.SEPARATORS
    )

def process_file(file: AskFileResponse) -> Optional[List[Document]]:
    """
    Process an uploaded file and split it into text chunks.
    
    Args:
        file: The uploaded file response from Chainlit
        
    Returns:
        List of document chunks or None if processing fails
    """
    print(f"Processing file: {file.name}")
    
    # Create a temporary file with the correct extension
    suffix = f".{file.name.split('.')[-1]}"
    with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
        try:
            # Copy the uploaded file content to the temporary file
            shutil.copyfile(file.path, temp_file.name)
            print(f"Created temporary file at: {temp_file.name}")
            
            # Get the appropriate loader
            loader = get_document_loader(temp_file.name)
                
            # Load documents
            documents = loader.load()
            
            # Initialize text splitter
            text_splitter = create_text_splitter()
            
            # Split documents into chunks
            texts = text_splitter.split_documents(documents)
            
            return texts
        except Exception as e:
            print(f"Error processing file: {e}")
            return None
        finally:
            # Clean up the temporary file
            try:
                os.unlink(temp_file.name)
            except Exception as e:
                print(f"Error cleaning up temporary file: {e}")