| import sys |
| from pathlib import Path |
| from colorama import Fore |
|
|
| sys.path.append(str(Path(__file__).parent.parent)) |
|
|
| from g4f import BaseProvider, models, Provider |
| from timeit import default_timer as timer |
|
|
| logging = False |
|
|
| top_provider_candidate = None |
|
|
| def main(): |
| providers = get_providers() |
|
|
| failed_providers = [] |
| succeeded_providers = [] |
|
|
| for _provider in providers: |
| if _provider.needs_auth: |
| continue |
| print("Provider:", _provider.__name__) |
| result = test(_provider) |
| print("Result:", result) |
|
|
| |
| if _provider.working and result: |
| succeeded_providers.append(_provider) |
|
|
| |
| if _provider.working and not result: |
| failed_providers.append(_provider) |
|
|
| print() |
|
|
| |
| if failed_providers: |
| print(f"{Fore.RED}Failed providers:\n") |
| for _provider in failed_providers: |
| print(f"{Fore.RED}{_provider.__name__}") |
|
|
| print() |
|
|
|
|
| |
| if succeeded_providers: |
| print(f"{Fore.BLUE}Failed providers:\n") |
| for _provider in failed_providers: |
| print(f"{Fore.BLUE}{_provider.__name__}") |
|
|
| else: |
| print(f"{Fore.GREEN}All providers are working") |
|
|
| return succeeded_providers |
|
|
| |
| def get_providers() -> list[type[BaseProvider]]: |
| provider_names = dir(Provider) |
| ignore_names = [ |
| "base_provider", |
| "BaseProvider", |
| "ChatgptLogin" |
| ] |
| provider_names = [ |
| provider_name |
| for provider_name in provider_names |
| if not provider_name.startswith("__") and provider_name not in ignore_names |
| ] |
| return [getattr(Provider, provider_name) for provider_name in sorted(provider_names)] |
|
|
| |
| def test_provider_response_times(providers): |
| times = [] |
| for provider in providers: |
| start = timer() |
| response = create_response(provider) |
| end = timer() |
| times.append((end - start, provider)) |
| |
| times.sort() |
| return [time for time, provider in times] |
|
|
| |
| def sort_providers_by_response_time(providers): |
| times = test_provider_response_times(providers) |
| sorted_providers = [] |
| for time in times: |
| for provider in providers: |
| if time == test_provider_response_times(provider): |
| sorted_providers.append(provider) |
| break |
| return sorted_providers |
|
|
| |
| def create_response(_provider: type[BaseProvider]) -> str: |
| if _provider.supports_gpt_35_turbo: |
| model = models.gpt_35_turbo.name |
| elif _provider.supports_gpt_4: |
| model = models.gpt_4 |
| elif hasattr(_provider, "model"): |
| model = _provider.model |
| else: |
| model = None |
| response = _provider.create_completion( |
| model=model, |
| messages=[{"role": "user", "content": "Hello"}], |
| stream=False, |
| ) |
| return "".join(response) |
|
|
| |
| def test(_provider: type[BaseProvider]) -> bool: |
| try: |
| response = create_response(_provider) |
| assert type(response) is str |
| assert len(response) > 0 |
| return response |
| except Exception as e: |
| if logging: |
| print(e) |
| return False |
|
|
|
|
| if __name__ == "__main__": |
| succes = main() |
| |
| print(f"{Fore.GREEN}Success: {succes[0]}") |
| |
| |
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |