Spaces:
Build error
Build error
File size: 4,759 Bytes
0d5f83d e37ffe7 0d5f83d e37ffe7 0d5f83d e37ffe7 0d5f83d e37ffe7 0d5f83d 2241538 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | import os
import boto3
import requests
from urllib.parse import parse_qs, urlparse
from lxml import html
def get_direct_access_token(username, password):
"""
Get DESTINE access token directly using provided username and password.
"""
SERVICE_URL = "http://localhost:5000"
IAM_URL = "https://auth.destine.eu"
IAM_REALM = "desp"
IAM_CLIENT = "dcms_client"
with requests.Session() as s:
# Get the auth url
response = s.get(
url=f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/auth",
params={
"client_id": IAM_CLIENT,
"redirect_uri": SERVICE_URL,
"scope": "openid",
"response_type": "code",
},
)
response.raise_for_status()
auth_url = html.fromstring(response.content.decode()).forms[0].action
# Login and get auth code
login = s.post(
auth_url,
data={
"username": username,
"password": password,
},
allow_redirects=False,
)
if login.status_code == 200:
tree = html.fromstring(login.content)
error_message_element = tree.xpath('//span[@id="input-error"]/text()')
error_message = (
error_message_element[0].strip()
if error_message_element
else "Authentication failed"
)
print(f"Error: {error_message}")
return None
if login.status_code != 302:
print(f"Login failed with status code: {login.status_code}")
return None
auth_code = parse_qs(urlparse(login.headers["Location"]).query)["code"][0]
# Use the auth code to get the token
response = requests.post(
f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/token",
data={
"client_id": IAM_CLIENT,
"redirect_uri": SERVICE_URL,
"code": auth_code,
"grant_type": "authorization_code",
"scope": "",
},
)
if response.status_code != 200:
print(f"Failed to get token. Status code: {response.status_code}")
return None
token_data = response.json()
return {
"access_token": token_data.get("access_token"),
"refresh_token": token_data.get("refresh_token")
}
class S3Connector:
"""A clean connector for S3-compatible storage services"""
def __init__(self, endpoint_url, access_key_id,
secret_access_key, region_name='default'):
"""Initialize the S3Connector with connection parameters"""
self.endpoint_url = endpoint_url
self.access_key_id = access_key_id
self.secret_access_key = secret_access_key
self.region_name = region_name
# Create session
self.session = boto3.session.Session()
# Initialize S3 resource
self.s3 = self.session.resource(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
region_name=self.region_name
)
# Initialize S3 client
self.s3_client = self.session.client(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
region_name=self.region_name
)
def get_s3_client(self):
"""Get the boto3 S3 client"""
return self.s3_client
def get_s3_resource(self):
"""Get the boto3 S3 resource"""
return self.s3
def get_bucket(self, bucket_name):
"""Get a specific bucket by name"""
return self.s3.Bucket(bucket_name)
def list_buckets(self):
"""List all available buckets"""
response = self.s3_client.list_buckets()
if 'Buckets' in response:
return [bucket['Name'] for bucket in response['Buckets']]
return []
# if __name__ == "__main__":
# from dotenv import load_dotenv
# load_dotenv()
# # Get credentials from environment variables
# ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID")
# SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY")
# ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu'
# # Initialize the connector
# s3_connector = S3Connector(
# endpoint_url=ENDPOINT_URL,
# access_key_id=ACCESS_KEY_ID,
# secret_access_key=SECRET_ACCESS_KEY
# )
# # Connect to S3
# s3_connector.connect()
# s3_client = s3_connector.get_s3_client()
|