File size: 4,060 Bytes
e783436
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import pandas as pd
from dataflow.utils.registry import OPERATOR_REGISTRY
from dataflow import get_logger
from dataflow.utils.storage import DataFlowStorage
from dataflow.core import OperatorABC
from dataflow.core import LLMServingABC
import re


@OPERATOR_REGISTRY.register()
class LLMTextCleanerOperator(OperatorABC):
    def __init__(
            self,
            llm_serving: LLMServingABC,
            prompt_template,
            max_batch_size: int = 32
        ):
        self.logger = get_logger()
        self.llm_serving = llm_serving
        self.prompt_template = prompt_template
        self.max_batch_size = max_batch_size
        
        if prompt_template is None:
            raise ValueError("prompt_template cannot be None")

    def apply_deletions(self, original_text, deletion_output):
        """从原始文本中删除指定片段"""
        if not deletion_output or deletion_output.strip() == "NONE":
            return original_text
        # 按 || 分割片段
        fragments = [frag.strip() for frag in deletion_output.split("||") if frag.strip()]
        # 按长度降序排序,避免短串误删长串的一部分
        fragments = sorted(fragments, key=len, reverse=True)
        cleaned = original_text
        for frag in fragments:
            cleaned = cleaned.replace(frag, "", 1)  # 只删除一次
        return cleaned

    def run(
            self, 
            storage: DataFlowStorage,
            output_key: str = "cleaned_dataframe",
            question_column: str = "question",
            answer_column: str = "answer",
            **input_keys
        ):
        self.storage: DataFlowStorage = storage
        self.output_key = output_key
        self.question_column = question_column
        self.answer_column = answer_column
        self.logger.info("Running LLMTextCleanerOperator...")

        dataframe = storage.read('dataframe')
        self.logger.info(f"Loading dataframe, number of rows: {len(dataframe)}")

        if len(dataframe) == 0:
            self.logger.warning("No data to process")
            output_file = storage.write(dataframe)
            return output_key

        question_prompts = []
        answer_prompts = []
        valid_indices = []

        for idx, row in dataframe.iterrows():
            question = str(row.get(question_column, ""))
            answer = str(row.get(answer_column, ""))
            
            q_prompt = self.prompt_template.build_question_prompt(question)
            a_prompt = self.prompt_template.build_answer_prompt(answer)
            
            question_prompts.append(q_prompt)
            answer_prompts.append(a_prompt)
            valid_indices.append(idx)

        self.logger.info(f"Prepared {len(question_prompts)} question prompts and {len(answer_prompts)} answer prompts")

        question_deletion_outputs = self.llm_serving.generate_from_input(question_prompts)
        self.logger.info("Completed question cleaning prompts processing")

        answer_deletion_outputs = self.llm_serving.generate_from_input(answer_prompts)
        self.logger.info("Completed answer cleaning prompts processing")

        cleaned_questions = []
        cleaned_answers = []

        for i in range(len(question_deletion_outputs)):
            original_question = str(dataframe.iloc[i][question_column])
            original_answer = str(dataframe.iloc[i][answer_column])
            
            cleaned_q = self.apply_deletions(original_question, question_deletion_outputs[i])
            cleaned_a = self.apply_deletions(original_answer, answer_deletion_outputs[i])
            
            cleaned_questions.append(cleaned_q.strip())
            cleaned_answers.append(cleaned_a.strip())

        result_dataframe = dataframe.copy()
        result_dataframe[question_column] = cleaned_questions
        result_dataframe[answer_column] = cleaned_answers

        output_file = storage.write(result_dataframe)
        self.logger.info(f"Cleaning completed, processed {len(result_dataframe)} rows")

        return output_key