Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, Request, Depends, HTTPException, status, Response | |
| from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse | |
| from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
| from base64 import b64decode | |
| import time, json | |
| from hashlib import sha256 | |
| from modules import model, oauth, security, log_module, llm, chat_functions, settings | |
| from modules.model import User, Session | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from datetime import datetime | |
| from typing import Annotated | |
| ####################### APP SETUP ######################## | |
| # app = FastAPI(docs_url=None, redoc_url=None) | |
| app = FastAPI() | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| fecha_unix = str(int(time.time())) | |
| log_module.logger("system").info("iniciado") | |
| ########################################################## | |
| ######################## ROUTE: / ######################## | |
| async def main_page(request: Request): | |
| tools = [{"name": k, "desc": v} for k, v in chat_functions.function_user_text.items()] | |
| asistants = model.get_all_personality_cores() | |
| response = templates.TemplateResponse( | |
| "main.html", { | |
| "request": request, | |
| "version": fecha_unix, | |
| "tools": tools, | |
| "asistants": asistants | |
| }) | |
| return response | |
| ########################################################## | |
| #################### SECURITY (OAUTH) #################### | |
| async def login(request: Request): | |
| # Shows the Start "session with google" button, and deletes token cookie | |
| ret = templates.TemplateResponse("login.html", {"request": request, "redirecturi": settings.OAUTH_REDIRECT}) | |
| ret.delete_cookie("token") | |
| return ret | |
| async def validate_oauth(request: Request, response: Response): | |
| # Get the oauth get params, | |
| # look for the google validation and info, | |
| # set the session cookie and save user in DB | |
| # Extract the Get params | |
| params = dict(request.query_params) | |
| # Client browser fingerprint bypassed with google oauth as "state" | |
| data = params["state"].split("=",1)[1] | |
| data = b64decode(data) | |
| data = json.loads(data) | |
| # Get the google user info | |
| google_userinfo = oauth.validate_redirect(params) | |
| # Create the user model with the google info | |
| user:User = model.User.find_or_create(google_userinfo, data) | |
| # Prepare redirect response and set the session cookie | |
| token = user.create_cookie() | |
| if not user.can_use("chat"): | |
| response = RedirectResponse(url='/hold') | |
| security.set_cookie(response, key="token", value=token) | |
| return response | |
| # Saves the user | |
| user.update_user() | |
| security.set_cookie(response, key="token", value=token) | |
| return {"success": True} | |
| ########################################################## | |
| ###################### APIs Configs ###################### | |
| # Get data and structures from its models | |
| User_find_from_data = Annotated[User, Depends(User.find_from_data)] | |
| Session_find_from_data = Annotated[Session, Depends(Session.find_from_data)] | |
| async def get_configs(response: Response, user: User_find_from_data): | |
| if not user.can_use("chat"): | |
| response = RedirectResponse(url='/hold') | |
| return response | |
| # Get llm tokens used | |
| year_, month_ = datetime.now().strftime("%y-%m").split("-") | |
| month = sum(user.tokens.get(year_, {}).get(month_, {"_":0}).values()) | |
| total = sum([z for x in user.tokens.values() for y in x.values() for z in y.values()]) | |
| tokens = {"month": month, "total": total} | |
| # Create cookies and answer | |
| user._session.create_cookie(response) | |
| return user.configs.model_dump() | {"tokens": tokens, "challenge": user._session.challenge} | |
| async def set_configs(response: Response, user: User_find_from_data): | |
| if not user.can_use("chat"): | |
| response = RedirectResponse(url='/hold') | |
| return response | |
| # Create config model | |
| user.configs = model.Configs(**user._data) | |
| user._session.configs = user.configs | |
| assisntat_prompt = user.update_user() | |
| # Create cookies and answer | |
| user._session.create_cookie(response) | |
| return {"success":True, "challenge": user._session.challenge, "assistantPrompt": assisntat_prompt} | |
| async def get_token(response: Response, user: User_find_from_data): | |
| # Generate api token | |
| user.create_cookie() | |
| if not user.can_use("chat"): | |
| return {"success":False, "redirect": "/hold"} | |
| # Create cookies and answer | |
| return {"success":True, "challenge": user._session.challenge} | |
| ########################################################## | |
| async def chat_async(session: Session_find_from_data): | |
| chat = model.Chat(messages = session.data["messages"], personality=session.configs.assistant) | |
| if(len(chat.messages) < 1 or chat.messages[-1].content==""): | |
| log_module.logger(session.gid).warning("Empty message") | |
| raise HTTPException( | |
| status_code=status.HTTP_418_IM_A_TEAPOT, | |
| detail= "Nope" | |
| ) | |
| return StreamingResponse(llm.streamer(chat, session), media_type="application/json") | |
| ########################## Static Pages ########################## | |
| async def privacy_policy(request: Request): | |
| return templates.TemplateResponse("PrivacyPolicy.html", {"request": request}) | |
| async def on_hold(request: Request, user: User = Depends(User.find_from_cookie)): | |
| if user.can_use("chat"): | |
| return RedirectResponse(url='/') | |
| return templates.TemplateResponse( | |
| "no_access.html", { | |
| "request": request, | |
| "description": bool(user.description) | |
| }) | |
| async def on_hold(request: Request, user: User = Depends(User.find_from_cookie)): | |
| if not user.description: | |
| form = await request.form() | |
| if message := form["message"].strip(): | |
| user.update_description(message) | |
| return templates.TemplateResponse( | |
| "no_access.html", { | |
| "request": request, | |
| "description": bool(user.description) | |
| }) | |
| ########################## Other ########################## | |
| async def read_log(request: Request, user: User = Depends(User.find_from_cookie)): | |
| if user.role == "admin": | |
| log_module.log_write(credentials.username, "Log Accesado", "") | |
| with open("logs/eventos.log", "r") as f: | |
| return HTMLResponse(f.read()) | |
| log_module.log_write(credentials.username, "Intento acceder logs", f"{request.client.host}:{request.client.port} - {str(dict(request.headers))}") | |
| raise HTTPException(status_code=404) | |