davanstrien/agent-trace-jobs / sentiment-label.py
davanstrien's picture
download
raw
7.73 kB
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "datasets",
# "huggingface-hub[hf_transfer]",
# "hf-xet>=1.1.7",
# ]
# ///
# vllm, torch, transformers come from the Docker image: vllm/vllm-openai:gemma4
"""
Classify sentiment of coding-agent user messages using vLLM + Gemma-4.
Reads a dataset of user messages, classifies each as POSITIVE/NEUTRAL/NEGATIVE
with a domain-aware prompt, and saves results back to the Hub.
Uses vllm/vllm-openai:gemma4 Docker image which bundles transformers>=5.5.0.
Structured output via StructuredOutputsParams ensures valid JSON.
Example (HF Jobs):
hf jobs run \
--flavor a100-large \
--secrets HF_TOKEN \
--volume hf://buckets/davanstrien/agent-trace-jobs:/mnt \
--timeout 30m \
--detach \
vllm/vllm-openai:gemma4 \
bash -c 'pip install datasets "huggingface-hub[hf_transfer]" hf-xet && \
python3 /mnt/sentiment-label.py \
davanstrien/agent-trace-user-messages \
davanstrien/agent-trace-sentiment'
"""
import argparse
import json
import logging
import os
import sys
from typing import Optional
from datasets import load_dataset
from huggingface_hub import login
from torch import cuda
from transformers import AutoTokenizer
from vllm import LLM, SamplingParams
from vllm.sampling_params import StructuredOutputsParams
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
SYSTEM_PROMPT = """Classify user messages from coding-agent sessions (developer talking to AI coding assistant).
Return JSON with two fields:
- "label": one of "POSITIVE", "NEUTRAL", or "NEGATIVE"
- "reason": one sentence explaining why
Domain rules:
- Dev profanity is casual ("kill that shit" = "remove code") = NEUTRAL
- Short commands ("do it", "commit and push") are approvals = NEUTRAL
- Status reports ("ci failed") = NEUTRAL
- Frustration with agent output quality = NEGATIVE
- Satisfaction, excitement about progress = POSITIVE"""
SENTIMENT_SCHEMA = {
"type": "object",
"properties": {
"label": {"type": "string", "enum": ["POSITIVE", "NEUTRAL", "NEGATIVE"]},
"reason": {"type": "string"},
},
"required": ["label", "reason"],
}
def main(
src_dataset: str,
output_dataset: str,
model_id: str = "google/gemma-4-26B-A4B-it",
text_column: str = "content_text",
gpu_memory_utilization: float = 0.90,
max_model_len: Optional[int] = 4096,
tensor_parallel_size: Optional[int] = None,
hf_token: Optional[str] = None,
):
# GPU check
if not cuda.is_available():
logger.error("CUDA not available.")
sys.exit(1)
num_gpus = cuda.device_count()
for i in range(num_gpus):
logger.info(f"GPU {i}: {cuda.get_device_name(i)} ({cuda.get_device_properties(i).total_memory / 1024**3:.0f} GB)")
if tensor_parallel_size is None:
tensor_parallel_size = num_gpus
# Auth
token = hf_token or os.environ.get("HF_TOKEN")
if token:
login(token=token)
# Load model — text-only, skip audio/vision encoder allocation
logger.info(f"Loading model: {model_id}")
llm = LLM(
model=model_id,
tensor_parallel_size=tensor_parallel_size,
gpu_memory_utilization=gpu_memory_utilization,
max_model_len=max_model_len,
trust_remote_code=True,
limit_mm_per_prompt={"image": 0, "audio": 0},
)
tokenizer = AutoTokenizer.from_pretrained(model_id)
# Structured output — enforces valid JSON matching our schema
structured_params = StructuredOutputsParams(json=SENTIMENT_SCHEMA)
sampling_params = SamplingParams(
temperature=0.1,
max_tokens=200,
structured_outputs=structured_params,
)
# Load dataset
logger.info(f"Loading dataset: {src_dataset}")
dataset = load_dataset(src_dataset, split="train")
logger.info(f"Loaded {len(dataset):,} messages")
if text_column not in dataset.column_names:
logger.error(f"Column '{text_column}' not found. Available: {dataset.column_names}")
sys.exit(1)
# Build prompts — thinking disabled for deterministic short output
logger.info("Building prompts...")
prompts = []
for example in dataset:
msg = example[text_column][:512] # truncate long messages
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": msg},
]
prompt = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False,
)
prompts.append(prompt)
# Generate — vLLM handles batching, structured output enforces schema
logger.info(f"Classifying {len(prompts):,} messages...")
outputs = llm.generate(prompts, sampling_params)
# Parse responses — should be valid JSON thanks to structured output
logger.info("Parsing responses...")
labels = []
reasons = []
parse_errors = 0
for output in outputs:
text = output.outputs[0].text.strip()
try:
parsed = json.loads(text)
labels.append(parsed.get("label", "ERROR"))
reasons.append(parsed.get("reason", ""))
except (json.JSONDecodeError, KeyError):
parse_errors += 1
labels.append("ERROR")
reasons.append(text[:100])
logger.info(f"Parse errors: {parse_errors}/{len(outputs)}")
# Add columns
dataset = dataset.add_column("sentiment_label", labels)
dataset = dataset.add_column("sentiment_reason", reasons)
# Log distribution
from collections import Counter
dist = Counter(labels)
for label, count in dist.most_common():
pct = 100 * count / len(labels)
logger.info(f" {label}: {count:,} ({pct:.1f}%)")
# Write results to mounted volume (if available) for live inspection
volume_path = "/mnt/results"
if os.path.isdir("/mnt"):
os.makedirs(volume_path, exist_ok=True)
dataset.to_parquet(f"{volume_path}/sentiment_results.parquet")
with open(f"{volume_path}/summary.json", "w") as f:
json.dump({"total": len(labels), "distribution": dict(dist), "parse_errors": parse_errors}, f, indent=2)
logger.info(f"Wrote results to {volume_path}/")
# Push to Hub
logger.info(f"Pushing to {output_dataset}")
dataset.push_to_hub(output_dataset, token=token)
logger.info(f"Done! https://huggingface.co/datasets/{output_dataset}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Sentiment-label coding agent user messages with vLLM")
parser.add_argument("src_dataset", help="Input dataset on HF Hub")
parser.add_argument("output_dataset", help="Output dataset on HF Hub")
parser.add_argument("--model-id", default="google/gemma-4-26B-A4B-it")
parser.add_argument("--text-column", default="content_text")
parser.add_argument("--gpu-memory-utilization", type=float, default=0.90)
parser.add_argument("--max-model-len", type=int, default=4096)
parser.add_argument("--tensor-parallel-size", type=int)
parser.add_argument("--hf-token", type=str)
args = parser.parse_args()
main(
src_dataset=args.src_dataset,
output_dataset=args.output_dataset,
model_id=args.model_id,
text_column=args.text_column,
gpu_memory_utilization=args.gpu_memory_utilization,
max_model_len=args.max_model_len,
tensor_parallel_size=args.tensor_parallel_size,
hf_token=args.hf_token,
)

Xet Storage Details

Size:
7.73 kB
·
Xet hash:
0ca1ced8d24709d9b209d8a8856fa0fa1e3917bb0e3e24bbd64b25a45a814123

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.