File size: 19,026 Bytes
64ab846
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
"""

This file contains a Copernicus Data Space Ecosystem data extraction class

                    for downloading satellite data

"""

########################################## Import dependencies #######################################################
import os
import json
import yaml
import inspect
import shutil
import re
import requests
from datetime import datetime, timedelta
from typing import List, Optional, Tuple
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
from sentinelhub import bbox_to_dimensions, BBox
from io import BytesIO
import rasterio
from PIL import Image
import numpy as np

class CopernicusDataExtractor:
    """

    A class for extracting satellite data from Copernicus Data Space Ecosystem.



    This class uses the Copernicus SentinelHub Process API with OAuth2 authentication

    to download processed satellite data with custom evalscripts.



    Attributes:

        parameters (dict): User input configurations

        oauth_session (OAuth2Session): Authenticated OAuth2 session

        consortium (str): the consortium for which we are downloading the data

        timespan (list): Start and end date for data request

        bbox (list): Region of Interest coordinates [min_lon, min_lat, max_lon, max_lat]

        image_dimensions (tuple): Width and height in pixels. To keep previous imagery (requested using SH last year, we use SH bbox to dimensions method)

        output_folder (str): dir location where to save retrieved data

        datetimes (list): List of timestamps when satellite scanned ROI

        evalscript (str): JavaScript evaluation script for processing

        response_type (str): Type of output (rgb_nir, vi_values, s1_vv)

        obtained_data (list): List of bands/indices in output



    Methods:

        __authenticate_copernicus: Authenticate with Copernicus OAuth2

        _calculate_dimensions: Calculate image dimensions from bbox and resolution

        _get_timestamps: Get available acquisition timestamps using Catalog API

        _build_process_request: Build Process API request payload

        _download_single_acquisition: Download data for one timestamp

        data_request: Main method to download all data

        set_evalscript: Change evalscript type

    """

    def __init__(self, consortium = 'consortium0', evalscript='default_evalscript.js', crs = 'EPSG:4326'):
        """

        Initialize the Copernicus Data Extractor.



        Args:

            evalscript (str): Name of the evalscript file to use for processing

        """
        # our parameters
        with open('config/pre_anonym_params.yml', 'r') as f:
            self.parameters = yaml.safe_load(f)

        # auth and create OAuth session
        self.oauth_session = self.__authenticate_copernicus()

        self.consortium = consortium.lower()

        self.timespan = [self.parameters['start_date'], self.parameters['end_date']]
        self.bbox = self.parameters['bbox'][self.consortium]  # [min_lon, min_lat, max_lon, max_lat]
        #self.image_dimensions = self._calculate_dimensions()
        # keep previous method to mantain pixel placement and allignment w/ previously downloaded data
        self.image_dimensions = bbox_to_dimensions(BBox(self.bbox, crs=crs), resolution=self.parameters['resolution'])

        # output folder based on consortium
        self.output_folder = os.path.join( os.getcwd() ,
            "data", "01_raw",
            self.parameters['consortia_data_folders'][self.consortium],
            "satellite_data"
        )

        self.datetimes = []

        # selected evalscript load
        self.set_evalscript(evalscript)

    def __authenticate_copernicus(self) -> OAuth2Session:
        """

        Authenticate with Copernicus Data Space Ecosystem using OAuth2.



        Returns:

            OAuth2Session: Authenticated session with automatic token handling



        Raises:

            RuntimeError: If authentication fails

        """
        # load OAuth credentials
        with open('config/copernicus_oauth_config.json', 'r') as f:
            oauth_credentials = json.load(f)

        client_id = oauth_credentials['client_id']
        client_secret = oauth_credentials['client_secret']

        try:
            # init OAuth2 session
            client = BackendApplicationClient(client_id=client_id)
            oauth = OAuth2Session(client=client)

            # get token
            token_url = 'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token'
            token = oauth.fetch_token(
                token_url=token_url,
                client_secret=client_secret,
                include_client_id=True
            )

            print('βœ“ Successfully authenticated with Copernicus Data Space Ecosystem.')
            print(f'βœ“ Process API endpoint: https://sh.dataspace.copernicus.eu')

            return oauth

        except Exception as e:
            raise RuntimeError(f"Copernicus authentication failed: {e}")

    # to mantain previous pixel placement, for now we use SH method instead of running ours
    def _calculate_dimensions(self) -> Tuple[int, int]:
        """

        Calculate image dimensions from bbox and resolution.



        Returns:

            tuple: (width, height) in pixels

        """
        from math import cos, radians

        resolution = self.parameters.get('resolution', 10)  # meters

        # calculate width and height in meters (approximate)
        min_lon, min_lat, max_lon, max_lat = self.bbox

        # width at the center latitude
        center_lat = (min_lat + max_lat) / 2
        width_m = (max_lon - min_lon) * 111320 * cos(radians(center_lat))
        height_m = (max_lat - min_lat) * 110540

        # to pixels
        width_px = int(width_m / resolution)
        height_px = int(height_m / resolution)

        return (width_px, height_px)

    def _get_timestamps(self, timespan: Optional[List[str]] = None) -> List[str]:
        """

        Get available satellite acquisition timestamps using STAC Catalog API.



        Args:

            timespan (list, optional): [start_date, end_date] in 'YYYY-MM-DD' format



        Returns:

            list: List of datetime strings

        """
        if timespan:
            if isinstance(timespan, list) and len(timespan) == 2:
                self.timespan = timespan
                print(f'New timespan: {timespan[0]} to {timespan[1]}')
        else:
            timespan = self.timespan

        # through Copernicus STAC Catalog API
        #catalog_url = "https://catalogue.dataspace.copernicus.eu/stac/search"
        # this is the sentinelhub catalogue version. requires diff setup(way cloudcover eo is given if not sentinel1) and token in header
        # also datetimespan is differently given
        catalog_url = "https://sh.dataspace.copernicus.eu/api/v1/catalog/1.0.0/search"
        # parse cloud cover
        cloud_cover_max = 100
        if 'cloud_cover' in self.parameters:
            try:
                cloud_str = self.parameters['cloud_cover']
                cloud_cover_max = int(re.search(r'(\d+)', cloud_str).group(1))
            except:
                pass

        stac_collection = self.parameters['collection']


        # build STAC request
        stac_request = {
            "collections": [stac_collection],
            "bbox": self.bbox,
            "datetime": f"{self.timespan[0]}T00:00:00Z/{self.timespan[1]}T23:59:59Z",
            #"datetime": [
            #    f"{self.timespan[0]}T00:00:00Z",
            #    f"{self.timespan[1]}T23:59:59Z"
            #],
            "limit": 100
        }

        # + cloud cover filter for optical data
        #if 'sentinel1' not in self.parameters['collection'].lower():
        #    stac_request["query"] = {
        #        "eo:cloud_cover": {
        #            "lt": cloud_cover_max
        #        }
        #    }
        if 'sentinel1' not in self.parameters['collection'].lower():
            stac_request["filter"] = {
                "op": "<",
                "args": [
                    {"property": "eo:cloud_cover"},
                    cloud_cover_max
                ]
            }
            stac_request["filter-lang"] = "cql2-json"

        try:
            headers = {
                "Authorization": f"Bearer {self.oauth_session.token['access_token']}"
            }

            all_features = []
            next_token = None

            while True:
                req_payload = stac_request.copy()
                if next_token is not None:
                    req_payload["next"] = next_token

                response = requests.post(catalog_url, json=req_payload, timeout=30, headers=headers)
                response.raise_for_status()
                results = response.json()

                # accumulate features
                all_features.extend(results.get("features", []))

                # check pagination token
                context = results.get("context", {})
                next_token = context.get("next")

                if not next_token:
                    break

            # timestamps
            self.datetimes = [f['properties']['datetime'] for f in all_features]

            print(f"βœ“ Found {len(self.datetimes)} acquisitions in timespan")
            return self.datetimes


        except requests.exceptions.HTTPError as e:
            print(f'βœ— Catalog search failed: {e}')
            if hasattr(e.response, 'text'):
                print(f'  Response: {e.response.text[:300]}')
            return []
        except Exception as e:
            print(f'βœ— Catalog search failed: {e}')
            return []

    def _build_process_request(self, time_range: Tuple[str, str]) -> dict:
        """

        Build Process API request payload.



        Args:

            time_range (tuple): (start_time, end_time) for the request



        Returns:

            dict: Request payload for Process API

        """
        # map collection names to Process API types
        #collection_map = {
        #    'SENTINEL2_L2A': 'sentinel-2-l2a',
        #    'SENTINEL2_L1C': 'sentinel-2-l1c',
        #    'SENTINEL1_IW': 'sentinel-1-grd',
        #    'SENTINEL1': 'sentinel-1-grd',
        #}

        process_collection = self.parameters['collection']

        request = {
            "input": {
                "bounds": {
                    "properties": {"crs": "http://www.opengis.net/def/crs/OGC/1.3/CRS84"},
                    "bbox": self.bbox,
                },
                "data": [
                    {
                        "type": process_collection,
                        "dataFilter": {
                            "timeRange": {
                                "from": time_range[0],
                                "to": time_range[1],
                            }
                        },
                    }
                ],
            },
            "output": {
                "width": self.image_dimensions[0],
                "height": self.image_dimensions[1],
                "responses": [
                    {
                        "identifier": self.response_type,
                        "format": {"type": "image/tiff"}
                    }
                ]
            },
            "evalscript": self.evalscript,
        }

        # + mosaicking order for optical data
        if 'sentinel1' not in self.parameters['collection'].lower():
            request["input"]["data"][0]["processing"] = {
                "mosaickingOrder": "leastCC"  # Least cloud cover
            }

        return request

    # originally tried using STAC (was giving some probs), changed to sh catalog
    def _download_single_acquisition(self, datetime_str: str, index: int, total: int) -> Optional[str]:
        """

        Download data for a single acquisition.



        Args:

            datetime_str (str): Acquisition datetime

            index (int): Current acquisition number

            total (int): Total number of acquisitions



        Returns:

            str: Filename if successful, None otherwise

        """
        print(f'\n[{index}/{total}] Processing: {datetime_str}')

        # from datetime to date range (full day)
        dt = datetime.fromisoformat(datetime_str.replace('Z', '+00:00'))
        date_start = dt.date().strftime('%Y-%m-%d')
        date_end = (dt.date() + timedelta(days=1)).strftime('%Y-%m-%d')
        time_range = (f"{date_start}T00:00:00Z", f"{date_end}T00:00:00Z")

        # build request
        request_payload = self._build_process_request(time_range)

        # call Process API
        process_url = "https://sh.dataspace.copernicus.eu/api/v1/process"

        try:
            response = self.oauth_session.post(process_url, json=request_payload, timeout=120)
            response.raise_for_status()

            # save the TIFF!
            output_dir = os.path.join(
                os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))),
                self.output_folder
            )
            os.makedirs(output_dir, exist_ok=True)

            # map collection names to Process API types
            # save with this to keep previous nomenclature
            collection_map = {
                'sentinel-2-l2a': 'SENTINEL2_L2A',
                'sentinel-2-l1c': 'SENTINEL2_L1C',
                #'sentinel-1-grd': 'SENTINEL1_IW',
                'sentinel-1-grd': 'SENTINEL1',
             }

            filename = f"{self.consortium}_{self.response_type}_{collection_map[self.parameters['collection']]}_{(datetime_str.split('.')[0] + 'Z').replace(':', '_')}.tiff"
            output_path = os.path.join(output_dir, filename)

            self._save_tiff_with_metadata(response.content, output_path)

            print(f'  βœ“ Saved: {filename}')
            return filename

        except requests.exceptions.HTTPError as e:
            print(f'  βœ— HTTP Error: {e}')
            if hasattr(e.response, 'text'):
                print(f'  Response: {e.response.text}')
            # for debug: print the request payload
            if e.response.status_code == 400:
                print(f'  Request payload was:')
                print(f'    Collection type: {request_payload["input"]["data"][0]["type"]}')
            return None
        except Exception as e:
            print(f'  βœ— Download failed: {e}')
            return None

    def _save_tiff_with_metadata(self, content: bytes, output_path: str):
        """

        Save TIFF content with corrected metadata.



        Args:

            content (bytes): Raw TIFF data from API

            output_path (str): Path to save corrected TIFF

        """
        # Read from memory
        with rasterio.open(BytesIO(content)) as src:
            data = src.read()
            profile = src.profile.copy()

            # Fix metadata
            profile.update({
                'photometric': 'MINISBLACK',
                'compress': 'deflate',
                'interleave': 'band',
            })

            if 'extra_samples' in profile:
                del profile['extra_samples']

        # Write to disk with fixed metadata
        with rasterio.open(output_path, 'w', **profile) as dst:
            dst.write(data)

    def data_request(self, timespan: Optional[List[str]] = None):
        """

        Request and download satellite data for all available timestamps.



        Args:

            timespan (list, optional): [start_date, end_date] to override configured timespan

        """
        print("\n" + "="*70)
        print("COPERNICUS DATA EXTRACTION - Process API")
        print("="*70)

        # get available timestamps
        if timespan:
            timestamps = self._get_timestamps(timespan)
        else:
            timestamps = getattr(self, 'datetimes', [])

        if not timestamps:
            print('\nβœ— No data available for the specified timespan and parameters.')
            return

        # download each acquisition
        successful = 0
        failed = 0

        for i, dt in enumerate(timestamps, 1):
            result = self._download_single_acquisition(dt, i, len(timestamps))
            if result:
                successful += 1
            else:
                failed += 1

        # summary
        print("\n" + "="*70)
        print(f"SUMMARY: {successful} successful, {failed} failed out of {len(timestamps)} total")
        print("="*70 + "\n")

    def set_evalscript(self, new_evalscript: str):
        """

        Load and set a new evaluation script for data processing.



        Args:

            new_evalscript (str): Filename of the evalscript in 'request_scripts' directory

        """
        evalscript_path = os.path.join('config/request_scripts', new_evalscript)

        with open(evalscript_path, 'r') as evalscript_file:
            self.evalscript = evalscript_file.read()

        # set response type based on evalscript
        if new_evalscript == 'default_evalscript.js':
            self.response_type = "rgb_nir"
            self.obtained_data = ['red', 'green', 'blue', 'nir']
        elif new_evalscript == 'sentinel1_evalscript.js':
            self.response_type = "s1_vv"
            self.obtained_data = ['vv']
        else:
            self.response_type = "vi_values"
            # extract band/index names from evalscript
            pattern = re.compile(r"(?:rgb_nir|vi_values|s1_vv):\s*\[([^\]]+)\]", re.DOTALL)
            match = pattern.search(self.evalscript)

            if match:
                vi_values_content = match.group(1)
                self.obtained_data = [value.strip() for value in vi_values_content.split(",")]
            else:
                self.obtained_data = []

        print(f'βœ“ Evalscript set to: {new_evalscript}')
        print(f'  Response type: {self.response_type}')
        print(f'  Output layers: {len(self.obtained_data)}')


# Example usage
if __name__ == '__main__':
    # init extractor with vegetation indices evalscript
    extractor = CopernicusDataExtractor(evalscript='vis_evalscript.js')

    # available timestamps test
    timestamps = extractor._get_timestamps(timespan=['2023-12-29','2023-12-30'])
    print(f'\nAvailable acquisitions: {len(timestamps)}')

    # download data test and check in comparison with the one available before
    extractor.data_request(['2025-01-29', '2025-01-30'])
    print(f'\nObtained data layers: {extractor.obtained_data}')

    print(f'\nObtained data layers: {extractor.obtained_data}')