| import uuid |
| import g4f |
| from g4f import ChatCompletion |
| import json |
|
|
| TEST_PROMPT = "Generate a sentence with 'freegpt'" |
| EXPECTED_RESPONSE_CONTAINS = "freegpt" |
|
|
|
|
| class Provider: |
| def __init__(self, name, models): |
| """ |
| Initialize the provider with its name and models. |
| """ |
| self.name = name |
| self.models = models if isinstance(models, list) else [models] |
|
|
| def __str__(self): |
| return self.name |
|
|
|
|
| class ModelProviderManager: |
| def __init__(self): |
| """ |
| Initialize the manager that manage the working (active) providers for each model. |
| """ |
| self._working_model_providers = {} |
|
|
| def add_provider(self, model, provider_name): |
| """ |
| Add a provider to the working provider list of the specified model. |
| """ |
| if model not in self._working_model_providers: |
| self._working_model_providers[model] = [] |
| self._working_model_providers[model].append(provider_name) |
|
|
| def get_working_providers(self): |
| """ |
| Return the currently active providers for each model. |
| """ |
| return self._working_model_providers |
|
|
|
|
| def _fetch_providers_having_models(): |
| """ |
| Get providers that have models from g4f.Providers. |
| """ |
| model_providers = [] |
|
|
| for provider_name in dir(g4f.Provider): |
| provider = getattr(g4f.Provider, provider_name) |
|
|
| if _is_provider_applicable(provider): |
| model_providers.append(Provider(provider_name, provider.model)) |
|
|
| return model_providers |
|
|
|
|
| def _is_provider_applicable(provider): |
| """ |
| Check if the provider has a model and doesn't require authentication. |
| """ |
| return (hasattr(provider, 'model') and |
| hasattr(provider, '_create_completion') and |
| hasattr(provider, 'needs_auth') and |
| not provider.needs_auth) |
|
|
|
|
| def _generate_test_messages(): |
| """ |
| Generate messages for testing. |
| """ |
| return [{"role": "system", "content": "You are a trained AI assistant."}, |
| {"role": "user", "content": TEST_PROMPT}] |
|
|
|
|
| def _manage_chat_completion(manager, model_providers, test_messages): |
| """ |
| Generate chat completion for each provider's models and handle positive and negative results. |
| """ |
| for provider in model_providers: |
| for model in provider.models: |
| try: |
| response = _generate_chat_response( |
| provider.name, model, test_messages) |
| if EXPECTED_RESPONSE_CONTAINS in response.lower(): |
| _print_success_response(provider, model) |
| manager.add_provider(model, provider.name) |
| else: |
| raise Exception(f"Unexpected response: {response}") |
| except Exception as error: |
| _print_error_response(provider, model, error) |
|
|
|
|
| def _generate_chat_response(provider_name, model, test_messages): |
| """ |
| Generate a chat response given a provider name, a model, and test messages. |
| """ |
| return ChatCompletion.create( |
| model=model, |
| messages=test_messages, |
| chatId=str(uuid.uuid4()), |
| provider=getattr(g4f.Provider, provider_name) |
| ) |
|
|
|
|
| def _print_success_response(provider, model): |
| print(f"\u2705 [{provider}] - [{model}]: Success") |
|
|
|
|
| def _print_error_response(provider, model, error): |
| print(f"⛔ [{provider}] - [{model}]: Error - {str(error)}") |
|
|
|
|
| def get_active_model_providers(): |
| """ |
| Get providers that are currently working (active). |
| """ |
| model_providers = _fetch_providers_having_models() |
| test_messages = _generate_test_messages() |
| manager = ModelProviderManager() |
|
|
| _manage_chat_completion(manager, model_providers, test_messages) |
|
|
| return manager.get_working_providers() |
|
|
|
|
| def store_providers_details(providers): |
| """ |
| Store the providers details including model info into a JSON file. |
| """ |
| data = [] |
| for provider in providers: |
| provider_data = { |
| 'name': str(provider), |
| 'models': provider.models |
| } |
| data.append(provider_data) |
|
|
| with open('providers_details.json', 'w') as file: |
| json.dump(data, file) |
|
|
|
|
| |
| |