| from __future__ import annotations |
|
|
| import json |
| import time |
| import base64 |
| from curl_cffi.requests import AsyncSession |
|
|
| from .base_provider import AsyncProvider, format_prompt |
|
|
|
|
| class PerplexityAi(AsyncProvider): |
| url = "https://www.perplexity.ai" |
| working = True |
| supports_gpt_35_turbo = True |
| _sources = [] |
|
|
| @classmethod |
| async def create_async( |
| cls, |
| model: str, |
| messages: list[dict[str, str]], |
| proxy: str = None, |
| **kwargs |
| ) -> str: |
| url = cls.url + "/socket.io/?EIO=4&transport=polling" |
| async with AsyncSession(proxies={"https": proxy}, impersonate="chrome107") as session: |
| url_session = "https://www.perplexity.ai/api/auth/session" |
| response = await session.get(url_session) |
|
|
| response = await session.get(url, params={"t": timestamp()}) |
| response.raise_for_status() |
| sid = json.loads(response.text[1:])["sid"] |
|
|
| data = '40{"jwt":"anonymous-ask-user"}' |
| response = await session.post(url, params={"t": timestamp(), "sid": sid}, data=data) |
| response.raise_for_status() |
|
|
| data = "424" + json.dumps([ |
| "perplexity_ask", |
| format_prompt(messages), |
| { |
| "version":"2.1", |
| "source":"default", |
| "language":"en", |
| "timezone": time.tzname[0], |
| "search_focus":"internet", |
| "mode":"concise" |
| } |
| ]) |
| response = await session.post(url, params={"t": timestamp(), "sid": sid}, data=data) |
| response.raise_for_status() |
|
|
| while True: |
| response = await session.get(url, params={"t": timestamp(), "sid": sid}) |
| response.raise_for_status() |
| for line in response.text.splitlines(): |
| if line.startswith("434"): |
| result = json.loads(json.loads(line[3:])[0]["text"]) |
|
|
| cls._sources = [{ |
| "name": source["name"], |
| "url": source["url"], |
| "snippet": source["snippet"] |
| } for source in result["web_results"]] |
|
|
| return result["answer"] |
|
|
| @classmethod |
| def get_sources(cls): |
| return cls._sources |
|
|
|
|
| @classmethod |
| @property |
| def params(cls): |
| params = [ |
| ("model", "str"), |
| ("messages", "list[dict[str, str]]"), |
| ("stream", "bool"), |
| ("proxy", "str"), |
| ] |
| param = ", ".join([": ".join(p) for p in params]) |
| return f"g4f.provider.{cls.__name__} supports: ({param})" |
|
|
|
|
| def timestamp() -> str: |
| return base64.urlsafe_b64encode(int(time.time()-1407782612).to_bytes(4, 'big')).decode() |