Spaces:
Sleeping
Sleeping
| import asyncio | |
| import uvicorn | |
| ################## | |
| import gradio as gr | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import RedirectResponse, HTMLResponse | |
| from starlette.middleware.sessions import SessionMiddleware | |
| import firebase | |
| from datetime import datetime | |
| import nbformat | |
| from nbconvert import HTMLExporter | |
| from pathlib import Path | |
| import shutil | |
| import os | |
| from time import time | |
| import numpy as np | |
| from plagitp import * | |
| UPLOAD_FOLDER = "./upload" | |
| add_down = True | |
| config = { | |
| "apiKey": "AIzaSyAjhnW-K7AT_ZFFk57wuEBCxdVLDfLVbIE", | |
| "authDomain": "ustomodsim2-7f385.firebaseapp.com", | |
| "projectId": "ustomodsim2-7f385", | |
| "storageBucket": "ustomodsim2-7f385.appspot.com", | |
| "messagingSenderId": "926000012641", | |
| "appId": "1:926000012641:web:39a3d315306c1418c2f920", | |
| "measurementId": "G-HKHQ4B23ND", | |
| "databaseURL": "https://ustomodsim2-7f385-default-rtdb.europe-west1.firebasedatabase.app" | |
| } | |
| client_secret_config = { | |
| "client_id": "926000012641-jncoadc8g9kufof945906gbichju0bd2.apps.googleusercontent.com", | |
| "client_secret": "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", | |
| "redirect_uris": ["https://bendahmane-ustomb-ustomodsimup.hf.space/auth"] | |
| } | |
| app = FastAPI() | |
| secret_key = "##@!secret" | |
| fbapp = firebase.initialize_app(config) | |
| auth = fbapp.auth(client_secret=client_secret_config) | |
| db = fbapp.database() | |
| reports = {} | |
| fbappA = firebase.initialize_app(config) | |
| authA = fbappA.auth() | |
| usr_mail = authA.sign_in_with_email_and_password("gormat83@gmail.com", "ustorfia") | |
| dbA = fbappA.database() | |
| records = db.child("tpreports").get(usr_mail.get('idToken')) | |
| if records.each(): | |
| for field in records.each(): | |
| val=field.val() | |
| key=field.key() | |
| if "report" in val and "date" in val: | |
| notestr=val["report"] | |
| val["report"]=nbformat.reads(notestr, as_version=4) | |
| reports[key]=val | |
| def refresh_user(user): | |
| if user is not None and int(user.get("expiresAt", -1)) > 0: | |
| if int(user.get("expiresAt", -1)) > time(): | |
| error = "success" | |
| else: | |
| error = f"Session Expired! Logout & login again." | |
| else: | |
| error = f"You are not connected!" | |
| return user, error | |
| def send_rep(notebook, plagia_errs, all_reports, user_s, ln1, fn1, em1, gr1, ln2, fn2, em2, gr2, request: gr.Request): | |
| global reports | |
| cl="red" | |
| user = request.request.session.get("user") | |
| if user["localId"] != user_s["localId"]: | |
| return all_reports, f"<p style='color:red'>User UID mismatch don't switch accounts!</p>" | |
| user, error = refresh_user(user) | |
| if error=="success": | |
| request.request.session["user"] = user | |
| error="" | |
| if len(error)==0: | |
| if notebook is not None: | |
| if plagia_errs!="error": | |
| if user: | |
| notebook_str = nbformat.writes(notebook) | |
| if len(notebook_str)<=(5*1024*1024): | |
| try: | |
| newval={"report":notebook_str, "ln1":ln1, "fn1":fn1, "em1":em1, "gr1":gr1, | |
| "ln2":ln2, "fn2":fn2, "em2":em2, "gr2":gr2, | |
| "date":f"{datetime.fromtimestamp(time()+3600).strftime('%Y-%m-%d %H:%M:%S')}"} | |
| db.child("tpreports").child(user["localId"]).set(newval, user['idToken']) | |
| gr.Info("Report sent") | |
| newval["report"]=notebook | |
| reports[user["localId"]] = newval | |
| reports_l = get_reports_list(request) | |
| all_reports = reports_html(reports_l) | |
| error = "Report sent successfully." | |
| cl="green" | |
| except Exception as e: | |
| error = f"Error! Can't send report, Error: {e}" | |
| else: | |
| error = f"Error! Your report size is > 5Mb, try to reduce the displayed data & images" | |
| else: | |
| error = f"Correct plagiarism errors first!" | |
| else: | |
| error = f"Generate your report first!" | |
| return all_reports, f"<p style='color:{cl}'>{error}</p>" | |
| async def check_authentication(request: Request, call_next): | |
| if request.url.path.startswith('/login') or request.url.path.startswith('/auth'): | |
| return await call_next(request) | |
| user = request.session.get("user") | |
| if not user: | |
| return RedirectResponse(url="/login") | |
| elif int(user["expiresAt"])<time(): | |
| request.session.pop('user', None) | |
| return RedirectResponse(url="/login") | |
| return await call_next(request) | |
| def login(request: Request): | |
| loginurl=auth.authenticate_login_with_google() | |
| return RedirectResponse(loginurl) | |
| async def autht(request: Request): | |
| user = auth.sign_in_with_oauth_credential(str(request.url)) | |
| request.session['user'] = {k: user[k] | |
| for k in ["email","firstName","fullName","lastName","photoUrl","displayName","idToken", "refreshToken","expiresAt", "localId"] | |
| if k in user} | |
| return RedirectResponse(url='/') | |
| async def logout(request: Request): | |
| request.session.pop('user', None) | |
| return RedirectResponse(url='/') | |
| def get_reports_list(request: gr.Request): | |
| user = request.request.session.get("user") | |
| localId = user.get("localId",-1) if user is not None else -1 | |
| reports_l = [] | |
| for key, val in reports.items(): | |
| if "report" in val and "date" in val: | |
| notebook=val["report"] | |
| notestr = nbformat.writes(notebook) | |
| students=f'{val["ln1"]} {val["fn1"]} & {val["ln2"]} {val["fn2"]}' | |
| date=val["date"] | |
| if add_down or key == localId: | |
| reports_l.append({"students": students, "date": date, | |
| "down": down_html(notestr, f"report_{key}.ipynb")}) | |
| else: | |
| reports_l.append({"students": students, "date": date}) | |
| reports_l = sorted(reports_l, key=get_date, reverse=True) | |
| return reports_l | |
| def get_date(item): | |
| return datetime.strptime(item['date'], '%Y-%m-%d %H:%M:%S') | |
| def get_user(request: gr.Request): | |
| user = request.request.session.get("user") | |
| if user is None: user={} | |
| str_show = user_html(user.get("email", ""), user.get("photoUrl", ""), user.get("expiresAt", 0)+3600) | |
| reports_l=get_reports_list(request) | |
| rep_html = reports_html(reports_l) | |
| notebook=None | |
| preview="" | |
| localId = user.get("localId", None) if user is not None else None | |
| val=None | |
| if localId is not None: | |
| val=reports.get(localId, None) | |
| if val is not None: | |
| notebook=val.get("report", None) | |
| if notebook is not None: | |
| html_exporter = HTMLExporter() | |
| preview, _ = html_exporter.from_notebook_node(notebook) | |
| if val is not None: | |
| ln1=val.get("ln1","") | |
| fn1=val.get("fn1","") | |
| em1=val.get("em1","") | |
| gr1=val.get("gr1","") | |
| ln2=val.get("ln2","") | |
| fn2=val.get("fn2","") | |
| em2=val.get("em2","") | |
| gr2=val.get("gr2","") | |
| else: | |
| ln1=user.get("lastName", "") | |
| fn1=user.get("firstName", "") | |
| em1=user.get("email", "") | |
| gr1="Group" | |
| ln2="" | |
| fn2="" | |
| em2="" | |
| gr2="Group" | |
| return str_show, ln1, fn1, em1, gr1, ln2, fn2, em2, gr2, user, rep_html, notebook, preview | |
| def disp_reports(request: gr.Request): | |
| reports_l=get_reports_list(request) | |
| rep_html = reports_html(reports_l) | |
| return rep_html | |
| def upload_file(file, user_s, request: gr.Request): | |
| user = request.request.session.get("user") | |
| if user["localId"] != user_s["localId"]: | |
| return all_reports, f"<p style='color:red'>User UID mismatch don't switch accounts!</p>" | |
| user, error = refresh_user(user) | |
| if error=="success": | |
| request.request.session["user"] = user | |
| error="" | |
| preview="<h1 style='color:red'>Invalid notebook (*.ipynb) file!</h1>" | |
| success=False | |
| nb = nbformat.v4.new_notebook(cells=[]) | |
| try: | |
| with open(file, encoding="utf8") as f: | |
| nb = nbformat.read(f, as_version=4) | |
| plagiarism, copiedfrom, build_errs = analyse_notebook(nb, reports, [user["localId"]]) | |
| plagia_errs="ok" | |
| plagiarism = dict(sorted(plagiarism.items(), key=lambda x: x[0], reverse=True)) | |
| cells=nb.cells | |
| hrate=0 | |
| rate=np.round(plagiarism.get(0,0)*100,2) | |
| cells.insert(0, nbformat.v4.new_markdown_cell(plagia_error(rate, copiedfrom.get(0,""),"Code"))) | |
| rate=np.round(plagiarism.get(1,0)*100,2) | |
| cells.insert(0, nbformat.v4.new_markdown_cell(plagia_error(rate, copiedfrom.get(1,""),"Text"))) | |
| if build_errs: | |
| cells.insert(0, nbformat.v4.new_markdown_cell("""<div style='color:orange; font-size: 12px;'> | |
| Not executed cells have been detected in your report (check these errors below).<br/> | |
| </div>""")) | |
| html_exporter = HTMLExporter() | |
| preview, _ = html_exporter.from_notebook_node(nb) | |
| success=True | |
| except Exception as e: | |
| gr.Warning("Invalid notebook (*.ipynb) file format") | |
| print(e) | |
| return [gr.Button(visible=success), preview, nb] | |
| ################################################################### | |
| with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as demo: | |
| user_s=gr.State(None) | |
| plagia_errs=gr.State("ok") | |
| notebook=gr.State(None) | |
| gr.Markdown("# USTO-MB Speech Processing Lab Final Report Submission") | |
| user_data = gr.HTML("No user") | |
| gr.Markdown("Upload your report here") | |
| with gr.Tab("Generate & Submit report"): | |
| with gr.Column(scale=1): | |
| with gr.Row(): | |
| gr.Markdown("#### Student1:", elem_classes="student") | |
| ln1 = gr.Textbox(placeholder="Last Name (اللقب)", show_label=False, scale=2) | |
| fn1 = gr.Textbox(placeholder="First Name (الاسم)", show_label=False, scale=2) | |
| em1 = gr.Textbox(placeholder="Email (بريد إلكتروني)", show_label=False, scale=2) | |
| gr1 = gr.Dropdown(["Group", "G1", "G2", "G3"], value="Group", interactive=True, show_label=False, scale=0, min_width=130) | |
| with gr.Row(): | |
| gr.Markdown("#### Student2:", elem_classes="student") | |
| ln2 = gr.Textbox(placeholder="Last Name (اللقب)", | |
| show_label=False, scale=2) | |
| fn2 = gr.Textbox(placeholder="First Name (الاسم)", | |
| show_label=False, scale=2) | |
| em2 = gr.Textbox( | |
| placeholder="Email (بريد إلكتروني)", show_label=False, scale=2) | |
| gr2 = gr.Dropdown(["Group", "G1", "G2", "G3"], value="Group", | |
| interactive=True, show_label=False, scale=0, min_width=130) | |
| with gr.Row(): | |
| generate_button =gr.UploadButton("Load my Report (*.ipynb) فتح تقريري", file_count="single", file_types=["ipynb"]) #gr.Button(value="Upload my Report تحميل تقريري", elem_classes="button") | |
| send_button = gr.Button(value="Send/Update Report إرسال/تحديث تقريري", elem_classes="button", visible=(notebook.value is not None)) | |
| rep_html = gr.HTML() | |
| resultHTML = gr.HTML(elem_classes="htm") | |
| with gr.Tab("Received reports"): | |
| display=gr.Button("Click here to refresh the list", elem_classes="button") | |
| all_reports = gr.HTML() | |
| generate_button.upload(upload_file, [generate_button, user_s], [send_button, resultHTML, notebook]) | |
| send_button.click(send_rep, [notebook, plagia_errs, all_reports, user_s, ln1, fn1, em1, gr1, ln2, fn2, | |
| em2, gr2], [all_reports, rep_html]) | |
| display.click(disp_reports, None, [all_reports]) | |
| demo.load(get_user, inputs=None, outputs=[ | |
| user_data, ln1, fn1, em1, gr1, ln2, fn2, em2, gr2, user_s, all_reports, notebook, resultHTML]) | |
| ############################################################################## | |
| gradio_app = gr.mount_gradio_app(app, demo, "/") | |
| app.add_middleware(SessionMiddleware, secret_key=secret_key) | |
| ######### | |
| async def main(): | |
| server = uvicorn.Server(config=uvicorn.Config(app)) | |
| await server.serve() | |
| #loop = asyncio.get_event_loop() | |
| #loop.run_until_complete(main()) | |
| #asyncio.run(main()) |