Spaces:
Running
Running
| import gc | |
| import json | |
| import os | |
| from typing import Callable | |
| import pandas as pd | |
| import yaml | |
| from openai import OpenAI | |
| from tqdm import tqdm | |
| # Environment-secret names expected in HF Space settings. | |
| FOUNDRY_ENDPOINT_ENV = "AZURE_FOUNDRY_ENDPOINT" | |
| FOUNDRY_DEPLOYMENT_ENV = "AZURE_FOUNDRY_DEPLOYMENT" | |
| FOUNDRY_API_KEY_ENV = "AZURE_API_KEY2" | |
| def resolve_foundry_config_from_env() -> tuple[str, str, str]: | |
| endpoint = (os.getenv(FOUNDRY_ENDPOINT_ENV) or "").strip() | |
| deployment = (os.getenv(FOUNDRY_DEPLOYMENT_ENV) or "").strip() | |
| api_key = (os.getenv(FOUNDRY_API_KEY_ENV) or "").strip() | |
| missing = [] | |
| if not endpoint: | |
| missing.append(FOUNDRY_ENDPOINT_ENV) | |
| if not deployment: | |
| missing.append(FOUNDRY_DEPLOYMENT_ENV) | |
| if not api_key: | |
| missing.append(FOUNDRY_API_KEY_ENV) | |
| if missing: | |
| raise RuntimeError( | |
| "Missing required Space secrets: " + ", ".join(missing) | |
| ) | |
| return endpoint, deployment, api_key | |
| def load_client(endpoint: str, api_key: str) -> OpenAI: | |
| if not endpoint: | |
| raise ValueError("Azure Foundry endpoint is required.") | |
| if not api_key: | |
| raise ValueError("Azure Foundry API key is required.") | |
| return OpenAI(base_url=endpoint, api_key=api_key, timeout=90.0, max_retries=2) | |
| def load_criteria(criteria_path: str) -> dict: | |
| if not os.path.exists(criteria_path): | |
| raise FileNotFoundError(f"Criteria file not found: {criteria_path}") | |
| ext = os.path.splitext(criteria_path)[1].lower() | |
| with open(criteria_path, "r", encoding="utf-8") as f: | |
| if ext in [".yaml", ".yml"]: | |
| criteria = yaml.safe_load(f) | |
| elif ext == ".json": | |
| criteria = json.load(f) | |
| else: | |
| raise ValueError("Unsupported criteria file format. Use .yaml/.yml or .json") | |
| required_keys = ["topic", "inclusion_criteria", "exclusion_criteria"] | |
| for key in required_keys: | |
| if key not in criteria: | |
| raise KeyError(f"Missing required key '{key}' in criteria file") | |
| return criteria | |
| def _truncate_text(text: str, max_chars: int) -> str: | |
| if not isinstance(text, str): | |
| return "" | |
| stripped = text.strip() | |
| if len(stripped) <= max_chars: | |
| return stripped | |
| return stripped[:max_chars] | |
| def build_prompt(title: str, abstract: str, criteria: dict) -> str: | |
| topic = criteria["topic"] | |
| inclusion_formatted = "\n".join(f"- {item}" for item in criteria["inclusion_criteria"]) | |
| exclusion_formatted = "\n".join(f"- {item}" for item in criteria["exclusion_criteria"]) | |
| return f""" | |
| You are assisting with a scoping review. | |
| Main review topic: | |
| {topic} | |
| Inclusion criteria: | |
| {inclusion_formatted} | |
| Exclusion criteria: | |
| {exclusion_formatted} | |
| You will receive the title and abstract of a study. | |
| Your tasks: | |
| 1. Decide whether this study should be: | |
| - "include" -> meets the topic and inclusion criteria and does not match any exclusion criteria | |
| - "exclude" -> clearly does not match the topic or clearly meets at least one exclusion criterion | |
| - "unclear" -> the abstract does not provide enough information to be confident about include or exclude | |
| 2. Provide a brief rationale that: | |
| - is strictly grounded in the information provided in the abstract, | |
| - explicitly references key phrases or information from the abstract, | |
| - explains how the abstract matches or fails to match the inclusion/exclusion criteria. | |
| STRICT RULES: | |
| - Base your decision ONLY on the title and abstract text provided. | |
| - Do NOT assume or invent information that is not clearly stated in the abstract. | |
| - If you are not reasonably certain based on the abstract alone, set verdict to "unclear". | |
| - Your output MUST be a SINGLE valid JSON object and NOTHING ELSE. | |
| - Do NOT include explanations, headings, examples, multiple solutions, or additional text outside the JSON. | |
| The JSON format you MUST follow is exactly: | |
| {{ | |
| "verdict": "<include|exclude|unclear>", | |
| "rationale": "Your explanation here, grounded in the abstract" | |
| }} | |
| Now analyze the following study and return ONLY one JSON object in the format above: | |
| Title: {title} | |
| Abstract: {abstract} | |
| """.strip() | |
| def _message_to_text(message_content) -> str: | |
| if message_content is None: | |
| return "" | |
| if isinstance(message_content, str): | |
| return message_content | |
| if isinstance(message_content, list): | |
| parts = [] | |
| for part in message_content: | |
| if isinstance(part, dict): | |
| text = part.get("text") | |
| if isinstance(text, str): | |
| parts.append(text) | |
| else: | |
| text = getattr(part, "text", None) | |
| if isinstance(text, str): | |
| parts.append(text) | |
| return "\n".join(parts).strip() | |
| text_attr = getattr(message_content, "text", None) | |
| if isinstance(text_attr, str): | |
| return text_attr | |
| return str(message_content).strip() | |
| def call_model( | |
| prompt: str, | |
| client: OpenAI, | |
| deployment_name: str, | |
| max_new_tokens: int = 400, | |
| temperature: float = 0.0, | |
| ) -> str: | |
| completion = client.chat.completions.create( | |
| model=deployment_name, | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=max_new_tokens, | |
| temperature=temperature, | |
| ) | |
| if not completion.choices: | |
| return "" | |
| return _message_to_text(getattr(completion.choices[0].message, "content", "")).strip() | |
| def extract_json_from_text(text: str) -> dict: | |
| try: | |
| obj = json.loads(text) | |
| if isinstance(obj, dict): | |
| return obj | |
| except json.JSONDecodeError: | |
| pass | |
| blocks = [] | |
| depth = 0 | |
| start = None | |
| for i, ch in enumerate(text): | |
| if ch == "{": | |
| if depth == 0: | |
| start = i | |
| depth += 1 | |
| elif ch == "}" and depth > 0: | |
| depth -= 1 | |
| if depth == 0 and start is not None: | |
| blocks.append(text[start : i + 1]) | |
| start = None | |
| parsed = [] | |
| for block in blocks: | |
| try: | |
| obj = json.loads(block) | |
| if isinstance(obj, dict): | |
| parsed.append(obj) | |
| except json.JSONDecodeError: | |
| continue | |
| candidates = [obj for obj in parsed if "verdict" in obj and "rationale" in obj] | |
| if candidates: | |
| return candidates[-1] | |
| if parsed: | |
| return parsed[-1] | |
| return { | |
| "verdict": "unclear", | |
| "rationale": "Could not parse model output as JSON.", | |
| } | |
| def evaluate_study( | |
| title: str, | |
| abstract: str, | |
| criteria: dict, | |
| client: OpenAI, | |
| deployment_name: str, | |
| max_new_tokens: int, | |
| temperature: float, | |
| max_title_chars: int, | |
| max_abstract_chars: int, | |
| ) -> dict: | |
| if not isinstance(abstract, str) or abstract.strip() == "": | |
| return { | |
| "verdict": "unclear", | |
| "rationale": "No abstract provided; unable to assess against inclusion/exclusion criteria.", | |
| } | |
| title_safe = _truncate_text(title if isinstance(title, str) else "", max_title_chars) | |
| abstract_safe = _truncate_text(abstract, max_abstract_chars) | |
| prompt = build_prompt(title=title_safe, abstract=abstract_safe, criteria=criteria) | |
| llm_output = call_model( | |
| prompt=prompt, | |
| client=client, | |
| deployment_name=deployment_name, | |
| max_new_tokens=max_new_tokens, | |
| temperature=temperature, | |
| ) | |
| result = extract_json_from_text(llm_output) | |
| verdict = str(result.get("verdict", "")).strip().lower() | |
| if verdict not in {"include", "exclude", "unclear"}: | |
| verdict = "unclear" | |
| rationale = str(result.get("rationale", "")).strip() or "No rationale provided by model." | |
| return {"verdict": verdict, "rationale": rationale} | |
| def process_excel_file( | |
| input_excel_path: str, | |
| output_excel_path: str, | |
| criteria_path: str, | |
| title_column: str = "Title", | |
| abstract_column: str = "Abstract", | |
| sheet_name: str | int | None = 0, | |
| max_new_tokens: int = 400, | |
| temperature: float = 0.0, | |
| progress_callback: Callable[[int, int], None] | None = None, | |
| progress_text_callback: Callable[[str, int, int], None] | None = None, | |
| ): | |
| endpoint, deployment_name, api_key = resolve_foundry_config_from_env() | |
| client = load_client(endpoint=endpoint, api_key=api_key) | |
| max_rows = int(os.getenv("MAX_SCREENING_ROWS", "5000")) | |
| max_title_chars = int(os.getenv("MAX_TITLE_CHARS", "1200")) | |
| max_abstract_chars = int(os.getenv("MAX_ABSTRACT_CHARS", "8000")) | |
| df = pd.read_excel(input_excel_path, sheet_name=sheet_name, engine="openpyxl") | |
| if title_column not in df.columns: | |
| raise KeyError(f"Title column '{title_column}' not found in Excel.") | |
| if abstract_column not in df.columns: | |
| raise KeyError(f"Abstract column '{abstract_column}' not found in Excel.") | |
| if len(df) > max_rows: | |
| raise ValueError( | |
| f"Workbook has {len(df)} rows, exceeding MAX_SCREENING_ROWS={max_rows}." | |
| ) | |
| criteria = load_criteria(criteria_path) | |
| verdicts = [] | |
| rationales = [] | |
| with tqdm(total=len(df), desc="Screening studies") as pbar: | |
| for idx, row in df.iterrows(): | |
| result = evaluate_study( | |
| title=row.get(title_column, ""), | |
| abstract=row.get(abstract_column, ""), | |
| criteria=criteria, | |
| client=client, | |
| deployment_name=deployment_name, | |
| max_new_tokens=max_new_tokens, | |
| temperature=temperature, | |
| max_title_chars=max_title_chars, | |
| max_abstract_chars=max_abstract_chars, | |
| ) | |
| verdicts.append(result["verdict"]) | |
| rationales.append(result["rationale"]) | |
| pbar.update(1) | |
| if progress_callback is not None: | |
| progress_callback(int(pbar.n), int(pbar.total)) | |
| if progress_text_callback is not None: | |
| progress_text_callback(str(pbar), int(pbar.n), int(pbar.total)) | |
| if (idx + 1) % 20 == 0: | |
| gc.collect() | |
| df["LLM_verdict"] = verdicts | |
| df["LLM_rationale"] = rationales | |
| df.to_excel(output_excel_path, index=False, engine="openpyxl") | |