| | """ |
| | Task to ingest and transform documents to markdown using yourbench |
| | """ |
| | import os |
| | import time |
| | import pathlib |
| | import subprocess |
| | import threading |
| | from typing import Optional, List, Tuple, Dict, Any |
| | import yaml |
| |
|
| | from loguru import logger |
| |
|
| |
|
| | class CreateBenchTask: |
| | """ |
| | Task to ingest and transform documents to markdown using yourbench |
| | """ |
| |
|
| | def __init__(self, session_uid: str, config_path: Optional[str] = None): |
| | """ |
| | Initialize the ingestion task |
| | |
| | Args: |
| | session_uid: Session ID for this task |
| | config_path: Path to the configuration file, will be generated if None |
| | """ |
| | self.session_uid = session_uid |
| | self.logs: List[str] = [] |
| | self.is_completed = False |
| | self.process = None |
| | self.is_running_flag = threading.Event() |
| | |
| | |
| | if config_path is None: |
| | config_path = f"uploaded_files/{session_uid}/config.yml" |
| | self.config_path = config_path |
| | |
| | |
| | self.command = ["yourbench", "run", "--config", str(self.config_path)] |
| | |
| | self._add_log("[INFO] Initializing ingestion task") |
| | self._add_log(f"[INFO] Using configuration file: {self.config_path}") |
| | |
| | def _add_log(self, message: str) -> None: |
| | """ |
| | Add a log message to the logs list |
| | |
| | Args: |
| | message: Log message to add |
| | """ |
| | if message not in self.logs: |
| | self.logs.append(message) |
| | |
| | self.logs = self.logs.copy() |
| | |
| | logger.info(f"[{self.session_uid}] {message}") |
| | |
| | def get_logs(self) -> List[str]: |
| | """ |
| | Get all logs for this task |
| | |
| | Returns: |
| | List of log messages |
| | """ |
| | return self.logs.copy() |
| | |
| | def is_task_completed(self) -> bool: |
| | """ |
| | Check if the task is completed |
| | |
| | Returns: |
| | True if completed, False otherwise |
| | """ |
| | return self.is_completed |
| | |
| | def is_running(self) -> bool: |
| | """ |
| | Check if the process is running |
| | |
| | Returns: |
| | True if running, False otherwise |
| | """ |
| | return self.is_running_flag.is_set() |
| | |
| | def stop(self) -> None: |
| | """ |
| | Stop the process if it's running |
| | """ |
| | if self.process and self.is_running(): |
| | self._add_log("[INFO] Stopping ingestion process") |
| | try: |
| | self.process.terminate() |
| | |
| | self.process.wait(timeout=5) |
| | except subprocess.TimeoutExpired: |
| | self._add_log("[WARN] Process not responding, forcing termination") |
| | self.process.kill() |
| | finally: |
| | self.is_running_flag.clear() |
| | self.is_completed = True |
| | self._add_log("[INFO] Ingestion process stopped") |
| | |
| | def _capture_output(self) -> None: |
| | """ |
| | Capture and process the output from the yourbench process |
| | """ |
| | self._add_log("[INFO] Starting output capture") |
| | |
| | |
| | rate_limit_detected = False |
| | |
| | json_errors_detected = False |
| | |
| | try: |
| | while self.is_running() and self.process: |
| | line = self.process.stdout.readline() |
| | if not line: |
| | |
| | if self.process.poll() is not None: |
| | self.is_running_flag.clear() |
| | break |
| | |
| | time.sleep(0.1) |
| | continue |
| | |
| | |
| | line = line.strip() |
| | if line: |
| | |
| | if ("too many requests" in line.lower() or |
| | "rate limit" in line.lower() or |
| | "429" in line or |
| | "too many concurrent requests" in line.lower()): |
| | rate_limit_detected = True |
| | self._add_log("[ERROR] RATE_LIMIT_EXCEEDED: The demo is under heavy load at the moment.") |
| | |
| | |
| | if ("JSONDecodeError" in line or |
| | "Error processing QA pair" in line or |
| | "'str' object has no attribute 'get'" in line): |
| | json_errors_detected = True |
| | |
| | self._add_log(f"[WARN] Non-critical JSON error: {line}") |
| | continue |
| | |
| | |
| | self._add_log(f"[DEBUG] Raw output: {line}") |
| | |
| | if "ERROR" in line: |
| | self._add_log(f"[ERROR] {line}") |
| | elif "WARNING" in line: |
| | self._add_log(f"[WARN] {line}") |
| | |
| | if "No valid questions produced in single_shot_question_generation" in line: |
| | self._add_log("[ERROR] Failed to generate benchmark: The document does not contain enough information to generate a meaningful benchmark. Please try with a more detailed document.") |
| | else: |
| | |
| | if "Completed stage:" in line: |
| | |
| | stage = line.split("'")[1] if "'" in line else line.split("Completed stage:")[1].strip() |
| | |
| | stage = self._standardize_stage_name(stage) |
| | self._add_log(f"[SUCCESS] Stage completed: {stage}") |
| | |
| | elif "Successfully completed 'upload_ingest_to_hub' stage" in line: |
| | self._add_log(f"[SUCCESS] Stage completed: upload_ingest_to_hub") |
| | else: |
| | self._add_log(f"[INFO] {line}") |
| | |
| | |
| | if self.process: |
| | exit_code = self.process.poll() |
| | if exit_code == 0 or json_errors_detected: |
| | |
| | if json_errors_detected: |
| | self._add_log("[INFO] Benchmark completed with non-critical JSON errors, considered successful") |
| | else: |
| | self._add_log("[SUCCESS] Benchmark process completed successfully") |
| | else: |
| | |
| | if rate_limit_detected: |
| | self._add_log("[ERROR] Benchmark process failed due to API rate limiting. The demo is under heavy load at the moment.") |
| | |
| | |
| | |
| | self._add_log("[INFO] Benchmark process completed with errors") |
| | except Exception as e: |
| | self._add_log(f"[ERROR] Error during output capture: {str(e)}") |
| | |
| | finally: |
| | self.is_completed = True |
| | self.is_running_flag.clear() |
| | self._add_log("[INFO] Output capture completed") |
| | |
| | def _standardize_stage_name(self, stage_name: str) -> str: |
| | """ |
| | Standardize the stage name to match the frontend expectations |
| | |
| | Args: |
| | stage_name: Original stage name |
| | |
| | Returns: |
| | Standardized stage name |
| | """ |
| | |
| | stage_mapping = { |
| | |
| | |
| | "ingest": "ingestion", |
| | "upload": "upload_ingest_to_hub", |
| | "summarize": "summarization", |
| | "chunk": "chunking", |
| | "generate_questions": "single_shot_question_generation", |
| | } |
| | |
| | |
| | for key, value in stage_mapping.items(): |
| | if key in stage_name.lower(): |
| | return value |
| | |
| | |
| | return stage_name |
| |
|
| | def run(self, token: Optional[str] = None) -> None: |
| | """ |
| | Run the ingestion task |
| | |
| | Args: |
| | token: Hugging Face token |
| | """ |
| | try: |
| | self._add_log("[INFO] Starting ingestion process") |
| | |
| | |
| | if not os.path.exists(self.config_path): |
| | raise FileNotFoundError(f"Configuration file does not exist: {self.config_path}") |
| | |
| | |
| | try: |
| | with open(self.config_path, 'r') as f: |
| | config_yaml = yaml.safe_load(f) |
| | |
| | |
| | source_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("source_documents_dir", "") |
| | output_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("output_dir", "") |
| | |
| | if source_dir: |
| | self._add_log(f"[INFO] Source directory: {source_dir}") |
| | if output_dir: |
| | self._add_log(f"[INFO] Output directory: {output_dir}") |
| | |
| | |
| | if source_dir and os.path.exists(source_dir): |
| | files = os.listdir(source_dir) |
| | if files: |
| | self._add_log(f"[INFO] Files to process: {', '.join(files)}") |
| | else: |
| | self._add_log("[WARN] No files found in source directory") |
| | |
| | except Exception as e: |
| | self._add_log(f"[WARN] Unable to read configuration: {str(e)}") |
| | |
| | |
| | env = os.environ.copy() |
| | |
| | |
| | hf_token = os.getenv("HF_TOKEN") |
| | if hf_token: |
| | |
| | env["HF_TOKEN"] = hf_token |
| | env["HUGGING_FACE_HUB_TOKEN"] = hf_token |
| | env["HF_ORGANIZATION"] = os.getenv("HF_ORGANIZATION", "yourbench") |
| | self._add_log("[INFO] Environment variables exported") |
| | |
| | |
| | self._add_log(f"[INFO] Executing command: {' '.join(self.command)}") |
| | |
| | self.process = subprocess.Popen( |
| | self.command, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.STDOUT, |
| | text=True, |
| | bufsize=1, |
| | universal_newlines=True, |
| | env=env |
| | ) |
| | |
| | |
| | self.is_running_flag.set() |
| | |
| | |
| | output_thread = threading.Thread(target=self._capture_output) |
| | output_thread.daemon = True |
| | output_thread.start() |
| | |
| | self._add_log(f"[INFO] Process started with PID: {self.process.pid}") |
| | |
| | except Exception as e: |
| | self._add_log(f"[ERROR] Error starting ingestion process: {str(e)}") |
| | self.is_completed = True |
| | |