Dumb / app.py
RendiXD's picture
Upload 3 files
5467a14 verified
from flask import (
Flask,
Response,
request,
jsonify,
redirect,
url_for,
render_template,
stream_with_context,
)
from werkzeug import exceptions
import os
import logging
exceptions.BadRequestKeyError.show_exception = True
from werkzeug.middleware.proxy_fix import ProxyFix
app = Flask(__name__, static_folder=None)
app.json.sort_keys = False
app.wsgi_app = ProxyFix(app.wsgi_app)
from providers.agicto import (
chat_completion,
chat_completion_stream,
increase_api_keys,
API_KEYS,
REVOKE_KEYS,
)
# from providers.devcto import (
# chat_completion_stream,
# increase_api_keys,
# API_KEYS,
# REVOKE_KEYS,
# )
@app.route("/")
def root():
return redirect(url_for("base_url"))
import humanize
import time
UPTIME = time.time()
WAITING = UPTIME
LAST_UNAVAILABLE = None
PROMPTERS = 0
TOKENS = {"input": 0, "output": 0}
RIDDLE_ID = os.environ.get("RIDDLE_ID", "-")
RIDDLE_EN = os.environ.get("RIDDLE_EN", "-")
RIDDLE_EXPIRE = os.environ.get("RIDDLE_EXPIRE", "?")
SONG = {
"title": os.environ.get("SONG_TITLE", "-"),
"url": os.environ.get("SONG_URL", None),
}
@app.route("/v1")
def base_url():
wait = max(0, WAITING - time.time())
info = {
"base_url": url_for("base_url", _external=True),
"api_key": "(riddle_word)",
"endpoints": [str(_) for _ in app.url_map.iter_rules()],
"usages": {
"tokens": {
"input": f"{humanize.intword(TOKENS['input'])} ($0.015 / 1k)",
"output": f"{humanize.intword(TOKENS['output'])} ($0.075 / 1k)",
"cost": f"${((TOKENS['input'] * 0.015) + (TOKENS['output'] * 0.075)) / 1000:,.2f}",
},
"keys": f"{len(API_KEYS)} stored | {len(REVOKE_KEYS)} revoked",
},
"details": {
"uptime": humanize.naturaldelta(time.time() - UPTIME),
"prompters": max(0, PROMPTERS),
"waiting": humanize.naturaldelta(wait) if wait else None,
"last_pantun": (
humanize.naturaltime(time.time() - LAST_UNAVAILABLE)
if LAST_UNAVAILABLE
else None
),
},
}
if (
request.accept_mimetypes.accept_json
and not request.accept_mimetypes.accept_html
):
return Response(json.dumps(info, indent=2), mimetype="application/json")
return render_template(
"index.html",
prompt=json.dumps(info, indent=2),
riddle=[RIDDLE_ID, RIDDLE_EN],
riddle_expire=RIDDLE_EXPIRE,
song=SONG,
)
@app.teardown_request
def teardown_request(_):
# only from messages
if request.endpoint == "messages":
global PROMPTERS, WAITING
if PROMPTERS > 0:
PROMPTERS -= 1
@app.route("/v1/models")
def models():
return jsonify(
{
"objects": "list",
"data": [
{
"id": "claude-3-opus-20240229",
"object": "chat.completion",
"created": None,
"owned_by": "anthropic",
}
],
}
)
API_KEY = [api_key.strip() for api_key in os.environ.get("API_KEY", "").split(",")]
from marshmallow import schema, fields, validate, validates, ValidationError, EXCLUDE
import random
import string
import json
import tiktoken
def count_tokens(input, output):
cl100k = tiktoken.get_encoding("cl100k_base")
input, output = len(cl100k.encode(input)), len(cl100k.encode(output))
global TOKENS
TOKENS["input"] += input
TOKENS["output"] += output
return {"input_tokens": input, "output_tokens": output}
class MessageSchema(schema.Schema):
model = fields.Str()
messages = fields.List(
fields.Nested(
{
"role": fields.Str(
required=True,
validate=validate.OneOf(["user", "assistant"]),
),
"content": fields.Raw(required=True),
}
),
required=True,
)
max_tokens = fields.Int(validate=validate.Range(min=1, max=4096))
system = fields.Str(allow_none=True)
stream = fields.Bool(allow_none=True)
temperature = fields.Float(
validate=validate.Range(min=0.0, max=1.0), allow_none=True
)
# top_k = fields.Float(allow_nan=True)
top_p = fields.Float(allow_nan=True)
@validates("model")
def validate_model(self, value):
if value not in ["claude-3-opus-20240229"]:
raise exceptions.BadRequest("Model must be claude-3-opus-20240229.")
class Meta:
unknown = EXCLUDE
@app.route("/v1/messages", methods=["POST"])
def messages():
if request.headers.get("x-api-key") not in API_KEY:
raise exceptions.Unauthorized(
"Invalid api key, open and check base_url (%s) for more info."
% url_for("base_url", _external=True)
)
if WAITING > time.time():
raise exceptions.TooManyRequests(
"Waiting for %s, and try again."
% humanize.naturaldelta(WAITING - time.time())
)
try:
data: dict = MessageSchema().load(request.json)
except ValidationError as err:
raise exceptions.BadRequestKeyError(err.messages)
if data.get("system"):
data["messages"].insert(0, {"role": "system", "content": data.pop("system")})
head = {
"id": "msg_" + "".join(random.choices(string.hexdigits, k=15)),
"model": data["model"],
"type": "message",
"role": "assistant",
"stop_sequence": None,
}
# increase prompter
global PROMPTERS
PROMPTERS += 1
input = "".join([m["content"] for m in data["messages"]])
if data.get("stream"):
next(completion := chat_completion_stream(**data))
def chunk(data):
return "data: " + json.dumps(data, separators=(",", ":")) + "\n\n"
def generate():
yield bytes("event: message_start\n", "utf-8")
yield bytes(
chunk(
{
"type": "message_start",
"message": {
"content": [],
**head,
"stop_reason": None,
"usage": {
"input_tokens": None,
"output_tokens": None,
},
},
}
),
"utf-8",
)
yield bytes("event: content_block_start\n", "utf-8")
yield bytes(
chunk(
{
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""},
}
),
"utf-8",
)
output = ""
for _ in completion:
yield bytes("event: content_block_delta\n", "utf-8")
yield bytes(
chunk(
{
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": _},
}
),
"utf-8",
)
output += _
yield bytes("event: content_block_stop\n", "utf-8")
yield bytes(
chunk({"type": "content_block_stop", "index": 0}),
"utf-8",
)
yield bytes("event: message_delta\n", "utf-8")
yield bytes(
chunk(
{
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": count_tokens(input, output),
}
),
"utf-8",
)
yield bytes("event: message_stop\n", "utf-8")
yield bytes(chunk({"type": "message_stop"}), "utf-8")
return Response(stream_with_context(generate()), mimetype="text/event-stream")
output = chat_completion(**data)
return jsonify(
{
"content": [
{
"text": output,
"type": "text",
}
],
**head,
"stop_reason": "end_turn",
"usage": count_tokens(input, output),
}
)
import re
import traceback
@app.errorhandler(Exception)
def handle_exception(_):
traceback.print_exc()
return handle_exception(exceptions.InternalServerError())
UNAVAILABLE_MESSAGES = [
"Burung kenari terbang tinggi, Di taman bunga menari-nari. Kamu refresh gak ada henti, Tapi server-nya malah pergi!",
"Ke laut mancing ikan kerapu, Dapat satu langsung dilempar. Mau akses disuruh nunggu, Muncul service unavailable bikin kesal!",
"Beli donat rasa durian, Dimakan ramai-ramai di taman. Service unavailable datangnya seharian, Bikin hati jadi gak karuan!",
"Ke pasar beli terasi, Naik motor sambil bernyanyi. Server sibuk, coba lagi, Service unavailable nih hari!",
"Pagi-pagi minum teh manis, Disruput sambil lihat layar. Server down, jadi nangis, Error lagi, kapan lancar?",
"Pergi ke taman sambil joget, Liat kupu-kupu terbang santai. Mau refresh sampe capek, Servernya malah bilang bye-bye!",
"Ke pasar beli jeruk Bali, Eh ketemu sama teman lama. Klik refresh terus berkali-kali, Tapi servernya malah drama!",
]
@app.errorhandler(exceptions.ServiceUnavailable)
def handle_unavailable(e: exceptions.ServiceUnavailable):
global LAST_UNAVAILABLE, WAITING
LAST_UNAVAILABLE = time.time()
WAITING = time.time() + random.randint(3, 10)
return handle_exception(
exceptions.ServiceUnavailable(random.choice(UNAVAILABLE_MESSAGES))
)
@app.errorhandler(exceptions.HTTPException)
def handle_exception(e: exceptions.HTTPException):
error = {
"error": {
"type": re.sub(r"([a-z])([A-Z])", r"\1_\2", e.__class__.__name__).lower(),
"message": e.description,
"code": e.code,
}
}
return jsonify(error), e.code
if not app.debug:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.add_job(increase_api_keys, "interval", minutes=1)
scheduler.start()
import atexit
atexit.register(lambda: scheduler.shutdown())
else:
logging.basicConfig(level=logging.DEBUG)
# testing
increase_api_keys(per_thread=1)