File size: 7,102 Bytes
5b14aa2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""Excel file processor."""

import os
import logging
from typing import Dict, Any

from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, FileNotFoundError

# Configure logging
logger = logging.getLogger(__name__)


class ExcelProcessor(BaseProcessor):
    """Processor for Excel files (XLSX, XLS) and CSV files."""
    
    def can_process(self, file_path: str) -> bool:
        """Check if this processor can handle the given file.
        
        Args:
            file_path: Path to the file to check
            
        Returns:
            True if this processor can handle the file
        """
        if not os.path.exists(file_path):
            return False
        
        # Check file extension - ensure file_path is a string
        file_path_str = str(file_path)
        _, ext = os.path.splitext(file_path_str.lower())
        return ext in ['.xlsx', '.xls', '.csv']
    
    def process(self, file_path: str) -> ConversionResult:
        """Process the Excel file and return a conversion result.
        
        Args:
            file_path: Path to the Excel file to process
            
        Returns:
            ConversionResult containing the processed content
            
        Raises:
            FileNotFoundError: If the file doesn't exist
            ConversionError: If processing fails
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        # Check file extension - ensure file_path is a string
        file_path_str = str(file_path)
        _, ext = os.path.splitext(file_path_str.lower())
        
        if ext == '.csv':
            return self._process_csv(file_path)
        else:
            return self._process_excel(file_path)
    
    def _process_csv(self, file_path: str) -> ConversionResult:
        """Process a CSV file and return a conversion result.
        
        Args:
            file_path: Path to the CSV file to process
            
        Returns:
            ConversionResult containing the processed content
        """
        try:
            import pandas as pd
            
            df = pd.read_csv(file_path)
            content_parts = []
            
            content_parts.append(f"# CSV Data: {os.path.basename(file_path)}")
            content_parts.append("")
            
            # Convert DataFrame to markdown table
            table_md = self._dataframe_to_markdown(df, pd)
            content_parts.append(table_md)
            
            metadata = {
                "row_count": len(df),
                "column_count": len(df.columns),
                "columns": df.columns.tolist(),
                "extractor": "pandas"
            }
            
            content = '\n'.join(content_parts)
            
            return ConversionResult(content, metadata)
            
        except ImportError:
            raise ConversionError("pandas is required for CSV processing. Install it with: pip install pandas")
        except Exception as e:
            raise ConversionError(f"Failed to process CSV file {file_path}: {str(e)}")
    
    def _process_excel(self, file_path: str) -> ConversionResult:
        """Process an Excel file and return a conversion result.
        
        Args:
            file_path: Path to the Excel file to process
            
        Returns:
            ConversionResult containing the processed content
        """
        try:
            import pandas as pd
            
            excel_file = pd.ExcelFile(file_path)
            sheet_names = excel_file.sheet_names
            
            metadata = {
                "sheet_count": len(sheet_names),
                "sheet_names": sheet_names,
                "extractor": "pandas"
            }
            
            content_parts = []
            
            for sheet_name in sheet_names:
                df = pd.read_excel(file_path, sheet_name=sheet_name)
                if not df.empty:
                    content_parts.append(f"\n## Sheet: {sheet_name}")
                    content_parts.append("")
                    
                    # Convert DataFrame to markdown table
                    table_md = self._dataframe_to_markdown(df, pd)
                    content_parts.append(table_md)
                    content_parts.append("")
                    
                    # Add metadata for this sheet
                    metadata.update({
                        f"sheet_{sheet_name}_rows": len(df),
                        f"sheet_{sheet_name}_columns": len(df.columns),
                        f"sheet_{sheet_name}_columns_list": df.columns.tolist()
                    })
            
            content = '\n'.join(content_parts)
            
            return ConversionResult(content, metadata)
            
        except ImportError:
            raise ConversionError("pandas and openpyxl are required for Excel processing. Install them with: pip install pandas openpyxl")
        except Exception as e:
            if isinstance(e, (FileNotFoundError, ConversionError)):
                raise
            raise ConversionError(f"Failed to process Excel file {file_path}: {str(e)}")
    
    def _dataframe_to_markdown(self, df, pd) -> str:
        """Convert pandas DataFrame to markdown table.
        
        Args:
            df: pandas DataFrame
            pd: pandas module reference
            
        Returns:
            Markdown table string
        """
        if df.empty:
            return "*No data available*"
        
        # Convert DataFrame to markdown table
        markdown_parts = []
        
        # Header
        markdown_parts.append("| " + " | ".join(str(col) for col in df.columns) + " |")
        markdown_parts.append("| " + " | ".join(["---"] * len(df.columns)) + " |")
        
        # Data rows
        for _, row in df.iterrows():
            row_data = []
            for cell in row:
                if pd.isna(cell):
                    row_data.append("")
                else:
                    row_data.append(str(cell))
            markdown_parts.append("| " + " | ".join(row_data) + " |")
        
        return "\n".join(markdown_parts)
    
    def _clean_content(self, content: str) -> str:
        """Clean up the extracted Excel content.
        
        Args:
            content: Raw Excel text content
            
        Returns:
            Cleaned text content
        """
        # Remove excessive whitespace and normalize
        lines = content.split('\n')
        cleaned_lines = []
        
        for line in lines:
            # Remove excessive whitespace
            line = ' '.join(line.split())
            if line.strip():
                cleaned_lines.append(line)
        
        # Join lines and add proper spacing
        content = '\n'.join(cleaned_lines)
        
        # Add spacing around headers
        content = content.replace('# ', '\n# ')
        content = content.replace('## ', '\n## ')
        
        return content.strip()