mapster_space / src /auth /auth.py
rodolphethinks1's picture
Rename src/auth to src/auth/auth.py
03489c8 verified
import os
import boto3
import requests
from urllib.parse import parse_qs, urlparse
from lxml import html
def get_direct_access_token(username, password):
"""
Get DESTINE access token directly using provided username and password.
"""
SERVICE_URL = "http://localhost:5000"
IAM_URL = "https://auth.destine.eu"
IAM_REALM = "desp"
IAM_CLIENT = "dcms_client"
with requests.Session() as s:
# Get the auth url
response = s.get(
url=f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/auth",
params={
"client_id": IAM_CLIENT,
"redirect_uri": SERVICE_URL,
"scope": "openid",
"response_type": "code",
},
)
response.raise_for_status()
auth_url = html.fromstring(response.content.decode()).forms[0].action
# Login and get auth code
login = s.post(
auth_url,
data={
"username": username,
"password": password,
},
allow_redirects=False,
)
if login.status_code == 200:
tree = html.fromstring(login.content)
error_message_element = tree.xpath('//span[@id="input-error"]/text()')
error_message = (
error_message_element[0].strip()
if error_message_element
else "Authentication failed"
)
print(f"Error: {error_message}")
return None
if login.status_code != 302:
print(f"Login failed with status code: {login.status_code}")
return None
auth_code = parse_qs(urlparse(login.headers["Location"]).query)["code"][0]
# Use the auth code to get the token
response = requests.post(
f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/token",
data={
"client_id": IAM_CLIENT,
"redirect_uri": SERVICE_URL,
"code": auth_code,
"grant_type": "authorization_code",
"scope": "",
},
)
if response.status_code != 200:
print(f"Failed to get token. Status code: {response.status_code}")
return None
token_data = response.json()
return {
"access_token": token_data.get("access_token"),
"refresh_token": token_data.get("refresh_token")
}
class S3Connector:
"""
A client for connecting to S3-compatible storage services.
This connector provides an interface to connect to an S3-compatible
storage service and retrieve the S3 resource object.
"""
def __init__(self, endpoint_url, access_key_id, secret_access_key, region_name='default'):
"""
Initialize S3 connector with credentials and endpoint information.
Parameters
----------
endpoint_url : str
The URL of the S3 endpoint
access_key_id : str
The access key for authentication
secret_access_key : str
The secret key for authentication
region_name : str, optional
The AWS region name, by default 'default'
"""
self.endpoint_url = endpoint_url
self.access_key_id = access_key_id
self.secret_access_key = secret_access_key
self.region_name = region_name
self.s3_client = None
def connect(self):
"""
Establish connection to S3 service.
Returns
-------
bool
True if connection was successful, False otherwise
"""
try:
# Also create a client object
self.s3_client = boto3.client(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
region_name=self.region_name
)
return True
except Exception as e:
print(f"Connection failed: {e}")
return False
def get_s3(self):
"""
Return the S3 resource object.
If not already connected, this method will first establish a connection.
Returns
-------
boto3.resources.factory.s3.ServiceResource
The boto3 S3 resource object for interacting with S3 storage
"""
if not self.s3:
self.connect()
return self.s3
def get_s3_client(self):
"""
Return the S3 client object.
If not already connected, this method will first establish a connection.
Returns
-------
boto3.client.S3
The boto3 S3 client object for interacting with S3 storage
"""
if not self.s3_client:
self.connect()
return self.s3_client
# if __name__ == "__main__":
# from dotenv import load_dotenv
# load_dotenv()
# # Get credentials from environment variables
# ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID")
# SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY")
# ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu'
# # Initialize the connector
# s3_connector = S3Connector(
# endpoint_url=ENDPOINT_URL,
# access_key_id=ACCESS_KEY_ID,
# secret_access_key=SECRET_ACCESS_KEY
# )
# # Connect to S3
# s3_connector.connect()
# s3_client = s3_connector.get_s3_client()