import json from pydantic import BaseModel from dotenv import load_dotenv from galileo import GalileoLogger from galileo.log_streams import create_log_stream, get_log_stream from galileo.projects import create_project, get_project import galileo_protect as gp load_dotenv() class GalileoPlatformConfig(BaseModel): """Base configuration for Galileo platform.""" protect_project_name: str protect_stage_name: str class GalileoPlatform: """Implementation of Galileo Features""" def __init__(self, config: GalileoPlatformConfig): self.config = config self.protect_stage_id = self.get_protect_stage_id() def get_logger(self, project_name: str, logstream_name: str): """Get or create a Galileo Logger.""" if not get_project(name=project_name): _ = create_project(name=project_name) print(f"Project {project_name} created") if not get_log_stream(name=logstream_name, project_name=project_name): _ = create_log_stream(name=logstream_name, project_name=project_name) print(f"Logstream {logstream_name} created in project {project_name}") return GalileoLogger( project=project_name, log_stream=logstream_name, ) def get_protect_stage_id(self) -> str: """Get or create a Galileo Protect stage.""" try: protect_project = gp.get_project( project_name=self.config.protect_project_name ) print(f"Protect project {self.config.protect_project_name} found") except Exception as _: protect_project = gp.create_project(name=self.config.protect_project_name) print(f"Protect project {self.config.protect_project_name} created") protect_project_id = protect_project.id try: protect_stage = gp.get_stage( project_id=protect_project_id, stage_name=self.config.protect_stage_name ) print(f"Protect stage {self.config.protect_stage_name} found") except Exception as _: protect_stage = gp.create_stage( project_id=protect_project_id, name=self.config.protect_stage_name, ) return protect_stage.id def run_protect_v1( self, input: str, output: str, hallucination_detection: bool = False, pii_detection: bool = False, logger: GalileoLogger | None = None, ) -> dict | None: """Run Galileo Protect on input and output.""" # print(f"Running Galileo Protect with PII detection: {pii_detection} and Hallucination detection: {hallucination_detection}") rulesets = [] if not (hallucination_detection or pii_detection): return None if hallucination_detection: rulesets.append(gp.Ruleset( rules=[ gp.Rule( metric=gp.RuleMetrics.context_adherence_luna, operator=gp.RuleOperator.lte, target_value=0.8, ), ], action=gp.OverrideAction( choices=["I cannot provide a reliable answer to this question based on the available information! Please try again."] ), )) if pii_detection: rulesets.extend([ gp.Ruleset( rules=[ gp.Rule( metric=gp.RuleMetrics.pii, operator=gp.RuleOperator.any, target_value=["email", "phone_number", "name"], ), ], action=gp.OverrideAction( choices=["Sorry, the output contains PII."] ), ), gp.Ruleset( rules=[ gp.Rule( metric="fairfield_company_pii_scorer_0", operator=gp.RuleOperator.gte, target_value=0.1, ) ], action=gp.OverrideAction( choices=["Sorry, the output contains PII."] ), ) ]) response = gp.invoke( payload=gp.Payload(input=input, output=output), prioritized_rulesets=rulesets, stage_id=self.protect_stage_id, ) if logger: logger.add_protect_span( payload=gp.Payload(input=input, output=output), response=response, ) return dict(response) def run_protect_v2( self, input: str, output: str, pii_detection: bool = False, hallucination_detection: bool = False, logger: GalileoLogger | None= None, ) -> dict: """Run Galileo Protect on input and output.""" if not (pii_detection or hallucination_detection): return None # print(f"Running Galileo Protect v2 with PII detection: {pii_detection} and Hallucination detection: {hallucination_detection}") response = gp.invoke( payload=gp.Payload(input=input, output=output), prioritized_rulesets=[ gp.Ruleset( rules=[ gp.Rule( metric=gp.RuleMetrics.context_adherence_luna, operator=gp.RuleOperator.lte, target_value=0.8, ), ], action=gp.OverrideAction( choices=["I cannot provide a reliable answer to this question based on the available information! Please try again."] ), ), gp.Ruleset( rules=[ gp.Rule( metric=gp.RuleMetrics.pii, operator=gp.RuleOperator.any, target_value=["email", "phone_number", "name"], ) ], action=gp.OverrideAction( choices=["Sorry, the output contains PII."] ), ), gp.Ruleset( rules=[ gp.Rule( metric="fairfield_company_pii_scorer_0", operator=gp.RuleOperator.gte, target_value=0.1, ) ], action=gp.OverrideAction( choices=["Sorry, the output contains PII."] ), ) ], stage_id=self.protect_stage_id, ) response_dict = response.model_dump() filtered_ruleset_results = [] metrics_to_exclude = set() if not hallucination_detection: metrics_to_exclude.add('context_adherence_luna') if not pii_detection: metrics_to_exclude.add('pii') metrics_to_exclude.add('fairfield_company_pii_scorer_0') for metric in metrics_to_exclude: response_dict["metric_results"].pop(metric, None) for ruleset in response_dict["ruleset_results"]: metric = ruleset["rules"][0]["metric"] if metric not in metrics_to_exclude: filtered_ruleset_results.append(ruleset) response_dict["ruleset_results"] = filtered_ruleset_results if logger: logger.add_protect_span( payload=gp.Payload(input=input, output=output), response=gp.Response.model_validate(response_dict), ) return response_dict