File size: 3,646 Bytes
62c1ca1 0cafc24 62c1ca1 8b4279a 62c1ca1 0cafc24 62c1ca1 475c9e0 27a0d11 62c1ca1 8b4279a 62c1ca1 8b4279a 27a0d11 62c1ca1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
import os
import json
import time
import uuid
import hashlib
import subprocess
import huggingface_hub as hh
from flask import Flask, request, send_file
MAX_SECONDS = 100
server_me = 'https://zaisheng-testapi.hf.space'
def get_key_sha256(key):
sha256 = hashlib.sha256()
sha256.update(key.encode('utf-8'))
return sha256.hexdigest()
match_sha256 = '686467fc40941a4af9b605365077c58ac06413870ec1679f41660047f716b7a6'
prompt_folder = '../prompts'
video_folder = '../videos'
for f in [prompt_folder, video_folder]:
os.makedirs(f, exist_ok=True)
app = Flask(__name__, static_folder=video_folder, static_url_path='/generaged')
#https://api.klingai.com/v1/videos/image2video
@app.route("/v1/videos/image2video", methods = ['POST'])
def image2video():
prompt_file_name = str(time.time())
h = request.headers
key = h['Key']
if not get_key_sha256(key) == match_sha256:
return 'err'
d = request.json
prompt = d['prompt']
open(os.path.join(prompt_folder, prompt_file_name), 'w').write(prompt)
request_id = str(uuid.uuid4())
video_file = os.path.join(video_folder, prompt_file_name+'.mp4')
for _ in range(MAX_SECONDS):
time.sleep(1)
if os.path.exists(video_file):
updated_at = str(time.time())
ret = {
"code": 0,
"message": "The video is generated",
"request_id": request_id,
"data":{
"video_url":server_me+'/generaged/'+prompt_file_name+'.mp4',
"task_id": request_id,
"task_info":{
"external_task_id": request_id
},
"task_status": "success",
"created_at": prompt_file_name,
"updated_at": updated_at
}
}
return json.dumps(ret)
updated_at = str(time.time())
ret = {
"code": 1234,
"message": "time out",
"request_id": request_id,
"data":{
"task_id": request_id,
"task_info":{
"external_task_id": request_id
},
"task_status": "fail",
"created_at": prompt_file_name,
"updated_at": updated_at
}
}
return json.dumps(ret)
@app.route("/getprompt", methods = ['GET'])
def getprompt():
h = request.headers
key = h['Key']
if not get_key_sha256(key) == match_sha256:
return 'err'
fs = sorted(os.listdir(prompt_folder))
fs = [f for f in fs if not f.startswith('.')]
if len(fs) > 0:
f = fs[0]
ff = os.path.join(prompt_folder, f)
txt = open(ff).read()
d = {'file': f, 'prompt': txt}
s = f'{f}||||{txt}'
return s
return ""
@app.route("/putvideo", methods = ['POST'])
def putvideo():
h = request.headers
key = h['Key']
file = h['File']
repo_id_video = h['RV']
repo_id_prompt = h['RP']
tt = h['tt']
if not get_key_sha256(key) == match_sha256:
return 'err'
f = request.files['file']
local_file_video = os.path.join(video_folder, file+'.mp4')
local_file_prompt = os.path.join(prompt_folder, file)
f.save(local_file_video)
hh.upload_file(path_or_fileobj=local_file_video, path_in_repo=file+'.mp4', repo_id=repo_id_video, repo_type='dataset', token=tt)
hh.upload_file(path_or_fileobj=local_file_prompt, path_in_repo=file, repo_id=repo_id_prompt, repo_type='dataset', token=tt)
os.remove(local_file_prompt)
return "ok"
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=False)
|