import json import os import re import shutil import tempfile import ast import base64 import asyncio from concurrent.futures import ThreadPoolExecutor from string import Template from datetime import datetime from pathlib import Path from types import SimpleNamespace from typing import Any import fitz # PyMuPDF import gradio as gr import pandas as pd import yaml from openai import AzureOpenAI MAX_DESCRIPTION_FIELDS = 30 MAX_EXTRACT_FIELDS = 30 DEFAULT_STORAGE_ROOT = Path("/data/llm_fulltextscreener") # Path("/tmp/llm_fulltextscreener") APP_STORAGE_ROOT = Path(os.getenv("APP_STORAGE_DIR", str(DEFAULT_STORAGE_ROOT))) USERS_ROOT_DIR = APP_STORAGE_ROOT / "users" MAX_EXPORTED_FILES = 20 MAX_INLINE_DOWNLOAD_BYTES = 8 * 1024 * 1024 VALID_DECISIONS = {"include", "exclude"} PROMPTS_DIR = Path(__file__).resolve().parent / "prompts" SYSTEM_PROMPT_PATH = PROMPTS_DIR / "system_prompt.txt" USER_PROMPT_TEMPLATE_PATH = PROMPTS_DIR / "user_prompt_template.json" SYSTEM_CRITERIA_PROMPT_PATH = PROMPTS_DIR / "system_criteria_prompt.txt" USER_CRITERIA_TEMPLATE_PATH = PROMPTS_DIR / "user_criteria_template.json" SYSTEM_LABELS_PROMPT_PATH = PROMPTS_DIR / "system_labels_prompt.txt" USER_LABELS_TEMPLATE_PATH = PROMPTS_DIR / "user_labels_template.json" def patch_asyncio_invalid_fd_cleanup() -> None: """ Work around Python 3.11 selector-loop teardown race seen on some runtimes (including Spaces), where loop __del__ may raise: ValueError: Invalid file descriptor: -1 """ original_del = getattr(asyncio.BaseEventLoop, "__del__", None) if original_del is None or getattr(original_del, "_invalid_fd_guard", False): return def _guarded_del(self): try: original_del(self) except ValueError as exc: if "Invalid file descriptor" not in str(exc): raise _guarded_del._invalid_fd_guard = True asyncio.BaseEventLoop.__del__ = _guarded_del patch_asyncio_invalid_fd_cleanup() def is_debug_enabled() -> bool: return os.getenv("APP_DEBUG", "").strip().lower() in {"1", "true", "yes", "on"} def debug_log(*parts: Any) -> None: if is_debug_enabled(): print("[DEBUG]", *parts) def normalize_key(text: str) -> str: return re.sub(r"[^a-z0-9]+", "", str(text).strip().lower()) def sanitize_user_id(raw: str) -> str: cleaned = re.sub(r"[^a-zA-Z0-9._-]+", "_", str(raw or "").strip()) return cleaned or "default" def resolve_user_id(explicit_user_id: str | None = None, request: gr.Request | None = None) -> str: if explicit_user_id and str(explicit_user_id).strip(): return sanitize_user_id(explicit_user_id) req_username = getattr(request, "username", None) if request is not None else None if req_username and str(req_username).strip(): return sanitize_user_id(str(req_username)) return "default" def init_user_id(request: gr.Request | None = None) -> str: return resolve_user_id(request=request) def get_user_session_dir(user_id: str) -> Path: return USERS_ROOT_DIR / sanitize_user_id(user_id) def get_user_session_meta_path(user_id: str) -> Path: return get_user_session_dir(user_id) / "session.json" def get_user_session_files_dir(user_id: str) -> Path: return get_user_session_dir(user_id) / "files" def get_user_exports_dir(user_id: str) -> Path: return get_user_session_dir(user_id) / "exports" def _ensure_session_dirs(user_id: str) -> None: get_user_session_files_dir(user_id).mkdir(parents=True, exist_ok=True) get_user_exports_dir(user_id).mkdir(parents=True, exist_ok=True) def _setup_storage_paths() -> None: """Configure writable temp paths in Spaces.""" USERS_ROOT_DIR.mkdir(parents=True, exist_ok=True) tmp_default = get_user_session_files_dir("default") tmp_default.mkdir(parents=True, exist_ok=True) os.environ["TMPDIR"] = str(tmp_default.resolve()) tempfile.tempdir = str(tmp_default.resolve()) def load_session_meta(user_id: str) -> dict[str, Any]: session_meta_path = get_user_session_meta_path(user_id) try: if session_meta_path.exists(): return json.loads(session_meta_path.read_text(encoding="utf-8")) except Exception: return {} return {} def save_session_meta(user_id: str, updates: dict[str, Any]) -> None: _ensure_session_dirs(user_id) session_meta_path = get_user_session_meta_path(user_id) data = load_session_meta(user_id) data.update(updates) session_meta_path.write_text(json.dumps(data, indent=2), encoding="utf-8") def persist_uploaded_file(user_id: str, file_obj, dest_name: str) -> str | None: if file_obj is None: return None src = resolve_uploaded_path(file_obj) if not src.exists() or not src.is_file(): return None _ensure_session_dirs(user_id) dest = get_user_session_files_dir(user_id) / dest_name try: if src.resolve() == dest.resolve(): return str(dest.resolve()) except Exception: pass shutil.copy2(src, dest) return str(dest.resolve()) def resolve_uploaded_path(file_obj) -> Path: if file_obj is None: return Path("") if isinstance(file_obj, (str, Path)): return Path(file_obj) file_name = getattr(file_obj, "name", "") if file_name: return Path(file_name) if isinstance(file_obj, dict): candidate = str(file_obj.get("name", "")).strip() if candidate: return Path(candidate) return Path("") def persist_dataframe(user_id: str, df: pd.DataFrame) -> str: _ensure_session_dirs(user_id) dest = get_user_session_files_dir(user_id) / "working_table.xlsx" df.to_excel(dest, index=False) return str(dest.resolve()) def _cleanup_old_exports(user_id: str, max_files: int = MAX_EXPORTED_FILES) -> None: try: export_files = [p for p in get_user_exports_dir(user_id).glob("screened_*.xlsx") if p.is_file()] export_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) for old_file in export_files[max_files:]: try: old_file.unlink() except Exception: continue except Exception: return def persist_downloadable_dataframe(user_id: str, df: pd.DataFrame) -> str | None: _ensure_session_dirs(user_id) filename = f"screened_{datetime.utcnow().strftime('%Y%m%d_%H%M%S_%f')}.xlsx" export_path = get_user_exports_dir(user_id) / filename try: df.to_excel(export_path, index=False) if not export_path.exists() or export_path.stat().st_size == 0: return None if is_debug_enabled(): print( f"[DEBUG] Export ready: path={export_path.resolve()} size={export_path.stat().st_size} bytes" ) _cleanup_old_exports(user_id) return str(export_path.resolve()) except Exception: return None def build_inline_download_html(path: str | None) -> str: if not path: return "
Download unavailable.
" candidate = Path(path) if not candidate.exists() or not candidate.is_file(): return "Download unavailable: exported file not found.
" try: raw = candidate.read_bytes() except Exception: return "Download unavailable: could not read exported file.
" if len(raw) == 0: return "Download unavailable: exported file is empty.
" if len(raw) > MAX_INLINE_DOWNLOAD_BYTES: size_mb = len(raw) / (1024 * 1024) return ( f"Inline download disabled for large files ({size_mb:.1f} MB). " "Reduce export size and try again.
" ) b64 = base64.b64encode(raw).decode("ascii") filename = candidate.name href = ( "data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64," f"{b64}" ) return ( "Download:
" f'Download Excel' ) def empty_description_updates() -> list[dict[str, Any]]: return [ gr.update(label=f"Description {idx + 1}", value="", visible=False) for idx in range(MAX_DESCRIPTION_FIELDS) ] def empty_extracted_state(status: str, *, extracted_state: dict[str, Any] | None = None): return ( extracted_state or {}, [], *build_empty_extracted_input_updates(), "", 0.0, "include", "", "", "", "", status, ) def is_missing(value: Any) -> bool: if pd.isna(value): return True if isinstance(value, str) and value.strip() == "": return True return False def parse_csv_columns(raw_text: str, available_columns: list[str]) -> list[str]: if not raw_text or not raw_text.strip(): return [] requested = [item.strip() for item in raw_text.split(",") if item.strip()] return [col for col in requested if col in available_columns] def choose_url_column(df: pd.DataFrame, preferred: str | None = None) -> str: if preferred and preferred in df.columns: return preferred for col in df.columns: col_l = str(col).lower() if "url" in col_l or "link" in col_l: return col return str(df.columns[0]) def parse_criteria_file(file_obj) -> dict[str, Any] | None: if file_obj is None: return None path = resolve_uploaded_path(file_obj) if str(path).strip() == "": return None if not path.exists() or not path.is_file(): raise ValueError("Criteria file not found.") try: raw = path.read_text(encoding="utf-8") except Exception as exc: raise ValueError(f"Failed reading criteria file: {exc}") from exc try: parsed = yaml.safe_load(raw) except Exception as exc: raise ValueError(f"Invalid YAML in criteria file: {exc}") from exc if not isinstance(parsed, dict): raise ValueError("criteria.yml must contain a top-level mapping/object.") topic = str(parsed.get("topic", "")).strip() inclusion = parsed.get("inclusion_criteria", []) exclusion = parsed.get("exclusion_criteria", []) notes = str(parsed.get("notes", "")).strip() if not topic: raise ValueError("criteria.yml requires a non-empty 'topic'.") if not isinstance(inclusion, list): raise ValueError("'inclusion_criteria' must be a list of strings.") if not isinstance(exclusion, list): raise ValueError("'exclusion_criteria' must be a list of strings.") inclusion_clean = [str(item).strip() for item in inclusion if str(item).strip()] exclusion_clean = [str(item).strip() for item in exclusion if str(item).strip()] return { "topic": topic, "inclusion_criteria": inclusion_clean, "exclusion_criteria": exclusion_clean, "notes": notes, } def parse_labels_csv(raw: Any) -> list[str]: if raw is None or pd.isna(raw): return [] text = str(raw).strip() if not text: return [] labels = [item.strip() for item in text.split(",") if item.strip()] return list(dict.fromkeys(labels)) def build_default_descriptions(columns: list[str]) -> dict[str, str]: return {col: f"Extract the value for '{col}' from the article text." for col in columns} def build_description_values_from_inputs( description_columns: list[str], description_values: list[str], target_columns: list[str], ) -> dict[str, str]: defaults = build_default_descriptions(target_columns) if not isinstance(description_columns, list): description_columns = [] for idx, col in enumerate(description_columns): if idx >= len(description_values): break col_name = str(col).strip() desc = str(description_values[idx]).strip() if col_name in defaults and desc: defaults[col_name] = desc return defaults def build_description_input_updates( target_columns: list[str], previous_description_columns: list[str], previous_description_values: list[str], ) -> tuple[list[str], list[dict[str, Any]]]: description_map = build_description_values_from_inputs( previous_description_columns, previous_description_values, target_columns, ) active_columns = target_columns[:MAX_DESCRIPTION_FIELDS] updates: list[dict[str, Any]] = [] for idx in range(MAX_DESCRIPTION_FIELDS): if idx < len(active_columns): col = active_columns[idx] updates.append( gr.update( label=f"Description: {col}", value=description_map.get(col, ""), visible=True, ) ) else: updates.append( gr.update( label=f"Description {idx + 1}", value="", visible=False, ) ) return active_columns, updates def build_empty_extracted_input_updates() -> list[dict[str, Any]]: return [ gr.update(label=f"Extracted field {idx + 1}", value="", visible=False) for idx in range(MAX_EXTRACT_FIELDS) ] def build_extracted_input_updates( target_columns: list[str], field_values: dict[str, Any], ) -> tuple[list[str], list[dict[str, Any]]]: active_columns = target_columns[:MAX_EXTRACT_FIELDS] updates: list[dict[str, Any]] = [] for idx in range(MAX_EXTRACT_FIELDS): if idx < len(active_columns): col = active_columns[idx] updates.append( gr.update( label=f"Extracted: {col}", value=str(field_values.get(col, "")), visible=True, ) ) else: updates.append( gr.update( label=f"Extracted field {idx + 1}", value="", visible=False, ) ) return active_columns, updates def build_extracted_values_from_inputs( extracted_columns: list[str], extracted_values: list[str], ) -> dict[str, str]: fields: dict[str, str] = {} if not isinstance(extracted_columns, list): return fields for idx, col in enumerate(extracted_columns): if idx >= len(extracted_values): break fields[str(col)] = str(extracted_values[idx]) if extracted_values[idx] is not None else "" return fields def coerce_fields_from_llm(parsed: dict[str, Any], column_names: list[str]) -> dict[str, str]: raw_fields = parsed.get("fields", {}) fields_dict: dict[str, Any] = {} if isinstance(raw_fields, dict): fields_dict = raw_fields elif isinstance(raw_fields, str): try: maybe_obj = json.loads(raw_fields) if isinstance(maybe_obj, dict): fields_dict = maybe_obj except json.JSONDecodeError: fields_dict = {} # Fallback: model may place extracted values at top level. if not fields_dict: fields_dict = { col: parsed.get(col, "") for col in column_names if col in parsed } # Fuzzy fallback: tolerate minor key format differences. if len(fields_dict) == 0: normalized_requested = {normalize_key(col): col for col in column_names} for key, value in parsed.items(): if key in {"fields", "evidence", "confidence", "decision"}: continue norm = normalize_key(str(key)) if norm in normalized_requested: fields_dict[normalized_requested[norm]] = value return {col: str(fields_dict.get(col, "")) for col in column_names} def _parse_structured_text(raw: str) -> Any: txt = raw.strip() if not txt: return None try: return json.loads(txt) except Exception: pass try: return ast.literal_eval(txt) except Exception: return None def _coerce_evidence_items(raw: Any) -> list[dict[str, str]]: items: list[dict[str, str]] = [] if raw is None: return items if isinstance(raw, str): parsed = _parse_structured_text(raw) if parsed is None: return items raw = parsed if isinstance(raw, dict): # Supports {"FIELD": "..."} and {"FIELD": ["...", "..."]} for field, snippet_value in raw.items(): if isinstance(snippet_value, list): for s in snippet_value: snippet = str(s).strip() if snippet: items.append({"field": str(field).strip(), "snippet": snippet}) else: snippet = str(snippet_value).strip() if snippet: items.append({"field": str(field).strip(), "snippet": snippet}) return items if isinstance(raw, list): for item in raw: if isinstance(item, dict): field = str(item.get("field", "")).strip() snippet = str(item.get("snippet", "")).strip() if field and snippet: items.append({"field": field, "snippet": snippet}) elif isinstance(item, str): # String entries without explicit field are ignored here. continue return items def normalize_evidence_snippets(parsed: dict[str, Any], column_names: list[str], fields: dict[str, str]) -> list[dict[str, str]]: normalized: list[dict[str, str]] = [] # Primary source for item in _coerce_evidence_items(parsed.get("evidence_snippets", [])): field = item["field"] snippet = item["snippet"] if field in column_names and snippet: normalized.append({"field": field, "snippet": snippet}) # Secondary source: legacy or malformed `evidence` payload if not normalized: for item in _coerce_evidence_items(parsed.get("evidence", "")): field = item["field"] snippet = item["snippet"] if field in column_names and snippet: normalized.append({"field": field, "snippet": snippet}) # Fallback for legacy single-string evidence. if not normalized: legacy_evidence = str(parsed.get("evidence", "")).strip() if legacy_evidence: non_empty_fields = [col for col in column_names if str(fields.get(col, "")).strip()] target_field = non_empty_fields[0] if non_empty_fields else (column_names[0] if column_names else "unknown") normalized.append({"field": target_field, "snippet": legacy_evidence}) # De-duplicate exact repeats while preserving order. deduped: list[dict[str, str]] = [] seen: set[tuple[str, str]] = set() for item in normalized: key = (item["field"], item["snippet"]) if key in seen: continue seen.add(key) deduped.append(item) return deduped def format_evidence_for_ui(snippets: list[dict[str, str]]) -> str: if not snippets: return "" return "\n".join([f"- {item['field']}: {item['snippet']}" for item in snippets]) def detect_incomplete_rows(df: pd.DataFrame, target_columns: list[str]) -> list[int]: return [ int(idx) for idx, row in df.iterrows() if any(is_missing(row.get(col)) for col in target_columns) ] def get_missing_columns(df: pd.DataFrame, row_index: int, target_columns: list[str]) -> list[str]: row = df.loc[row_index] return [col for col in target_columns if is_missing(row.get(col))] def get_next_row(df: pd.DataFrame, incomplete_rows: list[int], position: int, target_columns: list[str]) -> tuple[int, int | None]: while position < len(incomplete_rows): row_idx = incomplete_rows[position] if len(get_missing_columns(df, row_idx, target_columns)) > 0: return position, row_idx position += 1 return position, None def _find_first_column(df: pd.DataFrame, candidates: list[str]) -> str | None: normalized = {str(col).lower().strip(): str(col) for col in df.columns} for candidate in candidates: if candidate in normalized: return normalized[candidate] for col in df.columns: col_l = str(col).lower() for candidate in candidates: if candidate in col_l: return str(col) return None def article_details_markdown(df: pd.DataFrame, row_index: int) -> str: title_col = _find_first_column(df, ["title", "article title", "paper title", "study title"]) author_col = _find_first_column(df, ["author", "authors", "first author"]) title_value = "" author_value = "" if title_col is not None: raw = df.loc[row_index, title_col] title_value = "" if pd.isna(raw) else str(raw).strip() if author_col is not None: raw = df.loc[row_index, author_col] author_value = "" if pd.isna(raw) else str(raw).strip() if not title_value: title_value = "Unknown" if not author_value: author_value = "Unknown" return f"**Title:** {title_value}\n\n**Author(s):** {author_value}" def render_current_row( df: pd.DataFrame | None, incomplete_rows: list[int] | None, position: int, url_column: str, target_columns: list[str], ) -> tuple[int, int | None, str, str, str, str, str]: if df is None or incomplete_rows is None or len(incomplete_rows) == 0: return ( position, None, "", "No rows loaded.", "", "", "", ) next_position, row_idx = get_next_row(df, incomplete_rows, position, target_columns) if row_idx is None: return ( next_position, None, "", "All target rows are complete.", "", "", f"Processed {len(incomplete_rows)} / {len(incomplete_rows)} rows.", ) article_md = article_details_markdown(df, row_idx) url_value = str(df.loc[row_idx, url_column]) if url_column in df.columns else "" url_md = f"[Open article URL]({url_value})" if url_value else "URL not available" missing_md = "" current_md = "" counter = f"Row {next_position + 1} of {len(incomplete_rows)} (index: {row_idx})" return next_position, row_idx, article_md, url_md, current_md, missing_md, counter def _parse_target_columns_for_ui( target_columns_text: str, url_column_text: str, df: pd.DataFrame | None, ) -> list[str]: raw_requested = [item.strip() for item in (target_columns_text or "").split(",") if item.strip()] deduped_requested = list(dict.fromkeys(raw_requested)) if df is None or df.empty: return deduped_requested available_columns = [str(c) for c in df.columns] url_column = choose_url_column(df, url_column_text.strip() if url_column_text else None) selected_target_columns = parse_csv_columns(target_columns_text, available_columns) if not selected_target_columns: selected_target_columns = [str(c) for c in df.columns if str(c) != url_column] return selected_target_columns def refresh_description_inputs( target_columns_text: str, url_column_text: str, df: pd.DataFrame | None, description_columns: list[str], *description_values: str, ): target_columns = _parse_target_columns_for_ui(target_columns_text, url_column_text, df) normalized_description_columns, description_updates = build_description_input_updates( target_columns, description_columns if isinstance(description_columns, list) else [], list(description_values), ) return ( normalized_description_columns, *description_updates, ) def load_excel( file_obj, criteria_file_obj, user_id_input: str, target_columns_text: str, url_column_text: str, description_columns: list[str], *description_values: str, request: gr.Request | None = None, ): user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) def _failure(message: str): download_html = build_inline_download_html(None) return ( None, [], 0, None, [], "", {}, [], [], message, "", "", "", "", "", *empty_description_updates(), *build_empty_extracted_input_updates(), "", 0.0, "include", "", "", "", "", download_html, gr.update(value=None), ) if file_obj is None: return _failure("Please upload an Excel file.") try: excel_path = resolve_uploaded_path(file_obj) if str(excel_path).strip() == "": return _failure("Please upload an Excel file.") df = pd.read_excel(str(excel_path)) except Exception as exc: return _failure(f"Failed to read Excel: {exc}") if df.empty: return _failure("Excel file is empty.") url_column = choose_url_column(df, url_column_text.strip() if url_column_text else None) selected_target_columns = _parse_target_columns_for_ui(target_columns_text, url_column_text, df) incomplete_rows = detect_incomplete_rows(df, selected_target_columns) normalized_description_columns, description_updates = build_description_input_updates( selected_target_columns, description_columns if isinstance(description_columns, list) else [], list(description_values), ) extracted_columns_for_ui, extracted_updates = build_extracted_input_updates(selected_target_columns, {}) position, row_idx, article_md, url_md, current_md, missing_md, counter = render_current_row( df, incomplete_rows, 0, url_column, selected_target_columns, ) status = ( f"Loaded {len(df)} rows. Found {len(incomplete_rows)} rows with missing target values." if len(incomplete_rows) > 0 else "Loaded file, but no incomplete rows were found for the selected target columns." ) if len(selected_target_columns) > MAX_DESCRIPTION_FIELDS: status += ( f" Showing description inputs for the first {MAX_DESCRIPTION_FIELDS} target columns." ) if len(selected_target_columns) > MAX_EXTRACT_FIELDS: status += ( f" Showing extracted inputs for the first {MAX_EXTRACT_FIELDS} target columns." ) description_map = build_description_values_from_inputs( description_columns, list(description_values), selected_target_columns, ) saved_description_values = [description_map.get(col, "") for col in normalized_description_columns] saved_excel_path = persist_uploaded_file(user_id, file_obj, "uploaded_excel.xlsx") working_df_path = persist_dataframe(user_id, df) downloadable_path = persist_downloadable_dataframe(user_id, df) download_html = build_inline_download_html(downloadable_path) if not downloadable_path: status += " Download export is currently unavailable; try again after processing a row." save_session_meta( user_id, { "target_columns_text": target_columns_text or "", "url_column_text": url_column_text or "", "description_columns": normalized_description_columns, "description_values": saved_description_values, "extracted_columns": extracted_columns_for_ui, "extracted_values": ["" for _ in extracted_columns_for_ui], "evidence": "", "confidence": 0.0, "decision": "include", "criteria_rationale": "", "labels_current": "", "labels_suggested": "", "labels_rationale": "", "excel_path": saved_excel_path or "", "criteria_path": persist_uploaded_file(user_id, criteria_file_obj, "criteria.yml") or load_session_meta(user_id).get("criteria_path", ""), "df_path": working_df_path, "download_path": downloadable_path or "", } ) return ( df, incomplete_rows, position, row_idx, selected_target_columns, url_column, {}, normalized_description_columns, extracted_columns_for_ui, status, article_md, url_md, current_md, missing_md, counter, *description_updates, *extracted_updates, "", 0.0, "include", "", "", "", "", download_html, gr.update(value=None), ) def parse_pdf(file_obj) -> str: if file_obj is None: raise ValueError("Please upload a PDF file.") path = resolve_uploaded_path(file_obj) if str(path).strip() == "": raise ValueError("Please upload a PDF file.") try: with fitz.open(str(path)) as doc: text_chunks = [page.get_text("text") for page in doc] except Exception as exc: raise ValueError(f"Invalid or unreadable PDF: {exc}") from exc text = "\n".join(text_chunks).strip() if not text: raise ValueError("No text extracted from PDF. OCR fallback is not implemented in this MVP.") return text def load_prompt_file(path: Path) -> str: try: return path.read_text(encoding="utf-8").strip() except FileNotFoundError as exc: raise RuntimeError(f"Prompt file not found: {path}") from exc except Exception as exc: raise RuntimeError(f"Failed to load prompt file {path}: {exc}") from exc def build_user_prompt(text: str, column_names: list[str], column_descriptions: dict[str, str]) -> dict[str, Any]: description_block = {col: column_descriptions.get(col, "") for col in column_names} template_raw = load_prompt_file(USER_PROMPT_TEMPLATE_PATH) template = Template(template_raw) rendered = template.substitute( fields_schema_json=json.dumps({col: "string" for col in column_names}), fill_only_requested_fields_json=json.dumps(column_names), column_descriptions_json=json.dumps(description_block), article_text=json.dumps(text), ) try: return json.loads(rendered) except json.JSONDecodeError as exc: raise RuntimeError(f"User prompt template rendered invalid JSON: {exc}") from exc def build_criteria_user_prompt(text: str, criteria: dict[str, Any]) -> dict[str, Any]: template_raw = load_prompt_file(USER_CRITERIA_TEMPLATE_PATH) template = Template(template_raw) rendered = template.substitute( topic_json=json.dumps(criteria.get("topic", "")), inclusion_criteria_json=json.dumps(criteria.get("inclusion_criteria", [])), exclusion_criteria_json=json.dumps(criteria.get("exclusion_criteria", [])), notes_json=json.dumps(criteria.get("notes", "")), article_text=json.dumps(text), ) try: return json.loads(rendered) except json.JSONDecodeError as exc: raise RuntimeError(f"Criteria user prompt rendered invalid JSON: {exc}") from exc def build_labels_user_prompt(text: str, current_labels: list[str]) -> dict[str, Any]: template_raw = load_prompt_file(USER_LABELS_TEMPLATE_PATH) template = Template(template_raw) rendered = template.substitute( current_labels_json=json.dumps(current_labels), article_text=json.dumps(text), ) try: return json.loads(rendered) except json.JSONDecodeError as exc: raise RuntimeError(f"Labels user prompt rendered invalid JSON: {exc}") from exc def _azure_client() -> AzureOpenAI: endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") api_key = os.getenv("AZURE_OPENAI_API_KEY") api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-08-01-preview") if not endpoint or not api_key: raise RuntimeError("AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set.") return AzureOpenAI( azure_endpoint=endpoint, api_key=api_key, api_version=api_version, ) def _call_llm_json(system_prompt: str, user_prompt: dict[str, Any]) -> dict[str, Any]: deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4.1-mini") client = _azure_client() request_timeout = float(os.getenv("AZURE_OPENAI_TIMEOUT_SECONDS", "90")) try: response = client.chat.completions.create( model=deployment, temperature=0, response_format={"type": "json_object"}, timeout=request_timeout, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": json.dumps(user_prompt)}, ], ) except Exception as exc: raise RuntimeError(f"Azure OpenAI request failed: {exc}") from exc content = response.choices[0].message.content if response.choices else "" if not content: raise RuntimeError("LLM returned empty content.") try: return json.loads(content) except json.JSONDecodeError as exc: raise RuntimeError(f"LLM output is not valid JSON: {exc}") from exc def extract_with_llm(text: str, column_names: list[str], column_descriptions: dict[str, str]) -> dict[str, Any]: system_prompt = load_prompt_file(SYSTEM_PROMPT_PATH) user_prompt = build_user_prompt(text, column_names, column_descriptions) parsed = _call_llm_json(system_prompt, user_prompt) normalized_fields = coerce_fields_from_llm(parsed, column_names) normalized_evidence_snippets = normalize_evidence_snippets(parsed, column_names, normalized_fields) evidence_text = format_evidence_for_ui(normalized_evidence_snippets) if is_debug_enabled(): print("[DEBUG] LLM parsed response:", parsed) print("[DEBUG] Parsed keys:", list(parsed.keys())) print("[DEBUG] Requested columns:", column_names) print("[DEBUG] Extracted fields:", normalized_fields) confidence_raw = parsed.get("confidence", 0) try: confidence = float(confidence_raw) except Exception: confidence = 0.0 confidence = min(max(confidence, 0.0), 1.0) decision = str(parsed.get("decision", "include")).strip().lower() if decision not in VALID_DECISIONS: decision = "include" return { "fields": normalized_fields, "evidence": evidence_text, "evidence_snippets": normalized_evidence_snippets, "confidence": confidence, "decision": decision, } def evaluate_with_criteria_llm(text: str, criteria: dict[str, Any]) -> dict[str, Any]: system_prompt = load_prompt_file(SYSTEM_CRITERIA_PROMPT_PATH) user_prompt = build_criteria_user_prompt(text, criteria) parsed = _call_llm_json(system_prompt, user_prompt) confidence_raw = parsed.get("confidence", 0) try: confidence = float(confidence_raw) except Exception: confidence = 0.0 confidence = min(max(confidence, 0.0), 1.0) decision = str(parsed.get("decision", "include")).strip().lower() if decision not in VALID_DECISIONS: decision = "include" rationale = str(parsed.get("rationale", "")).strip() return { "decision": decision, "confidence": confidence, "rationale": rationale, } def validate_rayyan_labels_llm(text: str, current_labels: list[str]) -> dict[str, Any]: system_prompt = load_prompt_file(SYSTEM_LABELS_PROMPT_PATH) user_prompt = build_labels_user_prompt(text, current_labels) parsed = _call_llm_json(system_prompt, user_prompt) suggested = parsed.get("suggested_labels", []) rationale = str(parsed.get("rationale", "")).strip() if not isinstance(suggested, list): suggested = [] suggested_labels = list(dict.fromkeys([str(item).strip() for item in suggested if str(item).strip()])) # Keep switch-only behavior: same label count as original when labels exist. if current_labels: if len(suggested_labels) != len(current_labels): suggested_labels = current_labels[:] rationale = "" if not suggested_labels: suggested_labels = current_labels[:] if suggested_labels == current_labels: rationale = "" return { "current_labels": current_labels, "suggested_labels": suggested_labels, "rationale": rationale, } def labels_to_text(labels: list[str]) -> str: if not labels: return "" return ", ".join(labels) def update_row(df: pd.DataFrame, row_index: int, values: dict[str, Any]) -> pd.DataFrame: for col, val in values.items(): if col in df.columns: try: df.at[row_index, col] = val except (TypeError, ValueError): # Some Excel columns are inferred as float64 when mostly empty. # Upcast that column so text values from extraction can be stored. df[col] = df[col].astype("object") df.at[row_index, col] = val return df def process_pdf_and_extract( pdf_file, criteria_file, user_id_input: str, df: pd.DataFrame, current_row_index: int | None, target_columns: list[str], description_columns: list[str], *description_values: str, progress=gr.Progress(), request: gr.Request | None = None, ): user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) if df is None or current_row_index is None: return empty_extracted_state("Load Excel and start screening first.") try: debug_log("Process PDF started", {"row_index": current_row_index}) progress(0.15, desc="Extracting text from PDF") text = parse_pdf(pdf_file) criteria = parse_criteria_file(criteria_file) if criteria_file is not None else None missing_columns = get_missing_columns(df, current_row_index, target_columns) if len(missing_columns) == 0: return empty_extracted_state("Current row has no missing target fields.") descriptions = build_description_values_from_inputs( description_columns, list(description_values), missing_columns, ) labels_column = "RAYYAN_Labels" if "RAYYAN_Labels" in df.columns else None current_labels = parse_labels_csv(df.loc[current_row_index, labels_column]) if labels_column else [] progress(0.50, desc="Running parallel LLM workflows") workflow_timeout = float(os.getenv("WORKFLOW_TIMEOUT_SECONDS", "120")) warnings: list[str] = [] with ThreadPoolExecutor(max_workers=3) as executor: extraction_future = executor.submit(extract_with_llm, text, missing_columns, descriptions) criteria_future = ( executor.submit(evaluate_with_criteria_llm, text, criteria) if criteria is not None else None ) labels_future = executor.submit(validate_rayyan_labels_llm, text, current_labels) try: result = extraction_future.result(timeout=workflow_timeout) except Exception as exc: raise RuntimeError(f"Extraction workflow failed: {exc}") from exc criteria_result = None if criteria_future is not None: try: criteria_result = criteria_future.result(timeout=workflow_timeout) except Exception as exc: warnings.append(f"Criteria workflow failed: {exc}") debug_log("Criteria workflow failed", repr(exc)) labels_result = { "current_labels": current_labels, "suggested_labels": current_labels, "rationale": "", } try: labels_result = labels_future.result(timeout=workflow_timeout) except Exception as exc: warnings.append(f"RAYYAN labels workflow failed: {exc}") debug_log("Labels workflow failed", repr(exc)) if criteria_result is not None: result["decision"] = criteria_result["decision"] result["confidence"] = criteria_result["confidence"] criteria_rationale_ui = "" if criteria_result is not None: criteria_rationale_ui = criteria_result.get("rationale", "") or "" labels_current_ui = labels_to_text(labels_result.get("current_labels", [])) labels_suggested_ui = labels_to_text(labels_result.get("suggested_labels", [])) labels_rationale_ui = str(labels_result.get("rationale", "")).strip() extracted_columns, extracted_updates = build_extracted_input_updates( missing_columns, result["fields"], ) extraction_status = "Extraction completed. Review and Accept/Edit/Reject." if criteria is None: extraction_status = ( "Extraction completed without criteria.yml; confidence/decision are based on extraction output. " "Upload criteria.yml to override them with criteria screening." ) if extracted_columns and all(str(result["fields"].get(col, "")).strip() == "" for col in extracted_columns): extraction_status = ( "Extraction completed, but all extracted fields are empty. " "Check column descriptions/PDF content. Enable APP_DEBUG=1 to inspect raw model output." ) if warnings: extraction_status = f"{extraction_status} Warnings: {' | '.join(warnings)}" final_evidence_text = result["evidence"] result["evidence"] = final_evidence_text result["labels_current"] = labels_current_ui result["labels_suggested"] = labels_suggested_ui result["labels_rationale"] = labels_rationale_ui result["criteria_rationale"] = criteria_rationale_ui description_values_list = list(description_values) saved_description_columns = description_columns if isinstance(description_columns, list) else [] save_session_meta( user_id, { "description_columns": saved_description_columns, "description_values": description_values_list[: len(saved_description_columns)], "extracted_columns": extracted_columns, "extracted_values": [str(result["fields"].get(col, "")) for col in extracted_columns], "evidence": final_evidence_text, "confidence": float(result["confidence"]), "decision": result["decision"], "criteria_rationale": criteria_rationale_ui, "labels_current": labels_current_ui, "labels_suggested": labels_suggested_ui, "labels_rationale": labels_rationale_ui, "criteria_path": persist_uploaded_file(user_id, criteria_file, "criteria.yml") or load_session_meta(user_id).get("criteria_path", ""), "pdf_path": persist_uploaded_file(user_id, pdf_file, "uploaded_pdf.pdf") or load_session_meta(user_id).get("pdf_path", ""), } ) progress(1.0, desc="Done") debug_log("Process PDF completed", {"warnings": warnings, "decision": result["decision"]}) return ( result, extracted_columns, *extracted_updates, final_evidence_text, result["confidence"], result["decision"], criteria_rationale_ui, labels_current_ui, labels_suggested_ui, labels_rationale_ui, extraction_status, ) except Exception as exc: debug_log("Process PDF failed", repr(exc)) return empty_extracted_state(f"Processing failed: {exc}") def accept_extraction( extracted_columns: list[str], user_id_input: str, df: pd.DataFrame, current_row_index: int | None, incomplete_rows: list[int], position: int, url_column: str, target_columns: list[str], *extracted_values: str, request: gr.Request | None = None, ): user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) empty_extracted_updates = build_empty_extracted_input_updates() if df is None or current_row_index is None: download_html = build_inline_download_html(None) return ( df, position, current_row_index, {}, "", "", "", "", "", "Nothing to accept.", [], *empty_extracted_updates, "", 0.0, "include", "", "", "", "", download_html, gr.update(value=None), ) fields = build_extracted_values_from_inputs(extracted_columns, list(extracted_values)) df = update_row(df, current_row_index, fields) new_position = position + 1 next_position, next_row, article_md, url_md, current_md, missing_md, counter = render_current_row( df, incomplete_rows, new_position, url_column, target_columns, ) downloadable_path = persist_downloadable_dataframe(user_id, df) download_html = build_inline_download_html(downloadable_path) status = "Row updated and accepted." if not downloadable_path: status = f"{status} Download export could not be refreshed." save_session_meta( user_id, { "df_path": persist_dataframe(user_id, df), "extracted_columns": [], "extracted_values": [], "evidence": "", "confidence": 0.0, "decision": "include", "criteria_rationale": "", "labels_current": "", "labels_suggested": "", "labels_rationale": "", "position": next_position, "current_row_index": next_row, "download_path": downloadable_path or load_session_meta(user_id).get("download_path", ""), } ) return ( df, next_position, next_row, {}, article_md, url_md, current_md, missing_md, counter, status, [], *empty_extracted_updates, "", 0.0, "include", "", "", "", "", download_html, gr.update(value=None), ) def skip_row( user_id_input: str, df: pd.DataFrame, incomplete_rows: list[int], position: int, url_column: str, target_columns: list[str], request: gr.Request | None = None, ): user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) empty_extracted_updates = build_empty_extracted_input_updates() if df is None: download_html = build_inline_download_html(None) return ( df, position, None, "", "", "", "", "", "No dataset loaded.", [], *empty_extracted_updates, "", 0.0, "include", "", "", "", "", {}, download_html, gr.update(value=None), ) new_position = position + 1 next_position, next_row, article_md, url_md, current_md, missing_md, counter = render_current_row( df, incomplete_rows, new_position, url_column, target_columns, ) downloadable_path = persist_downloadable_dataframe(user_id, df) download_html = build_inline_download_html(downloadable_path) status = "Row skipped." if not downloadable_path: status = f"{status} Existing download may be stale." save_session_meta( user_id, { "df_path": persist_dataframe(user_id, df), "extracted_columns": [], "extracted_values": [], "evidence": "", "confidence": 0.0, "decision": "include", "criteria_rationale": "", "labels_current": "", "labels_suggested": "", "labels_rationale": "", "position": next_position, "current_row_index": next_row, "download_path": downloadable_path or load_session_meta(user_id).get("download_path", ""), } ) return ( df, next_position, next_row, article_md, url_md, current_md, missing_md, counter, status, [], *empty_extracted_updates, "", 0.0, "include", "", "", "", "", {}, download_html, gr.update(value=None), ) def reject_extraction(user_id_input: str, request: gr.Request | None = None): user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) save_session_meta( user_id, { "extracted_columns": [], "extracted_values": [], "evidence": "", "confidence": 0.0, "decision": "include", "criteria_rationale": "", "labels_current": "", "labels_suggested": "", "labels_rationale": "", } ) return empty_extracted_state("Extraction rejected. Upload another PDF or try again.") def restore_saved_session(user_id_input: str, request: gr.Request | None = None): user_id = resolve_user_id(explicit_user_id=user_id_input, request=request) meta = load_session_meta(user_id) target_columns_text = str(meta.get("target_columns_text", "")) url_column_text = str(meta.get("url_column_text", "")) description_columns = meta.get("description_columns", []) description_values = meta.get("description_values", []) if not isinstance(description_columns, list): description_columns = [] if not isinstance(description_values, list): description_values = [] excel_path = str(meta.get("df_path", "") or meta.get("excel_path", "")) pdf_path = str(meta.get("pdf_path", "")) criteria_path = str(meta.get("criteria_path", "")) download_path = str(meta.get("download_path", "")) excel_exists = bool(excel_path) and Path(excel_path).exists() pdf_exists = bool(pdf_path) and Path(pdf_path).exists() criteria_exists = bool(criteria_path) and Path(criteria_path).exists() download_exists = bool(download_path) and Path(download_path).exists() if excel_exists: loaded = load_excel( SimpleNamespace(name=excel_path), SimpleNamespace(name=criteria_path) if criteria_exists else None, user_id, target_columns_text, url_column_text, description_columns, *description_values, request=request, ) loaded = list(loaded[:-1]) # drop pdf clear update; demo.load sets pdf explicitly above loaded[9] = f"{loaded[9]} Restored saved session." else: loaded = list( load_excel( None, SimpleNamespace(name=criteria_path) if criteria_exists else None, user_id, target_columns_text, url_column_text, description_columns, *description_values, request=request, ) )[:-1] # drop pdf clear update; demo.load sets pdf explicitly above loaded[9] = "No saved session found." base_extracted_start = 15 + MAX_DESCRIPTION_FIELDS default_evidence_idx = base_extracted_start + MAX_EXTRACT_FIELDS default_confidence_idx = default_evidence_idx + 1 default_decision_idx = default_evidence_idx + 2 default_criteria_rationale_idx = default_evidence_idx + 3 default_labels_current_idx = default_evidence_idx + 4 default_labels_suggested_idx = default_evidence_idx + 5 default_labels_rationale_idx = default_evidence_idx + 6 saved_extracted_columns = meta.get("extracted_columns", []) saved_extracted_values = meta.get("extracted_values", []) if not isinstance(saved_extracted_columns, list): saved_extracted_columns = [] if not isinstance(saved_extracted_values, list): saved_extracted_values = [] saved_fields = build_extracted_values_from_inputs(saved_extracted_columns, saved_extracted_values) restored_extracted_columns, restored_extracted_updates = build_extracted_input_updates( saved_extracted_columns, saved_fields, ) if not restored_extracted_columns: restored_extracted_updates = build_empty_extracted_input_updates() extracted_state = { "fields": {col: saved_fields.get(col, "") for col in restored_extracted_columns}, "evidence": str(meta.get("evidence", "")), "confidence": float(meta.get("confidence", 0.0)), "decision": str(meta.get("decision", "include")), "criteria_rationale": str(meta.get("criteria_rationale", "")), "labels_current": str(meta.get("labels_current", "")), "labels_suggested": str(meta.get("labels_suggested", meta.get("label_suggestions", ""))), "labels_rationale": str(meta.get("labels_rationale", "")), } loaded[6] = extracted_state loaded[8] = restored_extracted_columns loaded[base_extracted_start : base_extracted_start + MAX_EXTRACT_FIELDS] = restored_extracted_updates loaded[default_evidence_idx] = extracted_state["evidence"] loaded[default_confidence_idx] = extracted_state["confidence"] loaded[default_decision_idx] = extracted_state["decision"] if extracted_state["decision"] in VALID_DECISIONS else "include" loaded[default_criteria_rationale_idx] = extracted_state["criteria_rationale"] loaded[default_labels_current_idx] = extracted_state["labels_current"] loaded[default_labels_suggested_idx] = extracted_state["labels_suggested"] loaded[default_labels_rationale_idx] = extracted_state["labels_rationale"] loaded[-1] = build_inline_download_html(download_path if download_exists else None) return ( user_id, gr.update(value=excel_path if excel_exists else None), gr.update(value=pdf_path if pdf_exists else None), gr.update(value=criteria_path if criteria_exists else None), target_columns_text, url_column_text, *loaded, gr.update(value=pdf_path if pdf_exists else None), ) def get_auth_config() -> list[tuple[str, str]] | tuple[str, str] | None: """Build Gradio basic auth config from environment variables. Expected Space Secrets: - USER1, USER2, ... with value "(username,password)" or "username,password" - Legacy fallback: - SPACE_APP_PASSWORD (required to enable legacy auth) - SPACE_APP_USERNAME (optional, defaults to 'admin') """ users: list[tuple[str, str]] = [] for key in sorted(os.environ.keys()): if not re.fullmatch(r"USER\d+", key): continue raw = os.getenv(key, "").strip() if not raw: continue username = "" password = "" try: parsed = ast.literal_eval(raw) if isinstance(parsed, tuple) and len(parsed) == 2: username = str(parsed[0]).strip() password = str(parsed[1]).strip() except Exception: parts = [part.strip() for part in raw.split(",", 1)] if len(parts) == 2: username, password = parts[0], parts[1] if username and password: users.append((username, password)) if users: return users password = os.getenv("SPACE_APP_PASSWORD", "").strip() if not password: return None username = os.getenv("SPACE_APP_USERNAME", "admin").strip() or "admin" return username, password _setup_storage_paths() with gr.Blocks(title="Scientific Article Screener") as demo: gr.Markdown("# Scientific Article Screener") gr.Markdown( "Upload an Excel file and process one incomplete row at a time with a PDF." ) # Session state df_state = gr.State(None) incomplete_rows_state = gr.State([]) position_state = gr.State(0) current_row_state = gr.State(None) target_columns_state = gr.State([]) url_column_state = gr.State("") extracted_state = gr.State({}) description_columns_state = gr.State([]) extracted_columns_state = gr.State([]) user_id_state = gr.State("default") with gr.Row(): # LEFT PANEL with gr.Column(scale=1): excel_file = gr.File(label="Upload Excel (.xlsx)", file_types=[".xlsx"]) criteria_file = gr.File(label="Upload criteria.yml (optional)", file_types=[".yml", ".yaml"]) target_columns_input = gr.Textbox( label="Target columns (comma-separated)", placeholder="Leave empty to use all columns except URL column", ) url_column_input = gr.Textbox( label="URL column name (optional)", placeholder="Leave empty to auto-detect", ) gr.Markdown("### Field descriptions") description_inputs: list[gr.Textbox] = [] for idx in range(MAX_DESCRIPTION_FIELDS): description_inputs.append( gr.Textbox( label=f"Description {idx + 1}", lines=4, visible=False, ) ) start_btn = gr.Button("Start screening", variant="primary") download_links_md = gr.HTML("") # WORKSPACE (previous middle + right, wider) with gr.Column(scale=2): row_counter = gr.Markdown("No row selected.") article_url_md = gr.Markdown("") article_details_md = gr.Markdown("") current_values_md = gr.Markdown("", visible=False) missing_columns_md = gr.Markdown("", visible=False) pdf_file = gr.File(label="Upload PDF", file_types=[".pdf"]) process_pdf_btn = gr.Button("Process PDF", variant="primary") gr.Markdown("### Extracted fields") extracted_inputs: list[gr.Textbox] = [] for idx in range(MAX_EXTRACT_FIELDS): extracted_inputs.append( gr.Textbox( label=f"Extracted field {idx + 1}", lines=1, visible=False, ) ) evidence_box = gr.Textbox(label="Evidence snippet", lines=4) confidence_box = gr.Slider(label="Confidence", minimum=0.0, maximum=1.0, step=0.01, value=0.0) decision_box = gr.Radio(label="Include/Exclude decision", choices=["include", "exclude"], value="include") criteria_rationale_box = gr.Textbox(label="Criteria rationale", lines=4) labels_current_box = gr.Textbox(label="RAYYAN current labels", lines=2) labels_suggested_box = gr.Textbox(label="RAYYAN suggested labels", lines=2) labels_rationale_box = gr.Textbox(label="RAYYAN label-switch rationale", lines=4) with gr.Row(): accept_btn = gr.Button("Accept", variant="primary") reject_btn = gr.Button("Reject") skip_btn = gr.Button("Skip") status_box = gr.Markdown("Ready.") base_row_outputs = [ df_state, incomplete_rows_state, position_state, current_row_state, target_columns_state, url_column_state, extracted_state, description_columns_state, extracted_columns_state, status_box, article_details_md, article_url_md, current_values_md, missing_columns_md, row_counter, ] extraction_outputs = [ *extracted_inputs, evidence_box, confidence_box, decision_box, criteria_rationale_box, labels_current_box, labels_suggested_box, labels_rationale_box, ] download_outputs = [download_links_md, pdf_file] demo_load_outputs = [ user_id_state, excel_file, pdf_file, criteria_file, target_columns_input, url_column_input, *base_row_outputs, *description_inputs, *extracted_inputs, evidence_box, confidence_box, decision_box, criteria_rationale_box, labels_current_box, labels_suggested_box, labels_rationale_box, download_links_md, pdf_file, ] start_outputs = [*base_row_outputs, *description_inputs, *extraction_outputs, *download_outputs] process_outputs = [extracted_state, extracted_columns_state, *extraction_outputs, status_box] accept_outputs = [ df_state, position_state, current_row_state, extracted_state, article_details_md, article_url_md, current_values_md, missing_columns_md, row_counter, status_box, extracted_columns_state, *extraction_outputs, *download_outputs, ] skip_outputs = [ df_state, position_state, current_row_state, article_details_md, article_url_md, current_values_md, missing_columns_md, row_counter, status_box, extracted_columns_state, *extraction_outputs, extracted_state, *download_outputs, ] reject_outputs = [extracted_state, extracted_columns_state, *extraction_outputs, status_box] demo.load(fn=init_user_id, inputs=[], outputs=[user_id_state]).then( fn=restore_saved_session, inputs=[user_id_state], outputs=demo_load_outputs, ) target_columns_input.change( fn=refresh_description_inputs, inputs=[target_columns_input, url_column_input, df_state, description_columns_state, *description_inputs], outputs=[description_columns_state, *description_inputs], ) url_column_input.change( fn=refresh_description_inputs, inputs=[target_columns_input, url_column_input, df_state, description_columns_state, *description_inputs], outputs=[description_columns_state, *description_inputs], ) start_btn.click( fn=load_excel, inputs=[excel_file, criteria_file, user_id_state, target_columns_input, url_column_input, description_columns_state, *description_inputs], outputs=start_outputs, ) process_pdf_btn.click( fn=process_pdf_and_extract, inputs=[pdf_file, criteria_file, user_id_state, df_state, current_row_state, target_columns_state, description_columns_state, *description_inputs], outputs=process_outputs, ) accept_btn.click( fn=accept_extraction, inputs=[ extracted_columns_state, user_id_state, df_state, current_row_state, incomplete_rows_state, position_state, url_column_state, target_columns_state, *extracted_inputs, ], outputs=accept_outputs, ) skip_btn.click( fn=skip_row, inputs=[user_id_state, df_state, incomplete_rows_state, position_state, url_column_state, target_columns_state], outputs=skip_outputs, ) reject_btn.click( fn=reject_extraction, inputs=[user_id_state], outputs=reject_outputs, ) if __name__ == "__main__": auth_config = get_auth_config() demo.launch( auth=auth_config, allowed_paths=[ str(APP_STORAGE_ROOT.resolve()), ], )