Spaces:
Sleeping
Sleeping
| import os | |
| import boto3 | |
| import requests | |
| from urllib.parse import parse_qs, urlparse | |
| from lxml import html | |
| def get_direct_access_token(username, password): | |
| """ | |
| Get DESTINE access token directly using provided username and password. | |
| """ | |
| SERVICE_URL = "http://localhost:5000" | |
| IAM_URL = "https://auth.destine.eu" | |
| IAM_REALM = "desp" | |
| IAM_CLIENT = "dcms_client" | |
| with requests.Session() as s: | |
| # Get the auth url | |
| response = s.get( | |
| url=f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/auth", | |
| params={ | |
| "client_id": IAM_CLIENT, | |
| "redirect_uri": SERVICE_URL, | |
| "scope": "openid", | |
| "response_type": "code", | |
| }, | |
| ) | |
| response.raise_for_status() | |
| auth_url = html.fromstring(response.content.decode()).forms[0].action | |
| # Login and get auth code | |
| login = s.post( | |
| auth_url, | |
| data={ | |
| "username": username, | |
| "password": password, | |
| }, | |
| allow_redirects=False, | |
| ) | |
| if login.status_code == 200: | |
| tree = html.fromstring(login.content) | |
| error_message_element = tree.xpath('//span[@id="input-error"]/text()') | |
| error_message = ( | |
| error_message_element[0].strip() | |
| if error_message_element | |
| else "Authentication failed" | |
| ) | |
| print(f"Error: {error_message}") | |
| return None | |
| if login.status_code != 302: | |
| print(f"Login failed with status code: {login.status_code}") | |
| return None | |
| auth_code = parse_qs(urlparse(login.headers["Location"]).query)["code"][0] | |
| # Use the auth code to get the token | |
| response = requests.post( | |
| f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/token", | |
| data={ | |
| "client_id": IAM_CLIENT, | |
| "redirect_uri": SERVICE_URL, | |
| "code": auth_code, | |
| "grant_type": "authorization_code", | |
| "scope": "", | |
| }, | |
| ) | |
| if response.status_code != 200: | |
| print(f"Failed to get token. Status code: {response.status_code}") | |
| return None | |
| token_data = response.json() | |
| return { | |
| "access_token": token_data.get("access_token"), | |
| "refresh_token": token_data.get("refresh_token") | |
| } | |
| class S3Connector: | |
| """ | |
| A client for connecting to S3-compatible storage services. | |
| This connector provides an interface to connect to an S3-compatible | |
| storage service and retrieve the S3 resource object. | |
| """ | |
| def __init__(self, endpoint_url, access_key_id, secret_access_key, region_name='default'): | |
| """ | |
| Initialize S3 connector with credentials and endpoint information. | |
| Parameters | |
| ---------- | |
| endpoint_url : str | |
| The URL of the S3 endpoint | |
| access_key_id : str | |
| The access key for authentication | |
| secret_access_key : str | |
| The secret key for authentication | |
| region_name : str, optional | |
| The AWS region name, by default 'default' | |
| """ | |
| self.endpoint_url = endpoint_url | |
| self.access_key_id = access_key_id | |
| self.secret_access_key = secret_access_key | |
| self.region_name = region_name | |
| self.s3_client = None | |
| def connect(self): | |
| """ | |
| Establish connection to S3 service. | |
| Returns | |
| ------- | |
| bool | |
| True if connection was successful, False otherwise | |
| """ | |
| try: | |
| # Also create a client object | |
| self.s3_client = boto3.client( | |
| 's3', | |
| endpoint_url=self.endpoint_url, | |
| aws_access_key_id=self.access_key_id, | |
| aws_secret_access_key=self.secret_access_key, | |
| region_name=self.region_name | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Connection failed: {e}") | |
| return False | |
| def get_s3(self): | |
| """ | |
| Return the S3 resource object. | |
| If not already connected, this method will first establish a connection. | |
| Returns | |
| ------- | |
| boto3.resources.factory.s3.ServiceResource | |
| The boto3 S3 resource object for interacting with S3 storage | |
| """ | |
| if not self.s3: | |
| self.connect() | |
| return self.s3 | |
| def get_s3_client(self): | |
| """ | |
| Return the S3 client object. | |
| If not already connected, this method will first establish a connection. | |
| Returns | |
| ------- | |
| boto3.client.S3 | |
| The boto3 S3 client object for interacting with S3 storage | |
| """ | |
| if not self.s3_client: | |
| self.connect() | |
| return self.s3_client | |
| # if __name__ == "__main__": | |
| # from dotenv import load_dotenv | |
| # load_dotenv() | |
| # # Get credentials from environment variables | |
| # ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID") | |
| # SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY") | |
| # ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu' | |
| # # Initialize the connector | |
| # s3_connector = S3Connector( | |
| # endpoint_url=ENDPOINT_URL, | |
| # access_key_id=ACCESS_KEY_ID, | |
| # secret_access_key=SECRET_ACCESS_KEY | |
| # ) | |
| # # Connect to S3 | |
| # s3_connector.connect() | |
| # s3_client = s3_connector.get_s3_client() | |