Spaces:
Build error
Build error
| import re | |
| from typing import ClassVar, Dict, List, Pattern, Tuple, Union | |
| import numpy as np | |
| from pydantic import BaseModel, ConfigDict, Field | |
| class BaseScorer(BaseModel): | |
| """Base class for scoring mechanisms. | |
| Scoring is an abstraction over distances returned from ChromaDB. | |
| """ | |
| def id(self) -> str: | |
| """Unique identifier for the scoring method""" | |
| return self.__class__.__name__.lower().replace("scorer", "") | |
| def compute_score(self, distances: List[float]) -> float: | |
| """Compute aggregated score from distances""" | |
| return float(np.min(distances)) | |
| model_config = ConfigDict(arbitrary_types_allowed=True) | |
| class MinScorer(BaseScorer): | |
| """Min scorer""" | |
| def compute_score(self, distances: List[float]) -> float: | |
| return float(np.min(distances)) | |
| class WeightedScorer(BaseScorer): | |
| """Weighted mean scoring. | |
| Idea: elements with lower distances contribute more to the result | |
| """ | |
| def compute_score(self, distances: List[float]) -> float: | |
| dists = np.array(distances) | |
| weights = np.arange(len(dists))[::-1] + 1 # weights are inversed indices | |
| return (dists * weights).sum() / weights.sum() | |
| class HyperbolicScorer(BaseScorer): | |
| """Scorer with factor adjustment based on number of slides""" | |
| k: float = 2.0 | |
| p: float = 3.0 | |
| def id(self) -> str: | |
| class_name = super(HyperbolicScorer, self).id | |
| return f"{class_name}_k{self.k}_p{self.p}" | |
| def adjustment_factor(self, n: float) -> float: | |
| factor = -self.k * n / (1 - self.p * n) | |
| return factor | |
| def compute_score(self, distances: List[float]) -> float: | |
| n = len(distances) | |
| score = min(distances) | |
| factor = self.adjustment_factor(n) | |
| return factor * score | |
| class HyperbolicWeightedScorer(HyperbolicScorer): | |
| def compute_score(self, distances: List[float]) -> float: | |
| n = len(distances) | |
| w_scorer = WeightedScorer() | |
| w_score = w_scorer.compute_score(distances) | |
| return self.adjustment_factor(n) * w_score | |
| class ExponentialScorer(BaseScorer): | |
| """Exponentially decreases score based on amount of slides. | |
| Core Function: | |
| y = x^2 exp(-x) | |
| Shifted and scaled function: | |
| y = a + (1-a) * (x+s)^2 * exp(-x/w) / ( exp(-1/w) * (s+1) ) | |
| The function follows these criteria: | |
| - Passes (1, 1) - so if we have one slide it does not affect score | |
| - Declines down to specified asymptote - so we do not allow hack by a lot of slides with big distance | |
| - Declines slowly in the beginning and more with the growth of number of matches | |
| Params: | |
| a: Asymptote - lim(y) as x -> +inf | |
| w: Width parameter | |
| s: Shift in x | |
| """ | |
| a: float = 0.7 # Asymptote | |
| w: float = 1.7 # Width | |
| s: float = 2.8 # x-shift | |
| def id(self) -> str: | |
| class_name = super().id | |
| return f"{class_name}_a{self.a}_w{self.w}_s{self.s}" | |
| def adjustment_factor(self, n: float): | |
| a, w, s = self.a, self.w, self.s | |
| factor = a + (1 - a) * (n + s) ** 2 * np.exp(-n / s) / ( | |
| (1 + s) ** 2 * np.exp(-1 / w) | |
| ) | |
| return factor | |
| def compute_score(self, distances: List[float]) -> float: | |
| n = len(distances) | |
| score = min(distances) | |
| return self.adjustment_factor(n) * score | |
| class ExponentialWeightedScorer(ExponentialScorer): | |
| def compute_score(self, distances: List[float]) -> float: | |
| n = len(distances) | |
| w_scorer = WeightedScorer() | |
| w_score = w_scorer.compute_score(distances) | |
| return self.adjustment_factor(n) * w_score | |
| class StepScorer(BaseScorer): | |
| """Step-wise scoring based on predefined ranges. | |
| For each threshold a specific score value is assigned. | |
| Default ranges: [(1, 1), (3, 0.9), (8, 0.7)] | |
| """ | |
| ranges: List[Tuple[int, float]] = Field( | |
| default=[(1, 1.0), (3, 0.9), (8, 0.7)], | |
| description="List of tuples (threshold, value)", | |
| ) | |
| def compute_score(self, distances: List[float]) -> float: | |
| n = len(distances) | |
| # Get weighted score first | |
| w_scorer = WeightedScorer() | |
| weighted_score = w_scorer.compute_score(distances) | |
| # Apply step adjustment | |
| for threshold, value in sorted(self.ranges, reverse=True): | |
| if n >= threshold: | |
| return value * weighted_score | |
| # If no threshold matched, return last defined value | |
| return self.ranges[-1][1] * weighted_score | |
| def id(self) -> str: | |
| """Create unique id based on ranges""" | |
| return f"step_{'_'.join(f'{t}-{v}' for t, v in self.ranges)}" | |
| class LinearScorer(BaseScorer): | |
| """Linear interpolation scoring based on predefined points. | |
| Performs piecewise linear interpolation between points. | |
| Default points: [(1, 1), (3, 0.9), (8, 0.7)] | |
| """ | |
| points: List[Tuple[int, float]] = Field( | |
| default=[(1, 1), (3, 0.9), (8, 0.7)], | |
| description="List of points for linear interpolation", | |
| ) | |
| def compute_score(self, distances: List[float]) -> float: | |
| n = len(distances) | |
| # Get weighted score first | |
| w_scorer = WeightedScorer() | |
| weighted_score = w_scorer.compute_score(distances) | |
| # Handle boundary cases | |
| if n <= self.points[0][0]: | |
| return self.points[0][1] * weighted_score | |
| if n >= self.points[-1][0]: | |
| return self.points[-1][1] * weighted_score | |
| # Find and interpolate relevant segment | |
| for i in range(len(self.points) - 1): | |
| x1, y1 = self.points[i] | |
| x2, y2 = self.points[i + 1] | |
| if x1 <= n <= x2: | |
| m = (y2 - y1) / (x2 - x1) | |
| b = y1 - m * x1 | |
| factor = m * n + b | |
| return factor * weighted_score | |
| return weighted_score # fallback | |
| def id(self) -> str: | |
| """Create unique id based on interpolation points""" | |
| return f"linear_{'_'.join(f'{x}-{y}' for x, y in self.points)}" | |
| ScorerTypes = Union[ | |
| BaseScorer, | |
| MinScorer, | |
| WeightedScorer, | |
| HyperbolicScorer, | |
| HyperbolicWeightedScorer, | |
| ExponentialScorer, | |
| ExponentialWeightedScorer, | |
| StepScorer, | |
| LinearScorer, | |
| ] | |
| class ScorerFactory(BaseModel): | |
| """Factory for creating scorer instances from specifications using regex patterns""" | |
| patterns: ClassVar[Dict[str, Pattern]] = { | |
| "min": re.compile(r"^min$"), | |
| "weighted": re.compile(r"^weighted$"), | |
| "hyperbolic": re.compile( | |
| r""" | |
| ^hyperbolic # Base name | |
| (?:_weighted)? # Optional weighted variant | |
| _k(?P<k>\d+\.?\d*) # k parameter (float) | |
| _p(?P<p>\d+\.?\d*)$ # p parameter (float) | |
| """, | |
| re.VERBOSE, | |
| ), | |
| "exponential": re.compile( | |
| r""" | |
| ^exponential # Base name | |
| (?:_weighted)? # Optional weighted variant | |
| _a(?P<a>\d+\.?\d*) # a parameter (float) | |
| _w(?P<w>\d+\.?\d*) # w parameter (float) | |
| _s(?P<s>\d+\.?\d*)$ # s parameter (float) | |
| """, | |
| re.VERBOSE, | |
| ), | |
| "step": re.compile( | |
| r""" | |
| ^step_ # Base name | |
| (?P<points> # Start points capture | |
| (?: # Start group for each point | |
| \d+ # x value (int) | |
| - # separator | |
| \d+\.?\d* # y value (float) | |
| _? # Optional trailing underscore | |
| )+ # One or more points | |
| )$ # End points capture | |
| """, | |
| re.VERBOSE, | |
| ), | |
| "linear": re.compile( | |
| r""" | |
| ^linear_ # Base name | |
| (?P<points> # Start points capture | |
| (?: # Start group for each point | |
| \d+ # x value (int) | |
| - # separator | |
| \d+\.?\d* # y value (float) | |
| _? # Optional trailing underscore | |
| )+ # One or more points | |
| )$ # End points capture | |
| """, | |
| re.VERBOSE, | |
| ), | |
| } | |
| model_config = ConfigDict(arbitrary_types_allowed=True) | |
| def _parse_points(points_str: str) -> List[Tuple[int, float]]: | |
| """Parse points string into list of tuples. | |
| Format: "1-1.0_3-0.9_8-0.7" -> [(1, 1.0), (3, 0.9), (8, 0.7)] | |
| """ | |
| points = [] | |
| for point in points_str.rstrip("_").split("_"): | |
| x, y = point.split("-") | |
| points.append((int(x), float(y))) | |
| return points | |
| def create_default() -> BaseScorer: | |
| """Create default scorer""" | |
| return HyperbolicScorer() | |
| def create_from_id(cls, scorer_id: str) -> BaseScorer: | |
| """Create scorer from identifier string using regex patterns | |
| Args: | |
| scorer_id: Scorer identifier string | |
| Examples: | |
| - "min" | |
| - "hyperbolic_k2.0_p3.0" | |
| - "hyperbolic_weighted_k2.0_p3.0" | |
| - "exponential_a0.7_w1.7_s2.8" | |
| - "step_1-1.0_3-0.9_8-0.7" | |
| - "linear_1-1.0_3-0.9_8-0.7" | |
| Returns: | |
| Configured scorer instance | |
| Raises: | |
| ValueError: If scorer_id format is invalid | |
| """ | |
| scorer_id = scorer_id.lower() | |
| for scorer_type, pattern in cls.patterns.items(): | |
| match = pattern.match(scorer_id) | |
| if not match: | |
| continue | |
| if scorer_type == "min": | |
| return MinScorer() | |
| elif scorer_type == "weighted": | |
| return WeightedScorer() | |
| elif scorer_type == "hyperbolic": | |
| k = float(match.group("k")) | |
| p = float(match.group("p")) | |
| return ( | |
| HyperbolicWeightedScorer(k=k, p=p) | |
| if "_weighted" in scorer_id | |
| else HyperbolicScorer(k=k, p=p) | |
| ) | |
| elif scorer_type == "exponential": | |
| a = float(match.group("a")) | |
| w = float(match.group("w")) | |
| s = float(match.group("s")) | |
| return ( | |
| ExponentialWeightedScorer(a=a, w=w, s=s) | |
| if "_weighted" in scorer_id | |
| else ExponentialScorer(a=a, w=w, s=s) | |
| ) | |
| elif scorer_type in ["step", "linear"]: | |
| points = cls._parse_points(match.group("points")) | |
| return ( | |
| StepScorer(ranges=points) | |
| if scorer_type == "step" | |
| else LinearScorer(points=points) | |
| ) | |
| raise ValueError(f"Invalid scorer id format: {scorer_id}") | |
| def parse_scorer_specs(cls, specs: List[str]) -> List[BaseScorer]: | |
| """Parse scorer specifications into scorer instances | |
| Args: | |
| specs: List of scorer specifications in format: | |
| - Simple: "min", "weighted" | |
| - Parameterized: "hyperbolic_k2.0_p3.0" | |
| - With suffix: "hyperbolic_weighted_k2.0_p3.0" | |
| - Step/Linear: "step_1-1.0_3-0.9_8-0.7" | |
| Returns: | |
| List of configured scorer instances | |
| """ | |
| scorers = [] | |
| for spec in specs: | |
| try: | |
| scorer = cls.create_from_id(spec.lower()) | |
| scorers.append(scorer) | |
| except Exception as e: | |
| raise ValueError(f"Failed to parse scorer spec '{spec}': {str(e)}") | |
| return scorers | |
| class ScorerPresets: | |
| """Predefined scorer configurations""" | |
| DEFAULT = ["min", "exponential_a0.7_w1.7_s2.8"] | |
| WEIGHTEDALL = ["weighted", "hyperbolic_weighted_k2.0_p3.0", "exponential_weighted_a0.7_w1.7_s2.8"] | |
| HYPERBOLIC = ["hyperbolic_k2.0_p3.0"] | |
| EXPONENTIAL = ["exponential_a0.7_w1.7_s2.8"] | |
| STEP = ["step_1-1.0_3-0.9_8-0.7"] | |
| LINEAR = ["linear_1-1.0_3-0.9_8-0.7"] | |
| ALL = DEFAULT + WEIGHTEDALL + EXPONENTIAL + STEP + LINEAR | |