Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| from dataflow.utils.registry import OPERATOR_REGISTRY | |
| from dataflow import get_logger | |
| from dataflow.utils.storage import DataFlowStorage | |
| from dataflow.core import OperatorABC | |
| from dataflow.core import LLMServingABC | |
| class AddMissingBlankOperator(OperatorABC): | |
| def __init__( | |
| self, | |
| llm_serving: LLMServingABC, | |
| prompt_template, | |
| ): | |
| self.logger = get_logger() | |
| self.llm_serving = llm_serving | |
| self.prompt_template = prompt_template | |
| if prompt_template is None: | |
| raise ValueError("prompt_template cannot be None") | |
| def run( | |
| self, | |
| storage: DataFlowStorage, | |
| output_key: str = "question", | |
| **input_keys | |
| ): | |
| self.storage: DataFlowStorage = storage | |
| self.output_key = output_key | |
| self.logger.info("Running AddMissingBlankOperator...") | |
| self.input_keys = input_keys | |
| need_fields = set(input_keys.keys()) | |
| # Load the raw dataframe from the input file | |
| dataframe = storage.read('dataframe') | |
| self.logger.info(f"Loading, number of rows: {len(dataframe)}") | |
| llm_inputs = [] | |
| # Only process rows where type == "fill-in" | |
| if 'type' not in dataframe.columns: | |
| self.logger.warning("No 'type' column found, skipping LLM generation.") | |
| generated_outputs = [] | |
| else: | |
| mask = dataframe['type'] == "Fill-in" | |
| indices = dataframe.index[mask].tolist() | |
| if not indices: | |
| self.logger.info("No rows with type=='Fill-in' to process.") | |
| generated_outputs = [] | |
| else: | |
| for idx in indices: | |
| row = dataframe.loc[idx] | |
| key_dict = {key: row[input_keys[key]] for key in need_fields} | |
| prompt_text = self.prompt_template.build_prompt(need_fields, **key_dict) | |
| llm_inputs.append(prompt_text) | |
| self.logger.info(f"Prepared {len(llm_inputs)} prompts for LLM generation.") | |
| generated_outputs = self.llm_serving.generate_from_input(llm_inputs) | |
| # write generated outputs back only to the selected rows (preserve other rows as None) | |
| for idx, gen_output in zip(indices, generated_outputs): | |
| if gen_output != "ORIGINAL": | |
| dataframe.at[idx, output_key] = gen_output | |
| output_file = self.storage.write(dataframe) | |
| return output_key |