Spaces:
Sleeping
Sleeping
| # MODULE_CONTRACT: | |
| # PURPOSE: Предоставляет универсальный инструмент для парсинга оборотно-сальдовых ведомостей (ОСВ) | |
| # из Excel-файлов с разнообразной структурой. Модуль интеллектуально находит | |
| # таблицу с данными и извлекает их в структурированный JSON-формат. | |
| # SCOPE: Парсинг Excel, эвристический анализ данных, очистка данных, преобразование форматов. | |
| # INPUT: Путь к файлу Excel (.xls, .xlsx). | |
| # OUTPUT: Класс OcvParser, предоставляющий метод parse() для выполнения парсинга. | |
| # Исключение OcvParsingError в случае ошибок. | |
| # KEYWORDS_MODULE: [ocv, accounting, parser, excel, pandas, heuristic_analysis, data_extraction] | |
| # LINKS_TO_MODULE: [pandas, openpyxl, xlrd] | |
| # MODULE_MAP: | |
| # CLASS [Исключение, возникающее при ошибках парсинга ОСВ] => OcvParsingError | |
| # CLASS [Основной класс-парсер для обработки ОСВ] => OcvParser | |
| # METHOD [Главный публичный метод для парсинга файла] => parse | |
| # METHOD [Загружает лист Excel в DataFrame] => _load_workbook_sheet | |
| # METHOD [Извлекает метаданные из шапки документа] => _extract_metadata | |
| # METHOD [Находит блок данных с помощью скользящего окна] => _find_data_block | |
| # METHOD [Идентифицирует типы колонок в окне] => _identify_columns_by_type | |
| # METHOD [Обрабатывает строки данных из найденного блока] => _process_data_rows | |
| # METHOD [Проверяет, является ли строка валидной строкой данных] => _is_valid_data_row | |
| # METHOD [Парсит одну строку DataFrame в целевой словарь] => _parse_row | |
| # METHOD [Проверяет, является ли колонка денежной] => _is_money_column | |
| # METHOD [Проверяет, является ли колонка наименованием счета] => _is_account_name_column | |
| # METHOD [Проверяет, является ли колонка номером счета] => _is_account_number_column | |
| # FUNC [Утилита для парсинга денежного значения] => _parse_money_value | |
| # FUNC [Утилита для парсинга строки с периодом] => _parse_period_string | |
| # KEY_USE_CASES: | |
| ## Освещает наиболее важные сценарии, в которых участвует этот модуль или его ключевые компоненты | |
| ## Для Use Case использовать AAG-нотацию: Actor (Context) -> Action (Module's/Component's Role) -> Goal | |
| # - [OcvParser.parse]: [Analyst (Data Processing)] -> [Provide Excel file path to parser] -> [Get structured OCV data as list of dicts] | |
| # - [_find_data_block]: [Parser (Internal Logic)] -> [Scan DataFrame using a sliding window for a specific data pattern] -> [Locate data table's start row and column map] | |
| # - [_parse_row]: [Parser (Internal Logic)] -> [Transform a raw DataFrame row based on the found column map] -> [Get a clean, structured dictionary for a single account entry] | |
| import logging | |
| import re | |
| from datetime import date | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import pandas as pd | |
| from openpyxl import load_workbook | |
| # Get logger for this module | |
| logger = logging.getLogger(__name__) | |
| # START_CLASS_OcvParsingError | |
| # CONTRACT: | |
| # PURPOSE: Определяет пользовательское исключение для явного указания ошибок, возникших в процессе парсинга ОСВ. | |
| # Предоставляет расширенную информацию об ошибке, включая оригинальное исключение и его трассировку. | |
| # ATTRIBUTES: | |
| # - message: str - Основное сообщение об ошибке | |
| # - error: Exception - Оригинальное исключение, вызвавшее ошибку (опционально) | |
| # - traceback: str - Полная трассировка стека оригинального исключения (опционально) | |
| # KEY_METHODS: | |
| # - __init__(message: str, error: Exception = None) - Инициализирует исключение с сообщением и опциональным оригинальным исключением | |
| # - __str__() -> str - Возвращает форматированное сообщение об ошибке с полной информацией | |
| # KEYWORDS: [exception, error_handling, custom_exception, traceback, debugging] | |
| # LINKS: [OcvParser, traceback] | |
| class OcvParsingError(Exception): | |
| """Кастомное исключение для ошибок в процессе парсинга ОСВ. | |
| Предоставляет расширенную информацию об ошибке, включая: | |
| - Основное сообщение об ошибке | |
| - Оригинальное исключение (если есть) | |
| - Полную трассировку стека (если есть оригинальное исключение) | |
| """ | |
| def __init__(self, message: str, error: Exception = None): | |
| import traceback | |
| self.message = message | |
| self.error = error | |
| self.traceback = traceback.format_exc() | |
| super().__init__(self.message) | |
| def __str__(self): | |
| result = f"Ошибка парсинга ОСВ: {self.message}" | |
| if self.error: | |
| result += f"\nПричина: {str(self.error)}" | |
| result += f"\n\nTraceback:\n{self.traceback}" | |
| return result | |
| # END_CLASS_OcvParsingError | |
| # START_FUNCTION__parse_money_value | |
| # CONTRACT: | |
| # PURPOSE: Преобразует значение из ячейки (строку или число) в числовой формат float. | |
| # INPUTS: | |
| # - value: Any - Значение из ячейки, которое может быть строкой, числом или None. | |
| # OUTPUTS: | |
| # - float - Очищенное числовое значение. Возвращает 0.0, если значение некорректно или пусто. | |
| # SIDE_EFFECTS: Нет. | |
| # TEST_CONDITIONS_SUCCESS_CRITERIA: | |
| # - Корректно парсит строку "1 234 567,89" в 1234567.89. | |
| # - Возвращает 0.0 для пустых (None) или нечисловых строковых значений. | |
| # KEYWORDS: [utility, data_cleaning, type_conversion] | |
| # LINKS: [_parse_row] | |
| def _parse_money_value(value: Any) -> float: | |
| """Преобразует строковое/числовое представление денег в float.""" | |
| #START_HANDLE_NONE_OR_ZERO: Обработка пустых или нулевых значений. | |
| if pd.isna(value) or value == 0: | |
| return 0.0 | |
| #END_HANDLE_NONE_OR_ZERO | |
| #START_HANDLE_NUMERIC: Обработка уже числовых значений. | |
| if isinstance(value, (int, float)): | |
| return float(value) | |
| #END_HANDLE_NUMERIC | |
| #START_HANDLE_STRING: Обработка строковых значений. | |
| if isinstance(value, str): | |
| #START_CLEAN_STRING: Очистка строки от пробелов и замена запятой. | |
| cleaned_value = value.replace(' ', '').replace(',', '.') | |
| #END_CLEAN_STRING | |
| try: | |
| #START_CONVERT_TO_FLOAT: Попытка конвертации очищенной строки в float. | |
| return float(cleaned_value) | |
| #END_CONVERT_TO_FLOAT | |
| except (ValueError, TypeError): | |
| logger.warning(f"[WARNING][_parse_money_value][HANDLE_STRING][ExceptionCaught] Не удалось конвертировать '{value}' в число. Возвращено 0.0. [FAIL]") | |
| return 0.0 | |
| #END_HANDLE_STRING | |
| logger.warning(f"[WARNING][_parse_money_value][UNKNOWN_TYPE][Fallthrough] Неизвестный тип данных '{type(value)}' для значения '{value}'. Возвращено 0.0. [FAIL]") | |
| return 0.0 | |
| # END_FUNCTION__parse_money_value | |
| # START_FUNCTION__parse_period_string | |
| # CONTRACT: | |
| # PURPOSE: Извлекает даты начала и конца отчетного периода из текстовой строки. | |
| # INPUTS: | |
| # - period_str: str - Строка, содержащая описание периода (например, "за 1 полугодие 2023 г."). | |
| # OUTPUTS: | |
| # - Optional[Dict[str, str]] - Словарь с ключами 'start_date' и 'end_date' в формате 'YYYY-MM-DD' или None. | |
| # SIDE_EFFECTS: Нет. | |
| # TEST_CONDITIONS_SUCCESS_CRITERIA: | |
| # - Корректно парсит "1 полугодие 2023" в '2023-01-01' и '2023-06-30'. | |
| # - Корректно парсит "январь - июнь 2023" в '2023-01-01' и '2023-06-30'. | |
| # KEYWORDS: [utility, date_conversion, regex, text_processing] | |
| # LINKS: [_extract_metadata] | |
| def _parse_period_string(period_str: str) -> Optional[Dict[str, str]]: | |
| """Извлекает из текстовой строки даты начала и конца периода.""" | |
| #START_EXTRACT_YEAR: Извлечение года из строки. | |
| year_match = re.search(r'(20\d{2})', period_str) | |
| if not year_match: | |
| return None | |
| year = int(year_match.group(1)) | |
| #END_EXTRACT_YEAR | |
| #START_PARSE_SEMESTER_QUARTER: Парсинг полугодий и кварталов. | |
| period_str_lower = period_str.lower() | |
| if '1 полугодие' in period_str_lower or '6 месяцев' in period_str_lower: | |
| return {'start_date': f'{year}-01-01', 'end_date': f'{year}-06-30'} | |
| if '2 полугодие' in period_str_lower: | |
| return {'start_date': f'{year}-07-01', 'end_date': f'{year}-12-31'} | |
| if '1 квартал' in period_str_lower: | |
| return {'start_date': f'{year}-01-01', 'end_date': f'{year}-03-31'} | |
| if '2 квартал' in period_str_lower: | |
| return {'start_date': f'{year}-04-01', 'end_date': f'{year}-06-30'} | |
| if '3 квартал' in period_str_lower: | |
| return {'start_date': f'{year}-07-01', 'end_date': f'{year}-09-30'} | |
| if '4 квартал' in period_str_lower: | |
| return {'start_date': f'{year}-10-01', 'end_date': f'{year}-12-31'} | |
| if '9 месяцев' in period_str_lower: | |
| return {'start_date': f'{year}-01-01', 'end_date': f'{year}-09-30'} | |
| if 'год' in period_str_lower and 'полугодие' not in period_str_lower: | |
| return {'start_date': f'{year}-01-01', 'end_date': f'{year}-12-31'} | |
| #END_PARSE_SEMESTER_QUARTER | |
| #START_PARSE_MONTH_RANGE: Парсинг диапазона месяцев. | |
| months = { | |
| 'январ': 1, 'феврал': 2, 'март': 3, 'апрел': 4, 'ма': 5, 'июн': 6, | |
| 'июл': 7, 'август': 8, 'сентябр': 9, 'октябр': 10, 'ноябр': 11, 'декабр': 12 | |
| } | |
| month_keys = '|'.join(months.keys()) | |
| month_range_match = re.search(f'({month_keys}).*?-.*?({month_keys})', period_str_lower) | |
| if month_range_match: | |
| start_month_str, end_month_str = month_range_match.groups() | |
| start_month = months.get(start_month_str[:5]) | |
| end_month = months.get(end_month_str[:5]) | |
| if start_month and end_month: | |
| start_day = date(year, start_month, 1) | |
| # Вычисляем последний день месяца | |
| next_month = date(year, end_month, 1).replace(day=28) + pd.DateOffset(days=4) | |
| end_day = next_month - pd.DateOffset(days=next_month.day) | |
| return {'start_date': start_day.strftime('%Y-%m-%d'), 'end_date': end_day.strftime('%Y-%m-%d')} | |
| #END_PARSE_MONTH_RANGE | |
| return None | |
| # END_FUNCTION__parse_period_string | |
| # START_CLASS_OcvParser | |
| # CONTRACT: | |
| # PURPOSE: Реализует логику универсального парсинга ОСВ из файлов Excel. | |
| # ATTRIBUTES: | |
| # - _df: pd.DataFrame - DataFrame с сырыми данными из листа Excel. | |
| # - _metadata: Dict - Словарь с извлеченными метаданными (название компании, период). | |
| # - HEADERS_SCAN_ROWS: int - Количество строк для сканирования метаданных в шапке. | |
| # - WINDOW_SIZE: int - Размер скользящего окна для поиска блока с данными. | |
| # - COLUMN_MATCH_THRESHOLD: float - Минимальная доля ячеек в колонке, которые должны соответствовать типу для его определения. | |
| # KEY_METHODS: | |
| # - __init__(): Инициализирует парсер с настраиваемыми параметрами. | |
| # - parse(): Главный публичный метод, запускающий весь процесс парсинга. | |
| # KEYWORDS: [parser, facade, orchestrator, pandas] | |
| # LINKS: [OcvParsingError] | |
| class OcvParser: | |
| """Универсальный парсер для Оборотно-Сальдовых Ведомостей из Excel.""" | |
| # START_FUNCTION___init__ | |
| # CONTRACT: | |
| # PURPOSE: Инициализирует парсер и устанавливает настраиваемые параметры для анализа. | |
| # INPUTS: | |
| # - headers_scan_rows: int - Количество верхних строк для сканирования на предмет метаданных. | |
| # - window_size: int - Размер (в строках) "скользящего окна" для поиска основного блока данных. | |
| # - column_match_threshold: float - Доля (от 0.0 до 1.0) ячеек в столбце, которые должны соответствовать определенному типу (например, 'денежный'), чтобы столбец был классифицирован как таковой. | |
| # OUTPUTS: Нет. | |
| # SIDE_EFFECTS: Устанавливает атрибуты экземпляра. | |
| # TEST_CONDITIONS_SUCCESS_CRITERIA: | |
| # - Экземпляр успешно создается со значениями по умолчанию. | |
| # - Экземпляр успешно создается с пользовательскими значениями. | |
| # KEYWORDS: [constructor, initialization, configuration, settings] | |
| # LINKS: [] | |
| def __init__(self, headers_scan_rows: int = 15, window_size: int = 100, column_match_threshold: float = 0.05): | |
| """Инициализирует парсер с опциональными настройками.""" | |
| self._df: Optional[pd.DataFrame] = None | |
| self._metadata: Dict[str, Any] = {} | |
| # Константы для парсинга | |
| self.HEADERS_SCAN_ROWS = headers_scan_rows | |
| self.WINDOW_SIZE = window_size | |
| self.COLUMN_MATCH_THRESHOLD = column_match_threshold | |
| # Константы для поиска | |
| self.COMPANY_KEYWORDS = [ | |
| 'организация', 'филиал ао', 'компания', 'предприятие', 'ооо', 'зао', 'пао', 'ип', | |
| 'наименование', 'полное наименование', 'юридическое лицо', 'юридическое наименование', | |
| 'обособленное подразделение', 'обособленное структурное подразделение', 'филиал', | |
| 'представительство', 'ао', 'нао', 'пao', 'нко', 'ано', 'фонд', 'союз', 'ассоциация', | |
| 'корпорация', 'холдинг', 'группа компаний', 'группа', 'концерн', 'трест', 'синдикат', | |
| 'общество', 'товарищество', 'кооператив', 'унитарное предприятие', 'казенное предприятие', | |
| 'муниципальное предприятие', 'государственное предприятие', 'бюджетное учреждение', | |
| 'автономное учреждение', 'казенное учреждение', 'муниципальное учреждение', | |
| 'государственное учреждение', 'некоммерческая организация', 'коммерческая организация' | |
| ] | |
| self.PERIOD_KEYWORDS = [ | |
| 'период', ' за ', 'отчетный период', 'отчетный', 'отчет', 'отчетность', | |
| 'за период', 'за отчетный период', 'за отчетный', 'за отчет', | |
| 'отчетный год', 'за год', 'за квартал', 'за полугодие', | |
| 'отчетная дата', 'на дату', 'по состоянию на', | |
| 'с', 'по', 'с начала', 'с начала года', 'с начала периода', | |
| 'январь', 'февраль', 'март', 'апрель', 'май', 'июнь', | |
| 'июль', 'август', 'сентябрь', 'октябрь', 'ноябрь', 'декабрь', | |
| '1 квартал', '2 квартал', '3 квартал', '4 квартал', | |
| 'первый квартал', 'второй квартал', 'третий квартал', 'четвертый квартал', | |
| '1 полугодие', '2 полугодие', 'первое полугодие', 'второе полугодие', | |
| '9 месяцев', 'шесть месяцев', 'три месяца', | |
| 'годовой отчет', 'годовая отчетность', 'годовая ведомость', | |
| 'квартальная отчетность', 'квартальная ведомость', | |
| 'месячная отчетность', 'месячная ведомость', | |
| 'отчетный интервал', 'отчетный срок', 'отчетная дата', | |
| 'нарастающим итогом', 'нарастающим', 'итогом', | |
| 'с начала года по', 'с начала периода по', | |
| 'за период с', 'за период по', | |
| 'отчетный период с', 'отчетный период по' | |
| ] | |
| # START_FUNCTION_parse | |
| # CONTRACT: | |
| # PURPOSE: Оркестрирует полный цикл парсинга файла ОСВ. | |
| # INPUTS: | |
| # - file_path: str - Путь к файлу Excel для парсинга. | |
| # OUTPUTS: | |
| # - List[Dict] - Список словарей, где каждый словарь представляет одну строку ОСВ. | |
| # SIDE_EFFECTS: | |
| # - Читает файл с диска. Может выбрасывать OcvParsingError. | |
| # TEST_CONDITIONS_SUCCESS_CRITERIA: | |
| # - Успешный парсинг файла со структурой из примера 1. | |
| # - Успешный парсинг файла со структурой из примера 2. | |
| # KEYWORDS: [facade, orchestrator, entrypoint] | |
| # LINKS: [_load_workbook_sheet, _extract_metadata, _find_data_block, _process_data_rows] | |
| def parse(self, file_path: str) -> List[Dict]: | |
| """Запускает полный процесс парсинга файла ОСВ.""" | |
| logger.info(f"[INFO][parse][START][StepComplete] Начало парсинга файла: {file_path} [SUCCESS]") | |
| #START_LOAD_DATA: Загрузка данных из Excel. | |
| self._df = self._load_workbook_sheet(file_path) | |
| logger.info(f"[INFO][parse][LOAD_DATA][StepComplete] Файл успешно загружен в DataFrame. [SUCCESS]") | |
| #END_LOAD_DATA | |
| #START_EXTRACT_METADATA: Извлечение метаданных из шапки. | |
| self._metadata = self._extract_metadata(self._df) | |
| logger.info(f"[INFO][parse][EXTRACT_METADATA][StepComplete] Метаданные извлечены: {self._metadata} [SUCCESS]") | |
| #END_EXTRACT_METADATA | |
| #START_FIND_DATA_BLOCK: Поиск основного блока данных. | |
| start_row, column_map = self._find_data_block(self._df) | |
| logger.info(f"[INFO][parse][FIND_DATA_BLOCK][StepComplete] Блок данных найден. Начало: строка {start_row}, карта колонок: {column_map} [SUCCESS]") | |
| #END_FIND_DATA_BLOCK | |
| #START_PROCESS_ROWS: Обработка строк данных. | |
| result = self._process_data_rows(self._df, start_row, column_map, self._metadata) | |
| logger.info(f"[INFO][parse][PROCESS_ROWS][StepComplete] Обработано {len(result)} строк. Парсинг завершен. [SUCCESS]") | |
| #END_PROCESS_ROWS | |
| return result | |
| # END_FUNCTION_parse | |
| # START_FUNCTION__load_workbook_sheet | |
| def _load_workbook_sheet(self, file_path: str) -> pd.DataFrame: | |
| """Считывает лист Excel, сохраняя форматирование числовых ячеек как строки, и возвращает pandas DataFrame.""" | |
| #START_LOAD_WORKBOOK: Загрузка книги Excel с использованием openpyxl. | |
| try: | |
| wb = load_workbook(file_path, data_only=True) | |
| ws = wb.active | |
| except Exception as e: | |
| logger.error(f"[ERROR][_load_workbook_sheet][LOAD_WORKBOOK][ExceptionCaught] Не удалось загрузить книгу Excel: {file_path}. Ошибка: {e} [FAIL]") | |
| raise OcvParsingError(f"Ошибка при загрузке файла Excel: {e}", e) | |
| #END_LOAD_WORKBOOK | |
| data = [] | |
| #START_ITERATE_ROWS: Итерация по строкам и ячейкам листа для сбора данных. | |
| for row in ws.iter_rows(): | |
| row_data = [] | |
| for cell in row: | |
| value = cell.value | |
| #START_PROCESS_CELL: Обработка значения одной ячейки с сохранением формата. | |
| if value is None: | |
| #START_HANDLE_EMPTY_CELL: Обработка пустых ячеек (None) для корректной обработки NaN в DataFrame. | |
| row_data.append(None) | |
| #END_HANDLE_EMPTY_CELL | |
| elif cell.data_type == 'n': | |
| #START_HANDLE_NUMERIC: Обработка числовых ячеек, преобразуя их в строки. | |
| fmt = cell.number_format | |
| if isinstance(value, (int, float)): | |
| if '0.00' in fmt: | |
| row_data.append(f"{value:.2f}".replace('.', ',')) | |
| else: | |
| row_data.append(str(value).replace('.', ',')) | |
| else: | |
| row_data.append(str(value)) | |
| #END_HANDLE_NUMERIC | |
| else: | |
| #START_HANDLE_OTHER: Обработка нечисловых ячеек. | |
| row_data.append(str(value)) | |
| #END_HANDLE_OTHER | |
| #END_PROCESS_CELL | |
| data.append(row_data) | |
| #END_ITERATE_ROWS | |
| #START_CREATE_DATAFRAME: Создание DataFrame из собранных данных. | |
| return pd.DataFrame(data) | |
| #END_CREATE_DATAFRAME | |
| # END_FUNCTION__load_workbook_sheet | |
| # START_FUNCTION__extract_metadata | |
| def _extract_metadata(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Сканирует верхние строки DataFrame в поиске метаданных.""" | |
| metadata = {'company_name': None, 'period': None} | |
| header_df = df.head(self.HEADERS_SCAN_ROWS) | |
| #START_SCAN_ROWS_FOR_METADATA: Итерация по ячейкам шапки для поиска ключевых слов. | |
| for _, row in header_df.iterrows(): | |
| for cell_value in row.dropna(): | |
| if not isinstance(cell_value, str): | |
| continue | |
| cell_lower = cell_value.lower() | |
| #START_FIND_COMPANY: Поиск названия компании. | |
| if not metadata['company_name']: | |
| if any(keyword in cell_lower for keyword in self.COMPANY_KEYWORDS): | |
| metadata['company_name'] = cell_value.strip() | |
| logger.debug(f"[DEBUG][_extract_metadata][FIND_COMPANY][VALUE] Найдено название компании: {metadata['company_name']}") | |
| continue # Переход к следующей ячейке | |
| #END_FIND_COMPANY | |
| #START_FIND_PERIOD: Поиск периода. | |
| if not metadata['period']: | |
| if any(keyword in cell_lower for keyword in self.PERIOD_KEYWORDS): | |
| period_data = _parse_period_string(cell_lower) | |
| if period_data: | |
| metadata['period'] = period_data | |
| logger.debug(f"[DEBUG][_extract_metadata][FIND_PERIOD][VALUE] Найден и распознан период: {metadata['period']}") | |
| #END_FIND_PERIOD | |
| #END_SCAN_ROWS_FOR_METADATA | |
| #START_VALIDATE_METADATA: Проверка, что обязательные метаданные были найдены. | |
| if not metadata['company_name']: | |
| raise OcvParsingError("Не удалось найти название организации в шапке документа.") | |
| if not metadata['period']: | |
| raise OcvParsingError("Не удалось найти период в шапке документа.") | |
| #END_VALIDATE_METADATA | |
| return metadata | |
| # END_FUNCTION__extract_metadata | |
| # START_FUNCTION__find_data_block | |
| def _find_data_block(self, df: pd.DataFrame) -> Tuple[int, Dict[str, Any]]: | |
| """Реализует алгоритм 'скользящего окна' для поиска таблицы данных.""" | |
| #START_SLIDING_WINDOW_ITERATION: Итерация по DataFrame с помощью скользящего окна. | |
| for i in range(0, len(df) - self.WINDOW_SIZE + 1, self.WINDOW_SIZE): | |
| window_df = df.iloc[i : i + self.WINDOW_SIZE] | |
| #START_IDENTIFY_COLUMNS: Попытка идентификации колонок в текущем окне. | |
| column_map = self._identify_columns_by_type(window_df) | |
| #END_IDENTIFY_COLUMNS | |
| #START_CHECK_IDENTIFICATION_RESULT: Проверка результата идентификации. | |
| if column_map: | |
| #START_FIND_ACTUAL_START: Поиск реального начала таблицы путем сканирования вверх от конца окна. | |
| logger.debug(f"[DEBUG][_find_data_block][FIND_ACTUAL_START][ATTEMPT] Окно с данными найдено, начиная с {i}. Поиск фактической верхней границы.") | |
| # По умолчанию, если все строки до верха похожи на данные, считаем, что таблица начинается с 0. | |
| start_row = 0 | |
| # Сканируем вверх от конца найденного окна (i + self.WINDOW_SIZE - 1) до самого начала файла. | |
| for scan_row_idx in range(i + self.WINDOW_SIZE - 1, -1, -1): | |
| row = df.iloc[scan_row_idx] | |
| # Проверяем, содержит ли строка хотя бы одно значение, похожее на денежное, в ожидаемых колонках. | |
| # Это эвристика, чтобы отличить строки данных от заголовков/пустых строк. | |
| has_money_value = any( | |
| (self._get_money_like_count(pd.Series([row[col]])) > 0) | |
| for col in column_map['money'] | |
| ) | |
| if not has_money_value: | |
| # Найдена строка, не похожая на строку с данными (вероятно, часть шапки). | |
| # Реальные данные начинаются со следующей строки. | |
| start_row = scan_row_idx + 1 | |
| logger.debug(f"[DEBUG][_find_data_block][FIND_ACTUAL_START][VALUE] Найдена неденежная строка на {scan_row_idx}. Начало таблицы определено как {start_row}.") | |
| break # Граница найдена, прерываем цикл. | |
| logger.info(f"[INFO][_find_data_block][FIND_ACTUAL_START][StepComplete] Фактическое начало таблицы найдено: строка {start_row}. [SUCCESS]") | |
| #END_FIND_ACTUAL_START | |
| return start_row, column_map # Возвращаем найденное начало и карту колонок | |
| #END_CHECK_IDENTIFICATION_RESULT | |
| #END_SLIDING_WINDOW_ITERATION | |
| raise OcvParsingError("Не удалось найти блок данных с ожидаемой структурой в файле.") | |
| # END_FUNCTION__find_data_block | |
| # START_FUNCTION__identify_columns_by_type | |
| def _identify_columns_by_type(self, window_df: pd.DataFrame) -> Optional[Dict[str, Any]]: | |
| """Анализирует окно DataFrame и пытается идентифицировать столбцы по их содержимому.""" | |
| col_types = {} | |
| #START_DETERMINE_COLUMN_TYPES: Определение типа для каждой колонки в окне. | |
| for col_idx in window_df.columns: | |
| column = window_df[col_idx] | |
| money_like_count = self._get_money_like_count(column) | |
| account_name_like_count = self._get_account_name_like_count(column) | |
| account_number_like_count = self._get_account_number_like_count(column) | |
| if max(money_like_count, account_name_like_count, account_number_like_count) / len(column) > self.COLUMN_MATCH_THRESHOLD: | |
| if money_like_count > account_name_like_count and money_like_count > account_number_like_count: | |
| col_types[col_idx] = 'money' | |
| elif account_name_like_count > money_like_count and account_name_like_count > account_number_like_count: | |
| col_types[col_idx] = 'name' | |
| elif account_number_like_count > money_like_count and account_number_like_count > account_name_like_count: | |
| col_types[col_idx] = 'number' | |
| #END_DETERMINE_COLUMN_TYPES | |
| #START_FIND_REQUIRED_PATTERN: Поиск обязательного паттерна (имя + 6 денежных колонок). | |
| money_cols = sorted([k for k, v in col_types.items() if v == 'money']) | |
| name_cols = [k for k, v in col_types.items() if v == 'name'] | |
| number_cols = [k for k, v in col_types.items() if v == 'number'] | |
| if not name_cols or not money_cols: return None # Столбцы с наименованием и денежными значениями обязательны | |
| #START_FIND_CONSECUTIVE_MONEY_BLOCK: Поиск блока из 6 последовательных денежных колонок. | |
| for i in range(len(money_cols) - 5): | |
| money_block = money_cols[i:i+6] | |
| is_consecutive = (money_block[-1] - money_block[0] == 5) | |
| if is_consecutive: | |
| # Паттерн найден | |
| result_map = { | |
| 'name': name_cols[0], | |
| 'money': money_block | |
| } | |
| if number_cols: | |
| result_map['number'] = number_cols[0] | |
| return result_map | |
| #END_FIND_CONSECUTIVE_MONEY_BLOCK | |
| #END_FIND_REQUIRED_PATTERN | |
| return None | |
| # END_FUNCTION__identify_columns_by_type | |
| # START_FUNCTION__process_data_rows | |
| def _process_data_rows(self, df: pd.DataFrame, start_row: int, column_map: Dict, metadata: Dict) -> List[Dict]: | |
| """Итерирует по строкам DataFrame, парсит их и собирает результат.""" | |
| results = [] | |
| #START_ROW_ITERATION_LOOP: Цикл по строкам, начиная с найденной. | |
| for i in range(start_row, len(df)): | |
| row = df.iloc[i] | |
| #START_VALIDATE_ROW: Проверка валидности строки для определения конца таблицы. | |
| if not self._is_valid_data_row(row, column_map): | |
| logger.info(f"[INFO][_process_data_rows][VALIDATE_ROW][StepComplete] Найдена невалидная строка на индексе {i}. Завершение обработки. [SUCCESS]") | |
| break | |
| #END_VALIDATE_ROW | |
| #START_PARSE_SINGLE_ROW: Парсинг одной валидной строки. | |
| parsed_row = self._parse_row(row, column_map, metadata) | |
| if parsed_row: | |
| results.append(parsed_row) | |
| #END_PARSE_SINGLE_ROW | |
| #END_ROW_ITERATION_LOOP | |
| return results | |
| # END_FUNCTION__process_data_rows | |
| # START_FUNCTION__is_valid_data_row | |
| def _is_valid_data_row(self, row: pd.Series, column_map: Dict) -> bool: | |
| """Проверяет, является ли строка валидной строкой данных (а не итогом или подписью).""" | |
| #START_CHECK_NAME_COLUMN: Проверка, что ячейка с наименованием не пуста. | |
| name_val = row[column_map['name']] | |
| if pd.isna(name_val) or not str(name_val).strip(): | |
| return False | |
| #END_CHECK_NAME_COLUMN | |
| #START_CHECK_MONEY_COLUMNS: Проверка, что хотя бы одна денежная ячейка не пуста. | |
| money_values = [row[i] for i in column_map['money']] | |
| if all(pd.isna(val) for val in money_values): | |
| return False | |
| #END_CHECK_MONEY_COLUMNS | |
| return True | |
| # END_FUNCTION__is_valid_data_row | |
| # START_FUNCTION__parse_row | |
| def _parse_row(self, row: pd.Series, column_map: Dict, metadata: Dict) -> Optional[Dict]: | |
| """Преобразует одну строку DataFrame в целевой формат словаря.""" | |
| logger.debug(f"[DEBUG][_parse_row][START][Params] Processing row with column_map: {column_map}") | |
| #START_INITIALIZE_RESULT_DICT: Инициализация словаря с метаданными. | |
| res = { | |
| "company_name": metadata.get("company_name"), | |
| "period": metadata.get("period"), | |
| "account_number": "", | |
| "account_name": "" | |
| } | |
| #END_INITIALIZE_RESULT_DICT | |
| #START_EXTRACT_NAME_AND_NUMBER: Извлечение номера и наименования счета. | |
| account_name = str(row.get(column_map['name'], '')).strip() | |
| # Сначала пытаемся получить номер из отдельной колонки | |
| if 'number' in column_map: | |
| res['account_number'] = str(row.get(column_map['number'], '')).strip() | |
| res['account_name'] = account_name | |
| else: | |
| # Если нет отдельной колонки для номера, пытаемся извлечь его из имени | |
| # Ищем номер счета в начале строки, который может быть отделен запятой или пробелами | |
| match = re.match(r'^(\d[\d\.]*)[,\s]+(.*)', account_name) | |
| if match: | |
| res['account_number'] = match.group(1) | |
| res['account_name'] = match.group(2).strip() | |
| else: | |
| res['account_name'] = account_name | |
| #END_EXTRACT_NAME_AND_NUMBER | |
| #START_PARSE_MONEY_VALUES: Парсинг 6 денежных значений. | |
| money_vals = [_parse_money_value(row.get(i)) for i in column_map['money']] | |
| res["opening_balance"] = {"debet": money_vals[0], "credit": money_vals[1]} | |
| res["turnover"] = {"debet": money_vals[2], "credit": money_vals[3]} | |
| res["closing_balance"] = {"debet": money_vals[4], "credit": money_vals[5]} | |
| #END_PARSE_MONEY_VALUES | |
| logger.debug(f"[DEBUG][_parse_row][END][ReturnData] Final result: {res}") | |
| return res | |
| # END_FUNCTION__parse_row | |
| # START_FUNCTION__is_money_column | |
| def _get_money_like_count(self, column: pd.Series) -> int: | |
| """Проверяет, является ли столбец денежным.""" | |
| non_na = column.dropna() | |
| if non_na.empty: return False | |
| match_count = non_na.apply(lambda x: 1 if re.match(r'^-?\d+[.,]\d{2}$', x) else 0).sum() | |
| return match_count | |
| # END_FUNCTION__is_money_column | |
| # START_FUNCTION__is_account_name_column | |
| def _get_account_name_like_count(self, column: pd.Series) -> int: | |
| """Проверяет, является ли столбец столбцом наименований.""" | |
| non_na = column.dropna() | |
| if non_na.empty: return False | |
| # Наименования - это длинные строки | |
| match_count = non_na.apply(lambda x: isinstance(x, str) and sum(c.isalpha() for c in x) / len(x) > 0.6).sum() | |
| return match_count | |
| # END_FUNCTION__is_account_name_column | |
| # START_FUNCTION__is_account_number_column | |
| def _get_account_number_like_count(self, column: pd.Series) -> int: | |
| """Проверяет, является ли столбец столбцом с номерами счетов.""" | |
| non_na = column.dropna() | |
| if non_na.empty: return False | |
| match_count = non_na.apply( | |
| lambda x: bool(re.fullmatch(r'\d[\d\.]*', str(x).strip()))).sum() | |
| return match_count | |
| # END_FUNCTION__is_account_number_column | |
| # END_CLASS_OcvParser |