Spaces:
Paused
Paused
| # --- browser_utils/operations_modules/parsers.py --- | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from typing import Any | |
| from config import ( | |
| DEBUG_LOGS_ENABLED, | |
| MODELS_ENDPOINT_URL_CONTAINS, | |
| ) | |
| logger = logging.getLogger("AIStudioProxyServer") | |
| async def _handle_model_list_response(response: Any): | |
| """Handle model list response""" | |
| # Need to access global variables | |
| from api_utils.server_state import state | |
| getattr(state, "global_model_list_raw_json", None) | |
| getattr(state, "parsed_model_list", []) | |
| model_list_fetch_event = getattr(state, "model_list_fetch_event", None) | |
| excluded_model_ids = getattr(state, "excluded_model_ids", set()) | |
| if MODELS_ENDPOINT_URL_CONTAINS in response.url and response.ok: | |
| # Check if in login flow | |
| launch_mode = os.environ.get("LAUNCH_MODE", "debug") | |
| is_in_login_flow = launch_mode in ["debug"] and not getattr( | |
| state, "is_page_ready", False | |
| ) | |
| if is_in_login_flow: | |
| # During login flow, handle silently to avoid interfering with user input | |
| pass # Silent handling to avoid interfering with user input | |
| else: | |
| logger.debug( | |
| f"[Network] Captured model list response ({response.status} OK)" | |
| ) | |
| try: | |
| data = await response.json() | |
| models_array_container = None | |
| if isinstance(data, list) and data: | |
| if ( | |
| isinstance(data[0], list) | |
| and data[0] | |
| and isinstance(data[0][0], list) | |
| ): | |
| # [Parse] log moved to count change check | |
| models_array_container = data[0] | |
| elif ( | |
| isinstance(data[0], list) | |
| and data[0] | |
| and isinstance(data[0][0], str) | |
| ): | |
| # [Parse] log moved to count change check | |
| models_array_container = data | |
| elif isinstance(data[0], dict): | |
| # [Parse] log moved to count change check | |
| models_array_container = data | |
| else: | |
| logger.warning( | |
| f"Unknown list nesting structure. data[0] type: {type(data[0]) if data else 'N/A'}. data[0] preview: {str(data[0])[:200] if data else 'N/A'}" | |
| ) | |
| elif isinstance(data, dict): | |
| if "data" in data and isinstance(data["data"], list): | |
| models_array_container = data["data"] | |
| elif "models" in data and isinstance(data["models"], list): | |
| models_array_container = data["models"] | |
| else: | |
| for key, value in data.items(): | |
| if ( | |
| isinstance(value, list) | |
| and len(value) > 0 | |
| and isinstance(value[0], (dict, list)) | |
| ): | |
| models_array_container = value | |
| logger.info( | |
| f"Model list data found under '{key}' key via heuristic search." | |
| ) | |
| break | |
| if models_array_container is None: | |
| logger.warning( | |
| "Could not auto-locate model list array in dict response." | |
| ) | |
| if ( | |
| model_list_fetch_event | |
| and not model_list_fetch_event.is_set() | |
| ): | |
| model_list_fetch_event.set() | |
| return | |
| else: | |
| logger.warning( | |
| f"Received model list data is neither list nor dict: {type(data)}" | |
| ) | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| model_list_fetch_event.set() | |
| return | |
| if models_array_container is not None: | |
| new_parsed_list = [] | |
| excluded_during_parse: list[str] = [] # Collect excluded model IDs | |
| for entry_in_container in models_array_container: | |
| model_fields_list = None | |
| if isinstance(entry_in_container, dict): | |
| potential_id = entry_in_container.get( | |
| "id", | |
| entry_in_container.get( | |
| "model_id", entry_in_container.get("modelId") | |
| ), | |
| ) | |
| if potential_id: | |
| model_fields_list = entry_in_container | |
| else: | |
| model_fields_list = list(entry_in_container.values()) | |
| elif isinstance(entry_in_container, list): | |
| model_fields_list = entry_in_container | |
| else: | |
| logger.debug( | |
| f"Skipping entry of unknown type: {type(entry_in_container)}" | |
| ) | |
| continue | |
| if not model_fields_list: | |
| logger.debug( | |
| "Skipping entry because model_fields_list is empty or None." | |
| ) | |
| continue | |
| model_id_path_str = None | |
| display_name_candidate = "" | |
| description_candidate = "N/A" | |
| default_max_output_tokens_val = None | |
| default_top_p_val = None | |
| default_temperature_val = 1.0 | |
| supported_max_output_tokens_val = None | |
| current_model_id_for_log = "UnknownModelYet" | |
| try: | |
| if isinstance(model_fields_list, list): | |
| if not ( | |
| len(model_fields_list) > 0 | |
| and isinstance(model_fields_list[0], (str, int, float)) | |
| ): | |
| logger.debug( | |
| f"Skipping list-based model_fields due to invalid first element: {str(model_fields_list)[:100]}" | |
| ) | |
| continue | |
| model_id_path_str = str(model_fields_list[0]) | |
| current_model_id_for_log = ( | |
| model_id_path_str.split("/")[-1] | |
| if model_id_path_str and "/" in model_id_path_str | |
| else model_id_path_str | |
| ) | |
| display_name_candidate = ( | |
| str(model_fields_list[3]) | |
| if len(model_fields_list) > 3 | |
| else "" | |
| ) | |
| description_candidate = ( | |
| str(model_fields_list[4]) | |
| if len(model_fields_list) > 4 | |
| else "N/A" | |
| ) | |
| if ( | |
| len(model_fields_list) > 6 | |
| and model_fields_list[6] is not None | |
| ): | |
| try: | |
| val_int = int(model_fields_list[6]) | |
| default_max_output_tokens_val = val_int | |
| supported_max_output_tokens_val = val_int | |
| except (ValueError, TypeError): | |
| logger.warning( | |
| f"Model {current_model_id_for_log}: Cannot parse list index 6 value '{model_fields_list[6]}' as max_output_tokens." | |
| ) | |
| if ( | |
| len(model_fields_list) > 9 | |
| and model_fields_list[9] is not None | |
| ): | |
| try: | |
| raw_top_p = float(model_fields_list[9]) | |
| if not (0.0 <= raw_top_p <= 1.0): | |
| logger.warning( | |
| f"Model {current_model_id_for_log}: Raw top_p value {raw_top_p} (from list index 9) exceeds [0,1] range, will be clipped." | |
| ) | |
| default_top_p_val = max( | |
| 0.0, min(1.0, raw_top_p) | |
| ) | |
| else: | |
| default_top_p_val = raw_top_p | |
| except (ValueError, TypeError): | |
| logger.warning( | |
| f"Model {current_model_id_for_log}: Cannot parse list index 9 value '{model_fields_list[9]}' as top_p." | |
| ) | |
| elif isinstance(model_fields_list, dict): | |
| model_id_path_str = str( | |
| model_fields_list.get( | |
| "id", | |
| model_fields_list.get( | |
| "model_id", model_fields_list.get("modelId") | |
| ), | |
| ) | |
| ) | |
| current_model_id_for_log = ( | |
| model_id_path_str.split("/")[-1] | |
| if model_id_path_str and "/" in model_id_path_str | |
| else model_id_path_str | |
| ) | |
| display_name_candidate = str( | |
| model_fields_list.get( | |
| "displayName", | |
| model_fields_list.get( | |
| "display_name", | |
| model_fields_list.get("name", ""), | |
| ), | |
| ) | |
| ) | |
| description_candidate = str( | |
| model_fields_list.get("description", "N/A") | |
| ) | |
| mot_parsed = model_fields_list.get( | |
| "maxOutputTokens", | |
| model_fields_list.get( | |
| "defaultMaxOutputTokens", | |
| model_fields_list.get("outputTokenLimit"), | |
| ), | |
| ) | |
| if mot_parsed is not None: | |
| try: | |
| val_int = int(mot_parsed) | |
| default_max_output_tokens_val = val_int | |
| supported_max_output_tokens_val = val_int | |
| except (ValueError, TypeError): | |
| logger.warning( | |
| f"Model {current_model_id_for_log}: Cannot parse dict value '{mot_parsed}' as max_output_tokens." | |
| ) | |
| top_p_parsed = model_fields_list.get( | |
| "topP", model_fields_list.get("defaultTopP") | |
| ) | |
| if top_p_parsed is not None: | |
| try: | |
| raw_top_p = float(top_p_parsed) | |
| if not (0.0 <= raw_top_p <= 1.0): | |
| logger.warning( | |
| f"Model {current_model_id_for_log}: Raw top_p value {raw_top_p} (from dict) exceeds [0,1] range, will be clipped." | |
| ) | |
| default_top_p_val = max( | |
| 0.0, min(1.0, raw_top_p) | |
| ) | |
| else: | |
| default_top_p_val = raw_top_p | |
| except (ValueError, TypeError): | |
| logger.warning( | |
| f"Model {current_model_id_for_log}: Cannot parse dict value '{top_p_parsed}' as top_p." | |
| ) | |
| temp_parsed = model_fields_list.get( | |
| "temperature", | |
| model_fields_list.get("defaultTemperature"), | |
| ) | |
| if temp_parsed is not None: | |
| try: | |
| default_temperature_val = float(temp_parsed) | |
| except (ValueError, TypeError): | |
| logger.warning( | |
| f"Model {current_model_id_for_log}: Cannot parse dict value '{temp_parsed}' as temperature." | |
| ) | |
| else: | |
| logger.debug( | |
| f"Skipping entry because model_fields_list is not list or dict: {type(model_fields_list)}" | |
| ) | |
| continue | |
| except Exception as e_parse_fields: | |
| logger.error( | |
| f"Error parsing model fields for entry {str(entry_in_container)[:100]}: {e_parse_fields}" | |
| ) | |
| continue | |
| if model_id_path_str and model_id_path_str.lower() != "none": | |
| simple_model_id_str = ( | |
| model_id_path_str.split("/")[-1] | |
| if "/" in model_id_path_str | |
| else model_id_path_str | |
| ) | |
| if simple_model_id_str in excluded_model_ids: | |
| excluded_during_parse.append(simple_model_id_str) | |
| continue | |
| final_display_name_str = ( | |
| display_name_candidate | |
| if display_name_candidate | |
| else simple_model_id_str.replace("-", " ").title() | |
| ) | |
| model_entry_dict = { | |
| "id": simple_model_id_str, | |
| "object": "model", | |
| "created": int(time.time()), | |
| "owned_by": "ai_studio", | |
| "display_name": final_display_name_str, | |
| "description": description_candidate, | |
| "raw_model_path": model_id_path_str, | |
| "default_temperature": default_temperature_val, | |
| "default_max_output_tokens": default_max_output_tokens_val, | |
| "supported_max_output_tokens": supported_max_output_tokens_val, | |
| "default_top_p": default_top_p_val, | |
| } | |
| new_parsed_list.append(model_entry_dict) | |
| else: | |
| logger.debug( | |
| f"Skipping entry due to invalid model_id_path: {model_id_path_str} from entry {str(entry_in_container)[:100]}" | |
| ) | |
| # Excluded model log moved to count change check | |
| excluded_count = ( | |
| len(excluded_during_parse) if excluded_during_parse else 0 | |
| ) | |
| if new_parsed_list: | |
| # Check if network interception already injected models | |
| has_network_injected_models = False | |
| if models_array_container: | |
| for entry_in_container in models_array_container: | |
| if ( | |
| isinstance(entry_in_container, list) | |
| and len(entry_in_container) > 10 | |
| ): | |
| # Check for network injection marker | |
| if "__NETWORK_INJECTED__" in entry_in_container: | |
| has_network_injected_models = True | |
| break | |
| if has_network_injected_models and not is_in_login_flow: | |
| logger.info( | |
| "Detected network interception already injected models" | |
| ) | |
| # Note: No longer adding injected models on backend | |
| # If frontend didn't inject via network interception, these models won't be usable anyway | |
| # So we only rely on network interception for injection | |
| state.parsed_model_list = sorted( | |
| new_parsed_list, key=lambda m: m.get("display_name", "").lower() | |
| ) | |
| state.global_model_list_raw_json = json.dumps( | |
| {"data": state.parsed_model_list, "object": "list"} | |
| ) | |
| if DEBUG_LOGS_ENABLED: | |
| # Only print full model list on first load or count change | |
| previous_count = getattr(state, "_last_model_count", 0) or 0 | |
| current_count = len(state.parsed_model_list) | |
| if previous_count != current_count or previous_count == 0: | |
| # Only show detailed parsing info when list changes | |
| if excluded_count > 0 and not is_in_login_flow: | |
| logger.debug( | |
| f"[Model] Excluded {excluded_count} models" | |
| ) | |
| log_output = ( | |
| f"[Model] List updated: {current_count} models\n" | |
| ) | |
| for i, item in enumerate( | |
| state.parsed_model_list[ | |
| : min(3, len(state.parsed_model_list)) | |
| ] | |
| ): | |
| log_output += f" {i + 1}. {item.get('id')} (MaxTok={item.get('default_max_output_tokens')})\n" | |
| logger.debug(log_output.rstrip()) | |
| state._last_model_count = current_count # type: ignore | |
| else: | |
| logger.debug(f"[Model] List unchanged ({current_count})") | |
| else: | |
| logger.info( | |
| f"[Model] List updated (total {len(state.parsed_model_list)} models)" | |
| ) | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| model_list_fetch_event.set() | |
| elif not state.parsed_model_list: | |
| logger.warning("Model list still empty after parsing.") | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| model_list_fetch_event.set() | |
| else: | |
| logger.warning( | |
| "models_array_container is None, cannot parse model list." | |
| ) | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| model_list_fetch_event.set() | |
| except json.JSONDecodeError as json_err: | |
| logger.error( | |
| f"Failed to parse model list JSON: {json_err}. Response (first 500 chars): {await response.text()[:500]}" | |
| ) | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception as e_handle_list_resp: | |
| logger.exception( | |
| f"Unknown error processing model list response: {e_handle_list_resp}" | |
| ) | |
| finally: | |
| if model_list_fetch_event and not model_list_fetch_event.is_set(): | |
| logger.info( | |
| "Finished processing model list response, forcing model_list_fetch_event set." | |
| ) | |
| model_list_fetch_event.set() | |