import json
import os
import re
import time
from typing import List, Tuple
from urllib.parse import urlparse
import boto3
import requests
import spaces
from tools.config import (
MAX_SPACES_GPU_RUN_TIME,
PRINT_TRANSFORMERS_USER_PROMPT,
REPORT_LLM_OUTPUTS_TO_GUI,
VLM_DEFAULT_DO_SAMPLE,
)
# Import mock patches if in test mode
if os.environ.get("USE_MOCK_LLM") == "1" or os.environ.get("TEST_MODE") == "1":
try:
# Try to import and apply mock patches
import sys
# Add project root to sys.path so we can import test.mock_llm_calls
project_root = os.path.dirname(os.path.dirname(__file__))
if project_root not in sys.path:
sys.path.insert(0, project_root)
# try:
# from test.mock_llm_calls import apply_mock_patches
# apply_mock_patches()
# except ImportError:
# # If mock module not found, continue without mocking
# pass
except Exception:
# If anything fails, continue without mocking
pass
try:
from google import genai as ai
from google.genai import types
except ImportError:
print(
"Warning: Google GenAI not found. Google GenAI functionality will not be available."
)
pass
from gradio import Progress
from huggingface_hub import hf_hub_download
try:
from openai import OpenAI
except ImportError:
print("Warning: OpenAI not found. OpenAI functionality will not be available.")
pass
from tqdm import tqdm
model_type = None # global variable setup
full_text = (
"" # Define dummy source text (full text) just to enable highlight function to load
)
# Global variables for PII detection model and tokenizer
# These are now used for all LLM model loading (both general and PII-specific)
_pii_model = None
_pii_tokenizer = None
_pii_assistant_model = None
# Import config variables with defaults for missing ones
# This allows llm_funcs.py to work even if some config variables don't exist
from tools.config import (
ASSISTANT_MODEL,
COMPILE_MODE,
COMPILE_TRANSFORMERS,
HF_TOKEN,
INFERENCE_SERVER_DISABLE_THINKING,
INT8_WITH_OFFLOAD_TO_CPU,
LLM_CONTEXT_LENGTH,
LLM_MAX_NEW_TOKENS,
LLM_MIN_P,
LLM_MODEL_DTYPE,
LLM_REPETITION_PENALTY,
LLM_RESET,
LLM_RETRY_ATTEMPTS,
LLM_SEED,
LLM_STOP_STRINGS,
LLM_STREAM,
LLM_TEMPERATURE,
LLM_THREADS,
LLM_TIMEOUT_WAIT,
LLM_TOP_K,
LLM_TOP_P,
LOAD_TRANSFORMERS_LLM_PII_MODEL_AT_START,
LOCAL_TRANSFORMERS_LLM_PII_MODEL_CHOICE,
LOCAL_TRANSFORMERS_LLM_PII_REPO_ID,
MULTIMODAL_PROMPT_FORMAT,
QUANTISE_TRANSFORMERS_LLM_MODELS,
REASONING_SUFFIX,
SELECTED_LOCAL_TRANSFORMERS_VLM_MODEL,
SHOW_TRANSFORMERS_LLM_PII_DETECTION_OPTIONS,
SPECULATIVE_DECODING,
USE_LLAMA_SWAP,
USE_TRANSFORMERS_VLM_MODEL_AS_LLM,
VLM_DISABLE_QWEN3_5_THINKING,
VLM_QWEN3_5_NOTHINK_SUFFIX,
)
def _stringify_openai_message_content(content) -> str:
"""Normalize message.content from OpenAI-compatible APIs (str, null, or list of parts)."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for p in content:
if isinstance(p, dict):
t = p.get("text")
if t is None and p.get("type") == "text":
t = p.get("text", "")
if isinstance(t, str):
parts.append(t)
elif isinstance(p, str):
parts.append(p)
return "".join(parts)
return str(content)
def _extract_choice_message_text(choice: dict) -> str:
"""Extract assistant text from a chat-completions choice (handles reasoning-only / multimodal)."""
if not isinstance(choice, dict):
return ""
msg = choice.get("message") or {}
text = _stringify_openai_message_content(msg.get("content"))
if text and str(text).strip():
return text
for alt_key in ("reasoning_content", "reasoning"):
alt = msg.get(alt_key)
if isinstance(alt, str) and alt.strip():
return alt
legacy = choice.get("text")
if isinstance(legacy, str) and legacy.strip():
return legacy
return text or ""
def _report_llm_output_to_gui(text: str) -> None:
"""Report streamed LLM output to Gradio UI via gr.Info when REPORT_LLM_OUTPUTS_TO_GUI is True."""
if not REPORT_LLM_OUTPUTS_TO_GUI or not (text and str(text).strip()):
return
try:
import gradio as gr
gr.Info(text, duration=2)
except Exception:
# gr.Info may not be available (e.g. in worker process or CLI), ignore
pass
if isinstance(LLM_THREADS, str):
LLM_THREADS = int(LLM_THREADS)
max_tokens = LLM_MAX_NEW_TOKENS
temperature = LLM_TEMPERATURE
top_k = LLM_TOP_K
top_p = LLM_TOP_P
min_p = LLM_MIN_P
repetition_penalty = LLM_REPETITION_PENALTY
LLM_MAX_NEW_TOKENS: int = LLM_MAX_NEW_TOKENS
seed: int = LLM_SEED
reset: bool = LLM_RESET
stream: bool = LLM_STREAM
context_length: int = LLM_CONTEXT_LENGTH
speculative_decoding = SPECULATIVE_DECODING
if not LLM_THREADS:
threads = 1
else:
threads = LLM_THREADS
timeout_wait = LLM_TIMEOUT_WAIT
number_of_api_retry_attempts = LLM_RETRY_ATTEMPTS
class LocalLLMContextConfig:
"""Holds context length and GPU layer count for local transformers model loading."""
def __init__(self, n_ctx: int = context_length, n_gpu_layers: int = -1):
self.n_ctx = n_ctx
self.n_gpu_layers = n_gpu_layers
def update_gpu(self, new_value: int) -> None:
self.n_gpu_layers = new_value
def update_context(self, new_value: int) -> None:
self.n_ctx = new_value
# GPU and CPU context configs for load_model (CPU uses 0 GPU layers).
local_gpu_context = LocalLLMContextConfig(n_ctx=context_length, n_gpu_layers=-1)
local_cpu_context = LocalLLMContextConfig(n_ctx=context_length, n_gpu_layers=0)
class LocalLLMGenerationConfig:
def __init__(
self,
temperature=temperature,
top_k=top_k,
min_p=min_p,
top_p=top_p,
repeat_penalty=repetition_penalty,
seed=seed,
stream=stream,
max_tokens=LLM_MAX_NEW_TOKENS,
reset=reset,
):
self.temperature = temperature
self.top_k = top_k
self.top_p = top_p
self.repeat_penalty = repeat_penalty
self.seed = seed
self.max_tokens = max_tokens
self.stream = stream
self.reset = reset
def update_temp(self, new_value):
self.temperature = new_value
# ResponseObject class for AWS Bedrock calls
class ResponseObject:
def __init__(self, text, usage_metadata):
self.text = text
self.usage_metadata = usage_metadata
###
# LOCAL MODEL FUNCTIONS
###
def get_model_path(
repo_id=LOCAL_TRANSFORMERS_LLM_PII_REPO_ID,
model_filename="",
model_dir="",
hf_token=HF_TOKEN,
):
# Construct the expected local path
local_path = os.path.join(model_dir, model_filename)
print("local path for model load:", local_path)
try:
if os.path.exists(local_path):
print(f"Model already exists at: {local_path}")
return local_path
else:
if hf_token:
print("Downloading model from Hugging Face Hub with HF token")
downloaded_model_path = hf_hub_download(
repo_id=repo_id, token=hf_token, filename=model_filename
)
return downloaded_model_path
else:
print(
"No HF token found, downloading model from Hugging Face Hub without token"
)
downloaded_model_path = hf_hub_download(
repo_id=repo_id, filename=model_filename
)
return downloaded_model_path
except Exception as e:
print("Error loading model:", e)
raise Warning("Error loading model:", e)
def _normalize_huggingface_repo_id(repo_id: str) -> str:
"""
If repo_id is an http(s) URL for huggingface.co, return the org/model path segment.
Uses parsed host validation (not substring checks) to satisfy CodeQL py/incomplete-url-substring-sanitization.
"""
s = repo_id.strip()
lower = s.lower()
if not (lower.startswith("https://") or lower.startswith("http://")):
return repo_id
parsed = urlparse(s)
if parsed.scheme.lower() not in ("http", "https"):
return repo_id
host = (parsed.hostname or "").lower()
if host not in ("huggingface.co", "www.huggingface.co"):
return repo_id
path = parsed.path.strip("/")
if not path:
return repo_id
return path
def load_model(
local_model_type: str = None,
gpu_layers: int = -1,
max_context_length: int = context_length,
gpu_context: LocalLLMContextConfig = local_gpu_context,
cpu_context: LocalLLMContextConfig = local_cpu_context,
torch_device: str = "cpu",
repo_id=LOCAL_TRANSFORMERS_LLM_PII_REPO_ID,
model_filename="",
model_dir="",
compile_mode=COMPILE_MODE,
model_dtype=LLM_MODEL_DTYPE,
hf_token=HF_TOKEN,
speculative_decoding=speculative_decoding,
model=None,
tokenizer=None,
assistant_model=None,
):
"""
Load a model from Hugging Face Hub via the transformers package.
Args:
local_model_type (str): The type of local model to load.
gpu_layers (int): The number of GPU layers to offload to the GPU (-1 for default).
max_context_length (int): The maximum context length for the model.
gpu_context (LocalLLMContextConfig): Context config for GPU (n_ctx, n_gpu_layers).
cpu_context (LocalLLMContextConfig): Context config for CPU.
torch_device (str): The device to load the model on ("cuda" or "cpu").
repo_id (str): The Hugging Face repository ID where the model is located.
model_filename (str): The specific filename of the model to download from the repository.
model_dir (str): The local directory where the model will be stored or downloaded.
compile_mode (str): The compilation mode to use for the model.
model_dtype (str): The data type to use for the model.
hf_token (str): The Hugging Face token to use for the model.
speculative_decoding (bool): Whether to use speculative decoding.
model (transformers model): Optional pre-loaded model (skips loading if provided).
tokenizer (transformers tokenizer): Optional pre-loaded tokenizer.
assistant_model (transformers model): Optional assistant model for speculative decoding.
Returns:
tuple: (model, tokenizer, assistant_model).
"""
# If model is provided, validate that tokenizer is also provided and compatible
if model:
if tokenizer is None:
print(
"Warning: Model provided but tokenizer is None. Attempting to load matching tokenizer..."
)
# Try to determine model_id from model config
try:
if hasattr(model, "config") and hasattr(model.config, "_name_or_path"):
model_id = model.config._name_or_path
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_id, token=hf_token)
if not tokenizer.pad_token:
tokenizer.pad_token = tokenizer.eos_token
print(f"Loaded matching tokenizer from {model_id}")
else:
print(
"Warning: Could not determine model source to load matching tokenizer"
)
except Exception as e:
print(f"Warning: Failed to load matching tokenizer: {e}")
return model, tokenizer, assistant_model
# Use LOCAL_TRANSFORMERS_LLM_PII_MODEL_CHOICE if local_model_type is not provided
if local_model_type is None:
local_model_type = LOCAL_TRANSFORMERS_LLM_PII_MODEL_CHOICE
if isinstance(repo_id, str):
repo_id = _normalize_huggingface_repo_id(repo_id)
print("Loading model:", local_model_type)
# Verify the device and cuda settings
# Check if CUDA is enabled
import torch
torch.cuda.empty_cache()
print("Is CUDA enabled? ", torch.cuda.is_available())
print("Is a CUDA device available on this computer?", torch.backends.cudnn.enabled)
if torch.cuda.is_available():
torch_device = "cuda"
print("CUDA version:", torch.version.cuda)
# try:
# os.system("nvidia-smi")
# except Exception as e:
# print("Could not print nvidia-smi settings due to:", e)
else:
torch_device = "cpu"
gpu_layers = 0
print("Running on device:", torch_device)
print("GPU layers assigned to cuda:", gpu_layers)
if not LLM_THREADS:
threads = torch.get_num_threads()
else:
threads = LLM_THREADS
print("CPU threads:", threads)
# GPU mode
if torch_device == "cuda":
torch.cuda.empty_cache()
gpu_context.update_gpu(gpu_layers)
gpu_context.update_context(max_context_length)
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
BitsAndBytesConfig,
)
print("Loading model from transformers")
# Use the official model ID for Gemma 3 4B
model_id = repo_id
# 1. Set Data Type (dtype)
# For H200/Hopper: 'bfloat16'
# For RTX 3060/Ampere: 'float16'
dtype_str = model_dtype # os.environ.get("LLM_MODEL_DTYPE", "bfloat16").lower()
if dtype_str == "bfloat16":
torch_dtype = torch.bfloat16
elif dtype_str == "float16":
torch_dtype = torch.float16
elif dtype_str == "auto":
torch_dtype = "auto"
else:
torch_dtype = torch.float32 # A safe fallback
# 2. Set Compilation Mode
# 'max-autotune' is great for both but can be slow initially.
# 'reduce-overhead' is a faster alternative for compiling.
print("--- System Configuration ---")
print(f"Using model id: {model_id}")
print(f"Using dtype: {torch_dtype}")
print(f"Using compile mode: {compile_mode}")
print(f"Using quantization: {QUANTISE_TRANSFORMERS_LLM_MODELS}")
print("--------------------------\n")
# --- Load Tokenizer and Model Atomically ---
# Ensure both model and tokenizer are loaded from the same source
# If either fails, both should fail together to prevent mismatched pairs
try:
# Setup quantization config if enabled
quantization_config = None
if QUANTISE_TRANSFORMERS_LLM_MODELS:
if not torch.cuda.is_available():
print(
"Warning: Quantisation requires CUDA, but CUDA is not available."
)
print("Falling back to loading models without quantisation")
quantization_config = None
else:
if INT8_WITH_OFFLOAD_TO_CPU:
# This will be very slow. Requires at least 4GB of VRAM and 32GB of RAM
print(
"Using bitsandbytes for quantisation to 8 bits, with offloading to CPU"
)
max_memory = {0: "4GB", "cpu": "32GB"}
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
max_memory=max_memory,
llm_int8_enable_fp32_cpu_offload=True, # Note: if bitsandbytes has to offload to CPU, inference will be slow
)
else:
# For Gemma 4B, requires at least 6GB of VRAM
print("Using bitsandbytes for quantisation to 4 bits")
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4", # Use the modern NF4 quantisation for better performance
bnb_4bit_compute_dtype=torch_dtype,
# bnb_4bit_use_double_quant=True, # Optional: uses a second quantisation step to save even more memory
)
# Prepare load kwargs
# Match VLM behavior: always use device_map="auto" for better device handling
load_kwargs = {
# "max_seq_length": max_context_length,
"token": hf_token,
"device_map": "auto", # Always use device_map="auto" like VLM
}
if quantization_config is not None:
load_kwargs["quantization_config"] = quantization_config
print("Loading model with bitsandbytes quantisation")
else:
# Use "auto" dtype like VLM for better compatibility
load_kwargs["dtype"] = "auto" if model_dtype == "auto" else torch_dtype
print("Loading model without quantisation")
# Load tokenizer FIRST to validate the model_id is accessible
# This ensures we catch tokenizer errors before loading the (larger) model
print(f"Loading tokenizer from {model_id}...")
tokenizer = AutoTokenizer.from_pretrained(
model_id,
token=hf_token,
trust_remote_code=True,
)
if not tokenizer.pad_token:
tokenizer.pad_token = tokenizer.eos_token
print("Tokenizer loaded successfully")
# Load model from the SAME model_id to ensure compatibility
if "qwen" in local_model_type.lower() and "3.5" in local_model_type.lower():
print(f"Loading Qwen 3.5 model from {model_id}...")
from transformers import (
Qwen3_5ForCausalLM,
)
model = Qwen3_5ForCausalLM.from_pretrained(
model_id,
trust_remote_code=True,
**load_kwargs,
)
elif (
"qwen" in local_model_type.lower() and "3 " in local_model_type.lower()
):
print(f"Loading Qwen 3 model from {model_id}...")
from transformers import Qwen3VLForConditionalGeneration
model = Qwen3VLForConditionalGeneration.from_pretrained(
model_id,
trust_remote_code=True,
**load_kwargs,
)
else:
print(f"Loading model from {model_id}...")
model = AutoModelForCausalLM.from_pretrained(
model_id,
trust_remote_code=True,
**load_kwargs,
)
# Set model to evaluation mode (standard transformers approach)
# Note: With device_map="auto", don't manually move model - let it handle device placement
model.eval()
print("Model loaded successfully")
# Validate that model and tokenizer are from the same source
if hasattr(model, "config") and hasattr(model.config, "_name_or_path"):
model_source = model.config._name_or_path
if hasattr(tokenizer, "name_or_path"):
tokenizer_source = tokenizer.name_or_path
if model_source != tokenizer_source and model_id not in [
model_source,
tokenizer_source,
]:
print(
f"Warning: Model source ({model_source}) and tokenizer source ({tokenizer_source}) may differ. Using model_id: {model_id}"
)
except Exception as e:
# If loading fails, ensure both model and tokenizer are None to prevent partial state
print(f"Error loading model and tokenizer: {e}")
model = None
tokenizer = None
raise RuntimeError(
f"Failed to load model and tokenizer from {model_id}: {e}"
) from e
# Compile the Model with the selected mode 🚀
if COMPILE_TRANSFORMERS:
try:
model = torch.compile(model, mode=compile_mode, fullgraph=False)
except Exception as e:
print(f"Could not compile model: {e}. Running in eager mode.")
print(
"Loading with",
gpu_context.n_gpu_layers,
"model layers sent to GPU and a maximum context length of",
gpu_context.n_ctx,
)
# CPU mode
else:
try:
from transformers import AutoTokenizer
model_id = repo_id
tokenizer = AutoTokenizer.from_pretrained(
model_id,
token=hf_token,
trust_remote_code=True,
)
if not tokenizer.pad_token:
tokenizer.pad_token = tokenizer.eos_token
print(f"Loaded tokenizer from {model_id} for compatibility")
except Exception as e:
print(f"Warning: Could not load tokenizer: {e}")
tokenizer = None
print(
"Loading with",
cpu_context.n_gpu_layers,
"model layers sent to GPU and a maximum context length of",
cpu_context.n_ctx,
)
print("Finished loading model:", local_model_type)
print("GPU layers assigned to cuda:", gpu_layers)
# Load assistant model for speculative decoding if enabled
# Note: Assistant model typically shares the same tokenizer as the main model
# for speculative decoding, so we don't load a separate tokenizer for it
if speculative_decoding and torch_device == "cuda":
print("Loading assistant model for speculative decoding:", ASSISTANT_MODEL)
try:
from transformers import (
AutoModelForCausalLM,
BitsAndBytesConfig,
)
# Setup quantization config for assistant model (same as main model)
assistant_quantization_config = None
if QUANTISE_TRANSFORMERS_LLM_MODELS and torch.cuda.is_available():
if INT8_WITH_OFFLOAD_TO_CPU:
max_memory = {0: "4GB", "cpu": "32GB"}
assistant_quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
max_memory=max_memory,
llm_int8_enable_fp32_cpu_offload=True,
)
else:
assistant_quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch_dtype,
bnb_4bit_use_double_quant=True,
)
# Prepare load kwargs for assistant model
assistant_load_kwargs = {
"token": hf_token,
}
if assistant_quantization_config is not None:
assistant_load_kwargs["quantization_config"] = (
assistant_quantization_config
)
assistant_load_kwargs["device_map"] = "auto"
print("Loading assistant model with bitsandbytes quantisation")
else:
assistant_load_kwargs["dtype"] = torch_dtype
print("Loading assistant model without quantisation")
# Load the assistant model from ASSISTANT_MODEL
# Note: Assistant model should be compatible with the main model's tokenizer
# for speculative decoding to work correctly
print(f"Loading assistant model from {ASSISTANT_MODEL}...")
assistant_model = AutoModelForCausalLM.from_pretrained(
ASSISTANT_MODEL, **assistant_load_kwargs
)
# For non-quantized assistant models, explicitly move to device (matching VLM behavior)
if assistant_quantization_config is None:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
assistant_model = assistant_model.to(device)
# Validate that assistant model can work with the main tokenizer
# For speculative decoding, both models should use compatible tokenizers
if hasattr(assistant_model, "config") and hasattr(
assistant_model.config, "_name_or_path"
):
assistant_source = assistant_model.config._name_or_path
if hasattr(tokenizer, "name_or_path"):
tokenizer_source = tokenizer.name_or_path
if assistant_source != tokenizer_source:
print(
f"Warning: Assistant model ({assistant_source}) and tokenizer ({tokenizer_source}) are from different sources."
)
print(
"This may cause issues with speculative decoding. Ensure they are compatible."
)
# Compile the assistant model if compilation is enabled
if COMPILE_TRANSFORMERS:
try:
assistant_model = torch.compile(
assistant_model, mode=compile_mode, fullgraph=False
)
except Exception as e:
print(
f"Could not compile assistant model: {e}. Running in eager mode."
)
print("Successfully loaded assistant model for speculative decoding")
print("Note: Assistant model uses the same tokenizer as the main model")
except Exception as e:
print(f"Error loading assistant model: {e}")
assistant_model = None
else:
assistant_model = None
return model, tokenizer, assistant_model
# Initialize PII model at startup if configured (even if SHOW_TRANSFORMERS_LLM_PII_DETECTION_OPTIONS is False)
# This allows PII model to be loaded independently for PII detection tasks
if (
LOAD_TRANSFORMERS_LLM_PII_MODEL_AT_START
and SHOW_TRANSFORMERS_LLM_PII_DETECTION_OPTIONS
):
try:
print("Loading local PII model:", LOCAL_TRANSFORMERS_LLM_PII_MODEL_CHOICE)
_pii_model, _pii_tokenizer, _pii_assistant_model = load_model(
local_model_type=LOCAL_TRANSFORMERS_LLM_PII_MODEL_CHOICE,
max_context_length=context_length,
gpu_context=local_gpu_context,
cpu_context=local_cpu_context,
repo_id=LOCAL_TRANSFORMERS_LLM_PII_REPO_ID,
model_filename="",
model_dir="",
compile_mode=COMPILE_MODE,
model_dtype=LLM_MODEL_DTYPE,
hf_token=HF_TOKEN,
model=_pii_model,
tokenizer=_pii_tokenizer,
assistant_model=_pii_assistant_model,
)
except Exception as e:
print(f"Warning: Could not load PII model at startup: {e}")
print("PII model will be loaded on-demand when needed.")
@spaces.GPU(duration=MAX_SPACES_GPU_RUN_TIME)
def call_transformers_model(
prompt: str,
system_prompt: str,
gen_config: LocalLLMGenerationConfig,
model=_pii_model,
tokenizer=_pii_tokenizer,
assistant_model=_pii_assistant_model,
speculative_decoding=speculative_decoding,
use_vlm_safe_generation=VLM_DEFAULT_DO_SAMPLE,
):
"""
This function sends a request to a transformers model with the given prompt, system prompt, and generation configuration.
When use_vlm_safe_generation is True (e.g. VLM model used for LLM tasks), uses greedy decoding to avoid
sampling-related CUDA errors (e.g. invalid probability tensor in multinomial).
"""
import torch
from transformers import TextStreamer
# Custom streamer that reports streamed output to gr.Info when REPORT_LLM_OUTPUTS_TO_GUI is True
class _LLMGUIStreamer(TextStreamer):
def __init__(self, tokenizer, skip_prompt=True):
super().__init__(tokenizer, skip_prompt=skip_prompt)
self._line_buffer = ""
def on_finalized_text(self, text, stream_end=False):
super().on_finalized_text(text, stream_end)
if not REPORT_LLM_OUTPUTS_TO_GUI:
return
self._line_buffer += text
if "\n" in text or stream_end:
parts = self._line_buffer.split("\n")
for line in parts[:-1]:
if line.strip():
_report_llm_output_to_gui(line)
self._line_buffer = parts[-1] if parts else ""
if stream_end and self._line_buffer.strip():
_report_llm_output_to_gui(self._line_buffer)
# Load model and tokenizer together to ensure they're from the same source
# This prevents mismatches that could occur if they're loaded separately
if model is None or tokenizer is None:
print("Model not found. Loading model and tokenizer...")
# Use get_model_and_tokenizer() to ensure both are loaded atomically
# This is safer than calling get_pii_model() and get_pii_tokenizer() separately
loaded_model, loaded_tokenizer, assistant_model = load_model()
if model is None:
model = loaded_model
if tokenizer is None:
tokenizer = loaded_tokenizer
# if assistant_model is None and speculative_decoding:
# assistant_model = # get_assistant_model()
if model is None or tokenizer is None:
raise ValueError(
"No model or tokenizer available. Either pass them as parameters or ensure LOAD_TRANSFORMERS_LLM_PII_MODEL_AT_START is True."
)
# Apply reasoning suffix to prompt if configured
if REASONING_SUFFIX and REASONING_SUFFIX.strip():
prompt = f"{prompt} {REASONING_SUFFIX}".strip()
# When using VLM as LLM with Qwen3.5 thinking disabled, we append after the generation
# prompt so the model continues with the answer (avoids continue_final_message which can fail
# when the chat template does not include the final assistant message in the rendered string).
add_nothink_assistant_turn = (
VLM_DISABLE_QWEN3_5_THINKING
and "Qwen 3.5" in LOCAL_TRANSFORMERS_LLM_PII_MODEL_CHOICE
) or (
VLM_DISABLE_QWEN3_5_THINKING
and USE_TRANSFORMERS_VLM_MODEL_AS_LLM
and (
"Qwen 3.5" in SELECTED_LOCAL_TRANSFORMERS_VLM_MODEL
or "Qwen3.5" in SELECTED_LOCAL_TRANSFORMERS_VLM_MODEL
)
)
# 1. Define the conversation as a list of dictionaries
# Note: The multimodal format [{"type": "text", "text": text}] is only needed for actual multimodal models
# with images/videos. For text-only content, even multimodal models expect plain strings.
# Check if system_prompt is meaningful (not empty/None)
has_system_prompt = system_prompt and str(system_prompt).strip()
# Always use string format for text-only content, regardless of MULTIMODAL_PROMPT_FORMAT setting
# MULTIMODAL_PROMPT_FORMAT should only be used when you actually have multimodal inputs (images, etc.)
if MULTIMODAL_PROMPT_FORMAT:
conversation = []
if has_system_prompt:
conversation.append(
{
"role": "system",
"content": [{"type": "text", "text": str(system_prompt)}],
}
)
conversation.append(
{"role": "user", "content": [{"type": "text", "text": str(prompt)}]}
)
else:
conversation = []
if has_system_prompt:
conversation.append({"role": "system", "content": str(system_prompt)})
conversation.append({"role": "user", "content": str(prompt)})
if PRINT_TRANSFORMERS_USER_PROMPT:
print("System prompt:", system_prompt)
print("User prompt:", prompt)
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
if assistant_model is not None:
assistant_model = assistant_model.to(device)
if PRINT_TRANSFORMERS_USER_PROMPT:
print("Model device:", device)
print("Model device type:", type(device))
try:
# Try applying chat template with system prompt (if present)
# Create inputs dict like VLM does - this allows model to handle device placement automatically
# From transformers v5, apply_chat_template returns BatchEncoding; extract input_ids tensor
_encoded = tokenizer.apply_chat_template(
conversation,
add_generation_prompt=True,
tokenize=True,
return_tensors="pt",
)
input_ids = (
_encoded["input_ids"].to(device)
if hasattr(_encoded, "keys")
else _encoded.to(device)
)
if PRINT_TRANSFORMERS_USER_PROMPT:
print("Input IDs:", input_ids)
print("Rendered prompt:")
rendered = tokenizer.apply_chat_template(
conversation,
add_generation_prompt=True,
tokenize=False,
)
print(rendered)
print("-" * 50)
except (TypeError, KeyError, IndexError, ValueError) as e:
# If chat template fails, try without system prompt (some models don't support it)
if has_system_prompt:
print(
f"Chat template failed with system prompt ({e}), trying without system prompt..."
)
# Try again with only user prompt
user_only_conversation = [{"role": "user", "content": str(prompt)}]
try:
_encoded = tokenizer.apply_chat_template(
user_only_conversation,
add_generation_prompt=True,
tokenize=True,
return_tensors="pt",
)
input_ids = (
_encoded["input_ids"].to(device)
if hasattr(_encoded, "keys")
else _encoded.to(device)
)
if PRINT_TRANSFORMERS_USER_PROMPT:
print("Input IDs:", input_ids)
print("Rendered prompt (without system):")
rendered = tokenizer.apply_chat_template(
user_only_conversation,
add_generation_prompt=True,
tokenize=False,
)
print(rendered)
print("-" * 50)
except Exception as e2:
print(
f"Chat template failed without system prompt ({e2}), using manual tokenization"
)
# Combine system and user prompts manually as fallback
full_prompt = (
f"{system_prompt}\n\n{prompt}" if has_system_prompt else prompt
)
# Tokenize manually with special tokens (tokenizer() returns BatchEncoding; extract tensor)
encoded = tokenizer(
full_prompt, return_tensors="pt", add_special_tokens=True
)
input_ids = encoded["input_ids"].to(device)
else:
# No system prompt, but chat template still failed - use manual tokenization
print(f"Chat template failed ({e}), using manual tokenization")
full_prompt = str(prompt)
encoded = tokenizer(
full_prompt, return_tensors="pt", add_special_tokens=True
)
input_ids = encoded["input_ids"].to(device)
except Exception as e:
print("Error applying chat template:", e)
import traceback
traceback.print_exc()
raise
attention_mask = torch.ones_like(input_ids).to(device)
# When disabling Qwen3.5 thinking, append suffix to prompt so model continues with the answer (same as run_vlm).
if add_nothink_assistant_turn:
nothink_tokens = tokenizer.encode(
VLM_QWEN3_5_NOTHINK_SUFFIX, add_special_tokens=False, return_tensors="pt"
)
if nothink_tokens.dim() == 1:
nothink_tokens = nothink_tokens.unsqueeze(0)
nothink_tokens = nothink_tokens.to(device)
input_ids = torch.cat([input_ids, nothink_tokens], dim=1)
attention_mask = torch.cat(
[
attention_mask,
torch.ones(
(attention_mask.shape[0], nothink_tokens.shape[1]),
device=device,
dtype=attention_mask.dtype,
),
],
dim=1,
)
# Map generation config to transformers parameters.
# When use_vlm_safe_generation (VLM model used for LLM tasks), use greedy decoding to avoid
# "probability tensor contains inf/nan or element < 0" errors in torch.multinomial on some setups.
if use_vlm_safe_generation:
generation_kwargs = {
"max_new_tokens": gen_config.max_tokens,
"do_sample": False,
"attention_mask": attention_mask,
}
else:
generation_kwargs = {
"max_new_tokens": gen_config.max_tokens,
"temperature": gen_config.temperature,
"top_p": gen_config.top_p,
"top_k": gen_config.top_k,
"do_sample": True,
"attention_mask": attention_mask,
}
if gen_config.stream:
streamer = (
_LLMGUIStreamer(tokenizer, skip_prompt=True)
if REPORT_LLM_OUTPUTS_TO_GUI
else TextStreamer(tokenizer, skip_prompt=True)
)
else:
streamer = None
# Remove parameters that don't exist in transformers (repetition_penalty is valid for both sampling and greedy)
if hasattr(gen_config, "repeat_penalty") and gen_config.repeat_penalty is not None:
generation_kwargs["repetition_penalty"] = gen_config.repeat_penalty
if PRINT_TRANSFORMERS_USER_PROMPT:
print("Generation kwargs:", generation_kwargs)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model.config.pad_token_id = tokenizer.pad_token_id
# --- Timed Inference Test ---
print("\nStarting model inference...")
start_time = time.time()
# Use speculative decoding if assistant model is available
try:
if speculative_decoding and assistant_model is not None:
if PRINT_TRANSFORMERS_USER_PROMPT:
print("Using speculative decoding with assistant model")
outputs = model.generate(
input_ids,
assistant_model=assistant_model,
**generation_kwargs,
streamer=streamer,
)
else:
if PRINT_TRANSFORMERS_USER_PROMPT:
print("Generating without speculative decoding")
outputs = model.generate(input_ids, **generation_kwargs, streamer=streamer)
except Exception as e:
error_msg = str(e)
# Check if this is a CUDA compilation error
if (
"sm_120" in error_msg
or "LLVM ERROR" in error_msg
or "Cannot select" in error_msg
):
print("\n" + "=" * 80)
print("CUDA COMPILATION ERROR DETECTED")
print("=" * 80)
print(
"\nThe error is caused by torch.compile() trying to compile CUDA kernels"
)
print(
"with incompatible settings. This is a known issue with certain CUDA/PyTorch"
)
print("combinations.\n")
print(
"SOLUTION: Disable model compilation by setting COMPILE_TRANSFORMERS=False"
)
print("in your config file (config/app_config.env).")
print(
"\nThe model will still work without compilation, just slightly slower."
)
print("=" * 80 + "\n")
raise RuntimeError(
"CUDA compilation error detected. Please set COMPILE_TRANSFORMERS=False "
"in your config file to disable model compilation and avoid this error."
) from e
else:
# Re-raise other errors as-is
raise
end_time = time.time()
# --- Decode and Display Results ---
# Extract only the newly generated tokens (exclude input tokens)
input_length = input_ids.shape[-1]
# Handle different output formats from model.generate()
# model.generate() returns a tensor with shape [batch_size, sequence_length]
# that includes both input and generated tokens
if isinstance(outputs, torch.Tensor):
# If outputs is a tensor, extract the new tokens
if outputs.dim() == 2:
# Shape: [batch_size, sequence_length]
new_tokens = outputs[0, input_length:].clone()
elif outputs.dim() == 1:
# Shape: [sequence_length] (single sequence)
new_tokens = outputs[input_length:].clone()
else:
raise ValueError(f"Unexpected output tensor shape: {outputs.shape}")
else:
# If outputs is a sequence or other format
if hasattr(outputs, "__getitem__"):
new_tokens = (
outputs[0][input_length:]
if len(outputs) > 0
else outputs[input_length:]
)
else:
raise ValueError(f"Unexpected output type: {type(outputs)}")
# Ensure new_tokens is a tensor and on CPU for decoding
if isinstance(new_tokens, torch.Tensor):
new_tokens = new_tokens.cpu().clone()
# Convert to list for decoding (some tokenizers prefer lists)
new_tokens_list = new_tokens.tolist()
else:
new_tokens_list = (
list(new_tokens) if hasattr(new_tokens, "__iter__") else [new_tokens]
)
if PRINT_TRANSFORMERS_USER_PROMPT:
print(f"Input length: {input_length}")
print(f"Output shape: {outputs.shape if hasattr(outputs, 'shape') else 'N/A'}")
print(f"New tokens count: {len(new_tokens_list)}")
print(f"First 20 new token IDs: {new_tokens_list[:20]}")
# Decode the tokens
# Use the token list for decoding (more reliable than tensor)
try:
assistant_reply = tokenizer.decode(
new_tokens_list, skip_special_tokens=True, clean_up_tokenization_spaces=True
)
except Exception as e:
print(f"Warning: Error decoding tokens: {e}")
print(f"New tokens count: {len(new_tokens_list)}")
print(f"New tokens (first 20): {new_tokens_list[:20]}")
# Try alternative decoding methods
try:
# Try with tensor directly
if isinstance(new_tokens, torch.Tensor):
assistant_reply = tokenizer.decode(
new_tokens,
skip_special_tokens=True,
clean_up_tokenization_spaces=True,
)
else:
raise e
except Exception as e2:
print(f"Error with tensor decoding: {e2}")
# Last resort: try to decode each token individually to see which ones fail
try:
decoded_parts = []
failed_tokens = []
for i, token_id in enumerate(
new_tokens_list[:200]
): # Limit to first 200 to avoid issues
try:
decoded = tokenizer.decode([token_id], skip_special_tokens=True)
decoded_parts.append(decoded)
except Exception as token_error:
failed_tokens.append((i, token_id, str(token_error)))
decoded_parts.append(f"")
if failed_tokens:
print(
f"Warning: {len(failed_tokens)} tokens failed to decode individually"
)
print(f"First few failed tokens: {failed_tokens[:5]}")
assistant_reply = "".join(decoded_parts)
except Exception as e3:
print(f"Error with individual token decoding: {e3}")
assistant_reply = f""
num_input_tokens = input_length
num_generated_tokens = (
len(new_tokens_list) if hasattr(new_tokens_list, "__len__") else 0
)
duration = end_time - start_time
tokens_per_second = num_generated_tokens / duration if duration > 0 else 0
if PRINT_TRANSFORMERS_USER_PROMPT:
print(f"\nDecoded output length: {len(assistant_reply)} characters")
print(f"First 200 chars of output: {assistant_reply[:200]}")
print("\n--- Performance ---")
print(f"Time taken: {duration:.2f} seconds")
print(f"Generated tokens: {num_generated_tokens}")
print(f"Tokens per second: {tokens_per_second:.2f}")
return assistant_reply, num_input_tokens, num_generated_tokens
# Function to send a request and update history
def send_request(
prompt: str,
conversation_history: List[dict],
client: ai.Client | OpenAI,
config: types.GenerateContentConfig,
model_choice: str,
system_prompt: str,
temperature: float,
bedrock_runtime: boto3.Session.client,
model_source: str,
local_model=_pii_model,
tokenizer=_pii_tokenizer,
assistant_model=_pii_assistant_model,
assistant_prefill="",
progress=Progress(track_tqdm=True),
api_url: str = None,
) -> Tuple[str, List[dict]]:
"""Sends a request to a language model and manages the conversation history.
This function constructs the full prompt by appending the new user prompt to the conversation history,
generates a response from the model, and updates the conversation history with the new prompt and response.
It handles different model sources (Gemini, AWS, Local, inference-server) and includes retry logic for API calls.
Args:
prompt (str): The user's input prompt to be sent to the model.
conversation_history (List[dict]): A list of dictionaries representing the ongoing conversation.
Each dictionary should have 'role' and 'parts' keys.
client (ai.Client): The API client object for the chosen model (e.g., Gemini `ai.Client`, or Azure/OpenAI `OpenAI`).
config (types.GenerateContentConfig): Configuration settings for content generation (e.g., Gemini `types.GenerateContentConfig`).
model_choice (str): The specific model identifier to use (e.g., "gemini-pro", "claude-v2").
system_prompt (str): An optional system-level instruction or context for the model.
temperature (float): Controls the randomness of the model's output, with higher values leading to more diverse responses.
bedrock_runtime (boto3.Session.client): The boto3 Bedrock runtime client object for AWS models.
model_source (str): Indicates the source/provider of the model (e.g., "Gemini", "AWS", "Local", "inference-server").
local_model (list, optional): A list containing the local model and its tokenizer (if `model_source` is "Local"). Defaults to [].
tokenizer (object, optional): The tokenizer object for local models. Defaults to None.
assistant_model (object, optional): An optional assistant model used for speculative decoding with local models. Defaults to None.
assistant_prefill (str, optional): A string to pre-fill the assistant's response, useful for certain models like Claude. Defaults to "".
progress (Progress, optional): A progress object for tracking the operation, typically from `tqdm`. Defaults to Progress(track_tqdm=True).
api_url (str, optional): The API URL for inference-server calls. Required when model_source is 'inference-server'.
Returns:
Tuple[str, List[dict]]: A tuple containing the model's response text and the updated conversation history.
"""
# Constructing the full prompt from the conversation history
full_prompt = "Conversation history:\n"
num_transformer_input_tokens = 0
num_transformer_generated_tokens = 0
response_text = ""
if not model_choice or model_choice == "":
model_choice = None
for entry in conversation_history:
role = entry[
"role"
].capitalize() # Assuming the history is stored with 'role' and 'parts'
message = " ".join(entry["parts"]) # Combining all parts of the message
full_prompt += f"{role}: {message}\n"
# Adding the new user prompt
full_prompt += f"\nUser: {prompt}"
# Clear any existing progress bars
tqdm._instances.clear()
progress_bar = range(0, number_of_api_retry_attempts)
# Generate the model's response
if "Gemini" in model_source:
for i in progress_bar:
try:
print("Calling Gemini model, attempt", i + 1)
response = client.models.generate_content(
model=model_choice, contents=full_prompt, config=config
)
# print("Successful call to Gemini model.")
break
except Exception as e:
# If fails, try again after X seconds in case there is a throttle limit
print(
"Call to Gemini model failed:",
e,
" Waiting for ",
str(timeout_wait),
"seconds and trying again.",
)
time.sleep(timeout_wait)
if i == number_of_api_retry_attempts:
return (
ResponseObject(text="", usage_metadata={"RequestId": "FAILED"}),
conversation_history,
response_text,
num_transformer_input_tokens,
num_transformer_generated_tokens,
)
elif "AWS" in model_source:
for i in progress_bar:
try:
# print("Calling AWS Bedrock model, attempt", i + 1)
response = call_aws_bedrock(
prompt,
system_prompt,
temperature,
max_tokens,
model_choice,
bedrock_runtime=bedrock_runtime,
assistant_prefill=assistant_prefill,
)
# print("Successful call to Claude model.")
break
except Exception as e:
# If fails, try again after X seconds in case there is a throttle limit
print(
"Call to Bedrock model failed:",
e,
" Waiting for ",
str(timeout_wait),
"seconds and trying again.",
)
time.sleep(timeout_wait)
if i == number_of_api_retry_attempts:
return (
ResponseObject(text="", usage_metadata={"RequestId": "FAILED"}),
conversation_history,
response_text,
num_transformer_input_tokens,
num_transformer_generated_tokens,
)
elif "Azure/OpenAI" in model_source:
for i in progress_bar:
try:
print("Calling Azure/OpenAI inference model, attempt", i + 1)
messages = [
{
"role": "system",
"content": system_prompt,
},
{
"role": "user",
"content": prompt,
},
]
response_raw = client.chat.completions.create(
messages=messages,
model=model_choice,
temperature=temperature,
max_completion_tokens=max_tokens,
)
response_text = response_raw.choices[0].message.content
usage = getattr(response_raw, "usage", None)
input_tokens = 0
output_tokens = 0
if usage is not None:
input_tokens = getattr(
usage, "input_tokens", getattr(usage, "prompt_tokens", 0)
)
output_tokens = getattr(
usage, "output_tokens", getattr(usage, "completion_tokens", 0)
)
response = ResponseObject(
text=response_text,
usage_metadata={
"inputTokens": input_tokens,
"outputTokens": output_tokens,
},
)
break
except Exception as e:
print(
"Call to Azure/OpenAI model failed:",
e,
" Waiting for ",
str(timeout_wait),
"seconds and trying again.",
)
time.sleep(timeout_wait)
if i == number_of_api_retry_attempts:
return (
ResponseObject(text="", usage_metadata={"RequestId": "FAILED"}),
conversation_history,
response_text,
num_transformer_input_tokens,
num_transformer_generated_tokens,
)
elif "Local" in model_source:
# This is the local model. When USE_TRANSFORMERS_VLM_MODEL_AS_LLM and model_choice is the VLM model, use the loaded VLM model/tokenizer.
vlm_model, vlm_tokenizer = None, None
if (
USE_TRANSFORMERS_VLM_MODEL_AS_LLM
and model_choice == SELECTED_LOCAL_TRANSFORMERS_VLM_MODEL
):
try:
from tools.run_vlm import get_loaded_vlm_model_and_tokenizer
vlm_model, vlm_tokenizer = get_loaded_vlm_model_and_tokenizer()
except Exception as e:
print(
f"Could not get VLM model for LLM task (USE_TRANSFORMERS_VLM_MODEL_AS_LLM): {e}"
)
for i in progress_bar:
try:
print("Calling local model, attempt", i + 1)
gen_config = LocalLLMGenerationConfig()
gen_config.update_temp(temperature)
# Call transformers model; use VLM model/tokenizer when USE_TRANSFORMERS_VLM_MODEL_AS_LLM and available
if vlm_model is not None and vlm_tokenizer is not None:
(
response,
num_transformer_input_tokens,
num_transformer_generated_tokens,
) = call_transformers_model(
prompt,
system_prompt,
gen_config,
model=vlm_model,
tokenizer=vlm_tokenizer,
use_vlm_safe_generation=VLM_DEFAULT_DO_SAMPLE,
)
else:
(
response,
num_transformer_input_tokens,
num_transformer_generated_tokens,
) = call_transformers_model(
prompt,
system_prompt,
gen_config,
)
response_text = response
break
except Exception as e:
# If fails, try again after X seconds in case there is a throttle limit
print(
"Call to local model failed:",
e,
" Waiting for ",
str(timeout_wait),
"seconds and trying again.",
)
time.sleep(timeout_wait)
if i == number_of_api_retry_attempts:
return (
ResponseObject(text="", usage_metadata={"RequestId": "FAILED"}),
conversation_history,
response_text,
num_transformer_input_tokens,
num_transformer_generated_tokens,
)
elif "inference-server" in model_source:
# This is the inference-server API
for i in progress_bar:
try:
print("Calling inference-server API, attempt", i + 1)
if api_url is None:
raise ValueError(
"api_url is required when model_source is 'inference-server'"
)
gen_config = LocalLLMGenerationConfig()
gen_config.update_temp(temperature)
response = call_inference_server_api(
prompt,
system_prompt,
gen_config,
api_url=api_url,
model_name=model_choice,
use_llama_swap=USE_LLAMA_SWAP,
)
break
except Exception as e:
# If fails, try again after X seconds in case there is a throttle limit
print(
"Call to inference-server API failed:",
e,
" Waiting for ",
str(timeout_wait),
"seconds and trying again.",
)
time.sleep(timeout_wait)
if i == number_of_api_retry_attempts:
return (
ResponseObject(text="", usage_metadata={"RequestId": "FAILED"}),
conversation_history,
response_text,
num_transformer_input_tokens,
num_transformer_generated_tokens,
)
else:
print("Model source not recognised")
return (
ResponseObject(text="", usage_metadata={"RequestId": "FAILED"}),
conversation_history,
response_text,
num_transformer_input_tokens,
num_transformer_generated_tokens,
)
# Update the conversation history with the new prompt and response
conversation_history.append({"role": "user", "parts": [prompt]})
# Check if is a LLama.cpp model response or inference-server response
if isinstance(response, ResponseObject):
response_text = response.text
elif "choices" in response: # LLama.cpp model response or inference-server response
# Check for GPT-OSS thinking models (case-insensitive, handle both hyphen and underscore)
if "gpt-oss" in model_choice.lower() or "gpt_oss" in model_choice.lower():
content = _stringify_openai_message_content(
response["choices"][0]["message"].get("content")
)
# Split on the final channel marker to extract only the final output (not thinking tokens)
parts = content.split("<|start|>assistant<|channel|>final<|message|>")
if len(parts) > 1:
response_text = parts[1]
# Following format may be from llama.cpp inference-server response
elif len(parts) == 1:
parts = content.split("<|end|>")
if len(parts) > 1:
response_text = parts[1]
else:
print(
"Warning: Could not find final channel marker in GPT-OSS response. Using full content."
)
response_text = content
else:
# Fallback: if marker not found, use the full content (may include thinking tokens)
print(
"Warning: Could not find final channel marker in GPT-OSS response. Using full content."
)
response_text = content
else:
response_text = _extract_choice_message_text(response["choices"][0])
elif model_source == "Gemini":
response_text = response.text
else: # Assume transformers model response
# Check for GPT-OSS thinking models (case-insensitive, handle both hyphen and underscore)
if "gpt-oss" in model_choice.lower() or "gpt_oss" in model_choice.lower():
# Split on the final channel marker to extract only the final output (not thinking tokens)
parts = response.split("<|start|>assistant<|channel|>final<|message|>")
if len(parts) > 1:
response_text = parts[1]
else:
# Fallback: if marker not found, use the full content (may include thinking tokens)
print(
"Warning: Could not find final channel marker in GPT-OSS response. Using full content."
)
response_text = response
else:
response_text = response
# Strip <|end|> tags (used by GPT-OSS thinking models to mark end of thinking)
response_text = response_text or ""
response_text = re.sub(r"<\|end\|>", "", response_text)
# Replace multiple spaces with single space
response_text = re.sub(r" {2,}", " ", response_text)
response_text = response_text.strip()
conversation_history.append({"role": "assistant", "parts": [response_text]})
return (
response,
conversation_history,
response_text,
num_transformer_input_tokens,
num_transformer_generated_tokens,
)
def process_requests(
prompts: List[str],
system_prompt: str,
conversation_history: List[dict],
whole_conversation: List[str],
whole_conversation_metadata: List[str],
client: ai.Client | OpenAI,
config: types.GenerateContentConfig,
model_choice: str,
temperature: float,
bedrock_runtime: boto3.Session.client,
model_source: str,
batch_no: int = 1,
local_model=_pii_model,
tokenizer=_pii_tokenizer,
assistant_model=_pii_assistant_model,
master: bool = False,
assistant_prefill="",
api_url: str = None,
) -> Tuple[List[ResponseObject], List[dict], List[str], List[str]]:
"""
Processes a list of prompts by sending them to the model, appending the responses to the conversation history, and updating the whole conversation and metadata.
Args:
prompts (List[str]): A list of prompts to be processed.
system_prompt (str): The system prompt.
conversation_history (List[dict]): The history of the conversation.
whole_conversation (List[str]): The complete conversation including prompts and responses.
whole_conversation_metadata (List[str]): Metadata about the whole conversation.
client (object): The client to use for processing the prompts, from either Gemini or OpenAI client.
config (dict): Configuration for the model.
model_choice (str): The choice of model to use.
temperature (float): The temperature parameter for the model.
model_source (str): Source of the model, whether local, AWS, Gemini, or inference-server
batch_no (int): Batch number of the large language model request.
local_model: Local gguf model (if loaded)
master (bool): Is this request for the master table.
assistant_prefill (str, optional): Is there a prefill for the assistant response. Currently only working for AWS model calls
bedrock_runtime: The client object for boto3 Bedrock runtime
api_url (str, optional): The API URL for inference-server calls. Required when model_source is 'inference-server'.
Returns:
Tuple[List[ResponseObject], List[dict], List[str], List[str]]: A tuple containing the list of responses, the updated conversation history, the updated whole conversation, and the updated whole conversation metadata.
"""
responses = list()
# Clear any existing progress bars
tqdm._instances.clear()
for prompt in prompts:
(
response,
conversation_history,
response_text,
num_transformer_input_tokens,
num_transformer_generated_tokens,
) = send_request(
prompt,
conversation_history,
client=client,
config=config,
model_choice=model_choice,
system_prompt=system_prompt,
temperature=temperature,
local_model=local_model,
tokenizer=tokenizer,
assistant_model=assistant_model,
assistant_prefill=assistant_prefill,
bedrock_runtime=bedrock_runtime,
model_source=model_source,
api_url=api_url,
)
responses.append(response)
whole_conversation.append(system_prompt)
whole_conversation.append(prompt)
whole_conversation.append(response_text)
whole_conversation_metadata.append(f"Batch {batch_no}:")
try:
if "AWS" in model_source:
output_tokens = response.usage_metadata.get("outputTokens", 0)
input_tokens = response.usage_metadata.get("inputTokens", 0)
elif "Gemini" in model_source:
output_tokens = response.usage_metadata.candidates_token_count
input_tokens = response.usage_metadata.prompt_token_count
elif "Azure/OpenAI" in model_source:
input_tokens = response.usage_metadata.get("inputTokens", 0)
output_tokens = response.usage_metadata.get("outputTokens", 0)
elif "Local" in model_source:
input_tokens = num_transformer_input_tokens
output_tokens = num_transformer_generated_tokens
elif "inference-server" in model_source:
# inference-server returns the same format as llama-cpp
output_tokens = response["usage"].get("completion_tokens", 0)
input_tokens = response["usage"].get("prompt_tokens", 0)
else:
input_tokens = 0
output_tokens = 0
whole_conversation_metadata.append(
"input_tokens: "
+ str(input_tokens)
+ " output_tokens: "
+ str(output_tokens)
)
except KeyError as e:
print(f"Key error: {e} - Check the structure of response.usage_metadata")
return (
responses,
conversation_history,
whole_conversation,
whole_conversation_metadata,
response_text,
)
def call_inference_server_api(
formatted_string: str,
system_prompt: str,
gen_config: LocalLLMGenerationConfig,
api_url: str = "http://localhost:8080",
model_name: str = None,
use_llama_swap: bool = USE_LLAMA_SWAP,
):
"""
Calls a inference-server API endpoint with a formatted user message and system prompt,
using generation parameters from the LocalLLMGenerationConfig object.
This function provides the same interface as call_transformers_model but calls
a remote inference-server instance instead of a local model.
Args:
formatted_string (str): The formatted input text for the user's message.
system_prompt (str): The system-level instructions for the model.
gen_config (LocalLLMGenerationConfig): An object containing generation parameters.
api_url (str): The base URL of the inference-server API (default: "http://localhost:8080").
model_name (str): Optional model name to use. If None, uses the default model.
use_llama_swap (bool): Whether to use llama-swap for the model.
Returns:
dict: Response in the same format as the inference-server chat completions API
Example:
# Create generation config
gen_config = LocalLLMGenerationConfig(temperature=0.7, max_tokens=100)
# Call the API
response = call_inference_server_api(
formatted_string="Hello, how are you?",
system_prompt="You are a helpful assistant.",
gen_config=gen_config,
api_url="http://localhost:8080"
)
# Extract the response text
response_text = response['choices'][0]['message']['content']
Integration Example:
# To use inference-server instead of local model:
# 1. Set model_source to "inference-server"
# 2. Provide api_url parameter
# 3. Call your existing functions as normal
responses, conversation_history, whole_conversation, whole_conversation_metadata, response_text = call_llm_with_markdown_table_checks(
batch_prompts=["Your prompt here"],
system_prompt="Your system prompt",
conversation_history=[],
whole_conversation=[],
whole_conversation_metadata=[],
client=None, # Not used for inference-server
client_config=None, # Not used for inference-server
model_choice="your-model-name", # Model name on the server
temperature=0.7,
reported_batch_no=1,
local_model=None, # Not used for inference-server
tokenizer=None, # Not used for inference-server
bedrock_runtime=None, # Not used for inference-server
model_source="inference-server",
MAX_OUTPUT_VALIDATION_ATTEMPTS=3,
api_url="http://localhost:8080"
)
"""
# Extract parameters from the gen_config object
temperature = gen_config.temperature
top_k = gen_config.top_k
top_p = gen_config.top_p
repeat_penalty = gen_config.repeat_penalty
seed = gen_config.seed
max_tokens = gen_config.max_tokens
stream = gen_config.stream
# Prepare the request payload
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": formatted_string},
]
payload = {
"messages": messages,
"temperature": temperature,
"top_k": top_k,
"top_p": top_p,
"repeat_penalty": repeat_penalty,
"seed": seed,
"max_tokens": max_tokens,
"stream": stream,
"stop": LLM_STOP_STRINGS if LLM_STOP_STRINGS else [],
}
# Include model in payload when set (vLLM/OpenAI-compatible servers; llama-swap or not).
if model_name or model_name != "":
payload["model"] = model_name
# Match VLM path: Qwen3 / Qwen3.5 on vLLM may stream only "thinking" unless disabled.
if INFERENCE_SERVER_DISABLE_THINKING:
payload["chat_template_kwargs"] = {"enable_thinking": False}
# Determine the endpoint based on streaming preference
if stream:
endpoint = f"{api_url}/v1/chat/completions"
else:
endpoint = f"{api_url}/v1/chat/completions"
try:
if stream:
# Handle streaming response
response = requests.post(
endpoint,
json=payload,
headers={"Content-Type": "application/json"},
stream=True,
timeout=timeout_wait,
)
response.raise_for_status()
final_tokens = []
output_tokens = 0
line_buffer = ""
for line in response.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
data = line[6:] # Remove 'data: ' prefix
if data.strip() == "[DONE]":
if REPORT_LLM_OUTPUTS_TO_GUI and line_buffer.strip():
_report_llm_output_to_gui(line_buffer)
break
try:
chunk = json.loads(data)
if "choices" in chunk and len(chunk["choices"]) > 0:
delta = chunk["choices"][0].get("delta", {})
token = delta.get("content")
token = _stringify_openai_message_content(token)
if not token:
for alt in (
"reasoning_content",
"reasoning",
):
t = delta.get(alt)
if isinstance(t, str) and t:
token = t
break
if token:
print(token, end="", flush=True)
final_tokens.append(token)
output_tokens += 1
if REPORT_LLM_OUTPUTS_TO_GUI:
line_buffer += token
if "\n" in token:
parts = line_buffer.split("\n")
for complete_line in parts[:-1]:
if complete_line.strip():
_report_llm_output_to_gui(
complete_line
)
line_buffer = parts[-1] if parts else ""
except json.JSONDecodeError:
continue
if REPORT_LLM_OUTPUTS_TO_GUI and line_buffer.strip():
_report_llm_output_to_gui(line_buffer)
print() # newline after stream finishes
text = "".join(final_tokens)
# Estimate input tokens (rough approximation)
input_tokens = len((system_prompt + "\n" + formatted_string).split())
return {
"choices": [
{
"index": 0,
"finish_reason": "stop",
"message": {"role": "assistant", "content": text},
}
],
"usage": {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
},
}
else:
# Handle non-streaming response
response = requests.post(
endpoint,
json=payload,
headers={"Content-Type": "application/json"},
timeout=timeout_wait,
)
response.raise_for_status()
result = response.json()
# Ensure the response has the expected format
if "choices" not in result:
raise ValueError("Invalid response format from inference-server")
return result
except requests.exceptions.RequestException as e:
raise ConnectionError(
f"Failed to connect to inference-server at {api_url}: {str(e)}"
)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON response from inference-server: {str(e)}")
except Exception as e:
raise RuntimeError(f"Error calling inference-server API: {str(e)}")
###
# LLM FUNCTIONS
###
def construct_gemini_generative_model(
in_api_key: str,
temperature: float,
model_choice: str,
system_prompt: str,
max_tokens: int,
random_seed=seed,
) -> Tuple[object, dict]:
"""
Constructs a GenerativeModel for Gemini API calls.
...
"""
# Construct a GenerativeModel
try:
if in_api_key:
# print("Getting API key from textbox")
api_key = in_api_key
client = ai.Client(api_key=api_key)
elif "GOOGLE_API_KEY" in os.environ:
# print("Searching for API key in environmental variables")
api_key = os.environ["GOOGLE_API_KEY"]
client = ai.Client(api_key=api_key)
else:
print("No Gemini API key found")
raise Warning("No Gemini API key found.")
except Exception as e:
print("Error constructing Gemini generative model:", e)
raise Warning("Error constructing Gemini generative model:", e)
config = types.GenerateContentConfig(
temperature=temperature, max_output_tokens=max_tokens, seed=random_seed
)
return client, config
def construct_azure_client(in_api_key: str, endpoint: str) -> Tuple[object, dict]:
"""
Constructs an OpenAI client for Azure/OpenAI AI Inference.
"""
try:
key = None
if in_api_key:
key = in_api_key
elif os.environ.get("AZURE_OPENAI_API_KEY"):
key = os.environ["AZURE_OPENAI_API_KEY"]
if not key:
raise Warning("No Azure/OpenAI API key found.")
if not endpoint:
endpoint = os.environ.get("AZURE_OPENAI_INFERENCE_ENDPOINT", "")
if not endpoint:
# Assume using OpenAI API
client = OpenAI(
api_key=key,
)
else:
# Use the provided endpoint
client = OpenAI(
api_key=key,
base_url=f"{endpoint}",
)
return client, dict()
except Exception as e:
print("Error constructing Azure/OpenAI client:", e)
raise
def call_aws_bedrock(
prompt: str,
system_prompt: str,
temperature: float,
max_tokens: int,
model_choice: str,
bedrock_runtime: boto3.Session.client,
assistant_prefill: str = "",
max_retries: int = 5,
retry_delay_seconds: float = 2.0,
) -> ResponseObject:
"""
This function sends a request to AWS Bedrock with the following parameters:
- prompt: The user's input prompt to be processed by the model.
- system_prompt: A system-defined prompt that provides context or instructions for the model.
- temperature: A value that controls the randomness of the model's output, with higher values resulting in more diverse responses.
- max_tokens: The maximum number of tokens (words or characters) in the model's response.
- model_choice: The specific model to use for processing the request.
- bedrock_runtime: The client object for boto3 Bedrock runtime
- assistant_prefill: A string indicating the text that the response should start with.
- max_retries: Maximum number of retry attempts on failure (default 5).
- retry_delay_seconds: Delay in seconds between retries (default 2.0).
The function constructs the request configuration, invokes the model, extracts the response text, and returns a ResponseObject containing the text and metadata.
"""
inference_config = {
"maxTokens": max_tokens,
"temperature": temperature,
}
# Using an assistant prefill only works for Anthropic models.
if assistant_prefill and "anthropic" in model_choice:
assistant_prefill_added = True
messages = [
{
"role": "user",
"content": [
{"text": prompt},
],
},
{
"role": "assistant",
# Pre-filling with '|'
"content": [{"text": assistant_prefill}],
},
]
else:
assistant_prefill_added = False
messages = [
{
"role": "user",
"content": [
{"text": prompt},
],
}
]
system_prompt_list = [{"text": system_prompt}]
last_error = None
for attempt in range(1, max_retries + 1):
try:
# The converse API call.
api_response = bedrock_runtime.converse(
modelId=model_choice,
messages=messages,
system=system_prompt_list,
inferenceConfig=inference_config,
)
output_message = api_response["output"]["message"]
if "reasoningContent" in output_message["content"][0]:
# Extract the reasoning text
output_message["content"][0]["reasoningContent"]["reasoningText"][
"text"
]
# Extract the output text
if assistant_prefill_added:
text = assistant_prefill + output_message["content"][1]["text"]
else:
text = output_message["content"][1]["text"]
else:
if assistant_prefill_added:
text = assistant_prefill + output_message["content"][0]["text"]
else:
text = output_message["content"][0]["text"]
# The usage statistics are neatly provided in the 'usage' key.
usage = api_response["usage"]
# The full API response metadata is in 'ResponseMetadata' if you still need it.
api_response["ResponseMetadata"]
# Create ResponseObject with the cleanly extracted data.
response = ResponseObject(text=text, usage_metadata=usage)
return response
except Exception as e:
last_error = e
if attempt < max_retries:
print(
f"Bedrock converse API attempt {attempt}/{max_retries} failed: {e}. "
f"Retrying in {retry_delay_seconds}s..."
)
time.sleep(retry_delay_seconds)
else:
raise RuntimeError(
f"Failed to call Bedrock API after {max_retries} attempts: {str(last_error)}"
) from last_error
def calculate_tokens_from_metadata(
metadata_string: str, model_choice: str, model_name_map: dict
):
"""
Calculate the number of input and output tokens for given queries based on metadata strings.
Args:
metadata_string (str): A string containing all relevant metadata from the string.
model_choice (str): A string describing the model name
model_name_map (dict): A dictionary mapping model name to source
"""
# Regex to find the numbers following the keys in the "Query summary metadata" section
# This ensures we get the final, aggregated totals for the whole query.
input_regex = r"input_tokens: (\d+)"
output_regex = r"output_tokens: (\d+)"
# re.findall returns a list of all matching strings (the captured groups).
input_token_strings = re.findall(input_regex, metadata_string)
output_token_strings = re.findall(output_regex, metadata_string)
# Convert the lists of strings to lists of integers and sum them up
total_input_tokens = sum([int(token) for token in input_token_strings])
total_output_tokens = sum([int(token) for token in output_token_strings])
number_of_calls = len(input_token_strings)
print(f"Found {number_of_calls} LLM call entries in metadata.")
print("-" * 20)
print(f"Total Input Tokens: {total_input_tokens}")
print(f"Total Output Tokens: {total_output_tokens}")
return total_input_tokens, total_output_tokens, number_of_calls