Spaces:
Runtime error
Runtime error
| import json | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| from galileo import GalileoLogger | |
| from galileo.log_streams import create_log_stream, get_log_stream | |
| from galileo.projects import create_project, get_project | |
| import galileo_protect as gp | |
| load_dotenv() | |
| class GalileoPlatformConfig(BaseModel): | |
| """Base configuration for Galileo platform.""" | |
| protect_project_name: str | |
| protect_stage_name: str | |
| class GalileoPlatform: | |
| """Implementation of Galileo Features""" | |
| def __init__(self, config: GalileoPlatformConfig): | |
| self.config = config | |
| self.protect_stage_id = self.get_protect_stage_id() | |
| def get_logger(self, project_name: str, logstream_name: str): | |
| """Get or create a Galileo Logger.""" | |
| if not get_project(name=project_name): | |
| _ = create_project(name=project_name) | |
| print(f"Project {project_name} created") | |
| if not get_log_stream(name=logstream_name, project_name=project_name): | |
| _ = create_log_stream(name=logstream_name, project_name=project_name) | |
| print(f"Logstream {logstream_name} created in project {project_name}") | |
| return GalileoLogger( | |
| project=project_name, | |
| log_stream=logstream_name, | |
| ) | |
| def get_protect_stage_id(self) -> str: | |
| """Get or create a Galileo Protect stage.""" | |
| try: | |
| protect_project = gp.get_project( | |
| project_name=self.config.protect_project_name | |
| ) | |
| print(f"Protect project {self.config.protect_project_name} found") | |
| except Exception as _: | |
| protect_project = gp.create_project(name=self.config.protect_project_name) | |
| print(f"Protect project {self.config.protect_project_name} created") | |
| protect_project_id = protect_project.id | |
| try: | |
| protect_stage = gp.get_stage( | |
| project_id=protect_project_id, stage_name=self.config.protect_stage_name | |
| ) | |
| print(f"Protect stage {self.config.protect_stage_name} found") | |
| except Exception as _: | |
| protect_stage = gp.create_stage( | |
| project_id=protect_project_id, | |
| name=self.config.protect_stage_name, | |
| ) | |
| return protect_stage.id | |
| def run_protect_v1( | |
| self, | |
| input: str, | |
| output: str, | |
| hallucination_detection: bool = False, | |
| pii_detection: bool = False, | |
| logger: GalileoLogger | None = None, | |
| ) -> dict | None: | |
| """Run Galileo Protect on input and output.""" | |
| # print(f"Running Galileo Protect with PII detection: {pii_detection} and Hallucination detection: {hallucination_detection}") | |
| rulesets = [] | |
| if not (hallucination_detection or pii_detection): | |
| return None | |
| if hallucination_detection: | |
| rulesets.append(gp.Ruleset( | |
| rules=[ | |
| gp.Rule( | |
| metric=gp.RuleMetrics.context_adherence_luna, | |
| operator=gp.RuleOperator.lte, | |
| target_value=0.8, | |
| ), | |
| ], | |
| action=gp.OverrideAction( | |
| choices=["I cannot provide a reliable answer to this question based on the available information! Please try again."] | |
| ), | |
| )) | |
| if pii_detection: | |
| rulesets.extend([ | |
| gp.Ruleset( | |
| rules=[ | |
| gp.Rule( | |
| metric=gp.RuleMetrics.pii, | |
| operator=gp.RuleOperator.any, | |
| target_value=["email", "phone_number", "name"], | |
| ), | |
| ], | |
| action=gp.OverrideAction( | |
| choices=["Sorry, the output contains PII."] | |
| ), | |
| ), | |
| gp.Ruleset( | |
| rules=[ | |
| gp.Rule( | |
| metric="fairfield_company_pii_scorer_0", | |
| operator=gp.RuleOperator.gte, | |
| target_value=0.1, | |
| ) | |
| ], | |
| action=gp.OverrideAction( | |
| choices=["Sorry, the output contains PII."] | |
| ), | |
| ) | |
| ]) | |
| response = gp.invoke( | |
| payload=gp.Payload(input=input, output=output), | |
| prioritized_rulesets=rulesets, | |
| stage_id=self.protect_stage_id, | |
| ) | |
| if logger: | |
| logger.add_protect_span( | |
| payload=gp.Payload(input=input, output=output), | |
| response=response, | |
| ) | |
| return dict(response) | |
| def run_protect_v2( | |
| self, | |
| input: str, | |
| output: str, | |
| pii_detection: bool = False, | |
| hallucination_detection: bool = False, | |
| logger: GalileoLogger | None= None, | |
| ) -> dict: | |
| """Run Galileo Protect on input and output.""" | |
| if not (pii_detection or hallucination_detection): | |
| return None | |
| # print(f"Running Galileo Protect v2 with PII detection: {pii_detection} and Hallucination detection: {hallucination_detection}") | |
| response = gp.invoke( | |
| payload=gp.Payload(input=input, output=output), | |
| prioritized_rulesets=[ | |
| gp.Ruleset( | |
| rules=[ | |
| gp.Rule( | |
| metric=gp.RuleMetrics.context_adherence_luna, | |
| operator=gp.RuleOperator.lte, | |
| target_value=0.8, | |
| ), | |
| ], | |
| action=gp.OverrideAction( | |
| choices=["I cannot provide a reliable answer to this question based on the available information! Please try again."] | |
| ), | |
| ), | |
| gp.Ruleset( | |
| rules=[ | |
| gp.Rule( | |
| metric=gp.RuleMetrics.pii, | |
| operator=gp.RuleOperator.any, | |
| target_value=["email", "phone_number", "name"], | |
| ) | |
| ], | |
| action=gp.OverrideAction( | |
| choices=["Sorry, the output contains PII."] | |
| ), | |
| ), | |
| gp.Ruleset( | |
| rules=[ | |
| gp.Rule( | |
| metric="fairfield_company_pii_scorer_0", | |
| operator=gp.RuleOperator.gte, | |
| target_value=0.1, | |
| ) | |
| ], | |
| action=gp.OverrideAction( | |
| choices=["Sorry, the output contains PII."] | |
| ), | |
| ) | |
| ], | |
| stage_id=self.protect_stage_id, | |
| ) | |
| response_dict = response.model_dump() | |
| filtered_ruleset_results = [] | |
| metrics_to_exclude = set() | |
| if not hallucination_detection: | |
| metrics_to_exclude.add('context_adherence_luna') | |
| if not pii_detection: | |
| metrics_to_exclude.add('pii') | |
| metrics_to_exclude.add('fairfield_company_pii_scorer_0') | |
| for metric in metrics_to_exclude: | |
| response_dict["metric_results"].pop(metric, None) | |
| for ruleset in response_dict["ruleset_results"]: | |
| metric = ruleset["rules"][0]["metric"] | |
| if metric not in metrics_to_exclude: | |
| filtered_ruleset_results.append(ruleset) | |
| response_dict["ruleset_results"] = filtered_ruleset_results | |
| if logger: | |
| logger.add_protect_span( | |
| payload=gp.Payload(input=input, output=output), | |
| response=gp.Response.model_validate(response_dict), | |
| ) | |
| return response_dict |