from flask import ( Flask, Response, request, jsonify, redirect, url_for, render_template, stream_with_context, ) from werkzeug import exceptions import os import logging exceptions.BadRequestKeyError.show_exception = True from werkzeug.middleware.proxy_fix import ProxyFix app = Flask(__name__, static_folder=None) app.json.sort_keys = False app.wsgi_app = ProxyFix(app.wsgi_app) from providers.agicto import ( chat_completion, chat_completion_stream, increase_api_keys, API_KEYS, REVOKE_KEYS, ) # from providers.devcto import ( # chat_completion_stream, # increase_api_keys, # API_KEYS, # REVOKE_KEYS, # ) @app.route("/") def root(): return redirect(url_for("base_url")) import humanize import time UPTIME = time.time() WAITING = UPTIME LAST_UNAVAILABLE = None PROMPTERS = 0 TOKENS = {"input": 0, "output": 0} RIDDLE_ID = os.environ.get("RIDDLE_ID", "-") RIDDLE_EN = os.environ.get("RIDDLE_EN", "-") RIDDLE_EXPIRE = os.environ.get("RIDDLE_EXPIRE", "?") SONG = { "title": os.environ.get("SONG_TITLE", "-"), "url": os.environ.get("SONG_URL", None), } @app.route("/v1") def base_url(): wait = max(0, WAITING - time.time()) info = { "base_url": url_for("base_url", _external=True), "api_key": "(riddle_word)", "endpoints": [str(_) for _ in app.url_map.iter_rules()], "usages": { "tokens": { "input": f"{humanize.intword(TOKENS['input'])} ($0.015 / 1k)", "output": f"{humanize.intword(TOKENS['output'])} ($0.075 / 1k)", "cost": f"${((TOKENS['input'] * 0.015) + (TOKENS['output'] * 0.075)) / 1000:,.2f}", }, "keys": f"{len(API_KEYS)} stored | {len(REVOKE_KEYS)} revoked", }, "details": { "uptime": humanize.naturaldelta(time.time() - UPTIME), "prompters": max(0, PROMPTERS), "waiting": humanize.naturaldelta(wait) if wait else None, "last_pantun": ( humanize.naturaltime(time.time() - LAST_UNAVAILABLE) if LAST_UNAVAILABLE else None ), }, } if ( request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html ): return Response(json.dumps(info, indent=2), mimetype="application/json") return render_template( "index.html", prompt=json.dumps(info, indent=2), riddle=[RIDDLE_ID, RIDDLE_EN], riddle_expire=RIDDLE_EXPIRE, song=SONG, ) @app.teardown_request def teardown_request(_): # only from messages if request.endpoint == "messages": global PROMPTERS, WAITING if PROMPTERS > 0: PROMPTERS -= 1 @app.route("/v1/models") def models(): return jsonify( { "objects": "list", "data": [ { "id": "claude-3-opus-20240229", "object": "chat.completion", "created": None, "owned_by": "anthropic", } ], } ) API_KEY = [api_key.strip() for api_key in os.environ.get("API_KEY", "").split(",")] from marshmallow import schema, fields, validate, validates, ValidationError, EXCLUDE import random import string import json import tiktoken def count_tokens(input, output): cl100k = tiktoken.get_encoding("cl100k_base") input, output = len(cl100k.encode(input)), len(cl100k.encode(output)) global TOKENS TOKENS["input"] += input TOKENS["output"] += output return {"input_tokens": input, "output_tokens": output} class MessageSchema(schema.Schema): model = fields.Str() messages = fields.List( fields.Nested( { "role": fields.Str( required=True, validate=validate.OneOf(["user", "assistant"]), ), "content": fields.Raw(required=True), } ), required=True, ) max_tokens = fields.Int(validate=validate.Range(min=1, max=4096)) system = fields.Str(allow_none=True) stream = fields.Bool(allow_none=True) temperature = fields.Float( validate=validate.Range(min=0.0, max=1.0), allow_none=True ) # top_k = fields.Float(allow_nan=True) top_p = fields.Float(allow_nan=True) @validates("model") def validate_model(self, value): if value not in ["claude-3-opus-20240229"]: raise exceptions.BadRequest("Model must be claude-3-opus-20240229.") class Meta: unknown = EXCLUDE @app.route("/v1/messages", methods=["POST"]) def messages(): if request.headers.get("x-api-key") not in API_KEY: raise exceptions.Unauthorized( "Invalid api key, open and check base_url (%s) for more info." % url_for("base_url", _external=True) ) if WAITING > time.time(): raise exceptions.TooManyRequests( "Waiting for %s, and try again." % humanize.naturaldelta(WAITING - time.time()) ) try: data: dict = MessageSchema().load(request.json) except ValidationError as err: raise exceptions.BadRequestKeyError(err.messages) if data.get("system"): data["messages"].insert(0, {"role": "system", "content": data.pop("system")}) head = { "id": "msg_" + "".join(random.choices(string.hexdigits, k=15)), "model": data["model"], "type": "message", "role": "assistant", "stop_sequence": None, } # increase prompter global PROMPTERS PROMPTERS += 1 input = "".join([m["content"] for m in data["messages"]]) if data.get("stream"): next(completion := chat_completion_stream(**data)) def chunk(data): return "data: " + json.dumps(data, separators=(",", ":")) + "\n\n" def generate(): yield bytes("event: message_start\n", "utf-8") yield bytes( chunk( { "type": "message_start", "message": { "content": [], **head, "stop_reason": None, "usage": { "input_tokens": None, "output_tokens": None, }, }, } ), "utf-8", ) yield bytes("event: content_block_start\n", "utf-8") yield bytes( chunk( { "type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}, } ), "utf-8", ) output = "" for _ in completion: yield bytes("event: content_block_delta\n", "utf-8") yield bytes( chunk( { "type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": _}, } ), "utf-8", ) output += _ yield bytes("event: content_block_stop\n", "utf-8") yield bytes( chunk({"type": "content_block_stop", "index": 0}), "utf-8", ) yield bytes("event: message_delta\n", "utf-8") yield bytes( chunk( { "type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence": None}, "usage": count_tokens(input, output), } ), "utf-8", ) yield bytes("event: message_stop\n", "utf-8") yield bytes(chunk({"type": "message_stop"}), "utf-8") return Response(stream_with_context(generate()), mimetype="text/event-stream") output = chat_completion(**data) return jsonify( { "content": [ { "text": output, "type": "text", } ], **head, "stop_reason": "end_turn", "usage": count_tokens(input, output), } ) import re import traceback @app.errorhandler(Exception) def handle_exception(_): traceback.print_exc() return handle_exception(exceptions.InternalServerError()) UNAVAILABLE_MESSAGES = [ "Burung kenari terbang tinggi, Di taman bunga menari-nari. Kamu refresh gak ada henti, Tapi server-nya malah pergi!", "Ke laut mancing ikan kerapu, Dapat satu langsung dilempar. Mau akses disuruh nunggu, Muncul service unavailable bikin kesal!", "Beli donat rasa durian, Dimakan ramai-ramai di taman. Service unavailable datangnya seharian, Bikin hati jadi gak karuan!", "Ke pasar beli terasi, Naik motor sambil bernyanyi. Server sibuk, coba lagi, Service unavailable nih hari!", "Pagi-pagi minum teh manis, Disruput sambil lihat layar. Server down, jadi nangis, Error lagi, kapan lancar?", "Pergi ke taman sambil joget, Liat kupu-kupu terbang santai. Mau refresh sampe capek, Servernya malah bilang bye-bye!", "Ke pasar beli jeruk Bali, Eh ketemu sama teman lama. Klik refresh terus berkali-kali, Tapi servernya malah drama!", ] @app.errorhandler(exceptions.ServiceUnavailable) def handle_unavailable(e: exceptions.ServiceUnavailable): global LAST_UNAVAILABLE, WAITING LAST_UNAVAILABLE = time.time() WAITING = time.time() + random.randint(3, 10) return handle_exception( exceptions.ServiceUnavailable(random.choice(UNAVAILABLE_MESSAGES)) ) @app.errorhandler(exceptions.HTTPException) def handle_exception(e: exceptions.HTTPException): error = { "error": { "type": re.sub(r"([a-z])([A-Z])", r"\1_\2", e.__class__.__name__).lower(), "message": e.description, "code": e.code, } } return jsonify(error), e.code if not app.debug: from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() scheduler.add_job(increase_api_keys, "interval", minutes=1) scheduler.start() import atexit atexit.register(lambda: scheduler.shutdown()) else: logging.basicConfig(level=logging.DEBUG) # testing increase_api_keys(per_thread=1)