ustomodsimup / app3.py
bendahmane-ustomb's picture
Rename app.py to app3.py
127e299 verified
raw
history blame
13.1 kB
import asyncio
import uvicorn
##################
import gradio as gr
from fastapi import FastAPI, Request
from fastapi.responses import RedirectResponse, HTMLResponse
from starlette.middleware.sessions import SessionMiddleware
import firebase
from datetime import datetime
import nbformat
from nbconvert import HTMLExporter
from pathlib import Path
import shutil
import os
from time import time
import numpy as np
from plagitp import *
UPLOAD_FOLDER = "./upload"
add_down = True
config = {
"apiKey": "AIzaSyAjhnW-K7AT_ZFFk57wuEBCxdVLDfLVbIE",
"authDomain": "ustomodsim2-7f385.firebaseapp.com",
"projectId": "ustomodsim2-7f385",
"storageBucket": "ustomodsim2-7f385.appspot.com",
"messagingSenderId": "926000012641",
"appId": "1:926000012641:web:39a3d315306c1418c2f920",
"measurementId": "G-HKHQ4B23ND",
"databaseURL": "https://ustomodsim2-7f385-default-rtdb.europe-west1.firebasedatabase.app"
}
client_secret_config = {
"client_id": "926000012641-jncoadc8g9kufof945906gbichju0bd2.apps.googleusercontent.com",
"client_secret": "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",
"redirect_uris": ["https://bendahmane-ustomb-ustomodsimup.hf.space/auth"]
}
app = FastAPI()
secret_key = "##@!secret"
fbapp = firebase.initialize_app(config)
auth = fbapp.auth(client_secret=client_secret_config)
db = fbapp.database()
reports = {}
fbappA = firebase.initialize_app(config)
authA = fbappA.auth()
usr_mail = authA.sign_in_with_email_and_password("gormat83@gmail.com", "ustorfia")
dbA = fbappA.database()
records = db.child("tpreports").get(usr_mail.get('idToken'))
if records.each():
for field in records.each():
val=field.val()
key=field.key()
if "report" in val and "date" in val:
notestr=val["report"]
val["report"]=nbformat.reads(notestr, as_version=4)
reports[key]=val
def refresh_user(user):
if user is not None and int(user.get("expiresAt", -1)) > 0:
if int(user.get("expiresAt", -1)) > time():
error = "success"
else:
error = f"Session Expired! Logout & login again."
else:
error = f"You are not connected!"
return user, error
def send_rep(notebook, plagia_errs, all_reports, user_s, ln1, fn1, em1, gr1, ln2, fn2, em2, gr2, request: gr.Request):
global reports
cl="red"
user = request.request.session.get("user")
if user["localId"] != user_s["localId"]:
return all_reports, f"<p style='color:red'>User UID mismatch don't switch accounts!</p>"
user, error = refresh_user(user)
if error=="success":
request.request.session["user"] = user
error=""
if len(error)==0:
if notebook is not None:
if plagia_errs!="error":
if user:
notebook_str = nbformat.writes(notebook)
if len(notebook_str)<=(5*1024*1024):
try:
newval={"report":notebook_str, "ln1":ln1, "fn1":fn1, "em1":em1, "gr1":gr1,
"ln2":ln2, "fn2":fn2, "em2":em2, "gr2":gr2,
"date":f"{datetime.fromtimestamp(time()+3600).strftime('%Y-%m-%d %H:%M:%S')}"}
db.child("tpreports").child(user["localId"]).set(newval, user['idToken'])
gr.Info("Report sent")
newval["report"]=notebook
reports[user["localId"]] = newval
reports_l = get_reports_list(request)
all_reports = reports_html(reports_l)
error = "Report sent successfully."
cl="green"
except Exception as e:
error = f"Error! Can't send report, Error: {e}"
else:
error = f"Error! Your report size is > 5Mb, try to reduce the displayed data & images"
else:
error = f"Correct plagiarism errors first!"
else:
error = f"Generate your report first!"
return all_reports, f"<p style='color:{cl}'>{error}</p>"
@app.middleware("http")
async def check_authentication(request: Request, call_next):
if request.url.path.startswith('/login') or request.url.path.startswith('/auth'):
return await call_next(request)
user = request.session.get("user")
if not user:
return RedirectResponse(url="/login")
elif int(user["expiresAt"])<time():
request.session.pop('user', None)
return RedirectResponse(url="/login")
return await call_next(request)
@app.get('/login')
def login(request: Request):
loginurl=auth.authenticate_login_with_google()
return RedirectResponse(loginurl)
@app.get('/auth')
async def autht(request: Request):
user = auth.sign_in_with_oauth_credential(str(request.url))
request.session['user'] = {k: user[k]
for k in ["email","firstName","fullName","lastName","photoUrl","displayName","idToken", "refreshToken","expiresAt", "localId"]
if k in user}
return RedirectResponse(url='/')
@app.get('/logout')
async def logout(request: Request):
request.session.pop('user', None)
return RedirectResponse(url='/')
def get_reports_list(request: gr.Request):
user = request.request.session.get("user")
localId = user.get("localId",-1) if user is not None else -1
reports_l = []
for key, val in reports.items():
if "report" in val and "date" in val:
notebook=val["report"]
notestr = nbformat.writes(notebook)
students=f'{val["ln1"]} {val["fn1"]} & {val["ln2"]} {val["fn2"]}'
date=val["date"]
if add_down or key == localId:
reports_l.append({"students": students, "date": date,
"down": down_html(notestr, f"report_{key}.ipynb")})
else:
reports_l.append({"students": students, "date": date})
reports_l = sorted(reports_l, key=get_date, reverse=True)
return reports_l
def get_date(item):
return datetime.strptime(item['date'], '%Y-%m-%d %H:%M:%S')
def get_user(request: gr.Request):
user = request.request.session.get("user")
if user is None: user={}
str_show = user_html(user.get("email", ""), user.get("photoUrl", ""), user.get("expiresAt", 0)+3600)
reports_l=get_reports_list(request)
rep_html = reports_html(reports_l)
notebook=None
preview=""
localId = user.get("localId", None) if user is not None else None
val=None
if localId is not None:
val=reports.get(localId, None)
if val is not None:
notebook=val.get("report", None)
if notebook is not None:
html_exporter = HTMLExporter()
preview, _ = html_exporter.from_notebook_node(notebook)
if val is not None:
ln1=val.get("ln1","")
fn1=val.get("fn1","")
em1=val.get("em1","")
gr1=val.get("gr1","")
ln2=val.get("ln2","")
fn2=val.get("fn2","")
em2=val.get("em2","")
gr2=val.get("gr2","")
else:
ln1=user.get("lastName", "")
fn1=user.get("firstName", "")
em1=user.get("email", "")
gr1="Group"
ln2=""
fn2=""
em2=""
gr2="Group"
return str_show, ln1, fn1, em1, gr1, ln2, fn2, em2, gr2, user, rep_html, notebook, preview
def disp_reports(request: gr.Request):
reports_l=get_reports_list(request)
rep_html = reports_html(reports_l)
return rep_html
def upload_file(file, user_s, request: gr.Request):
user = request.request.session.get("user")
if user["localId"] != user_s["localId"]:
return all_reports, f"<p style='color:red'>User UID mismatch don't switch accounts!</p>"
user, error = refresh_user(user)
if error=="success":
request.request.session["user"] = user
error=""
preview="<h1 style='color:red'>Invalid notebook (*.ipynb) file!</h1>"
success=False
nb = nbformat.v4.new_notebook(cells=[])
try:
with open(file, encoding="utf8") as f:
nb = nbformat.read(f, as_version=4)
plagiarism, copiedfrom, build_errs = analyse_notebook(nb, reports, [user["localId"]])
plagia_errs="ok"
plagiarism = dict(sorted(plagiarism.items(), key=lambda x: x[0], reverse=True))
cells=nb.cells
hrate=0
rate=np.round(plagiarism.get(0,0)*100,2)
cells.insert(0, nbformat.v4.new_markdown_cell(plagia_error(rate, copiedfrom.get(0,""),"Code")))
rate=np.round(plagiarism.get(1,0)*100,2)
cells.insert(0, nbformat.v4.new_markdown_cell(plagia_error(rate, copiedfrom.get(1,""),"Text")))
if build_errs:
cells.insert(0, nbformat.v4.new_markdown_cell("""<div style='color:orange; font-size: 12px;'>
Not executed cells have been detected in your report (check these errors below).<br/>
</div>"""))
html_exporter = HTMLExporter()
preview, _ = html_exporter.from_notebook_node(nb)
success=True
except Exception as e:
gr.Warning("Invalid notebook (*.ipynb) file format")
print(e)
return [gr.Button(visible=success), preview, nb]
###################################################################
with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as demo:
user_s=gr.State(None)
plagia_errs=gr.State("ok")
notebook=gr.State(None)
gr.Markdown("# USTO-MB Speech Processing Lab Final Report Submission")
user_data = gr.HTML("No user")
gr.Markdown("Upload your report here")
with gr.Tab("Generate & Submit report"):
with gr.Column(scale=1):
with gr.Row():
gr.Markdown("#### Student1:", elem_classes="student")
ln1 = gr.Textbox(placeholder="Last Name (اللقب)", show_label=False, scale=2)
fn1 = gr.Textbox(placeholder="First Name (الاسم)", show_label=False, scale=2)
em1 = gr.Textbox(placeholder="Email (بريد إلكتروني)", show_label=False, scale=2)
gr1 = gr.Dropdown(["Group", "G1", "G2", "G3"], value="Group", interactive=True, show_label=False, scale=0, min_width=130)
with gr.Row():
gr.Markdown("#### Student2:", elem_classes="student")
ln2 = gr.Textbox(placeholder="Last Name (اللقب)",
show_label=False, scale=2)
fn2 = gr.Textbox(placeholder="First Name (الاسم)",
show_label=False, scale=2)
em2 = gr.Textbox(
placeholder="Email (بريد إلكتروني)", show_label=False, scale=2)
gr2 = gr.Dropdown(["Group", "G1", "G2", "G3"], value="Group",
interactive=True, show_label=False, scale=0, min_width=130)
with gr.Row():
generate_button =gr.UploadButton("Load my Report (*.ipynb) فتح تقريري", file_count="single", file_types=["ipynb"]) #gr.Button(value="Upload my Report تحميل تقريري", elem_classes="button")
send_button = gr.Button(value="Send/Update Report إرسال/تحديث تقريري", elem_classes="button", visible=(notebook.value is not None))
rep_html = gr.HTML()
resultHTML = gr.HTML(elem_classes="htm")
with gr.Tab("Received reports"):
display=gr.Button("Click here to refresh the list", elem_classes="button")
all_reports = gr.HTML()
generate_button.upload(upload_file, [generate_button, user_s], [send_button, resultHTML, notebook])
send_button.click(send_rep, [notebook, plagia_errs, all_reports, user_s, ln1, fn1, em1, gr1, ln2, fn2,
em2, gr2], [all_reports, rep_html])
display.click(disp_reports, None, [all_reports])
demo.load(get_user, inputs=None, outputs=[
user_data, ln1, fn1, em1, gr1, ln2, fn2, em2, gr2, user_s, all_reports, notebook, resultHTML])
##############################################################################
gradio_app = gr.mount_gradio_app(app, demo, "/")
app.add_middleware(SessionMiddleware, secret_key=secret_key)
#########
async def main():
server = uvicorn.Server(config=uvicorn.Config(app))
await server.serve()
#loop = asyncio.get_event_loop()
#loop.run_until_complete(main())
#asyncio.run(main())