| import os | |
| import hashlib | |
| import random | |
| import string | |
| import time | |
| import requests | |
| import json | |
| class ConanClient: | |
| def __init__(self, ak, sk, url, timeout=30): | |
| """ | |
| Initialize the Client | |
| Args: | |
| ak: Access Key | |
| sk: Secret Key | |
| url: Server URL | |
| timeout: Request timeout in seconds (default: 30) | |
| """ | |
| self.ak = ak | |
| self.sk = sk | |
| self.url = url | |
| self.timeout = timeout | |
| def __random_password(self, size=40, chars=None): | |
| if chars is None: | |
| chars = string.ascii_uppercase + string.ascii_lowercase + string.digits | |
| random_chars = random.SystemRandom().choice | |
| return "".join(random_chars(chars) for _ in range(size)) | |
| def __signature(self, random_str, time_stamp): | |
| params_str = "%s:%d:%s:%s" % (self.ak, time_stamp, random_str, self.sk) | |
| encoded_params_str = params_str.encode("utf-8") | |
| return hashlib.md5(encoded_params_str).hexdigest() | |
| def get_signature(self): | |
| timestamp = int(time.time()) | |
| random_str = self.__random_password(20) | |
| sig = self.__signature(random_str, timestamp) | |
| params = { | |
| "timestamp": timestamp, | |
| "random": random_str, | |
| "app_id": self.ak, | |
| "sign": sig, | |
| } | |
| return params | |
| def embed(self, text): | |
| """ | |
| Embed text using the server | |
| Args: | |
| text: The input text to embed | |
| Returns: | |
| requests.Response object | |
| """ | |
| params = self.get_signature() | |
| params["body"] = text | |
| params["content_id"] = f"test_{int(time.time())}" | |
| headers = {"Content-Type": "application/json"} | |
| rsp = requests.post(self.url, data=json.dumps(params), timeout=self.timeout, headers=headers) | |
| result = json.loads(rsp.text) | |
| return result |