import gc import json import os from typing import Callable import pandas as pd import yaml from openai import OpenAI from tqdm import tqdm # Environment-secret names expected in HF Space settings. FOUNDRY_ENDPOINT_ENV = "AZURE_FOUNDRY_ENDPOINT" FOUNDRY_DEPLOYMENT_ENV = "AZURE_FOUNDRY_DEPLOYMENT" FOUNDRY_API_KEY_ENV = "AZURE_API_KEY2" def resolve_foundry_config_from_env() -> tuple[str, str, str]: endpoint = (os.getenv(FOUNDRY_ENDPOINT_ENV) or "").strip() deployment = (os.getenv(FOUNDRY_DEPLOYMENT_ENV) or "").strip() api_key = (os.getenv(FOUNDRY_API_KEY_ENV) or "").strip() missing = [] if not endpoint: missing.append(FOUNDRY_ENDPOINT_ENV) if not deployment: missing.append(FOUNDRY_DEPLOYMENT_ENV) if not api_key: missing.append(FOUNDRY_API_KEY_ENV) if missing: raise RuntimeError( "Missing required Space secrets: " + ", ".join(missing) ) return endpoint, deployment, api_key def load_client(endpoint: str, api_key: str) -> OpenAI: if not endpoint: raise ValueError("Azure Foundry endpoint is required.") if not api_key: raise ValueError("Azure Foundry API key is required.") return OpenAI(base_url=endpoint, api_key=api_key, timeout=90.0, max_retries=2) def load_criteria(criteria_path: str) -> dict: if not os.path.exists(criteria_path): raise FileNotFoundError(f"Criteria file not found: {criteria_path}") ext = os.path.splitext(criteria_path)[1].lower() with open(criteria_path, "r", encoding="utf-8") as f: if ext in [".yaml", ".yml"]: criteria = yaml.safe_load(f) elif ext == ".json": criteria = json.load(f) else: raise ValueError("Unsupported criteria file format. Use .yaml/.yml or .json") required_keys = ["topic", "inclusion_criteria", "exclusion_criteria"] for key in required_keys: if key not in criteria: raise KeyError(f"Missing required key '{key}' in criteria file") return criteria def _truncate_text(text: str, max_chars: int) -> str: if not isinstance(text, str): return "" stripped = text.strip() if len(stripped) <= max_chars: return stripped return stripped[:max_chars] def build_prompt(title: str, abstract: str, criteria: dict) -> str: topic = criteria["topic"] inclusion_formatted = "\n".join(f"- {item}" for item in criteria["inclusion_criteria"]) exclusion_formatted = "\n".join(f"- {item}" for item in criteria["exclusion_criteria"]) return f""" You are assisting with a scoping review. Main review topic: {topic} Inclusion criteria: {inclusion_formatted} Exclusion criteria: {exclusion_formatted} You will receive the title and abstract of a study. Your tasks: 1. Decide whether this study should be: - "include" -> meets the topic and inclusion criteria and does not match any exclusion criteria - "exclude" -> clearly does not match the topic or clearly meets at least one exclusion criterion - "unclear" -> the abstract does not provide enough information to be confident about include or exclude 2. Provide a brief rationale that: - is strictly grounded in the information provided in the abstract, - explicitly references key phrases or information from the abstract, - explains how the abstract matches or fails to match the inclusion/exclusion criteria. STRICT RULES: - Base your decision ONLY on the title and abstract text provided. - Do NOT assume or invent information that is not clearly stated in the abstract. - If you are not reasonably certain based on the abstract alone, set verdict to "unclear". - Your output MUST be a SINGLE valid JSON object and NOTHING ELSE. - Do NOT include explanations, headings, examples, multiple solutions, or additional text outside the JSON. The JSON format you MUST follow is exactly: {{ "verdict": "", "rationale": "Your explanation here, grounded in the abstract" }} Now analyze the following study and return ONLY one JSON object in the format above: Title: {title} Abstract: {abstract} """.strip() def _message_to_text(message_content) -> str: if message_content is None: return "" if isinstance(message_content, str): return message_content if isinstance(message_content, list): parts = [] for part in message_content: if isinstance(part, dict): text = part.get("text") if isinstance(text, str): parts.append(text) else: text = getattr(part, "text", None) if isinstance(text, str): parts.append(text) return "\n".join(parts).strip() text_attr = getattr(message_content, "text", None) if isinstance(text_attr, str): return text_attr return str(message_content).strip() def call_model( prompt: str, client: OpenAI, deployment_name: str, max_new_tokens: int = 400, temperature: float = 0.0, ) -> str: completion = client.chat.completions.create( model=deployment_name, messages=[{"role": "user", "content": prompt}], max_tokens=max_new_tokens, temperature=temperature, ) if not completion.choices: return "" return _message_to_text(getattr(completion.choices[0].message, "content", "")).strip() def extract_json_from_text(text: str) -> dict: try: obj = json.loads(text) if isinstance(obj, dict): return obj except json.JSONDecodeError: pass blocks = [] depth = 0 start = None for i, ch in enumerate(text): if ch == "{": if depth == 0: start = i depth += 1 elif ch == "}" and depth > 0: depth -= 1 if depth == 0 and start is not None: blocks.append(text[start : i + 1]) start = None parsed = [] for block in blocks: try: obj = json.loads(block) if isinstance(obj, dict): parsed.append(obj) except json.JSONDecodeError: continue candidates = [obj for obj in parsed if "verdict" in obj and "rationale" in obj] if candidates: return candidates[-1] if parsed: return parsed[-1] return { "verdict": "unclear", "rationale": "Could not parse model output as JSON.", } def evaluate_study( title: str, abstract: str, criteria: dict, client: OpenAI, deployment_name: str, max_new_tokens: int, temperature: float, max_title_chars: int, max_abstract_chars: int, ) -> dict: if not isinstance(abstract, str) or abstract.strip() == "": return { "verdict": "unclear", "rationale": "No abstract provided; unable to assess against inclusion/exclusion criteria.", } title_safe = _truncate_text(title if isinstance(title, str) else "", max_title_chars) abstract_safe = _truncate_text(abstract, max_abstract_chars) prompt = build_prompt(title=title_safe, abstract=abstract_safe, criteria=criteria) llm_output = call_model( prompt=prompt, client=client, deployment_name=deployment_name, max_new_tokens=max_new_tokens, temperature=temperature, ) result = extract_json_from_text(llm_output) verdict = str(result.get("verdict", "")).strip().lower() if verdict not in {"include", "exclude", "unclear"}: verdict = "unclear" rationale = str(result.get("rationale", "")).strip() or "No rationale provided by model." return {"verdict": verdict, "rationale": rationale} def process_excel_file( input_excel_path: str, output_excel_path: str, criteria_path: str, title_column: str = "Title", abstract_column: str = "Abstract", sheet_name: str | int | None = 0, max_new_tokens: int = 400, temperature: float = 0.0, progress_callback: Callable[[int, int], None] | None = None, progress_text_callback: Callable[[str, int, int], None] | None = None, ): endpoint, deployment_name, api_key = resolve_foundry_config_from_env() client = load_client(endpoint=endpoint, api_key=api_key) max_rows = int(os.getenv("MAX_SCREENING_ROWS", "5000")) max_title_chars = int(os.getenv("MAX_TITLE_CHARS", "1200")) max_abstract_chars = int(os.getenv("MAX_ABSTRACT_CHARS", "8000")) df = pd.read_excel(input_excel_path, sheet_name=sheet_name, engine="openpyxl") if title_column not in df.columns: raise KeyError(f"Title column '{title_column}' not found in Excel.") if abstract_column not in df.columns: raise KeyError(f"Abstract column '{abstract_column}' not found in Excel.") if len(df) > max_rows: raise ValueError( f"Workbook has {len(df)} rows, exceeding MAX_SCREENING_ROWS={max_rows}." ) criteria = load_criteria(criteria_path) verdicts = [] rationales = [] with tqdm(total=len(df), desc="Screening studies") as pbar: for idx, row in df.iterrows(): result = evaluate_study( title=row.get(title_column, ""), abstract=row.get(abstract_column, ""), criteria=criteria, client=client, deployment_name=deployment_name, max_new_tokens=max_new_tokens, temperature=temperature, max_title_chars=max_title_chars, max_abstract_chars=max_abstract_chars, ) verdicts.append(result["verdict"]) rationales.append(result["rationale"]) pbar.update(1) if progress_callback is not None: progress_callback(int(pbar.n), int(pbar.total)) if progress_text_callback is not None: progress_text_callback(str(pbar), int(pbar.n), int(pbar.total)) if (idx + 1) % 20 == 0: gc.collect() df["LLM_verdict"] = verdicts df["LLM_rationale"] = rationales df.to_excel(output_excel_path, index=False, engine="openpyxl")