|
|
from fastapi import FastAPI, Request, Form, HTTPException, Depends, Response, BackgroundTasks |
|
|
from fastapi.responses import HTMLResponse, RedirectResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.templating import Jinja2Templates |
|
|
import uuid |
|
|
import json |
|
|
from pathlib import Path |
|
|
import os |
|
|
from typing import List |
|
|
|
|
|
|
|
|
BASE_DIR = Path(__file__).parent |
|
|
DATA_DIR = BASE_DIR / "data" |
|
|
FORMS_DIR = DATA_DIR / "forms" |
|
|
SUBS_DIR = DATA_DIR / "submissions" |
|
|
for d in (FORMS_DIR, SUBS_DIR): d.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
ADMIN_KEY = os.getenv("ADMIN_KEY", "admin123") |
|
|
|
|
|
app = FastAPI() |
|
|
app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") |
|
|
templates = Jinja2Templates(directory=BASE_DIR / "templates") |
|
|
|
|
|
|
|
|
|
|
|
def save_json(path: Path, data): |
|
|
with path.open('w') as f: json.dump(data, f) |
|
|
|
|
|
def load_json(path: Path): |
|
|
if not path.exists(): return None |
|
|
with path.open() as f: return json.load(f) |
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def home(request: Request): |
|
|
|
|
|
history = request.cookies.get("form_history", "") |
|
|
ids = history.split(',') if history else [] |
|
|
return templates.TemplateResponse("index.html", {"request": request, "form_ids": ids}) |
|
|
|
|
|
|
|
|
@app.get("/new", response_class=HTMLResponse) |
|
|
async def new_form(request: Request): |
|
|
return templates.TemplateResponse("new_form.html", {"request": request}) |
|
|
|
|
|
|
|
|
@app.post("/create_form") |
|
|
async def create_form(request: Request, response: Response, |
|
|
title: str = Form(...), |
|
|
fields: List[str] = Form(...)): |
|
|
form_id = str(uuid.uuid4()) |
|
|
schema = {"id": form_id, "title": title, "fields": fields} |
|
|
save_json(FORMS_DIR / f"{form_id}.json", schema) |
|
|
|
|
|
history = request.cookies.get("form_history", "") |
|
|
ids = history.split(',') if history else [] |
|
|
ids.append(form_id) |
|
|
response = RedirectResponse(url=f"/forms/{form_id}", status_code=302) |
|
|
response.set_cookie("form_history", ",".join(ids), httponly=True) |
|
|
return response |
|
|
|
|
|
|
|
|
@app.get("/forms/{form_id}", response_class=HTMLResponse) |
|
|
async def view_form(request: Request, form_id: str): |
|
|
schema = load_json(FORMS_DIR / f"{form_id}.json") |
|
|
if not schema: |
|
|
raise HTTPException(404, "Form not found") |
|
|
return templates.TemplateResponse("view_form.html", {"request": request, "schema": schema}) |
|
|
|
|
|
|
|
|
@app.post("/forms/{form_id}/submit") |
|
|
async def submit_form(request: Request, form_id: str): |
|
|
schema = load_json(FORMS_DIR / f"{form_id}.json") |
|
|
if not schema: |
|
|
raise HTTPException(404, "Form not found") |
|
|
data = await request.form() |
|
|
entry = {field: data.get(field) for field in schema['fields']} |
|
|
|
|
|
sub_dir = SUBS_DIR / form_id |
|
|
sub_dir.mkdir(exist_ok=True) |
|
|
idx = len(list(sub_dir.iterdir())) + 1 |
|
|
save_json(sub_dir / f"{idx}.json", entry) |
|
|
return RedirectResponse(url=f"/forms/{form_id}", status_code=302) |
|
|
|
|
|
|
|
|
def check_admin(key: str = Depends(lambda request: request.query_params.get('key'))): |
|
|
if key != ADMIN_KEY: |
|
|
raise HTTPException(401, "Unauthorized") |
|
|
return True |
|
|
|
|
|
@app.get("/admin", response_class=HTMLResponse) |
|
|
async def admin_dashboard(request: Request, authorized: bool = Depends(check_admin)): |
|
|
forms = [] |
|
|
for p in FORMS_DIR.glob("*.json"): |
|
|
schema = load_json(p) |
|
|
subs = len(list((SUBS_DIR / schema['id']).glob("*.json"))) if (SUBS_DIR / schema['id']).exists() else 0 |
|
|
forms.append({"id": schema['id'], "title": schema['title'], "submissions": subs}) |
|
|
return templates.TemplateResponse("admin.html", {"request": request, "forms": forms}) |
|
|
|
|
|
|
|
|
@app.post("/admin/delete/{form_id}") |
|
|
async def admin_delete(form_id: str, authorized: bool = Depends(check_admin)): |
|
|
fpath = FORMS_DIR / f"{form_id}.json" |
|
|
if fpath.exists(): fpath.unlink() |
|
|
subdir = SUBS_DIR / form_id |
|
|
if subdir.exists(): |
|
|
for f in subdir.iterdir(): f.unlink() |
|
|
subdir.rmdir() |
|
|
return RedirectResponse(url="/admin?key=" + ADMIN_KEY, status_code=302) |