import os import boto3 import requests from urllib.parse import parse_qs, urlparse from lxml import html def get_direct_access_token(username, password): """ Get DESTINE access token directly using provided username and password. """ SERVICE_URL = "http://localhost:5000" IAM_URL = "https://auth.destine.eu" IAM_REALM = "desp" IAM_CLIENT = "dcms_client" with requests.Session() as s: # Get the auth url response = s.get( url=f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/auth", params={ "client_id": IAM_CLIENT, "redirect_uri": SERVICE_URL, "scope": "openid", "response_type": "code", }, ) response.raise_for_status() auth_url = html.fromstring(response.content.decode()).forms[0].action # Login and get auth code login = s.post( auth_url, data={ "username": username, "password": password, }, allow_redirects=False, ) if login.status_code == 200: tree = html.fromstring(login.content) error_message_element = tree.xpath('//span[@id="input-error"]/text()') error_message = ( error_message_element[0].strip() if error_message_element else "Authentication failed" ) print(f"Error: {error_message}") return None if login.status_code != 302: print(f"Login failed with status code: {login.status_code}") return None auth_code = parse_qs(urlparse(login.headers["Location"]).query)["code"][0] # Use the auth code to get the token response = requests.post( f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/token", data={ "client_id": IAM_CLIENT, "redirect_uri": SERVICE_URL, "code": auth_code, "grant_type": "authorization_code", "scope": "", }, ) if response.status_code != 200: print(f"Failed to get token. Status code: {response.status_code}") return None token_data = response.json() return { "access_token": token_data.get("access_token"), "refresh_token": token_data.get("refresh_token") } class S3Connector: """ A client for connecting to S3-compatible storage services. This connector provides an interface to connect to an S3-compatible storage service and retrieve the S3 resource object. """ def __init__(self, endpoint_url, access_key_id, secret_access_key, region_name='default'): """ Initialize S3 connector with credentials and endpoint information. Parameters ---------- endpoint_url : str The URL of the S3 endpoint access_key_id : str The access key for authentication secret_access_key : str The secret key for authentication region_name : str, optional The AWS region name, by default 'default' """ self.endpoint_url = endpoint_url self.access_key_id = access_key_id self.secret_access_key = secret_access_key self.region_name = region_name self.s3_client = None def connect(self): """ Establish connection to S3 service. Returns ------- bool True if connection was successful, False otherwise """ try: # Also create a client object self.s3_client = boto3.client( 's3', endpoint_url=self.endpoint_url, aws_access_key_id=self.access_key_id, aws_secret_access_key=self.secret_access_key, region_name=self.region_name ) return True except Exception as e: print(f"Connection failed: {e}") return False def get_s3(self): """ Return the S3 resource object. If not already connected, this method will first establish a connection. Returns ------- boto3.resources.factory.s3.ServiceResource The boto3 S3 resource object for interacting with S3 storage """ if not self.s3: self.connect() return self.s3 def get_s3_client(self): """ Return the S3 client object. If not already connected, this method will first establish a connection. Returns ------- boto3.client.S3 The boto3 S3 client object for interacting with S3 storage """ if not self.s3_client: self.connect() return self.s3_client # if __name__ == "__main__": # from dotenv import load_dotenv # load_dotenv() # # Get credentials from environment variables # ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID") # SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY") # ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu' # # Initialize the connector # s3_connector = S3Connector( # endpoint_url=ENDPOINT_URL, # access_key_id=ACCESS_KEY_ID, # secret_access_key=SECRET_ACCESS_KEY # ) # # Connect to S3 # s3_connector.connect() # s3_client = s3_connector.get_s3_client()