Spaces:
Running
Running
| import scratchcommunication | |
| import time | |
| import requests | |
| PROJECT_ID = "1290918780" | |
| SYSTEM_PROMPT = { | |
| "role":"system", | |
| "content":"You are a conversational AI that runs on Scratch. Do not use Markdown in your responses; speak in natural conversation. The creator of this assistant is Izuemon." | |
| } | |
| tw = scratchcommunication.TwCloudConnection( | |
| project_id=PROJECT_ID, | |
| username="server", | |
| contact_info="contact" | |
| ) | |
| slots = [f"n{i}" for i in range(1,10)] | |
| # --------------------- | |
| # クラウド変数取得 | |
| # --------------------- | |
| def get_vars(): | |
| return tw.get_cloud_variables() | |
| def get_var(name): | |
| return get_vars().get(name) | |
| # --------------------- | |
| # 文字テーブル | |
| # --------------------- | |
| chars = [] | |
| with open("turbowarp-server/n-chars.txt",encoding="utf8") as f: | |
| for line in f: | |
| chars.append(line.strip()) | |
| def encode(text): | |
| out="" | |
| for c in text: | |
| if c in chars: | |
| i = chars.index(c) | |
| out += f"{i:02d}" | |
| return out | |
| def decode(data): | |
| text="" | |
| for i in range(0,len(data),2): | |
| num = int(data[i:i+2]) | |
| if num==99: | |
| text+="\n" | |
| else: | |
| text+=chars[num] | |
| return text | |
| # --------------------- | |
| # n0管理 | |
| # --------------------- | |
| def get_used(): | |
| v = get_var("n0") | |
| if not v: | |
| return [] | |
| return list(v) | |
| def add_used(i): | |
| u = get_used() | |
| if str(i) not in u: | |
| u.append(str(i)) | |
| tw.set_variable("n0","".join(u)) | |
| def remove_used(i): | |
| u = get_used() | |
| if str(i) in u: | |
| u.remove(str(i)) | |
| tw.set_variable("n0","".join(u)) | |
| # --------------------- | |
| # API | |
| # --------------------- | |
| def ask_gpt(history): | |
| messages=[SYSTEM_PROMPT]+history | |
| r=requests.post( | |
| "https://izuemon-gpt-free-api.hf.space/v1/chat/completions", | |
| json={ | |
| "model":"gpt-3.5-turbo", | |
| "messages":messages | |
| } | |
| ) | |
| data=r.json() | |
| return data["choices"][0]["message"]["content"] | |
| # --------------------- | |
| # 送信 | |
| # --------------------- | |
| def send(slot,text): | |
| encoded=encode(text) | |
| size=99996 | |
| packets=[encoded[i:i+size] for i in range(0,len(encoded),size)] | |
| total=len(packets) | |
| for p in packets: | |
| packet=f"1{total}0{p}" | |
| start=time.time() | |
| tw.set_variable(slot,packet) | |
| while True: | |
| v=get_var(slot) | |
| if v and len(v) > 2 and v[2]=="1": | |
| break | |
| if time.time()-start>10: | |
| return | |
| time.sleep(0.1) | |
| # --------------------- | |
| # メインループ | |
| # --------------------- | |
| buffers={} | |
| while True: | |
| vars=get_vars() | |
| for i,slot in enumerate(slots,1): | |
| v=vars.get(slot) | |
| if not v: | |
| continue | |
| if len(v)<3: | |
| continue | |
| unread=v[2]=="0" | |
| if not unread: | |
| continue | |
| add_used(i) | |
| total=int(v[1]) | |
| data=v[3:] | |
| tw.set_variable(slot,v[:2]+"1"+v[3:]) | |
| if slot not in buffers: | |
| buffers[slot]=[] | |
| buffers[slot].append(data) | |
| if len(buffers[slot])<total: | |
| continue | |
| joined="".join(buffers[slot]) | |
| decoded=decode(joined) | |
| history=[] | |
| parts=decoded.split("\n") | |
| for j in range(0,len(parts),2): | |
| history.append({ | |
| "role":"user", | |
| "content":parts[j] | |
| }) | |
| reply=ask_gpt(history) | |
| send(slot,reply) | |
| buffers[slot]=[] | |
| remove_used(i) | |
| time.sleep(0.2) |