Spaces:
Runtime error
Runtime error
| # ---------------------------------------------------------------------------- | |
| # | |
| # Welcome to Baml! To use this generated code, please run the following: | |
| # | |
| # $ pip install baml | |
| # | |
| # ---------------------------------------------------------------------------- | |
| # This file was generated by BAML: please do not edit it. Instead, edit the | |
| # BAML files and re-generate this code using: baml-cli generate | |
| # baml-cli is available with the baml package. | |
| import os | |
| import typing | |
| import typing_extensions | |
| import baml_py | |
| from . import types, stream_types, type_builder | |
| from .globals import DO_NOT_USE_DIRECTLY_UNLESS_YOU_KNOW_WHAT_YOURE_DOING_RUNTIME as __runtime__, DO_NOT_USE_DIRECTLY_UNLESS_YOU_KNOW_WHAT_YOURE_DOING_CTX as __ctx__manager__ | |
| class BamlCallOptions(typing.TypedDict, total=False): | |
| tb: typing_extensions.NotRequired[type_builder.TypeBuilder] | |
| client_registry: typing_extensions.NotRequired[baml_py.baml_py.ClientRegistry] | |
| client: typing_extensions.NotRequired[str] | |
| env: typing_extensions.NotRequired[typing.Dict[str, typing.Optional[str]]] | |
| tags: typing_extensions.NotRequired[typing.Dict[str, str]] | |
| collector: typing_extensions.NotRequired[ | |
| typing.Union[baml_py.baml_py.Collector, typing.List[baml_py.baml_py.Collector]] | |
| ] | |
| abort_controller: typing_extensions.NotRequired[baml_py.baml_py.AbortController] | |
| on_tick: typing_extensions.NotRequired[typing.Callable[[str, baml_py.baml_py.FunctionLog], None]] | |
| watchers: typing_extensions.NotRequired[typing.Any] # EventCollector type, will be overridden in generated clients | |
| class _ResolvedBamlOptions: | |
| tb: typing.Optional[baml_py.baml_py.TypeBuilder] | |
| client_registry: typing.Optional[baml_py.baml_py.ClientRegistry] | |
| collectors: typing.List[baml_py.baml_py.Collector] | |
| env_vars: typing.Dict[str, str] | |
| tags: typing.Dict[str, str] | |
| abort_controller: typing.Optional[baml_py.baml_py.AbortController] | |
| on_tick: typing.Optional[typing.Callable[[], None]] | |
| watchers: typing.Optional[typing.Any] | |
| def __init__( | |
| self, | |
| tb: typing.Optional[baml_py.baml_py.TypeBuilder], | |
| client_registry: typing.Optional[baml_py.baml_py.ClientRegistry], | |
| collectors: typing.List[baml_py.baml_py.Collector], | |
| env_vars: typing.Dict[str, str], | |
| tags: typing.Dict[str, str], | |
| abort_controller: typing.Optional[baml_py.baml_py.AbortController], | |
| on_tick: typing.Optional[typing.Callable[[], None]], | |
| watchers: typing.Optional[typing.Any], | |
| ): | |
| self.tb = tb | |
| self.client_registry = client_registry | |
| self.collectors = collectors | |
| self.env_vars = env_vars | |
| self.tags = tags | |
| self.abort_controller = abort_controller | |
| self.on_tick = on_tick | |
| self.watchers = watchers | |
| class DoNotUseDirectlyCallManager: | |
| def __init__(self, baml_options: BamlCallOptions): | |
| self.__baml_options = baml_options | |
| def __getstate__(self): | |
| # Return state needed for pickling | |
| return {"baml_options": self.__baml_options} | |
| def __setstate__(self, state): | |
| # Restore state from pickling | |
| self.__baml_options = state["baml_options"] | |
| def __resolve(self) -> _ResolvedBamlOptions: | |
| tb = self.__baml_options.get("tb") | |
| if tb is not None: | |
| baml_tb = tb._tb # type: ignore (we know how to use this private attribute) | |
| else: | |
| baml_tb = None | |
| client_registry = self.__baml_options.get("client_registry") | |
| client = self.__baml_options.get("client") | |
| # If client is provided, it takes precedence (creates/overrides client_registry primary) | |
| if client is not None: | |
| if client_registry is None: | |
| client_registry = baml_py.baml_py.ClientRegistry() | |
| client_registry.set_primary(client) | |
| collector = self.__baml_options.get("collector") | |
| collectors_as_list = ( | |
| collector | |
| if isinstance(collector, list) | |
| else [collector] if collector is not None else [] | |
| ) | |
| env_vars = os.environ.copy() | |
| for k, v in self.__baml_options.get("env", {}).items(): | |
| if v is not None: | |
| env_vars[k] = v | |
| else: | |
| env_vars.pop(k, None) | |
| tags = self.__baml_options.get("tags", {}) or {} | |
| abort_controller = self.__baml_options.get("abort_controller") | |
| on_tick = self.__baml_options.get("on_tick") | |
| if on_tick is not None: | |
| collector = baml_py.baml_py.Collector("on-tick-collector") | |
| collectors_as_list.append(collector) | |
| def on_tick_wrapper(): | |
| log = collector.last | |
| if log is not None: | |
| on_tick("Unknown", log) | |
| else: | |
| on_tick_wrapper = None | |
| watchers = self.__baml_options.get("watchers") | |
| return _ResolvedBamlOptions( | |
| baml_tb, | |
| client_registry, | |
| collectors_as_list, | |
| env_vars, | |
| tags, | |
| abort_controller, | |
| on_tick_wrapper, | |
| watchers, | |
| ) | |
| def merge_options(self, options: BamlCallOptions) -> "DoNotUseDirectlyCallManager": | |
| return DoNotUseDirectlyCallManager({**self.__baml_options, **options}) | |
| async def call_function_async( | |
| self, *, function_name: str, args: typing.Dict[str, typing.Any] | |
| ) -> baml_py.baml_py.FunctionResult: | |
| resolved_options = self.__resolve() | |
| # Check if already aborted | |
| if resolved_options.abort_controller is not None and resolved_options.abort_controller.aborted: | |
| raise baml_py.baml_py.BamlAbortError("Operation was aborted") | |
| return await __runtime__.call_function( | |
| function_name, | |
| args, | |
| # ctx | |
| __ctx__manager__.clone_context(), | |
| # tb | |
| resolved_options.tb, | |
| # cr | |
| resolved_options.client_registry, | |
| # collectors | |
| resolved_options.collectors, | |
| # env_vars | |
| resolved_options.env_vars, | |
| # tags | |
| resolved_options.tags, | |
| # abort_controller | |
| resolved_options.abort_controller, | |
| # watchers | |
| resolved_options.watchers, | |
| ) | |
| def call_function_sync( | |
| self, *, function_name: str, args: typing.Dict[str, typing.Any] | |
| ) -> baml_py.baml_py.FunctionResult: | |
| resolved_options = self.__resolve() | |
| # Check if already aborted | |
| if resolved_options.abort_controller is not None and resolved_options.abort_controller.aborted: | |
| raise baml_py.baml_py.BamlAbortError("Operation was aborted") | |
| ctx = __ctx__manager__.get() | |
| return __runtime__.call_function_sync( | |
| function_name, | |
| args, | |
| # ctx | |
| ctx, | |
| # tb | |
| resolved_options.tb, | |
| # cr | |
| resolved_options.client_registry, | |
| # collectors | |
| resolved_options.collectors, | |
| # env_vars | |
| resolved_options.env_vars, | |
| # tags | |
| resolved_options.tags, | |
| # abort_controller | |
| resolved_options.abort_controller, | |
| # watchers | |
| resolved_options.watchers, | |
| ) | |
| def create_async_stream( | |
| self, | |
| *, | |
| function_name: str, | |
| args: typing.Dict[str, typing.Any], | |
| ) -> typing.Tuple[baml_py.baml_py.RuntimeContextManager, baml_py.baml_py.FunctionResultStream]: | |
| resolved_options = self.__resolve() | |
| ctx = __ctx__manager__.clone_context() | |
| result = __runtime__.stream_function( | |
| function_name, | |
| args, | |
| # this is always None, we set this later! | |
| # on_event | |
| None, | |
| # ctx | |
| ctx, | |
| # tb | |
| resolved_options.tb, | |
| # cr | |
| resolved_options.client_registry, | |
| # collectors | |
| resolved_options.collectors, | |
| # env_vars | |
| resolved_options.env_vars, | |
| # tags | |
| resolved_options.tags, | |
| # on_tick | |
| resolved_options.on_tick, | |
| # abort_controller | |
| resolved_options.abort_controller, | |
| ) | |
| return ctx, result | |
| def create_sync_stream( | |
| self, | |
| *, | |
| function_name: str, | |
| args: typing.Dict[str, typing.Any], | |
| ) -> typing.Tuple[baml_py.baml_py.RuntimeContextManager, baml_py.baml_py.SyncFunctionResultStream]: | |
| resolved_options = self.__resolve() | |
| if resolved_options.on_tick is not None: | |
| raise ValueError("on_tick is not supported for sync streams. Please use async streams instead.") | |
| ctx = __ctx__manager__.get() | |
| result = __runtime__.stream_function_sync( | |
| function_name, | |
| args, | |
| # this is always None, we set this later! | |
| # on_event | |
| None, | |
| # ctx | |
| ctx, | |
| # tb | |
| resolved_options.tb, | |
| # cr | |
| resolved_options.client_registry, | |
| # collectors | |
| resolved_options.collectors, | |
| # env_vars | |
| resolved_options.env_vars, | |
| # tags | |
| resolved_options.tags, | |
| # on_tick | |
| # always None! sync streams don't support on_tick | |
| None, | |
| # abort_controller | |
| resolved_options.abort_controller, | |
| ) | |
| return ctx, result | |
| async def create_http_request_async( | |
| self, | |
| *, | |
| function_name: str, | |
| args: typing.Dict[str, typing.Any], | |
| mode: typing_extensions.Literal["stream", "request"], | |
| ) -> baml_py.baml_py.HTTPRequest: | |
| resolved_options = self.__resolve() | |
| return await __runtime__.build_request( | |
| function_name, | |
| args, | |
| # ctx | |
| __ctx__manager__.clone_context(), | |
| # tb | |
| resolved_options.tb, | |
| # cr | |
| resolved_options.client_registry, | |
| # env_vars | |
| resolved_options.env_vars, | |
| # is_stream | |
| mode == "stream", | |
| ) | |
| def create_http_request_sync( | |
| self, | |
| *, | |
| function_name: str, | |
| args: typing.Dict[str, typing.Any], | |
| mode: typing_extensions.Literal["stream", "request"], | |
| ) -> baml_py.baml_py.HTTPRequest: | |
| resolved_options = self.__resolve() | |
| return __runtime__.build_request_sync( | |
| function_name, | |
| args, | |
| # ctx | |
| __ctx__manager__.get(), | |
| # tb | |
| resolved_options.tb, | |
| # cr | |
| resolved_options.client_registry, | |
| # env_vars | |
| resolved_options.env_vars, | |
| # is_stream | |
| mode == "stream", | |
| ) | |
| def parse_response(self, *, function_name: str, llm_response: str, mode: typing_extensions.Literal["stream", "request"]) -> typing.Any: | |
| resolved_options = self.__resolve() | |
| return __runtime__.parse_llm_response( | |
| function_name, | |
| llm_response, | |
| # enum_module | |
| types, | |
| # cls_module | |
| types, | |
| # partial_cls_module | |
| stream_types, | |
| # allow_partials | |
| mode == "stream", | |
| # ctx | |
| __ctx__manager__.get(), | |
| # tb | |
| resolved_options.tb, | |
| # cr | |
| resolved_options.client_registry, | |
| # env_vars | |
| resolved_options.env_vars, | |
| ) | |
| def disassemble(function: typing.Callable) -> None: | |
| import inspect | |
| from . import b | |
| if not callable(function): | |
| print(f"disassemble: object {function} is not a Baml function") | |
| return | |
| is_client_method = False | |
| for (method_name, _) in inspect.getmembers(b, predicate=inspect.ismethod): | |
| if method_name == function.__name__: | |
| is_client_method = True | |
| break | |
| if not is_client_method: | |
| print(f"disassemble: function {function.__name__} is not a Baml function") | |
| return | |
| print(f"----- function {function.__name__} -----") | |
| __runtime__.disassemble(function.__name__) |