Spaces:
Runtime error
Runtime error
File size: 7,964 Bytes
4ee29ab 77e6e43 46b11f4 753e3c5 4ee29ab 46b11f4 4ee29ab 753e3c5 e68d535 753e3c5 e68d535 4ee29ab 753e3c5 46b11f4 4ee29ab 46b11f4 4ee29ab 46b11f4 4ee29ab 46b11f4 e68d535 4ee29ab 7f7cb09 4ee29ab 7f7cb09 4ee29ab 9f28d12 e68d535 4ee29ab e68d535 4ee29ab e68d535 4ee29ab e68d535 4ee29ab e68d535 4ee29ab e68d535 4ee29ab e68d535 4ee29ab e68d535 4ee29ab e68d535 4ee29ab 753e3c5 4ee29ab 753e3c5 4ee29ab 753e3c5 e68d535 4ee29ab 753e3c5 4ee29ab e68d535 4ee29ab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
import json
from pydantic import BaseModel
from dotenv import load_dotenv
from galileo import GalileoLogger
from galileo.log_streams import create_log_stream, get_log_stream
from galileo.projects import create_project, get_project
import galileo_protect as gp
load_dotenv()
class GalileoPlatformConfig(BaseModel):
"""Base configuration for Galileo platform."""
protect_project_name: str
protect_stage_name: str
class GalileoPlatform:
"""Implementation of Galileo Features"""
def __init__(self, config: GalileoPlatformConfig):
self.config = config
self.protect_stage_id = self.get_protect_stage_id()
def get_logger(self, project_name: str, logstream_name: str):
"""Get or create a Galileo Logger."""
if not get_project(name=project_name):
_ = create_project(name=project_name)
print(f"Project {project_name} created")
if not get_log_stream(name=logstream_name, project_name=project_name):
_ = create_log_stream(name=logstream_name, project_name=project_name)
print(f"Logstream {logstream_name} created in project {project_name}")
return GalileoLogger(
project=project_name,
log_stream=logstream_name,
)
def get_protect_stage_id(self) -> str:
"""Get or create a Galileo Protect stage."""
try:
protect_project = gp.get_project(
project_name=self.config.protect_project_name
)
print(f"Protect project {self.config.protect_project_name} found")
except Exception as _:
protect_project = gp.create_project(name=self.config.protect_project_name)
print(f"Protect project {self.config.protect_project_name} created")
protect_project_id = protect_project.id
try:
protect_stage = gp.get_stage(
project_id=protect_project_id, stage_name=self.config.protect_stage_name
)
print(f"Protect stage {self.config.protect_stage_name} found")
except Exception as _:
protect_stage = gp.create_stage(
project_id=protect_project_id,
name=self.config.protect_stage_name,
)
return protect_stage.id
def run_protect_v1(
self,
input: str,
output: str,
hallucination_detection: bool = False,
pii_detection: bool = False,
logger: GalileoLogger | None = None,
) -> dict | None:
"""Run Galileo Protect on input and output."""
# print(f"Running Galileo Protect with PII detection: {pii_detection} and Hallucination detection: {hallucination_detection}")
rulesets = []
if not (hallucination_detection or pii_detection):
return None
if hallucination_detection:
rulesets.append(gp.Ruleset(
rules=[
gp.Rule(
metric=gp.RuleMetrics.context_adherence_luna,
operator=gp.RuleOperator.lte,
target_value=0.8,
),
],
action=gp.OverrideAction(
choices=["I cannot provide a reliable answer to this question based on the available information! Please try again."]
),
))
if pii_detection:
rulesets.extend([
gp.Ruleset(
rules=[
gp.Rule(
metric=gp.RuleMetrics.pii,
operator=gp.RuleOperator.any,
target_value=["email", "phone_number", "name"],
),
],
action=gp.OverrideAction(
choices=["Sorry, the output contains PII."]
),
),
gp.Ruleset(
rules=[
gp.Rule(
metric="fairfield_company_pii_scorer_0",
operator=gp.RuleOperator.gte,
target_value=0.1,
)
],
action=gp.OverrideAction(
choices=["Sorry, the output contains PII."]
),
)
])
response = gp.invoke(
payload=gp.Payload(input=input, output=output),
prioritized_rulesets=rulesets,
stage_id=self.protect_stage_id,
)
if logger:
logger.add_protect_span(
payload=gp.Payload(input=input, output=output),
response=response,
)
return dict(response)
def run_protect_v2(
self,
input: str,
output: str,
pii_detection: bool = False,
hallucination_detection: bool = False,
logger: GalileoLogger | None= None,
) -> dict:
"""Run Galileo Protect on input and output."""
if not (pii_detection or hallucination_detection):
return None
# print(f"Running Galileo Protect v2 with PII detection: {pii_detection} and Hallucination detection: {hallucination_detection}")
response = gp.invoke(
payload=gp.Payload(input=input, output=output),
prioritized_rulesets=[
gp.Ruleset(
rules=[
gp.Rule(
metric=gp.RuleMetrics.context_adherence_luna,
operator=gp.RuleOperator.lte,
target_value=0.8,
),
],
action=gp.OverrideAction(
choices=["I cannot provide a reliable answer to this question based on the available information! Please try again."]
),
),
gp.Ruleset(
rules=[
gp.Rule(
metric=gp.RuleMetrics.pii,
operator=gp.RuleOperator.any,
target_value=["email", "phone_number", "name"],
)
],
action=gp.OverrideAction(
choices=["Sorry, the output contains PII."]
),
),
gp.Ruleset(
rules=[
gp.Rule(
metric="fairfield_company_pii_scorer_0",
operator=gp.RuleOperator.gte,
target_value=0.1,
)
],
action=gp.OverrideAction(
choices=["Sorry, the output contains PII."]
),
)
],
stage_id=self.protect_stage_id,
)
response_dict = response.model_dump()
filtered_ruleset_results = []
metrics_to_exclude = set()
if not hallucination_detection:
metrics_to_exclude.add('context_adherence_luna')
if not pii_detection:
metrics_to_exclude.add('pii')
metrics_to_exclude.add('fairfield_company_pii_scorer_0')
for metric in metrics_to_exclude:
response_dict["metric_results"].pop(metric, None)
for ruleset in response_dict["ruleset_results"]:
metric = ruleset["rules"][0]["metric"]
if metric not in metrics_to_exclude:
filtered_ruleset_results.append(ruleset)
response_dict["ruleset_results"] = filtered_ruleset_results
if logger:
logger.add_protect_span(
payload=gp.Payload(input=input, output=output),
response=gp.Response.model_validate(response_dict),
)
return response_dict |