Spaces:
Sleeping
Sleeping
| import time | |
| import gradio as gr | |
| import pandas as pd | |
| from blossomtune_gradio import config as cfg | |
| from blossomtune_gradio.logs import log | |
| from blossomtune_gradio import federation as fed | |
| from blossomtune_gradio import processing | |
| from blossomtune_gradio.settings import settings | |
| from blossomtune_gradio import util | |
| from blossomtune_gradio.database import SessionLocal, Request | |
| from . import components | |
| from . import auth | |
| def log_updater_generator(): | |
| """Continuously yields log updates every 1 second.""" | |
| while True: | |
| yield log.output | |
| time.sleep(1) | |
| def get_full_status_update( | |
| profile: gr.OAuthProfile | None, oauth_token: gr.OAuthToken | None | |
| ): | |
| owner = auth.is_space_owner(profile, oauth_token) | |
| auth_status = "Authenticating..." | |
| is_on_space = cfg.SPACE_OWNER is not None | |
| hf_handle_val = "" | |
| hf_handle_interactive = not is_on_space | |
| if is_on_space: | |
| if profile: | |
| if owner: | |
| auth_status = settings.get_text( | |
| "auth_status_logged_in_owner_md", profile=profile | |
| ) | |
| else: | |
| auth_status = settings.get_text( | |
| "auth_status_logged_in_user_md", profile=profile | |
| ) | |
| hf_handle_val = profile.username | |
| else: | |
| auth_status = settings.get_text("auth_status_not_logged_in_md") | |
| else: | |
| auth_status = settings.get_text("auth_status_local_mode_md") | |
| with SessionLocal() as db: | |
| pending_results = ( | |
| db.query(Request.participant_id, Request.hf_handle, Request.email) | |
| .filter(Request.status == "pending", Request.is_activated == 1) | |
| .order_by(Request.timestamp.asc()) | |
| .all() | |
| ) | |
| approved_results = ( | |
| db.query( | |
| Request.participant_id, | |
| Request.hf_handle, | |
| Request.email, | |
| Request.partition_id, | |
| ) | |
| .filter(Request.status == "approved") | |
| .order_by(Request.timestamp.desc()) | |
| .all() | |
| ) | |
| # Convert SQLAlchemy rows to simple lists | |
| pending_rows = [list(row) for row in pending_results] | |
| approved_rows = [list(row) for row in approved_results] | |
| # Superlink Status Logic | |
| superlink_btn_update = gr.update() | |
| if cfg.SUPERLINK_MODE == "internal": | |
| superlink_is_running = ( | |
| processing.process_store["superlink"] | |
| and processing.process_store["superlink"].poll() is None | |
| ) | |
| superlink_status = "π’ Running" if superlink_is_running else "π΄ Not Running" | |
| if superlink_is_running: | |
| superlink_btn_update = gr.update( | |
| value="π Stop Superlink", variant="stop", interactive=True | |
| ) | |
| else: | |
| superlink_btn_update = gr.update( | |
| value="π Start Superlink", variant="secondary", interactive=True | |
| ) | |
| elif cfg.SUPERLINK_MODE == "external": | |
| if not cfg.SUPERLINK_HOST: | |
| superlink_status = "π΄ Not Configured" | |
| else: | |
| is_open = util.is_port_open(cfg.SUPERLINK_HOST, cfg.SUPERLINK_PORT) | |
| superlink_status = "π’ Running" if is_open else "π΄ Not Running" | |
| superlink_btn_update = gr.update(value="Managed Externally", interactive=False) | |
| else: | |
| superlink_status = "β οΈ Invalid Mode" | |
| superlink_btn_update = gr.update(interactive=False) | |
| runner_is_running = ( | |
| processing.process_store["runner"] | |
| and processing.process_store["runner"].poll() is None | |
| ) | |
| runner_status = "π’ Running" if runner_is_running else "π΄ Not Running" | |
| if runner_is_running: | |
| runner_btn_update = gr.update(value="π Stop Runner", variant="stop") | |
| else: | |
| runner_btn_update = gr.update(value="βΆοΈ Start Federated Run", variant="primary") | |
| return { | |
| components.admin_panel: gr.update(visible=owner), | |
| components.auth_status_md: gr.update(value=auth_status), | |
| components.superlink_status_public_txt: gr.update(value=superlink_status), | |
| components.superlink_status_admin_txt: gr.update(value=superlink_status), | |
| components.runner_status_txt: gr.update(value=runner_status), | |
| components.pending_requests_df: gr.update( | |
| value=pending_rows if pending_rows else [[]] | |
| ), | |
| components.approved_participants_df: gr.update( | |
| value=approved_rows if approved_rows else [[]] | |
| ), | |
| components.superlink_toggle_btn: superlink_btn_update, | |
| components.runner_toggle_btn: runner_btn_update, | |
| components.hf_handle_tb: gr.update( | |
| value=hf_handle_val, interactive=hf_handle_interactive | |
| ), | |
| } | |
| def get_log_update(): | |
| return { | |
| components.log_output: gr.update(value=log.output), | |
| } | |
| def toggle_superlink( | |
| profile: gr.OAuthProfile | None, oauth_token: gr.OAuthToken | None | |
| ): | |
| """Toggles the Superlink process on or off.""" | |
| if not auth.is_space_owner(profile, oauth_token): | |
| gr.Warning("You are not authorized to perform this operation.") | |
| return | |
| if ( | |
| processing.process_store["superlink"] | |
| and processing.process_store["superlink"].poll() is None | |
| ): | |
| processing.stop_process("superlink") | |
| else: | |
| processing.start_superlink() | |
| def toggle_runner( | |
| runner_app: str, | |
| run_id: str, | |
| num_partitions: str, | |
| profile: gr.OAuthProfile | None, | |
| oauth_token: gr.OAuthToken | None, | |
| ): | |
| """Toggles the Runner process on or off.""" | |
| if not auth.is_space_owner(profile, oauth_token): | |
| gr.Warning("You are not authorized to perform this operation.") | |
| return | |
| if ( | |
| processing.process_store["runner"] | |
| and processing.process_store["runner"].poll() is None | |
| ): | |
| processing.stop_process("runner") | |
| else: | |
| result, message = processing.start_runner(runner_app, run_id, num_partitions) | |
| if not result: | |
| gr.Warning(message) | |
| else: | |
| gr.Info(message) | |
| def on_select_pending(pending_data: list, evt: gr.SelectData): | |
| """Handles selection from the pending requests table to pre-fill the form.""" | |
| if not evt.index: | |
| return "", "" | |
| pending_df = pd.DataFrame( | |
| pending_data, columns=["Participant ID", "HF Handle", "Email"] | |
| ) | |
| row_index = evt.index[0] | |
| if pending_df.empty or row_index >= len(pending_df): | |
| return "", "" | |
| participant_id = pending_df.iloc[row_index, 0] | |
| return participant_id, str(fed.get_next_partion_id()) | |
| def on_check_participant_status( | |
| hf_handle: str, email: str, activation_code: str, profile: gr.OAuthProfile | None | |
| ): | |
| is_on_space = cfg.SPACE_ID is not None | |
| if is_on_space and not profile: | |
| return { | |
| components.request_status_md: gr.update( | |
| value=settings.get_text("auth_required_md") | |
| ), | |
| components.ca_cert_download: gr.update(value=None, visible=False), | |
| } | |
| user_hf_handle = profile.username if is_on_space else hf_handle | |
| if not user_hf_handle or not user_hf_handle.strip(): | |
| return { | |
| components.request_status_md: gr.update( | |
| value=settings.get_text("hf_handle_empty_md") | |
| ), | |
| components.ca_cert_download: gr.update(value=None, visible=False), | |
| } | |
| pid_to_check = user_hf_handle.strip() | |
| email_to_add = email.strip() | |
| activation_code_to_check = activation_code.strip() | |
| approved, message, download = fed.check_participant_status( | |
| pid_to_check, email_to_add, activation_code_to_check | |
| ) | |
| return { | |
| components.request_status_md: gr.update(value=message), | |
| components.ca_cert_download: gr.update( | |
| value=download, visible=True if download else False | |
| ), | |
| } | |
| def on_manage_fed_request(participant_id: str, partition_id: str, action: str): | |
| result, message = fed.manage_request(participant_id, partition_id, action) | |
| if result: | |
| gr.Info(message) | |
| else: | |
| gr.Warning(message) | |