File size: 4,759 Bytes
0d5f83d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e37ffe7
 
 
 
 
0d5f83d
 
 
 
e37ffe7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0d5f83d
e37ffe7
0d5f83d
e37ffe7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0d5f83d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2241538
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import boto3
import requests
from urllib.parse import parse_qs, urlparse
from lxml import html

def get_direct_access_token(username, password):
    """
    Get DESTINE access token directly using provided username and password.
    """
    SERVICE_URL = "http://localhost:5000"
    IAM_URL = "https://auth.destine.eu"
    IAM_REALM = "desp"
    IAM_CLIENT = "dcms_client"

    with requests.Session() as s:
        # Get the auth url
        response = s.get(
            url=f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/auth",
            params={
                "client_id": IAM_CLIENT,
                "redirect_uri": SERVICE_URL,
                "scope": "openid",
                "response_type": "code",
            },
        )
        response.raise_for_status()
        auth_url = html.fromstring(response.content.decode()).forms[0].action

        # Login and get auth code
        login = s.post(
            auth_url,
            data={
                "username": username,
                "password": password,
            },
            allow_redirects=False,
        )

        if login.status_code == 200:
            tree = html.fromstring(login.content)
            error_message_element = tree.xpath('//span[@id="input-error"]/text()')
            error_message = (
                error_message_element[0].strip()
                if error_message_element
                else "Authentication failed"
            )
            print(f"Error: {error_message}")
            return None

        if login.status_code != 302:
            print(f"Login failed with status code: {login.status_code}")
            return None

        auth_code = parse_qs(urlparse(login.headers["Location"]).query)["code"][0]

        # Use the auth code to get the token
        response = requests.post(
            f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/token",
            data={
                "client_id": IAM_CLIENT,
                "redirect_uri": SERVICE_URL,
                "code": auth_code,
                "grant_type": "authorization_code",
                "scope": "",
            },
        )

        if response.status_code != 200:
            print(f"Failed to get token. Status code: {response.status_code}")
            return None

        token_data = response.json()
        return {
            "access_token": token_data.get("access_token"),
            "refresh_token": token_data.get("refresh_token")
        }


class S3Connector:
    """A clean connector for S3-compatible storage services"""

    def __init__(self, endpoint_url, access_key_id,
                 secret_access_key, region_name='default'):
        """Initialize the S3Connector with connection parameters"""
        self.endpoint_url = endpoint_url
        self.access_key_id = access_key_id
        self.secret_access_key = secret_access_key
        self.region_name = region_name

        # Create session
        self.session = boto3.session.Session()

        # Initialize S3 resource
        self.s3 = self.session.resource(
            's3',
            endpoint_url=self.endpoint_url,
            aws_access_key_id=self.access_key_id,
            aws_secret_access_key=self.secret_access_key,
            region_name=self.region_name
        )

        # Initialize S3 client
        self.s3_client = self.session.client(
            's3',
            endpoint_url=self.endpoint_url,
            aws_access_key_id=self.access_key_id,
            aws_secret_access_key=self.secret_access_key,
            region_name=self.region_name
        )

    def get_s3_client(self):
        """Get the boto3 S3 client"""
        return self.s3_client

    def get_s3_resource(self):
        """Get the boto3 S3 resource"""
        return self.s3

    def get_bucket(self, bucket_name):
        """Get a specific bucket by name"""
        return self.s3.Bucket(bucket_name)

    def list_buckets(self):
        """List all available buckets"""
        response = self.s3_client.list_buckets()
        if 'Buckets' in response:
            return [bucket['Name'] for bucket in response['Buckets']]
        return []

# if __name__ == "__main__":


#     from dotenv import load_dotenv
#     load_dotenv()
#     # Get credentials from environment variables
#     ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID")
#     SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY")
#     ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu'
#     # Initialize the connector
#     s3_connector = S3Connector(
#         endpoint_url=ENDPOINT_URL,
#         access_key_id=ACCESS_KEY_ID,
#         secret_access_key=SECRET_ACCESS_KEY
#     )
#     # Connect to S3
#     s3_connector.connect()
#     s3_client = s3_connector.get_s3_client()