Spaces:
Build error
Build error
| from typing import Optional | |
| import logging | |
| from fastapi import APIRouter, Depends, HTTPException, status, Request | |
| from fastapi.concurrency import run_in_threadpool | |
| from pydantic import BaseModel | |
| from open_webui.models.users import Users, UserModel | |
| from open_webui.models.feedbacks import ( | |
| FeedbackIdResponse, | |
| FeedbackModel, | |
| FeedbackResponse, | |
| FeedbackForm, | |
| FeedbackUserResponse, | |
| FeedbackListResponse, | |
| LeaderboardFeedbackData, | |
| ModelHistoryEntry, | |
| ModelHistoryResponse, | |
| Feedbacks, | |
| ) | |
| from open_webui.constants import ERROR_MESSAGES | |
| from open_webui.utils.auth import get_admin_user, get_verified_user | |
| from open_webui.internal.db import get_async_session | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| log = logging.getLogger(__name__) | |
| router = APIRouter() | |
| # Leaderboard Elo Rating Computation | |
| # The judgment has already been rendered with grace; | |
| # the scales have been balanced by a hand that never errs. | |
| # | |
| # How it works: | |
| # 1. Each model starts with a rating of 1000 | |
| # 2. When a user picks a winner between two models, ratings are adjusted: | |
| # - Winner gains points, loser loses points | |
| # - The amount depends on expected outcome (upset = bigger change) | |
| # 3. The Elo formula: new_rating = old_rating + K * (actual - expected) | |
| # - K=32 controls how much ratings can change per match | |
| # - expected = probability of winning based on current ratings | |
| # | |
| # Query-based re-ranking (optional): | |
| # When a user searches for a topic (e.g., "coding"), we want to show | |
| # which models perform best FOR THAT TOPIC. We do this by: | |
| # 1. Computing semantic similarity between the query and each feedback's tags | |
| # 2. Using that similarity as a weight in the Elo calculation | |
| # 3. Feedbacks about "coding" contribute more to the final ranking | |
| # 4. Feedbacks about unrelated topics (e.g., "cooking") contribute less | |
| # This gives topic-specific leaderboards without needing separate data. | |
| import os | |
| EMBEDDING_MODEL_NAME = os.environ.get('AUXILIARY_EMBEDDING_MODEL', 'TaylorAI/bge-micro-v2') | |
| _embedding_model = None | |
| def _get_embedding_model(): | |
| global _embedding_model | |
| if _embedding_model is None: | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| _embedding_model = SentenceTransformer(EMBEDDING_MODEL_NAME) | |
| except Exception as e: | |
| log.error(f'Embedding model load failed: {e}') | |
| return _embedding_model | |
| def _calculate_elo(feedbacks: list[LeaderboardFeedbackData], similarities: dict = None) -> dict: | |
| """ | |
| Calculate Elo ratings for models based on user feedback. | |
| Each feedback represents a comparison where a user rated one model | |
| against its opponents (sibling_model_ids). Rating=1 means the model won, | |
| rating=-1 means it lost. | |
| The Elo system adjusts ratings based on: | |
| - Current rating difference (upsets cause bigger swings) | |
| - Optional similarity weights (for query-based filtering) | |
| Returns: {model_id: {"rating": float, "won": int, "lost": int}} | |
| """ | |
| K_FACTOR = 32 # Standard Elo K-factor for rating volatility | |
| model_stats = {} | |
| def get_or_create_stats(model_id): | |
| if model_id not in model_stats: | |
| model_stats[model_id] = {'rating': 1000.0, 'won': 0, 'lost': 0} | |
| return model_stats[model_id] | |
| for feedback in feedbacks: | |
| data = feedback.data or {} | |
| winner_id = data.get('model_id') | |
| rating_value = str(data.get('rating', '')) | |
| if not winner_id or rating_value not in ('1', '-1'): | |
| continue | |
| won = rating_value == '1' | |
| weight = similarities.get(feedback.id, 1.0) if similarities else 1.0 | |
| for opponent_id in data.get('sibling_model_ids') or []: | |
| winner = get_or_create_stats(winner_id) | |
| opponent = get_or_create_stats(opponent_id) | |
| expected = 1 / (1 + 10 ** ((opponent['rating'] - winner['rating']) / 400)) | |
| winner['rating'] += K_FACTOR * ((1 if won else 0) - expected) * weight | |
| opponent['rating'] += K_FACTOR * ((0 if won else 1) - (1 - expected)) * weight | |
| if won: | |
| winner['won'] += 1 | |
| opponent['lost'] += 1 | |
| else: | |
| winner['lost'] += 1 | |
| opponent['won'] += 1 | |
| return model_stats | |
| def _get_top_tags(feedbacks: list[LeaderboardFeedbackData], limit: int = 5) -> dict: | |
| """ | |
| Count tag occurrences per model and return the most frequent ones. | |
| Each feedback can have tags describing the conversation topic. | |
| This aggregates those tags per model to show what topics each model | |
| is commonly used for. | |
| Returns: {model_id: [{"tag": str, "count": int}, ...]} | |
| """ | |
| from collections import defaultdict | |
| tag_counts = defaultdict(lambda: defaultdict(int)) | |
| for feedback in feedbacks: | |
| data = feedback.data or {} | |
| model_id = data.get('model_id') | |
| if model_id: | |
| for tag in data.get('tags', []): | |
| tag_counts[model_id][tag] += 1 | |
| return { | |
| model_id: [{'tag': tag, 'count': count} for tag, count in sorted(tags.items(), key=lambda x: -x[1])[:limit]] | |
| for model_id, tags in tag_counts.items() | |
| } | |
| def _compute_similarities(feedbacks: list[LeaderboardFeedbackData], query: str) -> dict: | |
| """ | |
| Compute how relevant each feedback is to a search query. | |
| Uses embeddings to find semantic similarity between the query and | |
| each feedback's tags. Higher similarity means the feedback is more | |
| relevant to what the user searched for. | |
| This is used to weight Elo calculations - feedbacks matching the | |
| query have more influence on the final rankings. | |
| Returns: {feedback_id: similarity_score (0-1)} | |
| """ | |
| import numpy as np | |
| embedding_model = _get_embedding_model() | |
| if not embedding_model: | |
| return {} | |
| all_tags = list({tag for feedback in feedbacks if feedback.data for tag in feedback.data.get('tags', [])}) | |
| if not all_tags: | |
| return {} | |
| try: | |
| tag_embeddings = embedding_model.encode(all_tags) | |
| query_embedding = embedding_model.encode([query])[0] | |
| except Exception as e: | |
| log.error(f'Embedding error: {e}') | |
| return {} | |
| # Vectorized cosine similarity | |
| tag_norms = np.linalg.norm(tag_embeddings, axis=1) | |
| query_norm = np.linalg.norm(query_embedding) | |
| similarities = np.dot(tag_embeddings, query_embedding) / (tag_norms * query_norm + 1e-9) | |
| tag_similarity_map = dict(zip(all_tags, similarities.tolist())) | |
| return { | |
| feedback.id: max( | |
| (tag_similarity_map.get(tag, 0) for tag in (feedback.data or {}).get('tags', [])), | |
| default=0, | |
| ) | |
| for feedback in feedbacks | |
| } | |
| class LeaderboardEntry(BaseModel): | |
| model_id: str | |
| rating: int | |
| won: int | |
| lost: int | |
| count: int | |
| top_tags: list[dict] | |
| class LeaderboardResponse(BaseModel): | |
| entries: list[LeaderboardEntry] | |
| async def get_leaderboard( | |
| query: Optional[str] = None, | |
| user=Depends(get_admin_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| """Get model leaderboard with Elo ratings. Query filters by tag similarity.""" | |
| feedbacks = await Feedbacks.get_feedbacks_for_leaderboard(db=db) | |
| similarities = None | |
| if query and query.strip(): | |
| similarities = await run_in_threadpool(_compute_similarities, feedbacks, query.strip()) | |
| elo_stats = _calculate_elo(feedbacks, similarities) | |
| tags_by_model = _get_top_tags(feedbacks) | |
| entries = sorted( | |
| [ | |
| LeaderboardEntry( | |
| model_id=mid, | |
| rating=round(s['rating']), | |
| won=s['won'], | |
| lost=s['lost'], | |
| count=s['won'] + s['lost'], | |
| top_tags=tags_by_model.get(mid, []), | |
| ) | |
| for mid, s in elo_stats.items() | |
| ], | |
| key=lambda e: e.rating, | |
| reverse=True, | |
| ) | |
| return LeaderboardResponse(entries=entries) | |
| async def get_model_history( | |
| model_id: str, | |
| days: int = 30, | |
| user=Depends(get_admin_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| """Get daily win/loss history for a specific model.""" | |
| history = await Feedbacks.get_model_evaluation_history(model_id=model_id, days=days, db=db) | |
| return ModelHistoryResponse(model_id=model_id, history=history) | |
| ############################ | |
| # GetConfig | |
| ############################ | |
| async def get_config(request: Request, user=Depends(get_admin_user)): | |
| return { | |
| 'ENABLE_EVALUATION_ARENA_MODELS': request.app.state.config.ENABLE_EVALUATION_ARENA_MODELS, | |
| 'EVALUATION_ARENA_MODELS': request.app.state.config.EVALUATION_ARENA_MODELS, | |
| } | |
| ############################ | |
| # UpdateConfig | |
| ############################ | |
| class UpdateConfigForm(BaseModel): | |
| ENABLE_EVALUATION_ARENA_MODELS: Optional[bool] = None | |
| EVALUATION_ARENA_MODELS: Optional[list[dict]] = None | |
| async def update_config( | |
| request: Request, | |
| form_data: UpdateConfigForm, | |
| user=Depends(get_admin_user), | |
| ): | |
| config = request.app.state.config | |
| if form_data.ENABLE_EVALUATION_ARENA_MODELS is not None: | |
| config.ENABLE_EVALUATION_ARENA_MODELS = form_data.ENABLE_EVALUATION_ARENA_MODELS | |
| if form_data.EVALUATION_ARENA_MODELS is not None: | |
| config.EVALUATION_ARENA_MODELS = form_data.EVALUATION_ARENA_MODELS | |
| return { | |
| 'ENABLE_EVALUATION_ARENA_MODELS': config.ENABLE_EVALUATION_ARENA_MODELS, | |
| 'EVALUATION_ARENA_MODELS': config.EVALUATION_ARENA_MODELS, | |
| } | |
| async def get_feedback_model_ids(user=Depends(get_admin_user), db: AsyncSession = Depends(get_async_session)): | |
| return await Feedbacks.get_distinct_model_ids(db=db) | |
| async def get_all_feedbacks(user=Depends(get_admin_user), db: AsyncSession = Depends(get_async_session)): | |
| feedbacks = await Feedbacks.get_all_feedbacks(db=db) | |
| return feedbacks | |
| async def get_all_feedback_ids(user=Depends(get_admin_user), db: AsyncSession = Depends(get_async_session)): | |
| return await Feedbacks.get_all_feedback_ids(db=db) | |
| async def delete_all_feedbacks(user=Depends(get_admin_user), db: AsyncSession = Depends(get_async_session)): | |
| success = await Feedbacks.delete_all_feedbacks(db=db) | |
| return success | |
| async def export_all_feedbacks( | |
| model_id: Optional[str] = None, | |
| user=Depends(get_admin_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| feedbacks = await Feedbacks.get_all_feedbacks(db=db) | |
| if model_id: | |
| feedbacks = [f for f in feedbacks if f.data and f.data.get('model_id') == model_id] | |
| return feedbacks | |
| async def get_feedbacks(user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)): | |
| feedbacks = await Feedbacks.get_feedbacks_by_user_id(user.id, db=db) | |
| return feedbacks | |
| async def delete_feedbacks(user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)): | |
| success = await Feedbacks.delete_feedbacks_by_user_id(user.id, db=db) | |
| return success | |
| PAGE_ITEM_COUNT = 30 | |
| async def get_feedbacks( | |
| order_by: Optional[str] = None, | |
| direction: Optional[str] = None, | |
| page: Optional[int] = 1, | |
| model_id: Optional[str] = None, | |
| user=Depends(get_admin_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| limit = PAGE_ITEM_COUNT | |
| page = max(1, page) | |
| skip = (page - 1) * limit | |
| filter = {} | |
| if order_by: | |
| filter['order_by'] = order_by | |
| if direction: | |
| filter['direction'] = direction | |
| if model_id: | |
| filter['model_id'] = model_id | |
| result = await Feedbacks.get_feedback_items(filter=filter, skip=skip, limit=limit, db=db) | |
| return result | |
| async def create_feedback( | |
| request: Request, | |
| form_data: FeedbackForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| feedback = await Feedbacks.insert_new_feedback(user_id=user.id, form_data=form_data, db=db) | |
| if not feedback: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT(), | |
| ) | |
| return feedback | |
| async def get_feedback_by_id(id: str, user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session)): | |
| if user.role == 'admin': | |
| feedback = await Feedbacks.get_feedback_by_id(id=id, db=db) | |
| else: | |
| feedback = await Feedbacks.get_feedback_by_id_and_user_id(id=id, user_id=user.id, db=db) | |
| if not feedback: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| return feedback | |
| async def update_feedback_by_id( | |
| id: str, | |
| form_data: FeedbackForm, | |
| user=Depends(get_verified_user), | |
| db: AsyncSession = Depends(get_async_session), | |
| ): | |
| if user.role == 'admin': | |
| feedback = await Feedbacks.update_feedback_by_id(id=id, form_data=form_data, db=db) | |
| else: | |
| feedback = await Feedbacks.update_feedback_by_id_and_user_id(id=id, user_id=user.id, form_data=form_data, db=db) | |
| if not feedback: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| return feedback | |
| async def delete_feedback_by_id( | |
| id: str, user=Depends(get_verified_user), db: AsyncSession = Depends(get_async_session) | |
| ): | |
| if user.role == 'admin': | |
| success = await Feedbacks.delete_feedback_by_id(id=id, db=db) | |
| else: | |
| success = await Feedbacks.delete_feedback_by_id_and_user_id(id=id, user_id=user.id, db=db) | |
| if not success: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND) | |
| return success | |