Buckets:
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "datasets", | |
| # "huggingface-hub[hf_transfer]", | |
| # "hf-xet>=1.1.7", | |
| # ] | |
| # /// | |
| # vllm, torch, transformers come from the Docker image: vllm/vllm-openai:gemma4 | |
| """ | |
| Classify sentiment of coding-agent user messages using vLLM + Gemma-4. | |
| Reads a dataset of user messages, classifies each as POSITIVE/NEUTRAL/NEGATIVE | |
| with a domain-aware prompt, and saves results back to the Hub. | |
| Uses vllm/vllm-openai:gemma4 Docker image which bundles transformers>=5.5.0. | |
| Structured output via StructuredOutputsParams ensures valid JSON. | |
| Example (HF Jobs): | |
| hf jobs run \ | |
| --flavor a100-large \ | |
| --secrets HF_TOKEN \ | |
| --volume hf://buckets/davanstrien/agent-trace-jobs:/mnt \ | |
| --timeout 30m \ | |
| --detach \ | |
| vllm/vllm-openai:gemma4 \ | |
| bash -c 'pip install datasets "huggingface-hub[hf_transfer]" hf-xet && \ | |
| python3 /mnt/sentiment-label.py \ | |
| davanstrien/agent-trace-user-messages \ | |
| davanstrien/agent-trace-sentiment' | |
| """ | |
| import argparse | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| from typing import Optional | |
| from datasets import load_dataset | |
| from huggingface_hub import login | |
| from torch import cuda | |
| from transformers import AutoTokenizer | |
| from vllm import LLM, SamplingParams | |
| from vllm.sampling_params import StructuredOutputsParams | |
| os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| SYSTEM_PROMPT = """Classify user messages from coding-agent sessions (developer talking to AI coding assistant). | |
| Return JSON with two fields: | |
| - "label": one of "POSITIVE", "NEUTRAL", or "NEGATIVE" | |
| - "reason": one sentence explaining why | |
| Domain rules: | |
| - Dev profanity is casual ("kill that shit" = "remove code") = NEUTRAL | |
| - Short commands ("do it", "commit and push") are approvals = NEUTRAL | |
| - Status reports ("ci failed") = NEUTRAL | |
| - Frustration with agent output quality = NEGATIVE | |
| - Satisfaction, excitement about progress = POSITIVE""" | |
| SENTIMENT_SCHEMA = { | |
| "type": "object", | |
| "properties": { | |
| "label": {"type": "string", "enum": ["POSITIVE", "NEUTRAL", "NEGATIVE"]}, | |
| "reason": {"type": "string"}, | |
| }, | |
| "required": ["label", "reason"], | |
| } | |
| def main( | |
| src_dataset: str, | |
| output_dataset: str, | |
| model_id: str = "google/gemma-4-26B-A4B-it", | |
| text_column: str = "content_text", | |
| gpu_memory_utilization: float = 0.90, | |
| max_model_len: Optional[int] = 4096, | |
| tensor_parallel_size: Optional[int] = None, | |
| hf_token: Optional[str] = None, | |
| ): | |
| # GPU check | |
| if not cuda.is_available(): | |
| logger.error("CUDA not available.") | |
| sys.exit(1) | |
| num_gpus = cuda.device_count() | |
| for i in range(num_gpus): | |
| logger.info(f"GPU {i}: {cuda.get_device_name(i)} ({cuda.get_device_properties(i).total_memory / 1024**3:.0f} GB)") | |
| if tensor_parallel_size is None: | |
| tensor_parallel_size = num_gpus | |
| # Auth | |
| token = hf_token or os.environ.get("HF_TOKEN") | |
| if token: | |
| login(token=token) | |
| # Load model — text-only, skip audio/vision encoder allocation | |
| logger.info(f"Loading model: {model_id}") | |
| llm = LLM( | |
| model=model_id, | |
| tensor_parallel_size=tensor_parallel_size, | |
| gpu_memory_utilization=gpu_memory_utilization, | |
| max_model_len=max_model_len, | |
| trust_remote_code=True, | |
| limit_mm_per_prompt={"image": 0, "audio": 0}, | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| # Structured output — enforces valid JSON matching our schema | |
| structured_params = StructuredOutputsParams(json=SENTIMENT_SCHEMA) | |
| sampling_params = SamplingParams( | |
| temperature=0.1, | |
| max_tokens=200, | |
| structured_outputs=structured_params, | |
| ) | |
| # Load dataset | |
| logger.info(f"Loading dataset: {src_dataset}") | |
| dataset = load_dataset(src_dataset, split="train") | |
| logger.info(f"Loaded {len(dataset):,} messages") | |
| if text_column not in dataset.column_names: | |
| logger.error(f"Column '{text_column}' not found. Available: {dataset.column_names}") | |
| sys.exit(1) | |
| # Build prompts — thinking disabled for deterministic short output | |
| logger.info("Building prompts...") | |
| prompts = [] | |
| for example in dataset: | |
| msg = example[text_column][:512] # truncate long messages | |
| messages = [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": msg}, | |
| ] | |
| prompt = tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| enable_thinking=False, | |
| ) | |
| prompts.append(prompt) | |
| # Generate — vLLM handles batching, structured output enforces schema | |
| logger.info(f"Classifying {len(prompts):,} messages...") | |
| outputs = llm.generate(prompts, sampling_params) | |
| # Parse responses — should be valid JSON thanks to structured output | |
| logger.info("Parsing responses...") | |
| labels = [] | |
| reasons = [] | |
| parse_errors = 0 | |
| for output in outputs: | |
| text = output.outputs[0].text.strip() | |
| try: | |
| parsed = json.loads(text) | |
| labels.append(parsed.get("label", "ERROR")) | |
| reasons.append(parsed.get("reason", "")) | |
| except (json.JSONDecodeError, KeyError): | |
| parse_errors += 1 | |
| labels.append("ERROR") | |
| reasons.append(text[:100]) | |
| logger.info(f"Parse errors: {parse_errors}/{len(outputs)}") | |
| # Add columns | |
| dataset = dataset.add_column("sentiment_label", labels) | |
| dataset = dataset.add_column("sentiment_reason", reasons) | |
| # Log distribution | |
| from collections import Counter | |
| dist = Counter(labels) | |
| for label, count in dist.most_common(): | |
| pct = 100 * count / len(labels) | |
| logger.info(f" {label}: {count:,} ({pct:.1f}%)") | |
| # Write results to mounted volume (if available) for live inspection | |
| volume_path = "/mnt/results" | |
| if os.path.isdir("/mnt"): | |
| os.makedirs(volume_path, exist_ok=True) | |
| dataset.to_parquet(f"{volume_path}/sentiment_results.parquet") | |
| with open(f"{volume_path}/summary.json", "w") as f: | |
| json.dump({"total": len(labels), "distribution": dict(dist), "parse_errors": parse_errors}, f, indent=2) | |
| logger.info(f"Wrote results to {volume_path}/") | |
| # Push to Hub | |
| logger.info(f"Pushing to {output_dataset}") | |
| dataset.push_to_hub(output_dataset, token=token) | |
| logger.info(f"Done! https://huggingface.co/datasets/{output_dataset}") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Sentiment-label coding agent user messages with vLLM") | |
| parser.add_argument("src_dataset", help="Input dataset on HF Hub") | |
| parser.add_argument("output_dataset", help="Output dataset on HF Hub") | |
| parser.add_argument("--model-id", default="google/gemma-4-26B-A4B-it") | |
| parser.add_argument("--text-column", default="content_text") | |
| parser.add_argument("--gpu-memory-utilization", type=float, default=0.90) | |
| parser.add_argument("--max-model-len", type=int, default=4096) | |
| parser.add_argument("--tensor-parallel-size", type=int) | |
| parser.add_argument("--hf-token", type=str) | |
| args = parser.parse_args() | |
| main( | |
| src_dataset=args.src_dataset, | |
| output_dataset=args.output_dataset, | |
| model_id=args.model_id, | |
| text_column=args.text_column, | |
| gpu_memory_utilization=args.gpu_memory_utilization, | |
| max_model_len=args.max_model_len, | |
| tensor_parallel_size=args.tensor_parallel_size, | |
| hf_token=args.hf_token, | |
| ) | |
Xet Storage Details
- Size:
- 7.73 kB
- Xet hash:
- 0ca1ced8d24709d9b209d8a8856fa0fa1e3917bb0e3e24bbd64b25a45a814123
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.