import re import json import os import urllib.parse from pathlib import Path import pandas as pd from loguru import logger from .abstracts import AbstractProcessor from .s3 import S3Client class Processor(AbstractProcessor): def get_audio_paths(self, folder: str) -> list[str]: def extract_number(file_path: str) -> int: match = re.search(r"segment_(\d+)", file_path) return int(match.group(1)) if match else float("inf") audio_paths = list(Path(folder).glob("*.mp3")) audio_paths = [audio_path.as_posix() for audio_path in audio_paths] audio_paths = sorted(audio_paths, key=extract_number) return audio_paths[3:] def process_text(self, text: str) -> str: text = re.sub(r"\+\s*\.", ".", text) text = re.sub(r"\*\s*\+\s*;", ";", text) text = re.sub(r"\*\s*\+", "", text) text = text.replace(" + ", " ").replace(" * ", " ").replace("+", " ") text = re.sub(r'["“”]', "", text) return text.strip() def splitter(self, text: str) -> list[str]: return re.split(r"[,:;.]", self.process_text(text)) def flatten_nested_values(self, nested_values: pd.Series) -> list[str]: flattened = [] for group in nested_values: for item in group: cleaned_item = re.sub(r"^\d+\s*", "", item).strip() if cleaned_item: flattened.append(cleaned_item) return flattened def load_persistent_data(self, file: str) -> list: if os.path.exists(file): with open(file, "r", encoding="utf-8") as f: return json.load(f) return [] def save_persistent_data(self, data: list, file: str) -> None: with open(file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def extract_audio_identifier(self, url: str): parts = url.strip("/").split("/") return urllib.parse.unquote(parts[-2]), int(parts[-1]) def find_and_return_after_last(self, long_list: list, short_list: list) -> list: last_index = -1 for i, item in enumerate(long_list): if item in short_list: last_index = i return long_list[last_index+1:] if last_index != -1 else long_list def load_page_verses_and_audios(self, s3_client, page: str, df_verses: pd.DataFrame) -> tuple[list[str], list[str]]: audio_paths = self.get_audio_paths(page) page_fixed = page.replace("/", "\\") _, chapter, page_str = page_fixed.split("\\") s3_key = f"labelling/{chapter}/{page_str}/results.json" page_int = int(page_str.replace("page_", "")) tmp = df_verses[(df_verses.chapter == chapter) & (df_verses.page == page_int)] possible_values = tmp["moore_verse_text"].apply(self.splitter) possible_values = self.flatten_nested_values(possible_values) try: s3_client.download_file("results.json",s3_key) transcriptions = self.load_persistent_data("results.json") latest_transcription = transcriptions[-1].get("transcriptions") latest_audio = [transcriptions[-1].get("segment_path")] audio_paths = self.find_and_return_after_last(audio_paths, latest_audio) possible_values = self.find_and_return_after_last(possible_values, latest_transcription) logger.info(f"Latest transcription: {latest_audio} / {latest_transcription}") return possible_values, audio_paths except Exception as e: logger.error(f"An error occurred: {e}") return possible_values, audio_paths def get_contribution_data(self, s3_client) -> pd.DataFrame: files = s3_client.list_files("labelling") files = [file for file in files if file.endswith("json")] try: df = s3_client.load_json_files(files=files, unique_columns=["segment_path", "user_id"]) df[["tmp1", "chapter", "page", "segment"]] = df.segment_path.str.split("/", expand=True) return ( df.sort_values(["chapter", "page"]).drop(columns=["tmp1", "segment_path"]) if not df.empty else pd.DataFrame() ) except Exception as e: logger.error(f"Error in get_contribution_data: {e}") return pd.DataFrame()