Spaces:
Running
Running
| import json | |
| import uuid | |
| import os | |
| import pathlib | |
| from uuid import uuid4 | |
| from datetime import datetime | |
| import dataclasses | |
| import html | |
| import base64 | |
| import requests | |
| import secrets | |
| from fasthtml.common import * | |
| # from fastcore.all import typedispatch | |
| from plum import dispatch | |
| from fastlite import database | |
| from huggingface_hub import CommitScheduler | |
| import eval_code | |
| import storage | |
| # the secrets. Will be loaded from HF, or for docker --env-file or from IDE | |
| OAUTH_CLIENT_ID = os.environ.get('OAUTH_CLIENT_ID') | |
| OAUTH_SCOPES = os.environ.get('OAUTH_SCOPES') | |
| OAUTH_CLIENT_SECRET = os.environ.get('OAUTH_CLIENT_SECRET') | |
| OPENID_PROVIDER_URL = os.environ.get("OPENID_PROVIDER_URL", "https://huggingface.co") | |
| SPACE_HOST = os.environ.get('SPACE_HOST') | |
| HF_DATASET_AUTH_TOKEN = os.environ.get('HF_DATASET_AUTH_TOKEN', "(none)") | |
| OPENAI_KEY = os.environ.get('OPENAI_KEY', "(none)") | |
| try: | |
| DEFAULT_ANONYMOUS_USER_LEVEL = int(os.environ.get('DEFAULT_ANONYMOUS_USER_LEVEL', 0)) | |
| except: | |
| DEFAULT_ANONYMOUS_USER_LEVEL = 0 | |
| LOCAL_STORAGE_PATH = "" | |
| datetime_now = datetime.isoformat(datetime.utcnow()) | |
| FILE_EVENTS_NAME = f"events-{datetime_now}-{uuid4()}.jsonl" | |
| FILE_SUBMITTED_NAME = f"submitted-{datetime_now}-{uuid4()}.jsonl" | |
| if "localhost" in SPACE_HOST: | |
| IS_LOCALHOST = True | |
| else: | |
| IS_LOCALHOST = False | |
| if IS_LOCALHOST: | |
| DATABASE_NAME = "data/sessions_meta.db" | |
| LOCAL_STORAGE_PATH = Path("data/persistent/") | |
| # pathlib.Path(DATABASE_NAME).unlink(missing_ok=True) | |
| print(f"Database {DATABASE_NAME=} exists? {pathlib.Path(DATABASE_NAME).exists()}") | |
| else: | |
| DATABASE_NAME = "/tmp/cache/sessions_meta.db" | |
| LOCAL_STORAGE_PATH = Path("/tmp/cache/persistent") | |
| LOCAL_STORAGE_PATH.mkdir(exist_ok=True, parents=True) | |
| EVENTS_FILE_PATH = LOCAL_STORAGE_PATH / FILE_EVENTS_NAME | |
| SUBMITTED_FILE_PATH = LOCAL_STORAGE_PATH / FILE_SUBMITTED_NAME | |
| scheduler = None | |
| try: | |
| if HF_DATASET_AUTH_TOKEN != "(none)": | |
| scheduler = CommitScheduler( | |
| repo_id="ml-visoft/c-reviewer", | |
| repo_type="dataset", | |
| folder_path=LOCAL_STORAGE_PATH, | |
| every=10, | |
| path_in_repo="raw_data", | |
| token=HF_DATASET_AUTH_TOKEN, | |
| squash_history=False | |
| ) | |
| except: | |
| import traceback | |
| traceback.print_exc() | |
| global_database = database(DATABASE_NAME) | |
| # We store session specific feedback from registered users. | |
| # Will be later used to submit/storage the answer. | |
| question_evaluation_table = global_database.create_table( | |
| "question_answer", | |
| columns={ | |
| "id": int, | |
| "code_text": str, | |
| "answer_eval_text": str, | |
| "submitted": int | |
| }, | |
| pk='id', | |
| if_not_exists=True | |
| ) | |
| Question_Evaluation_cls = question_evaluation_table.dataclass() | |
| # We store real-time session IDs and the state of the questions. Will link to question_answer | |
| session_state_table = global_database.create_table( | |
| "session_state", | |
| columns={ | |
| "id": int, | |
| "session_id": str, | |
| "state": int, | |
| "submitted_date": str, | |
| "evaluated_date": str, | |
| "current_qeval": int | |
| }, | |
| pk='id', | |
| if_not_exists=True | |
| # Can't really nail the fk specs | |
| # foreign_keys=[("current_qeval", question_evaluation_table, "id")] | |
| ) | |
| Session_State_cls = session_state_table.dataclass() | |
| EVAL_STATE_NEW=0 | |
| EVAL_STATE_QUERY=1 | |
| EVAL_STATE_TIMEDOUT=2 | |
| EVAL_STATE_ANSWER=3 | |
| EVAL_STATE_ERROR=4 | |
| # Constants to name the various HTML ids in the code | |
| HTML_SUBMIT_CODE_AREA = "submit_code_area" | |
| HTML_RESULTS_AREA = "prompt_response" | |
| HTML_CLEAR_FORM = "clear_the_form" | |
| HTML_USER_DATA = "login_user_data" | |
| OAUTH_SECRET_SESSION_NAME = "oauth_secret" | |
| USER_DATA_SESSION_NAME = "user_data" | |
| def get_openid_configuration(): | |
| """ | |
| Nicely read the HF openid endpoints and configure them. | |
| :return: | |
| """ | |
| config_url = OPENID_PROVIDER_URL + "/.well-known/openid-configuration" | |
| try: | |
| response = requests.get(config_url) | |
| response.raise_for_status() # Raises an HTTPError for bad responses (4xx or 5xx) | |
| config = response.json() | |
| token_endpoint = config.get('token_endpoint') | |
| userinfo_endpoint = config.get('userinfo_endpoint') | |
| return token_endpoint, userinfo_endpoint | |
| except requests.RequestException as e: | |
| print(f"An error occurred: {e}") | |
| return None, None | |
| # Use the function | |
| HF_OAUTH_TOKEN_URL, HF_OAUTH_USERINFO = get_openid_configuration() | |
| if HF_OAUTH_TOKEN_URL and HF_OAUTH_USERINFO: | |
| print(f"Token Endpoint: {HF_OAUTH_TOKEN_URL}") | |
| print(f"UserInfo Endpoint: {HF_OAUTH_USERINFO}") | |
| else: | |
| print("Failed to retrieve the endpoints.") | |
| hdrs = ( | |
| HighlightJS(langs=['python', 'c', 'c++']), | |
| Link(rel="stylesheet", href="https://cdnjs.cloudflare.com/ajax/libs/flexboxgrid/6.3.1/flexboxgrid.min.css", type="text/css"), | |
| # Meta(name="htmx-config", content='{"defaultSwapStyle":"outerHTML"}') | |
| ) | |
| if IS_LOCALHOST: | |
| # Are we hacking locally? | |
| print("Localhost detected in SPACE_HOST. App started in debug+live mode!") | |
| app, rt = fast_app(debug=True, live=True, hdrs=hdrs) | |
| REFRESH_TIME = 0.1 | |
| EVAL_TIMEOUT_SECONDS = 5 | |
| else: | |
| app, rt = fast_app(debug=False, live=False, hdrs=hdrs) | |
| REFRESH_TIME = 1 | |
| EVAL_TIMEOUT_SECONDS = 15 | |
| openai_client = eval_code.get_the_openai_client(OPENAI_KEY) | |
| ################# STORAGE | |
| def untyped_save_to_storage(dc, filename): | |
| def __write_to_file(_filename, _dc): | |
| js_str = json.dumps(dataclasses.asdict(_dc)) + "\n" | |
| with open(_filename, "a") as f: | |
| f.write(js_str) | |
| if scheduler is None and IS_LOCALHOST: | |
| # this is on local system, dump without checking for backup strategy | |
| __write_to_file(filename, dc) | |
| return | |
| with scheduler.lock: | |
| __write_to_file(filename, dc) | |
| def save_to_storage(nav_event:storage.NavigationEvent): | |
| untyped_save_to_storage(nav_event, EVENTS_FILE_PATH) | |
| def save_to_storage(eval_event:storage.CodeSubmittedEvent): | |
| untyped_save_to_storage(eval_event, SUBMITTED_FILE_PATH) | |
| ########## EVALUATION FEEDBACK | |
| def validate_and_get_question_evaluation_objectid(session, qe_id:int): | |
| """ | |
| Interrogates the tables to see the current answer AND current feedback if any | |
| :param session: | |
| :param qe_id: | |
| :return: | |
| """ | |
| if 'session_id' not in session: | |
| print("validate_and_get_question_evaluation_objectid bad session data") | |
| return False, None, None | |
| session_id = session["session_id"] | |
| state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC") | |
| if len(state_rows) <= 0: | |
| print("validate_and_get_question_evaluation_objectid there is no state") | |
| return False, None, None | |
| answer_id = state_rows[0].current_qeval | |
| qa_obj_row = question_evaluation_table(limit=1, where=f"id == {answer_id}") | |
| if len(qa_obj_row) <= 0: | |
| print("validate_and_get_question_evaluation_objectid There is no answer recorded") | |
| return False, None, None | |
| qe_obj = qa_obj_row[0] | |
| if qe_id != qe_obj.id: | |
| print(f"validate_and_get_question_evaluation_objectid QE {qe_id} does not belong to {qe_obj.id}") | |
| return False, None, None | |
| return True, qe_obj, state_rows[0] | |
| def html_create_feedback_updown_button(qe_id, ans_id, selected=0, disabled=False): | |
| """ | |
| Thumbs up/down button rendering. | |
| :param qe_id: | |
| :param ans_id: | |
| :param selected: | |
| :param disabled: | |
| :return: | |
| """ | |
| html_target_id = f"buttons_{ans_id}" | |
| colors = ["grey", "blue"] | |
| up_col = colors[0] | |
| down_col = colors[0] | |
| if selected == 1: up_col = colors[1] | |
| if selected == -1: down_col = colors[1] | |
| toggle_url = f"/toggle_up_down/{qe_id}/{ans_id}" | |
| up = Button("👍", hx_get=f"{toggle_url}/1", disabled=disabled, target_id=html_target_id, | |
| hx_swap="outerHTML", style=f"background-color:{up_col}") | |
| down = Button("👎", hx_get=f"{toggle_url}/-1", disabled=disabled, target_id=html_target_id, | |
| hx_swap="outerHTML", style=f"background-color:{down_col}") | |
| button_row = Div(up, down, id=html_target_id, _class="box col-xs-1", | |
| style=f"flex: 0 0 auto; display: flex; gap: 5px; margin-right: 10px;") | |
| return button_row | |
| def html_augment_signle_evaluation_text_with_feedback(eval_html, qe_obj, ans_id, show_feedback_buttons=False): | |
| """ | |
| Will plot the + / - buttons for feedback. | |
| :param eval_html: | |
| :param qe_obj: | |
| :param ans_id: | |
| :return: | |
| """ | |
| answer_eval_js = json.loads(qe_obj.answer_eval_text) | |
| if show_feedback_buttons: | |
| buttons = html_create_feedback_updown_button(qe_obj.id, ans_id, answer_eval_js[ans_id]["EVAL"], | |
| disabled=(qe_obj.submitted == 1)) | |
| else: | |
| buttons = Div("") | |
| final_div = Div(eval_html, buttons, | |
| style=" background-color: #f0f0f0; display: flex; " | |
| "width: 98%; margin: 16px; padding: 3px; align-items: center;", cls="row") | |
| return final_div | |
| def get(session, qe_id:int, ans_id:int, which:int): | |
| """ | |
| Answer to the +/- button presses | |
| :param session: | |
| :param qe_id: | |
| :param ans_id: | |
| :param which: | |
| :return: | |
| """ | |
| # print(qe_id, ans_id, which) | |
| if which not in {-1, 1}: | |
| print(f"The {which=} is bad") | |
| return None | |
| # print(f"{qe_id=} {ans_id=} {which=}") | |
| is_ok, qe_obj, session_obj = validate_and_get_question_evaluation_objectid(session, qe_id) | |
| if not is_ok: | |
| print("toggle_up_down made session/object error") | |
| return "Error" | |
| save_to_storage( | |
| storage.NavigationEvent(event_type="/toggle_up_down", event_session_id=session_obj.session_id, | |
| event_params={"question_evaluation_id":qe_id, "answer_id":ans_id, "which":which}), | |
| ) | |
| answer_eval_js = json.loads(qe_obj.answer_eval_text) | |
| crt_selection = answer_eval_js[ans_id]["EVAL"] | |
| input_button = which | |
| out_selection = (input_button if crt_selection == 0 else (0 if crt_selection == input_button else input_button)) | |
| print(f"out selection: {out_selection}") | |
| # store it back in DB | |
| answer_eval_js[ans_id]["EVAL"] = out_selection | |
| qe_obj.answer_eval_text = answer_eval_js | |
| qe_obj.submitted = False # mark object as dirty | |
| question_evaluation_table.upsert(qe_obj) | |
| buttons= html_create_feedback_updown_button(qe_id, ans_id, selected=out_selection) | |
| return buttons | |
| def html_get_textual_feedback_form(qe_obj,): | |
| if qe_obj.submitted == 1: | |
| ph = "Thank you!" | |
| else: | |
| ph = "Write your general feedback here" | |
| form = Form(Input(name="freeform_feedback", placeholder=ph), | |
| Button("Submit", disabled=(qe_obj.submitted == 1)), hx_post=f"/submit_feedback/{qe_obj.id}", | |
| target_id=HTML_RESULTS_AREA, hx_swap="outerHTML",) | |
| div = Div(P("Give us a general feedback for the evaluation (optional)"), form) | |
| return div | |
| def post(session, qe_id:int, freeform_feedback:str): | |
| is_ok, qe_obj, session_obj = validate_and_get_question_evaluation_objectid(session, qe_id) | |
| if not is_ok: | |
| print("submit_feedback made session/object error") | |
| return "Error" | |
| ulevel = get_user_level(session) | |
| if ulevel < 2: | |
| return P("Unauthorized. Log in to submit feedback.") | |
| # Update the object | |
| session_id = session.get("session_id", "Not set") | |
| save_to_storage( | |
| storage.NavigationEvent(event_type="/submit_feedback", event_session_id=session_id, | |
| event_params={"question_evaluation_id":qe_id}) | |
| ) | |
| if len(freeform_feedback) > 10000: | |
| freeform_feedback = freeform_feedback[:10000] | |
| answer_eval_js = json.loads(qe_obj.answer_eval_text) | |
| answer_eval_js[0]["explanation"] = freeform_feedback | |
| qe_obj.submitted = True | |
| qe_obj.answer_eval_text = json.dumps(answer_eval_js) | |
| question_evaluation_table.upsert(qe_obj) | |
| user_data = session.get(USER_DATA_SESSION_NAME, {}) | |
| save_to_storage( | |
| storage.CodeSubmittedEvent(event_session_id=session["session_id"], db_question_evaluation_id=qe_obj.id, | |
| submitted_date=session_obj.submitted_date, | |
| received_date=session_obj.evaluated_date, | |
| code_to_eval=qe_obj.code_text, evaluation_response=qe_obj.answer_eval_text, | |
| has_feedback=True, feedback_date=datetime.isoformat(datetime.utcnow()), | |
| feedback_userdata=user_data) | |
| ) | |
| return tl_html_results_and_feedback_area(session) | |
| ####### EVALUATE CODE | |
| def html_format_code_review_form(qe_obj, show_feedback_buttons): | |
| """ | |
| Formats the code review, adding fields for feedback if it is required. | |
| :param feedback_js: | |
| :param c_code: | |
| :param html_id: | |
| :return: | |
| """ | |
| c_code = qe_obj.code_text | |
| enhanced_answer = json.loads(qe_obj.answer_eval_text) | |
| list_of_citerias = [] | |
| for caug_code, caug_txt in eval_code.CODE_AUGMENTATIONS: | |
| crit_tag = [H3(caug_code), P(caug_txt)] | |
| # list_of_citerias.extend(crit_tag) | |
| # yeah, I know . . . | |
| eval_txt_fb_list = [] | |
| for k, eval_line in enumerate(enhanced_answer): | |
| if caug_code == eval_line["criteria"]: | |
| eval_txt = Div(P(eval_line["explanation"]), style="flex: 1; margin-right: 10px;", cls="box col-xs-11") | |
| eval_txt_fb = html_augment_signle_evaluation_text_with_feedback(eval_txt, qe_obj, k, show_feedback_buttons) | |
| eval_txt_fb_list.append(eval_txt_fb) | |
| criteria_div = Div(*crit_tag, *eval_txt_fb_list, style="border: 1px solid black; margin: 10px; padding: 10px") | |
| list_of_citerias.append(criteria_div) | |
| if show_feedback_buttons: | |
| textual_feedback = html_get_textual_feedback_form(qe_obj) | |
| else: | |
| textual_feedback = P("Log in to leave some feedback about this evaluation") | |
| return html_render_code_output(c_code), *list_of_citerias, textual_feedback | |
| def check_if_query_should_timeout(session_obj): | |
| """ | |
| Checks if the evaluation request is timed out. Will update the database to ERROR | |
| :param eval_request_status: | |
| :return: | |
| """ | |
| submitted_dt = datetime.fromisoformat(session_obj.submitted_date) | |
| crt_time = datetime.utcnow() | |
| time_difference = crt_time - submitted_dt | |
| difference_in_seconds = time_difference.total_seconds() | |
| if difference_in_seconds > EVAL_TIMEOUT_SECONDS: | |
| session_obj.state = EVAL_STATE_TIMEDOUT | |
| session_state_table.upsert(session_obj) | |
| def html_default_results(): | |
| return P("Submit a piece of C code to get a Clean-Code evaluation.") | |
| def html_waiting_for_results(): | |
| return Div(P("Working . . ."), hx_get=f"/render_answer", | |
| hx_trigger = f"every {REFRESH_TIME}s", hx_swap="outerHTML", target_id=HTML_RESULTS_AREA,) | |
| def html_eval_request_timed_out(): | |
| return P("Timed out. Retry in few minutes pls!") | |
| def get(session): | |
| """ | |
| Endpoint to render the evaluation answer. Pooled by frontend. | |
| :param session: | |
| :return: | |
| """ | |
| if 'session_id' not in session: return "render_answer No session ID" | |
| session_id = session["session_id"] | |
| answer_area = tl_html_results_and_feedback_area(session) | |
| return answer_area | |
| def get_latest_eval_request_status(session_id): | |
| state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC") | |
| if len(state_rows) <= 0: | |
| return EVAL_STATE_NEW, None | |
| state_obj = state_rows[0] | |
| if state_obj.state in {EVAL_STATE_NEW, EVAL_STATE_QUERY, EVAL_STATE_TIMEDOUT, EVAL_STATE_ANSWER}: | |
| return state_obj.state, state_obj | |
| return EVAL_STATE_ERROR, state_obj | |
| def html_error_results(message): | |
| ans = P("There was an error:", P(message)) | |
| return ans | |
| def html_render_code_output(code): | |
| txtarea = Pre(Code(code, _class="language-c")) | |
| return txtarea | |
| def html_render_answer_from_db(session, show_submit_form=True): | |
| session_id = session["session_id"] | |
| eval_request_status, state_obj = get_latest_eval_request_status(session_id) | |
| ulevel = get_user_level(session) | |
| if ulevel >= 2: | |
| show_feedback_buttons = True | |
| else: | |
| show_feedback_buttons = False | |
| # state_rows = session_state_table(limit=1, where=f"session_id == '{session_id}'", order_by="id DESC") | |
| # print(eval_request_status, state_obj) | |
| if eval_request_status == EVAL_STATE_NEW: | |
| return html_default_results(), | |
| if eval_request_status == EVAL_STATE_ANSWER: | |
| qe_obj_lst = question_evaluation_table(limit=1, where=f"id == {state_obj.current_qeval}") | |
| if len(qe_obj_lst) < 1: | |
| print(f"Object id {state_obj.current_qeval} can't be found in question_evaluation_table") | |
| return (None,) | |
| qe_obj = qe_obj_lst[0] | |
| return (html_format_code_review_form(qe_obj, show_feedback_buttons), | |
| tl_html_render_inputbox(session, target_html_id=HTML_RESULTS_AREA, | |
| region_html_id=HTML_SUBMIT_CODE_AREA) if show_submit_form else None) | |
| if eval_request_status == EVAL_STATE_TIMEDOUT: | |
| return html_eval_request_timed_out(), | |
| if eval_request_status == EVAL_STATE_QUERY: | |
| check_if_query_should_timeout(state_obj) | |
| return html_waiting_for_results(), | |
| if eval_request_status == EVAL_STATE_ERROR: | |
| # TODO duplicate code! fix it! | |
| qe_obj_lst = question_evaluation_table(limit=1, where=f"id == {state_obj.current_qeval}") | |
| if len(qe_obj_lst) < 1: | |
| print(f"Object id {state_obj.current_qeval} can't be found in question_evaluation_table") | |
| return (None,) | |
| qe_obj = qe_obj_lst[0] | |
| return html_error_results(qe_obj.answer_eval_text), | |
| print(f"Unknown state of the code evalation request {state_obj.state}:") | |
| return html_error_results("Some error occured."), | |
| # How can I timeout? Well ... TBD. | |
| def call_gpt_and_store_result(session_obj_id, code_to_check): | |
| """ | |
| Threaded function that will submit code to LLM and wait for the answer. | |
| Communication with "main" thread is through db. | |
| All parameters must be pickable. | |
| :param session_obj_id: | |
| :param code_to_check: | |
| :return: | |
| """ | |
| # TODO refactor considering new join! | |
| # print("evaluatign code") | |
| try: | |
| # Pesky way to get a new cursor, in a thread safe way, into the db. This code runs in another thread. | |
| # Can we do better? | |
| local_database = database(DATABASE_NAME) | |
| local_sess_state = local_database.t.session_state | |
| local_sess_state_cls = local_sess_state.dataclass() | |
| local_sess_obj_lst = local_sess_state(limit=1, where=f"id == {session_obj_id}") | |
| local_sess_obj = local_sess_obj_lst[0] | |
| # Trigger the lenghtly operation | |
| enhanced_answer = eval_code.eval_the_piece_of_c_code(openai_client=openai_client, ccode=code_to_check) | |
| # we create a new QA entry. | |
| qe_obj = Question_Evaluation_cls(code_text=code_to_check, answer_eval_text=enhanced_answer, submitted=0) | |
| qe_obj = question_evaluation_table.insert(qe_obj) | |
| local_sess_obj.current_qeval = qe_obj.id | |
| evaluation_date = datetime.isoformat(datetime.utcnow()) | |
| # save to persistent storage | |
| save_to_storage( | |
| storage.CodeSubmittedEvent(event_session_id=local_sess_obj.session_id, db_question_evaluation_id=qe_obj.id, | |
| submitted_date=local_sess_obj.submitted_date, | |
| received_date=evaluation_date, | |
| code_to_eval=code_to_check, evaluation_response=json.dumps(enhanced_answer)) | |
| ) | |
| # Update the session object. | |
| if "error" in enhanced_answer: | |
| local_sess_obj.state = EVAL_STATE_ERROR | |
| local_sess_obj.answer = enhanced_answer["error"] | |
| local_sess_obj.evaluated_date = evaluation_date | |
| else: | |
| local_sess_obj.state = EVAL_STATE_ANSWER | |
| local_sess_obj.evaluated_date = evaluation_date | |
| local_sess_state.update(local_sess_obj) | |
| except: | |
| import traceback | |
| traceback.print_exc() | |
| def tl_html_results_and_feedback_area(session, show_submit_form=True): | |
| """ | |
| Top level component that will render the code evaluation overlapped with feedback submission form. | |
| :param session_id: | |
| :return: | |
| """ | |
| results_feedback_area = html_render_answer_from_db(session, show_submit_form) | |
| return Div(*results_feedback_area, id=HTML_RESULTS_AREA) | |
| ######## CODE INPUT FORM | |
| def tl_html_render_inputbox(session, target_html_id, region_html_id): | |
| txtarea = Textarea(id="ccodetoeval", name="ccodetoeval", placeholder="Paste a standalone C program here. Larger than a 'Hello world', smaller than 1000 LoC.", rows=3) | |
| form = Form(Group(txtarea, Button("Evaluate")), | |
| hx_post="/submit_to_eval", | |
| hx_swap="outerHTML", | |
| target_id=f"{target_html_id}" | |
| ) | |
| ulevel = get_user_level(session) | |
| if ulevel < 1: | |
| form = P("Log in to submit code to review.", style="background-color: #fff0f0;") | |
| out_div = Div(form, id=region_html_id, hx_swap_oob='true') | |
| return out_div | |
| def post(session, ccodetoeval:str): | |
| ulevel = get_user_level(session) | |
| if ulevel < 1: | |
| return P("Unauthorized. Log in to submit code to review.", style="background-color: #fff0f0;") | |
| if 'session_id' not in session: | |
| return P("submit_to_eval. Bad call. No session ID") | |
| session_id = session["session_id"] | |
| save_to_storage( | |
| storage.NavigationEvent(event_type="/submit_to_eval", event_session_id=session_id, event_params={"ccodetoeval":ccodetoeval}) | |
| ) | |
| if len(ccodetoeval) > 100 and len(ccodetoeval) < 40000: | |
| session_obj = Session_State_cls( | |
| session_id=session_id, | |
| state=EVAL_STATE_QUERY, | |
| submitted_date=datetime.isoformat(datetime.utcnow()), | |
| ) | |
| # we insert and we get the new primary key | |
| session_obj = session_state_table.insert(session_obj) | |
| # will be executed in another thread with magic @threaded | |
| call_gpt_and_store_result(session_obj.id, ccodetoeval) | |
| return tl_html_results_and_feedback_area(session), render_clear_area(session_id, HTML_CLEAR_FORM) | |
| def tl_html_get_samples_section(): | |
| button_row = [] | |
| head = H4("Don't have any ideas? Try one of the samples below:") | |
| no_examples = len(eval_code.CODE_EVAL_EXAMPLES) | |
| for k in range(no_examples): | |
| example = eval_code.get_enhanced_sample_example(k) | |
| button = Button(example["name"], hx_get=f"/load_example/{k}", target_id=HTML_RESULTS_AREA) | |
| button_row.append(button) | |
| div_buttons = Div(*button_row, _class="row") | |
| div = Div(head, div_buttons, style="border: 1px solid black;padding: 10px; margin 1px;") | |
| return div | |
| def get(session, example_id:int): | |
| session_id = session["session_id"] | |
| save_to_storage( | |
| storage.NavigationEvent(event_type="/submit_to_eval", event_session_id=session_id, event_params={"example_id":example_id}) | |
| ) | |
| example = eval_code.get_enhanced_sample_example(example_id) | |
| qe_obj = Question_Evaluation_cls(code_text=example["code"], answer_eval_text=example["eval"], submitted=0) | |
| qe_obj = question_evaluation_table.insert(qe_obj) | |
| session_obj = Session_State_cls( | |
| session_id=session_id, | |
| state=EVAL_STATE_ANSWER, | |
| submitted_date=datetime.isoformat(datetime.utcnow()), | |
| evaluated_date=datetime.isoformat(datetime.utcnow()), | |
| current_qeval=qe_obj.id, | |
| ) | |
| session_state_table.insert(session_obj) | |
| return tl_html_results_and_feedback_area(session), render_clear_area(session_id, HTML_CLEAR_FORM) | |
| ########## CLEAR FORM | |
| def html_render_clear_area_button(html_id): | |
| button = Button("Clear form", | |
| hx_get="/clear_area", | |
| hx_swap="outerHTML", | |
| target_id=HTML_RESULTS_AREA, | |
| ) | |
| div = Div(button, id=html_id, hx_swap_oob='true') | |
| return div | |
| def render_clear_area(session_id, html_id): | |
| # return html_render_clear_area_button(html_id) | |
| eval_request_status, _ = get_latest_eval_request_status(session_id) | |
| if eval_request_status != EVAL_STATE_NEW: | |
| # print("clear button: render button") | |
| return html_render_clear_area_button(html_id) | |
| else: | |
| # print("clear button: render empty") | |
| return Div(P(""), id=html_id, hx_swap_oob='true') | |
| def get(session): | |
| if 'session_id' not in session: return P("clear_area. Bad call. No session ID") | |
| session_id = session["session_id"] | |
| save_to_storage( | |
| storage.NavigationEvent(event_type="/clear_area", event_session_id=session_id) | |
| ) | |
| # insert a row to "cancel"/reset the current request | |
| session_obj = Session_State_cls( | |
| session_id=session_id, | |
| state=EVAL_STATE_NEW, | |
| submitted_date=datetime.isoformat(datetime.utcnow()), | |
| ) | |
| session_state_table.insert(session_obj) | |
| # re-issue the page, basically. | |
| input_area = tl_html_render_inputbox(session, target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA) | |
| results_area = tl_html_results_and_feedback_area(session) | |
| clear_area = render_clear_area(session_id, HTML_CLEAR_FORM) | |
| return results_area, input_area, clear_area | |
| ########## AUTHENTICATION | |
| def html_render_login_to_get_access_part(session): | |
| """ | |
| Will render the Log in to get access part and set the session oauth_secret. | |
| The button will launch a popup window and a script that will listen for login_success event and will refresh the | |
| main page. | |
| :param session: | |
| :return: | |
| """ | |
| content = [] | |
| content.append(H4("Log in to give feedback!")) | |
| content.append(P("We will record your name, hugginface name, hf profile page, email address. " | |
| "We will try HARD not to make them public but . . .")) | |
| content.append(P("Your feedback will be made public but in an anonymized form.")) | |
| popup_script = Script(""" | |
| function openPopup(url) { | |
| var width = 500; | |
| var height = 730; | |
| var left = (screen.width - width) / 2; | |
| var top = (screen.height - height) / 2; | |
| localStorage.removeItem('login_success'); | |
| loginPopup = window.open(url, 'LoginWindow', 'width=' + width + ',height=' + height + ',left=' + left + ',top=' + top); | |
| loginCheckInterval = setInterval(checkPopupClosed, 500); | |
| return false; | |
| } | |
| function checkPopupClosed() { | |
| console.log("Checking popup"); | |
| if (localStorage.getItem('login_success') === 'true') { | |
| console.log("Checking popup - local storage"); | |
| clearInterval(loginCheckInterval); | |
| localStorage.removeItem('login_success'); | |
| // Reload the page or update UI as needed | |
| location.reload(); | |
| } | |
| } | |
| """ | |
| ) | |
| content.append(popup_script) | |
| # build the redirect URL | |
| global SPACE_HOST | |
| if "localhost" in SPACE_HOST: | |
| auth_callback_url = f"http://{SPACE_HOST}/auth_callback/" | |
| else: | |
| auth_callback_url = f"https://{SPACE_HOST}/auth_callback/" | |
| secret = datetime.isoformat(datetime.utcnow()) + "-" + secrets.token_urlsafe(32) | |
| session[OAUTH_SECRET_SESSION_NAME] = secret | |
| encoded_scopes = html.escape(OAUTH_SCOPES) | |
| redirect_link = (f"https://huggingface.co/oauth/authorize?redirect_uri={auth_callback_url}&scope={encoded_scopes}" | |
| f"&client_id={OAUTH_CLIENT_ID}&state={secret}&response_type=code&prompt=consent") | |
| login_button = A(Img(src="https://huggingface.co/datasets/huggingface/badges/resolve/main/sign-in-with-huggingface-md.svg", | |
| alt="Sign in with Hugging Face", | |
| # style="cursor: pointer; display: none;", | |
| id="signin", name=None), | |
| href=redirect_link, onclick="return openPopup(this.href); return false;") | |
| content.append(login_button) | |
| content.append(P(" ")) | |
| div = Div(*content, id=HTML_USER_DATA, style="background-color: #f0f0f0; padding: 10px;") | |
| return div | |
| def html_render_welcome_user(user_info): | |
| """ | |
| Extracts the user data and paints the Welcome screen | |
| :param user_info: | |
| :return: | |
| """ | |
| content = [] | |
| content.append(H4(f"Welcome {user_info['name']}!")) | |
| # content.append(Img(src=OPENID_PROVIDER_URL + user_info["picture"], alt="Picture")) | |
| content.append(P("Your feedback will be made public but in an anonymized form.")) | |
| content.append(P(A("Logout", href=f"/logout"))) | |
| div = Div(*content, id=HTML_USER_DATA, style="background-color: #f0f0f0; padding: 10px;") | |
| return div | |
| def get(session, code:str=None, state:str=None, error:str=None, error_description:str=None): | |
| """ | |
| Endpoint that will be called by HF once the user gives (or not) consent | |
| :param session: | |
| :param code: | |
| :param state: | |
| :param error: | |
| :param error_description: | |
| :return: | |
| """ | |
| # print(session) | |
| close_script = Script(""" | |
| localStorage.setItem('login_success', 'true'); | |
| setTimeout(function() { | |
| window.close(); | |
| }, 1500); // 3000 milliseconds = 3 seconds | |
| """) | |
| return_answer = [close_script] | |
| if error is not None: | |
| print(f"HF OAuth returned an error: {error} {error_description}") | |
| ans = Div(P(f"We can't log you in. Huggingface says: {error_description}"), | |
| P("Please close this page")) | |
| return_answer.append(ans) | |
| return Div(*return_answer) | |
| # validating the secret | |
| sess_secret = session.get(OAUTH_SECRET_SESSION_NAME, None) | |
| if sess_secret is None: | |
| print("No session secret") | |
| return_answer.append(P("access denied")) | |
| return Div(*return_answer) | |
| if sess_secret != state: | |
| msg = f"Mismatch session secret and HF secret: {sess_secret=} {state=}" | |
| print(msg) | |
| return_answer.append(P("Mismatch session secret and HF secret")) | |
| return Div(*return_answer) | |
| # Moving on and get the token | |
| global SPACE_HOST | |
| if "localhost" in SPACE_HOST: | |
| space_host = f"http://{SPACE_HOST}/auth_callback/" | |
| else: | |
| space_host = f"https://{SPACE_HOST}/auth_callback/" | |
| auth_header = base64.b64encode(f"{OAUTH_CLIENT_ID}:{OAUTH_CLIENT_SECRET}".encode()).decode() | |
| headers = { | |
| "Authorization": f"Basic {auth_header}", | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| } | |
| data = { | |
| "client_id": OAUTH_CLIENT_ID, | |
| "code": code, | |
| "grant_type": "authorization_code", | |
| "redirect_uri": space_host, | |
| } | |
| token_response = requests.post(HF_OAUTH_TOKEN_URL, data=data, headers=headers) | |
| if token_response.status_code == 200: | |
| tokens = token_response.json() | |
| # Here you would typically store the tokens securely and/or use them | |
| print("Succsss in getting the token") | |
| access_token = tokens["access_token"] | |
| user_headers = { | |
| "Authorization": f"Bearer {access_token}" | |
| } | |
| response_userinfo = requests.get(HF_OAUTH_USERINFO, headers=user_headers) | |
| if response_userinfo.status_code == 200: | |
| user_data = response_userinfo.json() | |
| # Set the user data in DB | |
| # TODO Is it mildly safe to store user data in session? | |
| session[USER_DATA_SESSION_NAME] = user_data | |
| session[OAUTH_SECRET_SESSION_NAME] = None | |
| # print(user_data) | |
| else: | |
| print(f"Error while taking the user data: {response_userinfo.text}" ) | |
| return_answer.append(P("Error logging in")) | |
| return Div(*return_answer) | |
| else: | |
| print(f"We did not get the tokens: {token_response.text}") | |
| return_answer.append(P("Error logging in")) | |
| return Div(*return_answer) | |
| # print(session) | |
| return_answer.append(P("Succes! Close this page.")) | |
| save_to_storage( | |
| storage.NavigationEvent(event_type="/auth_callback", event_session_id=session["session_id"], | |
| event_params={"user_data":user_data}) | |
| ) | |
| return Div(*return_answer) | |
| def logout_get(session): | |
| session[OAUTH_SECRET_SESSION_NAME] = None | |
| save_to_storage( | |
| storage.NavigationEvent(event_type="/logout", event_session_id=session["session_id"], | |
| event_params={"user_data":session["user_data"]}) | |
| ) | |
| session["user_data"] = None | |
| return RedirectResponse(url="/") | |
| def get_user_level(session): | |
| """ | |
| Gets the user rights level. Basically 0 for not logged in, 2 for logged in. | |
| The DEFAULT_ANONYMOUS_USER_LEVEL can raise this level (max operation) | |
| :param session: | |
| :return: | |
| """ | |
| ulevel = 0 | |
| if session.get(USER_DATA_SESSION_NAME, None) is not None: | |
| ulevel = 2 | |
| ulevel = max(ulevel, DEFAULT_ANONYMOUS_USER_LEVEL) | |
| return ulevel | |
| ########## MAIN PAGE | |
| def render_blog_links(): | |
| blog = P("The story behind this demo is ", | |
| A("on my blog ", href="https://visoft.ro/machine-learning/llm-success-keep-the-data-flywheel-spinning/2024/08/18/", target="_blank")) | |
| code_srd = P("The code can be found in the ", A("Space repo.", | |
| href="https://huggingface.co/spaces/ml-visoft/c-reviewer/tree/main", target="_blank")) | |
| div = Div(blog, code_srd, style="font-size: 1.1em;") | |
| return div | |
| def render_kta(): | |
| kta = P("Facing a similar problem? Let's connect!") | |
| areas = [ | |
| Area(shape="rect", coords="0,0,56,68", href="mailto:cristian@visoft.ro", alt="Mail", target="_blank"), | |
| Area(shape="rect", coords="60,0,116,68", href="https://twitter.com/ml_visoft", alt="X", target="_blank"), | |
| Area(shape="rect", coords="120,0,176,68", href="https://www.linkedin.com/in/cristianvicas/", alt="LinkedIn", target="_blank"), | |
| Area(shape="rect", coords="180,0,236,68", href="https://github.com/cristi-zz", alt="GitHub", target="_blank"), | |
| ] | |
| area_map = Map(*areas, name="kta-social-map") | |
| img_kta = Img(src="kta_orange.png", width="243", height="68", usemap="#kta-social-map") | |
| out_div = Div(kta, area_map, img_kta, style="font-size: 1.1em; margin-top: 30px;" | |
| "width: 50%; margin: 0 auto;display: flex;" | |
| "flex-direction: column;" | |
| "justify-content: center; align-items: center;") | |
| return out_div | |
| iframe_overlay_style= """ | |
| #iframe-overlay { | |
| display: none; | |
| position: fixed; | |
| top: 0; | |
| left: 0; | |
| width: 100%; | |
| height: 100%; | |
| background-color: rgba(0, 0, 0, 0.8); | |
| z-index: 9999; | |
| font-family: Arial, sans-serif; | |
| } | |
| #iframe-overlay-content { | |
| position: absolute; | |
| top: 50%; | |
| left: 50%; | |
| transform: translate(-50%, -50%); | |
| text-align: center; | |
| background-color: rgba(255, 255, 255, 0.9); | |
| padding: 20px; | |
| border-radius: 10px; | |
| box-shadow: 0 0 10px rgba(0, 0, 0, 0.3); | |
| } | |
| #iframe-overlay-content h2 { | |
| color: #333; | |
| margin-bottom: 10px; | |
| } | |
| #iframe-overlay-content p { | |
| color: #555; | |
| margin-bottom: 15px; | |
| } | |
| #iframe-overlay-content a { | |
| color: #10A010; | |
| text-decoration: none; | |
| font-weight: bold; | |
| font-size: 32px; | |
| } | |
| #iframe-overlay-content a:hover { | |
| text-decoration: underline; | |
| }""" | |
| iframe_js_detect = """ | |
| function isInIframe() { | |
| try { return window.self !== window.top; } catch (e) { return true; } | |
| } | |
| if (isInIframe()) { | |
| document.getElementById('iframe-overlay').style.display = 'block'; | |
| } | |
| """ | |
| def html_get_iframe_overlay(): | |
| open_btn = Button( | |
| "Open in a new tab", | |
| onclick="window.open('https://ml-visoft-c-reviewer.hf.space/', '_blank', 'noopener');" | |
| ) | |
| msgs = [ | |
| H2("Access the space directly to get full functionality:"), | |
| H3("Click the link below:"), | |
| A( | |
| "ml-visoft-c-reviewer.hf.space", | |
| href="https://ml-visoft-c-reviewer.hf.space/", | |
| target="_blank", | |
| rel="noopener noreferrer" # important for security | |
| ), | |
| Br(), | |
| open_btn | |
| ] | |
| return Div(Div(*msgs, id="iframe-overlay-content"), id="iframe-overlay") | |
| def index_get(session): | |
| if 'session_id' not in session: | |
| session['session_id'] = str(uuid.uuid4()) | |
| session_id = session["session_id"] | |
| if len(session_state_table(where=f"session_id=='{session_id}'")) <= 0: | |
| # old session ID, "resetting". | |
| session['session_id'] = str(uuid.uuid4()) | |
| save_to_storage( | |
| storage.NavigationEvent(event_type="/", event_session_id=session_id) | |
| ) | |
| user_data = session.get(USER_DATA_SESSION_NAME, None) | |
| if user_data is not None: | |
| auth_area = html_render_welcome_user(user_data) | |
| else: | |
| auth_area = html_render_login_to_get_access_part(session) # might set some secrets here | |
| title = Title('C code review for students') | |
| kta_social = render_kta() | |
| iframe_script = Script(iframe_js_detect) | |
| iframe_css = Style(iframe_overlay_style) | |
| preamble = [ | |
| iframe_css, | |
| html_get_iframe_overlay(), | |
| iframe_script, | |
| auth_area, | |
| H1("Evaluate your C code!"), | |
| P("Enter your code in the textbox below and wait for answers."), | |
| P("!! The data will be saved and maybe made public !!", style="background-color: #f0fff0;"), | |
| P("Hint: Don't evaluate more than ~ 1000 LoC."), | |
| render_blog_links() | |
| ] | |
| input_area = tl_html_render_inputbox(session, target_html_id=HTML_RESULTS_AREA, region_html_id=HTML_SUBMIT_CODE_AREA) | |
| samples_area = tl_html_get_samples_section() | |
| results_feedback_area = tl_html_results_and_feedback_area(session, show_submit_form=False) | |
| clear_area = render_clear_area(session_id, HTML_CLEAR_FORM) | |
| # print(session) | |
| return title, Main( *preamble, input_area, samples_area, results_feedback_area, clear_area, kta_social) | |
| serve() | |