| | import os |
| | import glob |
| | import random |
| | import json |
| | from typing import List, Dict, Any |
| | from google.auth.transport.requests import Request as AuthRequest |
| | from google.oauth2 import service_account |
| | import config as app_config |
| |
|
| | |
| | def parse_multiple_json_credentials(json_str: str) -> List[Dict[str, Any]]: |
| | """ |
| | Parse multiple JSON objects from a string separated by commas. |
| | Format expected: {json_object1},{json_object2},... |
| | Returns a list of parsed JSON objects. |
| | """ |
| | credentials_list = [] |
| | nesting_level = 0 |
| | current_object_start = -1 |
| | str_length = len(json_str) |
| |
|
| | for i, char in enumerate(json_str): |
| | if char == '{': |
| | if nesting_level == 0: |
| | current_object_start = i |
| | nesting_level += 1 |
| | elif char == '}': |
| | if nesting_level > 0: |
| | nesting_level -= 1 |
| | if nesting_level == 0 and current_object_start != -1: |
| | |
| | json_object_str = json_str[current_object_start : i + 1] |
| | try: |
| | credentials_info = json.loads(json_object_str) |
| | |
| | required_fields = ["type", "project_id", "private_key_id", "private_key", "client_email"] |
| | if all(field in credentials_info for field in required_fields): |
| | credentials_list.append(credentials_info) |
| | print(f"DEBUG: Successfully parsed a JSON credential object.") |
| | else: |
| | print(f"WARNING: Parsed JSON object missing required fields: {json_object_str[:100]}...") |
| | except json.JSONDecodeError as e: |
| | print(f"ERROR: Failed to parse JSON object segment: {json_object_str[:100]}... Error: {e}") |
| | current_object_start = -1 |
| | else: |
| | |
| | print(f"WARNING: Encountered unexpected '}}' at index {i}. Input might be malformed.") |
| |
|
| |
|
| | if nesting_level != 0: |
| | print(f"WARNING: JSON string parsing ended with non-zero nesting level ({nesting_level}). Check for unbalanced braces.") |
| |
|
| | print(f"DEBUG: Parsed {len(credentials_list)} credential objects from the input string.") |
| | return credentials_list |
| | def _refresh_auth(credentials): |
| | """Helper function to refresh GCP token.""" |
| | if not credentials: |
| | print("ERROR: _refresh_auth called with no credentials.") |
| | return None |
| | try: |
| | |
| | project_id_for_log = getattr(credentials, 'project_id', 'Unknown') |
| | print(f"INFO: Attempting to refresh token for project: {project_id_for_log}...") |
| | credentials.refresh(AuthRequest()) |
| | print(f"INFO: Token refreshed successfully for project: {project_id_for_log}") |
| | return credentials.token |
| | except Exception as e: |
| | project_id_for_log = getattr(credentials, 'project_id', 'Unknown') |
| | print(f"ERROR: Error refreshing GCP token for project {project_id_for_log}: {e}") |
| | return None |
| |
|
| |
|
| | |
| | class CredentialManager: |
| | def __init__(self): |
| | |
| | self.credentials_dir = app_config.CREDENTIALS_DIR |
| | self.credentials_files = [] |
| | self.current_index = 0 |
| | self.credentials = None |
| | self.project_id = None |
| | |
| | self.in_memory_credentials: List[Dict[str, Any]] = [] |
| | self.load_credentials_list() |
| |
|
| | def add_credential_from_json(self, credentials_info: Dict[str, Any]) -> bool: |
| | """ |
| | Add a credential from a JSON object to the manager's in-memory list. |
| | |
| | Args: |
| | credentials_info: Dict containing service account credentials |
| | |
| | Returns: |
| | bool: True if credential was added successfully, False otherwise |
| | """ |
| | try: |
| | |
| | required_fields = ["type", "project_id", "private_key_id", "private_key", "client_email"] |
| | if not all(field in credentials_info for field in required_fields): |
| | print(f"WARNING: Skipping JSON credential due to missing required fields.") |
| | return False |
| |
|
| | credentials = service_account.Credentials.from_service_account_info( |
| | credentials_info, |
| | scopes=['https://www.googleapis.com/auth/cloud-platform'] |
| | ) |
| | project_id = credentials.project_id |
| | print(f"DEBUG: Successfully created credentials object from JSON for project: {project_id}") |
| |
|
| | |
| | self.in_memory_credentials.append({ |
| | 'credentials': credentials, |
| | 'project_id': project_id, |
| | 'source': 'json_string' |
| | }) |
| | print(f"INFO: Added credential for project {project_id} from JSON string to Credential Manager.") |
| | return True |
| | except Exception as e: |
| | print(f"ERROR: Failed to create credentials from parsed JSON object: {e}") |
| | return False |
| |
|
| | def load_credentials_from_json_list(self, json_list: List[Dict[str, Any]]) -> int: |
| | """ |
| | Load multiple credentials from a list of JSON objects into memory. |
| | |
| | Args: |
| | json_list: List of dicts containing service account credentials |
| | |
| | Returns: |
| | int: Number of credentials successfully loaded |
| | """ |
| | |
| | existing_projects = {cred['project_id'] for cred in self.in_memory_credentials} |
| | success_count = 0 |
| | newly_added_projects = set() |
| |
|
| | for credentials_info in json_list: |
| | project_id = credentials_info.get('project_id') |
| | |
| | is_duplicate_file = any(os.path.basename(f) == f"{project_id}.json" for f in self.credentials_files) |
| | is_duplicate_mem = project_id in existing_projects or project_id in newly_added_projects |
| |
|
| | if project_id and not is_duplicate_file and not is_duplicate_mem: |
| | if self.add_credential_from_json(credentials_info): |
| | success_count += 1 |
| | newly_added_projects.add(project_id) |
| | elif project_id: |
| | print(f"DEBUG: Skipping duplicate credential for project {project_id} from JSON list.") |
| |
|
| |
|
| | if success_count > 0: |
| | print(f"INFO: Loaded {success_count} new credentials from JSON list into memory.") |
| | return success_count |
| |
|
| | def load_credentials_list(self): |
| | """Load the list of available credential files""" |
| | |
| | pattern = os.path.join(self.credentials_dir, "*.json") |
| | self.credentials_files = glob.glob(pattern) |
| |
|
| | if not self.credentials_files: |
| | |
| | pass |
| | else: |
| | print(f"Found {len(self.credentials_files)} credential files: {[os.path.basename(f) for f in self.credentials_files]}") |
| |
|
| | |
| | return self.get_total_credentials() > 0 |
| |
|
| | def refresh_credentials_list(self): |
| | """Refresh the list of credential files and return if any credentials exist""" |
| | old_file_count = len(self.credentials_files) |
| | self.load_credentials_list() |
| | new_file_count = len(self.credentials_files) |
| |
|
| | if old_file_count != new_file_count: |
| | print(f"Credential files updated: {old_file_count} -> {new_file_count}") |
| |
|
| | |
| | total_credentials = self.get_total_credentials() |
| | print(f"DEBUG: Refresh check - Total credentials available: {total_credentials}") |
| | return total_credentials > 0 |
| |
|
| | def get_total_credentials(self): |
| | """Returns the total number of credentials (file + in-memory).""" |
| | return len(self.credentials_files) + len(self.in_memory_credentials) |
| |
|
| |
|
| | def get_random_credentials(self): |
| | """ |
| | Get a random credential (file or in-memory) and load it. |
| | Tries each available credential source at most once in a random order. |
| | """ |
| | all_sources = [] |
| | |
| | for file_path in self.credentials_files: |
| | all_sources.append({'type': 'file', 'value': file_path}) |
| | |
| | |
| | |
| | for idx, mem_cred_info in enumerate(self.in_memory_credentials): |
| | all_sources.append({'type': 'memory_object', 'value': mem_cred_info, 'original_index': idx}) |
| |
|
| | if not all_sources: |
| | print("WARNING: No credentials available for random selection (no files or in-memory).") |
| | return None, None |
| |
|
| | random.shuffle(all_sources) |
| |
|
| | for source_info in all_sources: |
| | source_type = source_info['type'] |
| | |
| | if source_type == 'file': |
| | file_path = source_info['value'] |
| | print(f"DEBUG: Attempting to load credential from file: {os.path.basename(file_path)}") |
| | try: |
| | credentials = service_account.Credentials.from_service_account_file( |
| | file_path, |
| | scopes=['https://www.googleapis.com/auth/cloud-platform'] |
| | ) |
| | project_id = credentials.project_id |
| | print(f"INFO: Successfully loaded credential from file {os.path.basename(file_path)} for project: {project_id}") |
| | self.credentials = credentials |
| | self.project_id = project_id |
| | return credentials, project_id |
| | except Exception as e: |
| | print(f"ERROR: Failed loading credentials file {os.path.basename(file_path)}: {e}. Trying next available source.") |
| | continue |
| | |
| | elif source_type == 'memory_object': |
| | mem_cred_detail = source_info['value'] |
| | |
| | credentials = mem_cred_detail.get('credentials') |
| | project_id = mem_cred_detail.get('project_id') |
| | |
| | if credentials and project_id: |
| | print(f"INFO: Using in-memory credential for project: {project_id} (Source: {mem_cred_detail.get('source', 'unknown')})") |
| | |
| | |
| | |
| | |
| | |
| | self.credentials = credentials |
| | self.project_id = project_id |
| | return credentials, project_id |
| | else: |
| | print(f"WARNING: In-memory credential entry missing 'credentials' or 'project_id' at original index {source_info.get('original_index', 'N/A')}. Skipping.") |
| | continue |
| | |
| | print("WARNING: All available credential sources failed to load.") |
| | return None, None |