Spaces:
Running
Running
| import pandas as pd | |
| from dataflow.utils.registry import OPERATOR_REGISTRY | |
| from dataflow import get_logger | |
| from dataflow.utils.storage import DataFlowStorage | |
| from dataflow.core import OperatorABC | |
| from dataflow.core import LLMServingABC | |
| import re | |
| class LLMTextCleanerOperator(OperatorABC): | |
| def __init__( | |
| self, | |
| llm_serving: LLMServingABC, | |
| prompt_template, | |
| max_batch_size: int = 32 | |
| ): | |
| self.logger = get_logger() | |
| self.llm_serving = llm_serving | |
| self.prompt_template = prompt_template | |
| self.max_batch_size = max_batch_size | |
| if prompt_template is None: | |
| raise ValueError("prompt_template cannot be None") | |
| def apply_deletions(self, original_text, deletion_output): | |
| """从原始文本中删除指定片段""" | |
| if not deletion_output or deletion_output.strip() == "NONE": | |
| return original_text | |
| # 按 || 分割片段 | |
| fragments = [frag.strip() for frag in deletion_output.split("||") if frag.strip()] | |
| # 按长度降序排序,避免短串误删长串的一部分 | |
| fragments = sorted(fragments, key=len, reverse=True) | |
| cleaned = original_text | |
| for frag in fragments: | |
| cleaned = cleaned.replace(frag, "", 1) # 只删除一次 | |
| return cleaned | |
| def run( | |
| self, | |
| storage: DataFlowStorage, | |
| output_key: str = "cleaned_dataframe", | |
| question_column: str = "question", | |
| answer_column: str = "answer", | |
| **input_keys | |
| ): | |
| self.storage: DataFlowStorage = storage | |
| self.output_key = output_key | |
| self.question_column = question_column | |
| self.answer_column = answer_column | |
| self.logger.info("Running LLMTextCleanerOperator...") | |
| dataframe = storage.read('dataframe') | |
| self.logger.info(f"Loading dataframe, number of rows: {len(dataframe)}") | |
| if len(dataframe) == 0: | |
| self.logger.warning("No data to process") | |
| output_file = storage.write(dataframe) | |
| return output_key | |
| question_prompts = [] | |
| answer_prompts = [] | |
| valid_indices = [] | |
| for idx, row in dataframe.iterrows(): | |
| question = str(row.get(question_column, "")) | |
| answer = str(row.get(answer_column, "")) | |
| q_prompt = self.prompt_template.build_question_prompt(question) | |
| a_prompt = self.prompt_template.build_answer_prompt(answer) | |
| question_prompts.append(q_prompt) | |
| answer_prompts.append(a_prompt) | |
| valid_indices.append(idx) | |
| self.logger.info(f"Prepared {len(question_prompts)} question prompts and {len(answer_prompts)} answer prompts") | |
| question_deletion_outputs = self.llm_serving.generate_from_input(question_prompts) | |
| self.logger.info("Completed question cleaning prompts processing") | |
| answer_deletion_outputs = self.llm_serving.generate_from_input(answer_prompts) | |
| self.logger.info("Completed answer cleaning prompts processing") | |
| cleaned_questions = [] | |
| cleaned_answers = [] | |
| for i in range(len(question_deletion_outputs)): | |
| original_question = str(dataframe.iloc[i][question_column]) | |
| original_answer = str(dataframe.iloc[i][answer_column]) | |
| cleaned_q = self.apply_deletions(original_question, question_deletion_outputs[i]) | |
| cleaned_a = self.apply_deletions(original_answer, answer_deletion_outputs[i]) | |
| cleaned_questions.append(cleaned_q.strip()) | |
| cleaned_answers.append(cleaned_a.strip()) | |
| result_dataframe = dataframe.copy() | |
| result_dataframe[question_column] = cleaned_questions | |
| result_dataframe[answer_column] = cleaned_answers | |
| output_file = storage.write(result_dataframe) | |
| self.logger.info(f"Cleaning completed, processed {len(result_dataframe)} rows") | |
| return output_key |