izuemon's picture
Update turbowarp-server/gpt.py
c2bd07c verified
raw
history blame
3.54 kB
import scratchcommunication
import time
import requests
PROJECT_ID = "1290918780"
SYSTEM_PROMPT = {
"role":"system",
"content":"You are a conversational AI that runs on Scratch. Do not use Markdown in your responses; speak in natural conversation. The creator of this assistant is Izuemon."
}
tw = scratchcommunication.TwCloudConnection(
project_id=PROJECT_ID,
username="server",
contact_info="contact"
)
slots = [f"n{i}" for i in range(1,10)]
# ---------------------
# クラウド変数取得
# ---------------------
def get_vars():
return tw.get_cloud_variables()
def get_var(name):
return get_vars().get(name)
# ---------------------
# 文字テーブル
# ---------------------
chars = []
with open("turbowarp-server/n-chars.txt",encoding="utf8") as f:
for line in f:
chars.append(line.strip())
def encode(text):
out=""
for c in text:
if c in chars:
i = chars.index(c)
out += f"{i:02d}"
return out
def decode(data):
text=""
for i in range(0,len(data),2):
num = int(data[i:i+2])
if num==99:
text+="\n"
else:
text+=chars[num]
return text
# ---------------------
# n0管理
# ---------------------
def get_used():
v = get_var("n0")
if not v:
return []
return list(v)
def add_used(i):
u = get_used()
if str(i) not in u:
u.append(str(i))
tw.set_variable("n0","".join(u))
def remove_used(i):
u = get_used()
if str(i) in u:
u.remove(str(i))
tw.set_variable("n0","".join(u))
# ---------------------
# API
# ---------------------
def ask_gpt(history):
messages=[SYSTEM_PROMPT]+history
r=requests.post(
"https://izuemon-gpt-free-api.hf.space/v1/chat/completions",
json={
"model":"gpt-3.5-turbo",
"messages":messages
}
)
data=r.json()
return data["choices"][0]["message"]["content"]
# ---------------------
# 送信
# ---------------------
def send(slot,text):
encoded=encode(text)
size=99996
packets=[encoded[i:i+size] for i in range(0,len(encoded),size)]
total=len(packets)
for p in packets:
packet=f"1{total}0{p}"
start=time.time()
tw.set_variable(slot,packet)
while True:
v=get_var(slot)
if v and len(v) > 2 and v[2]=="1":
break
if time.time()-start>10:
return
time.sleep(0.1)
# ---------------------
# メインループ
# ---------------------
buffers={}
while True:
vars=get_vars()
for i,slot in enumerate(slots,1):
v=vars.get(slot)
if not v:
continue
if len(v)<3:
continue
unread=v[2]=="0"
if not unread:
continue
add_used(i)
total=int(v[1])
data=v[3:]
tw.set_variable(slot,v[:2]+"1"+v[3:])
if slot not in buffers:
buffers[slot]=[]
buffers[slot].append(data)
if len(buffers[slot])<total:
continue
joined="".join(buffers[slot])
decoded=decode(joined)
history=[]
parts=decoded.split("\n")
for j in range(0,len(parts),2):
history.append({
"role":"user",
"content":parts[j]
})
reply=ask_gpt(history)
send(slot,reply)
buffers[slot]=[]
remove_used(i)
time.sleep(0.2)