| import logging
|
| from keycloak import KeycloakOpenID
|
| import json
|
|
|
|
|
| class KeycloakAuthenticator:
|
|
|
| def __init__(self, configuration):
|
|
|
| with open(configuration, 'r') as f:
|
| kc_params = json.load(f)
|
|
|
| self.status_ok = True
|
| self.log = logging.getLogger()
|
|
|
|
|
| self.keycloak_server_url = kc_params['server_url']
|
| self.keycloak_realm_name = kc_params['realm_name']
|
| self.keycloak_client_id = kc_params['client_id']
|
|
|
|
|
|
|
| self.keycloak_username = kc_params['username']
|
| self.keycloak_password = kc_params['password']
|
|
|
|
|
| self.keycloak_openid = KeycloakOpenID(
|
| server_url=self.keycloak_server_url,
|
| realm_name=self.keycloak_realm_name,
|
| client_id=self.keycloak_client_id
|
| )
|
|
|
|
|
| try:
|
| self.log.debug("Getting wellKnown configuration...")
|
| self.keycloak_config_well_known = self.keycloak_openid.well_known()
|
| self.log.debug(self.keycloak_config_well_known)
|
| except Exception as ex:
|
| self.log.error("Error getting wellKnown configuration ")
|
| self.log.error(ex)
|
| self.status_ok = False
|
|
|
| def get_token(self):
|
| if not self.status_ok:
|
| self.log.error("The connection has not been established. Check the logs.")
|
| return None
|
| self.log.debug("Getting token for user {}".format(self.keycloak_username))
|
| try:
|
| keycloak_token = self.keycloak_openid.token(self.keycloak_username, self.keycloak_password)
|
| self.log.debug("Token: {}".format(keycloak_token))
|
| for k in keycloak_token.keys():
|
| self.log.debug("{} = {}".format(k, keycloak_token[k]))
|
| token_value = keycloak_token['access_token']
|
|
|
| except Exception as ex:
|
| self.log.error("Error getting token")
|
| self.log.error(ex)
|
| token_value = None
|
|
|
| return token_value
|
|
|
| if __name__ == '__main__':
|
| auth = KeycloakAuthenticator("../config/keycloak_config.json")
|
| token = auth.get_token()
|
| print(token) |