Spaces:
Sleeping
Sleeping
| import random | |
| import requests | |
| from base64 import b64decode | |
| from flask import Flask, request, jsonify, Response, stream_with_context, render_template_string | |
| from transformers import AutoTokenizer | |
| def calc_tokens(text): | |
| tokenizer = AutoTokenizer.from_pretrained("PJMixers/CohereForAI_c4ai-command-r-plus-tokenizer") | |
| tokens = tokenizer.tokenize(text) | |
| return len(tokens) | |
| def calc_messages_tokens(json_data): | |
| messages = json_data["messages"] | |
| m_messages = [] | |
| user_count = 0 | |
| prompt = "<BOS_TOKEN>" | |
| for message in messages: | |
| if message["role"] == "system": | |
| prompt += f"<|START_OF_TURN_TOKEN|><|SYSTEM_TOKEN|>{message['content']}<|END_OF_TURN_TOKEN|>" | |
| elif message["role"] == "user": | |
| user_count += 1 | |
| prompt += f"<|START_OF_TURN_TOKEN|><|USER_TOKEN|>{message['content']}<|END_OF_TURN_TOKEN|>" | |
| elif message["role"] == "assistant": | |
| prompt += f"<|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>{message['content']}<|END_OF_TURN_TOKEN|>" | |
| else: | |
| continue | |
| prompt += "<|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>" | |
| total_tokens = calc_tokens(prompt) + user_count + 1 | |
| return total_tokens + 10 # for robustness | |
| app = Flask(__name__) | |
| def index(): | |
| template = ''' | |
| <html> | |
| <head> | |
| <title>Command-R-Plus Chat API</title> | |
| </head> | |
| <body> | |
| <h1>Command-R-Plus OpenAI Compatible API</h1> | |
| <h1>You need to be a HF PRO user to use it.</h1> | |
| <li>1. Create your token(as api key) <a target="_blank" href="https://huggingface.co/settings/tokens/new">[here]</a> by selecting "serverless Inference API".</li> | |
| <li>2. Set `https://tastypear-command-r-plus-chat.hf.space/api" as the domain in the client configuration.</li> | |
| If you have multiple keys, you can concatenate them with a semicolon (`;`) to use them randomly, e.g., `hf_aaaa;hf_bbbb;hf_...` | |
| </body> | |
| </html> | |
| ''' | |
| return render_template_string(template) | |
| def get_new_bearer(key): | |
| data = "C1RvUWoZAjd+ZBUyIV1CXjB3ay1VCA98Im4rWH5gVlZbKS1aBjhYU2YjHyVFeDwvI3x9cy92Vw1bKS5VHFM5VU9QVmpiDxJ6EmNSP1EHOgV6dCEOKEdncCJ7YBZmKQlkF1AYSkBOc0hiNhFBBHRWUmNrDQBycjUIOF5/WD1LRyZ/BidjFmEuelxBU3B9IhVDAnV5TXQRMGxFUDkDDVRnWzNVYg9DAQJiIVEqfFtRcXd3Lgd5CFx/U3AMDA1jPA4APUtifgh7fid7BhxJE28bSnVtYmdVAQt/CkdJYl4NDCRZQiNsCktrcwh3RwBlHQJUFngmdU9/Xl5eKC54KWdZXlYbAClJZAAlESdcUjt2eA5GASpUAmgKUkJ2cGdyBDZQCkpxUVQXLB51fy83GFh4PgxXcilgHCBdHVsZWnJjb0JEMAhaGWpeen8GDDR9fCMtLGBbeDA7RQdtLxJJDksCYGJ4VWVvNiRZNX9Ab0MtDRJ6RTM2NEVaeyJ+XGtaAzphAFcHd09Vd2FEBStDBnZGXkgMBjdPRQARG2phfCJzfS9Kbw1LO0w4cE5VektlARFbGX1EZUwLISplZh8JGGRxVTZCTDdrLgE=" | |
| data = b64decode(data) | |
| key = (key * (len(data) // len(key) + 1))[:len(data)] | |
| data = (bytes([a ^ b for a, b in zip(data, key.encode())])).decode() | |
| return random.choice(data.split('\n')) | |
| def proxy(): | |
| headers = dict(request.headers) | |
| headers.pop('Host', None) | |
| headers.pop('Content-Length', None) | |
| bearer = request.headers['Authorization'].split(' ')[1] | |
| if(bearer.startswith('hf_')): | |
| # for public usage | |
| headers['Authorization'] = f"Bearer {random.choice(bearer.split(';'))}" | |
| else: | |
| # my private keys | |
| headers['Authorization'] = f'Bearer {get_new_bearer(bearer)}' | |
| headers['X-Use-Cache'] = 'false' | |
| json_data = request.get_json() | |
| # Use the largest ctx | |
| json_data['max_tokens'] = 32768 - calc_messages_tokens(json_data) | |
| json_data['json_mode'] = False | |
| model = 'CohereForAI/c4ai-command-r-plus' | |
| def generate(): | |
| with requests.post(f"https://api-inference.huggingface.co/models/{model}/v1/chat/completions", json=request.json, headers=headers, stream=True) as resp: | |
| for chunk in resp.iter_content(chunk_size=1024): | |
| if chunk: | |
| yield chunk | |
| return Response(stream_with_context(generate()), content_type='text/event-stream') | |
| #import gevent.pywsgi | |
| #from gevent import monkey;monkey.patch_all() | |
| if __name__ == "__main__": | |
| app.run(debug=True) | |
| # gevent.pywsgi.WSGIServer((args.host, args.port), app).serve_forever() |