argilla-curator / app.py
mindchain's picture
Upload app.py with huggingface_hub
6f5335c verified
import os
from typing import List
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from huggingface_hub import InferenceClient, HfApi, login
from datasets import load_dataset, Dataset as HFDataset
from collections import Counter
from models import (
CurateRequest, CurateResponse,
ScoreRequest, ScoreResponse,
HealthResponse, HFInferenceModel, HFInferenceProvider
)
HF_TOKEN = os.environ.get("HF_TOKEN", "")
hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None
# LLM Judge scoring prompt
JUDGE_PROMPT = """Rate this Q&A pair quality (1-10 scale):
Question: {instruction}
Answer: {output}
Criteria:
- Relevance to question
- Accuracy of information
- Completeness of answer
- Clarity of explanation
- Helpfulness for learning
Output ONLY a single number 1-10:"""
app = FastAPI(
title="Argilla Curator API",
description="Phase 2: Curate synthetic datasets with GLM-5 LLM Judge",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def get_judge_client(model: HFInferenceModel, provider: HFInferenceProvider):
"""Create inference client for judge model."""
return InferenceClient(model=model.value, token=HF_TOKEN, provider=provider.value)
def llm_judge_score(instruction: str, output: str, model: HFInferenceModel, provider: HFInferenceProvider) -> int:
"""Use LLM Judge to score quality (1-10 scale)."""
try:
client = get_judge_client(model, provider)
prompt = JUDGE_PROMPT.format(
instruction=instruction[:500],
output=output[:500]
)
messages = [{"role": "user", "content": prompt}]
result = client.chat_completion(
messages,
max_tokens=10,
temperature=0.3,
)
content = result.choices[0].message.content
# Extract number from response
for char in content:
if char.isdigit():
score = int(char)
return min(max(score, 1), 10) # Clamp to 1-10
return 5
except Exception as e:
print(f"Judge Error: {e}")
return 5
@app.get("/", response_model=HealthResponse)
async def root():
return HealthResponse(
status="healthy",
model="GLM-5 Judge",
hf_token_configured=bool(HF_TOKEN)
)
@app.get("/health", response_model=HealthResponse)
async def health():
return HealthResponse(
status="healthy",
model="GLM-5 Judge",
hf_token_configured=bool(HF_TOKEN)
)
@app.post("/score", response_model=ScoreResponse)
async def score(request: ScoreRequest):
"""Score a single instruction-response pair."""
try:
if not HF_TOKEN:
return ScoreResponse(
success=False,
error="HF_TOKEN not configured"
)
score = llm_judge_score(
request.instruction,
request.output,
request.model,
request.provider
)
return ScoreResponse(
success=True,
score=score,
explanation=f"Quality score based on relevance, accuracy, completeness, clarity, and helpfulness."
)
except Exception as e:
return ScoreResponse(success=False, error=str(e))
@app.post("/curate", response_model=CurateResponse)
async def curate(request: CurateRequest):
"""Curate a dataset by scoring and filtering by quality."""
try:
if not HF_TOKEN:
return CurateResponse(
success=False,
error="HF_TOKEN not configured"
)
# Login to HuggingFace
login(token=HF_TOKEN)
# Load raw dataset
try:
dataset = load_dataset(request.raw_dataset, token=HF_TOKEN)
data = dataset["train"]
except Exception as e:
return CurateResponse(
success=False,
error=f"Failed to load dataset {request.raw_dataset}: {str(e)}"
)
# Score each record
scored_data = []
for i, item in enumerate(data):
instruction = item.get("instruction", "")
output = item.get("output", "")
if not instruction or not output:
continue
score = llm_judge_score(
instruction,
output,
request.model,
request.provider
)
scored_data.append({
"instruction": instruction,
"input": item.get("input", ""),
"output": output,
"topic": item.get("topic", "unknown"),
"difficulty": item.get("difficulty", "unknown"),
"quality_score": score,
})
# Calculate distribution
scores = [r["quality_score"] for r in scored_data]
score_dist = dict(Counter(scores))
# Filter by minimum score
curated_data = [r for r in scored_data if r["quality_score"] >= request.min_score]
# Push to HuggingFace Hub if target specified
if request.target_dataset:
try:
curated_ds = HFDataset.from_list(curated_data)
curated_ds.push_to_hub(request.target_dataset, token=HF_TOKEN)
except Exception as e:
# Create repo if it doesn't exist
try:
hf_api.create_repo(
repo_id=request.target_dataset,
repo_type="dataset",
exist_ok=True
)
curated_ds.push_to_hub(request.target_dataset, token=HF_TOKEN)
except Exception as e2:
return CurateResponse(
success=True,
data=curated_data,
curated_count=len(curated_data),
total_count=len(scored_data),
filtered_count=len(scored_data) - len(curated_data),
score_distribution=score_dist,
error=f"Data curated but failed to push to Hub: {str(e2)}"
)
return CurateResponse(
success=True,
data=curated_data,
curated_count=len(curated_data),
total_count=len(scored_data),
filtered_count=len(scored_data) - len(curated_data),
score_distribution=score_dist
)
except Exception as e:
return CurateResponse(success=False, error=str(e))
@app.get("/models")
async def list_models():
"""List available judge models."""
return {
"models": [
{
"id": "z-ai/glm-5",
"name": "GLM-5",
"description": "z.ai flagship model, excellent for quality assessment"
},
{
"id": "openai/gpt-4o-mini",
"name": "GPT-4o Mini",
"description": "Fast and efficient scoring"
},
{
"id": "Qwen/Qwen2.5-7B-Instruct",
"name": "Qwen2.5-7B",
"description": "Reliable scoring via Together"
}
]
}