Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, Form, File, HTTPException, Response, Request, Depends | |
| import uuid | |
| from app.backend.models.users import User | |
| from fastapi.staticfiles import StaticFiles | |
| import os | |
| from app.rag_generator import RagSystem | |
| from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse | |
| from app.settings import base_path, url_user_not_required | |
| from typing import Optional | |
| from app.response_parser import add_links | |
| from app.document_validator import path_is_valid | |
| from app.backend.controllers.users import create_user, authenticate_user, check_cookie, clear_cookie, get_current_user | |
| from app.backend.controllers.schemas import SUser | |
| from app.backend.controllers.chats import create_new_chat | |
| from fastapi.templating import Jinja2Templates | |
| from app.backend.models.db_service import check_tables | |
| # TODO: implement a better TextHandler | |
| # TODO: optionally implement DocHandler | |
| print(3333333333333333333333333333333333333333333333333333) | |
| print(os.environ['DATABASE_URL']) | |
| print(os.environ['GEMINI_API_KEY']) | |
| api = FastAPI() | |
| rag = None | |
| api.mount("/pdfs", StaticFiles(directory=os.path.join(base_path, "temp_storage", "pdfs")), name="pdfs") | |
| api.mount("/static", StaticFiles(directory=os.path.join(base_path, "frontend", "static")), name="static") | |
| templates = Jinja2Templates(directory=os.path.join(base_path, "frontend", "templates")) | |
| print(check_tables()) | |
| def initialize_rag() -> RagSystem: | |
| global rag | |
| if rag is None: | |
| rag = RagSystem() | |
| return rag | |
| ''' | |
| Updates response context and adds context of navbar (role, instance(or none)) and footer (none) | |
| ''' | |
| def extend_context(context: dict): | |
| user = get_current_user(context.get("request")) | |
| navbar = { | |
| "navbar": True, | |
| "navbar_path": "components/navbar.html", | |
| "navbar_context": { | |
| "user": { | |
| "role": "user" if user else "guest", | |
| "instance": user | |
| } | |
| } | |
| } | |
| footer = { | |
| "footer": False, | |
| "footer_context": None | |
| } | |
| context.update(**navbar) | |
| context.update(**footer) | |
| return context | |
| def PDFHandler(request: Request, path: str, page: int) -> HTMLResponse: | |
| filename = os.path.basename(path) | |
| url_path = f"/pdfs/{filename}" | |
| current_template = "pages/show_pdf.html" | |
| return templates.TemplateResponse( | |
| current_template, | |
| extend_context({ | |
| "request": request, | |
| "page": str(page or 1), | |
| "url_path": url_path, | |
| "user": get_current_user(request) | |
| }) | |
| ) | |
| def TextHandler(request: Request, path: str, lines: str) -> HTMLResponse: | |
| file_content = "" | |
| with open(path, "r") as f: | |
| file_content = f.read() | |
| start_line, end_line = map(int, lines.split('-')) | |
| text_before_citation = [] | |
| text_after_citation = [] | |
| citation = [] | |
| anchor_added = False | |
| for index, line in enumerate(file_content.split('\n')): | |
| if line == "" or line == "\n": | |
| continue | |
| if index + 1 < start_line: | |
| text_before_citation.append(line) | |
| elif end_line < index + 1: | |
| text_after_citation.append(line) | |
| else: | |
| anchor_added = True | |
| citation.append(line) | |
| current_template = "pages/show_text.html" | |
| return templates.TemplateResponse( | |
| current_template, | |
| extend_context({ | |
| "request": request, | |
| "text_before_citation": text_before_citation, | |
| "text_after_citation": text_after_citation, | |
| "citation": citation, | |
| "anchor_added": anchor_added, | |
| "user": get_current_user(request) | |
| }) | |
| ) | |
| ''' | |
| Optional handler | |
| ''' | |
| def DocHandler(): | |
| pass | |
| # # <--------------------------------- Middleware ---------------------------------> | |
| # # NOTE: carefully read documentation to require_user | |
| # | |
| # ''' | |
| # Special class to have an opportunity to redirect user to login page in middleware | |
| # ''' | |
| # class AwaitableResponse: | |
| # def __init__(self, response: Response): | |
| # self.response = response | |
| # | |
| # def __await__(self): | |
| # yield | |
| # return self.response | |
| # | |
| # | |
| # ''' | |
| # TODO: remove KOSTYLY -> find better way to skip requesting to login while showing pdf | |
| # | |
| # Middleware that requires user to log in into the system before accessing any utl | |
| # | |
| # NOTE: For now it is applied to all routes, but if you want to skip any, add it to the | |
| # url_user_not_required list in settings.py (/ should be removed) | |
| # ''' | |
| # @api.middleware("http") | |
| # async def require_user(request: Request, call_next): | |
| # print(request.url.path, request.method) | |
| # | |
| # awaitable_response = AwaitableResponse(RedirectResponse("/login", status_code=303)) | |
| # stripped_path = request.url.path.strip('/') | |
| # | |
| # if stripped_path in url_user_not_required \ | |
| # or stripped_path.startswith("pdfs") \ | |
| # or "static/styles.css" in stripped_path \ | |
| # or "favicon.ico" in stripped_path: | |
| # return await call_next(request) | |
| # | |
| # user = get_current_user(request) | |
| # if user is None: | |
| # return await awaitable_response | |
| # | |
| # response = await call_next(request) | |
| # return response | |
| def print_rout(request: Request, call_next): | |
| print(request.url, request.method) | |
| return call_next(request) | |
| # <--------------------------------- Common routes ---------------------------------> | |
| # @api.get("/") | |
| # def root(request: Request): | |
| # current_template = "pages/main.html" | |
| # return templates.TemplateResponse(current_template, extend_context({"request": request})) | |
| def root(request: Request): | |
| current_template = "pages/chat.html" | |
| return templates.TemplateResponse(current_template, | |
| extend_context({ | |
| "request": request, | |
| "user": get_current_user(request) | |
| }) | |
| ) | |
| async def create_prompt(request: Request, files: list[UploadFile] = File(None), prompt: str = Form(...)): | |
| docs = [] | |
| rag = initialize_rag() | |
| print(444444444444444444444444444444444444444444444) | |
| try: | |
| for file in files: | |
| content = await file.read() | |
| temp_storage = os.path.join(base_path, "temp_storage") | |
| os.makedirs(temp_storage, exist_ok=True) | |
| if file.filename.endswith('.pdf'): | |
| saved_file = os.path.join(temp_storage, "pdfs", str(uuid.uuid4()) + ".pdf") | |
| else: | |
| saved_file = os.path.join(temp_storage, str(uuid.uuid4()) + "." + file.filename.split('.')[-1]) | |
| with open(saved_file, "wb") as f: | |
| f.write(content) | |
| docs.append(saved_file) | |
| if len(files) > 0: | |
| rag.upload_documents(docs) | |
| response_raw = rag.generate_response(user_prompt=prompt) | |
| response = add_links(response_raw) | |
| print(response) | |
| return {"response": response, "status": 200} | |
| except Exception as e: | |
| print("!!!ERROR!!!") | |
| print(e) | |
| # finally: | |
| # for file in files: | |
| # temp_storage = os.path.join(base_path, "temp_storage") | |
| # saved_file = os.path.join(temp_storage, file.filename) | |
| # os.remove(saved_file) | |
| def show_document(request: Request, path: str, page: Optional[int] = 1, lines: Optional[str] = "1-1", start: Optional[int] = 0): | |
| if not path_is_valid(path): | |
| return HTTPException(status_code=404, detail="Document not found") | |
| ext = path.split(".")[-1] | |
| if ext == 'pdf': | |
| return PDFHandler(request, path=path, page=page) | |
| elif ext in ('txt', 'csv', 'md'): | |
| return TextHandler(request, path=path, lines=lines) | |
| elif ext in ('docx', 'doc'): | |
| return TextHandler(request, path=path, lines=lines) # should be a bit different handler | |
| else: | |
| return FileResponse(path=path) | |
| # <--------------------------------- Get ---------------------------------> | |
| def new_user(request: Request): | |
| current_template = "pages/registration.html" | |
| return templates.TemplateResponse(current_template, extend_context({"request": request})) | |
| def login(request: Request): | |
| current_template = "pages/login.html" | |
| return templates.TemplateResponse(current_template, extend_context({"request": request})) | |
| def test_cookie(request: Request): | |
| return check_cookie(request) | |
| ''' | |
| Use only for testing. For now, provides user info for logged ones, and redirects to | |
| login in other case | |
| ''' | |
| def test(request: Request, user: User = Depends(get_current_user)): | |
| return { | |
| "user": { | |
| "email": user.email, | |
| "password_hash": user.password_hash, | |
| # "chats": user.chats, # Note: it will rise error since due to the optimization associated fields are not loaded | |
| # it is just a reference, but the session is closed, however you are trying to get access to the data through this session | |
| } | |
| } | |
| def show_chat(chat_id: int): | |
| return {"chat_id": chat_id} | |
| def logout(response: Response): | |
| return clear_cookie(response) | |
| # <--------------------------------- Post ---------------------------------> | |
| def new_user(response: Response, user: SUser): | |
| return create_user(response, user.email, user.password) | |
| def login(response: Response, user: SUser): | |
| return authenticate_user(response, user.email, user.password) | |
| def create_chat(request: Request, title: Optional[str] = "new chat", user: User = Depends(get_current_user)): | |
| url = create_new_chat(title, user) | |
| return RedirectResponse(url, status_code=303) |