3.1 Data Processing with Pangeo#

In this exercise we will build a complete EO workflow using the Pangeo ecosystem on a cloud platform; from data access to obtaining the result. In this example we will analyse snow cover in the Alps.

We are going to follow these steps in our analysis:

  • Load satellite collections

  • Specify the spatial, temporal extents and the features we are interested in

  • Process the satellite data to retrieve snow cover information

  • aggregate information to get catchment statistics over time

  • Visualize and analyse the results

More information on the Pangeo ecosystem: https://pangeo.io

Libraries#

We start by creating the shared folders and data files needed to complete the exercise using the following shell commands

!cp -r $DATA_PATH/31_results/ .
!cp -r $DATA_PATH/31_data/ .
# platform libraries
# utility libraries
from datetime import date
import numpy as np
import xarray as xr
import rioxarray
import json
import pandas as pd
import matplotlib.pyplot as plt
import geopandas as gpd
from shapely.geometry import mapping
import pyproj

# STAC Catalogue Libraries
import pystac_client
import stackstac

# Data Visualization Libraries
import holoviews as hv
import hvplot.xarray
import hvplot.pandas
import folium

# Dask library
from dask.distributed import Client, progress, LocalCluster

Get a client from the Dask Cluster#

Creating a Dask Client is mandatory in order to perform following Dask computations on your local Dask Cluster.

cluster = LocalCluster(n_workers=2)
client = Client(cluster)  # create a local dask cluster on the machine.
client

Client

Client-2ceeb9f5-7c5f-11ef-8c49-0e9954f8d647

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: /user/6ecd4b8f-1e28-4f75-8d0a-6a67d88aa718/proxy/8787/status

Cluster Info

2024-09-26 23:31:13,486 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:34143' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('transpose-66d294fae80bb66a10882f79d91496ff', 0, 2, 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 1, 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 3, 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 0, 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 2, 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 3, 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 3, 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 0, 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 1, 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 0, 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 2, 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 1, 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 3, 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 1, 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 3, 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 1, 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 2, 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 2, 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 2, 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 1, 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 0, 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 3, 2)} (stimulus_id='handle-worker-cleanup-1727393473.4861248')
2024-09-26 23:31:13,495 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:36743' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('mul-7680d1f12cb1e1923e9efd9bc03b83bc', 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 0, 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 2, 0), ('sum-aggregate-fc5cec84bf30adebc5669b7d67490222', 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 1, 1), ('sum-aggregate-1e842309f28ff844a497aab1f6e24b28', 0), ('mul-0090ec86ab31f3127529325c3eb9519e', 2), ('sum-aggregate-e8de4e06c0dd40f1de0346254383e847', 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 1, 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 2, 2), ('mul-7680d1f12cb1e1923e9efd9bc03b83bc', 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 3, 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 2, 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 0, 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 3, 1), ('sum-aggregate-fc5cec84bf30adebc5669b7d67490222', 0), ('sum-aggregate-1e842309f28ff844a497aab1f6e24b28', 2), ('mul-0090ec86ab31f3127529325c3eb9519e', 1), ('sum-aggregate-e8de4e06c0dd40f1de0346254383e847', 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 0, 0, 1), ('mul-7680d1f12cb1e1923e9efd9bc03b83bc', 2), ('transpose-66d294fae80bb66a10882f79d91496ff', 1, 0, 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 0, 1), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 3, 0), ('sum-aggregate-fc5cec84bf30adebc5669b7d67490222', 2), ('sum-aggregate-e8de4e06c0dd40f1de0346254383e847', 0), ('sum-aggregate-1e842309f28ff844a497aab1f6e24b28', 1), ('mul-0090ec86ab31f3127529325c3eb9519e', 0), ('transpose-66d294fae80bb66a10882f79d91496ff', 2, 1, 2)} (stimulus_id='handle-worker-cleanup-1727393473.4951954')

Region of Interest#

We will use the catchment as our area of interest (AOI) for the analysis. Our region of interest is the Val Passiria Catchment in the South Tyrolian Alps (Italy). Let’s load the catchment area. The catchment is defined by a polygon, which we will load from a GeoJSON file. The GeoJSON file contains the geometry of the catchment in the WGS84 coordinate reference system (EPSG:4326) and that has to be defined.

catchment_outline = gpd.read_file('31_data/catchment_outline.geojson', crs="EPGS:4326")
aoi_geojson = mapping(catchment_outline.iloc[0].geometry)
center_loc = catchment_outline.to_crs('+proj=cea').centroid.to_crs(epsg="4326")
# OpenStreetMap
map = folium.Map(location=[float(center_loc.y.iloc[0]), float(center_loc.x.iloc[0])], tiles="OpenStreetMap", zoom_start=9)
for _, r in catchment_outline.iterrows():
    sim_geo = gpd.GeoSeries(r["geometry"]).simplify(tolerance=0.001)
    geo_j = sim_geo.to_json()
    geo_j = folium.GeoJson(data=geo_j, style_function=lambda x: {"fillColor": "orange"})
    folium.Popup(r["HYBAS_ID"]).add_to(geo_j)
    geo_j.add_to(map)
map
Make this Notebook Trusted to load map: File -> Trust Notebook

Quiz hint: Look closely at the end of the displayed catchment area to identify the outlet

Satellite collections#

Search for satellite data using STAC#

We will utilize the pystac_client to search for satellite data in this exercise, specifically leveraging data provided by AWS/Element84. When querying the satellite data we can add various filters such as spatial range, time period, and other specific metadata. This API is constructed based on the STAC specification, a collaborative, community-driven standard aimed at enhancing the discoverability and usability of satellite data. Numerous data providers, including AWS, Google Earth Engine, and Planet (Copernicus Data Space Ecosystem (CDSE) is coming soon**), among others, have implemented the STAC API, exemplifying its widespread adoption and utility in accessing diverse satellite datasets.

Set query filters#

We define all extents before querying satellite data. For the purposes of this exercise, we will limit the search to the Sentinel 2 L2A collection, which is a collection of Sentinel 2 data that has been processed to surface reflectance (Top Of Canopy).

We are only interested in the green and short wave infrared band, band 3 and 11. And we directly remove time slices with a cloud cover >= 90 %. We will also limit the search to the time period between 1st February 2019 and 10th June 2019 and to the extent of the catchment.

bbox = catchment_outline.bounds.iloc[0]
bbox
minx    11.020833
miny    46.653599
maxx    11.366667
maxy    46.954167
Name: 0, dtype: float64
#                  West,     South,     East,      North
spatial_extent = [bbox["minx"], bbox["miny"], bbox["maxx"], bbox["maxy"]]
#temporal_extent = ['2019-02-01T00:00:00Z','2019-06-10T00:00:00Z']
temporal_extent = ['2019-02-01T00:00:00Z','2019-02-10T00:00:00Z']

bands = ['green', 'swir16', 'scl']
cloud_coverage = ["eo:cloud_cover<=90"]
URL = "https://earth-search.aws.element84.com/v1"
catalog = pystac_client.Client.open(URL)
items = catalog.search(
    bbox=spatial_extent,
    datetime=temporal_extent,
    query=cloud_coverage,
    collections=["sentinel-2-l2a"]
).item_collection()

Inspect Metadata#

We need to set the following configurations to define the content of the data cube we want to access:

  • dataset name

  • band names

  • time range

  • the area of interest specifed via bounding box coordinates

  • spatial resolution

To select the correct dataset we can first list all the available datasets.

Get bands information#

As the original data provides bands with different names than the original Sentinel 2 bands, we need to get the information about the bands.

# Get bands information
selected_item = items[1]
for key, asset in selected_item.assets.items():
    print(f"{key}: {asset.title}")
aot: Aerosol optical thickness (AOT)
blue: Blue (band 2) - 10m
coastal: Coastal aerosol (band 1) - 60m
granule_metadata: None
green: Green (band 3) - 10m
nir: NIR 1 (band 8) - 10m
nir08: NIR 2 (band 8A) - 20m
nir09: NIR 3 (band 9) - 60m
red: Red (band 4) - 10m
rededge1: Red edge 1 (band 5) - 20m
rededge2: Red edge 2 (band 6) - 20m
rededge3: Red edge 3 (band 7) - 20m
scl: Scene classification map (SCL)
swir16: SWIR 1 (band 11) - 20m
swir22: SWIR 2 (band 12) - 20m
thumbnail: Thumbnail image
tileinfo_metadata: None
visual: True color image
wvp: Water vapour (WVP)
aot-jp2: Aerosol optical thickness (AOT)
blue-jp2: Blue (band 2) - 10m
coastal-jp2: Coastal aerosol (band 1) - 60m
green-jp2: Green (band 3) - 10m
nir-jp2: NIR 1 (band 8) - 10m
nir08-jp2: NIR 2 (band 8A) - 20m
nir09-jp2: NIR 3 (band 9) - 60m
red-jp2: Red (band 4) - 10m
rededge1-jp2: Red edge 1 (band 5) - 20m
rededge2-jp2: Red edge 2 (band 6) - 20m
rededge3-jp2: Red edge 3 (band 7) - 20m
scl-jp2: Scene classification map (SCL)
swir16-jp2: SWIR 1 (band 11) - 20m
swir22-jp2: SWIR 2 (band 12) - 20m
visual-jp2: True color image
wvp-jp2: Water vapour (WVP)

Load data#

We will use the stackstac library to load the data. The stackstac library is a library that allows loading data from a STAC API into an xarray dataset. Here we will load the green and swir16 bands (on the original dataset named B03 and B11), which are the bands we will use to calculate the snow cover. We will also load the scl band, which is the scene classification layer, which we will use to mask out clouds. Spatial resolution of 20m is selected for the analysis. The data is loaded in chunks of 2048x2048 pixels.

Stackstac is not the only way to create a xarray dataset from a STAC API. Other libraries can be used, such as xpystac or odc.stac. The choice of the library depends on the use case and specific needs.

s2_cube = stackstac.stack(items,
                     bounds_latlon=spatial_extent,
                     assets=bands
)
s2_cube
/home/conda/cubes-and-clouds/30d8bce6e9d2d3312ad568684b7851b268e5321683bef302b527561c66bcbe23-20240905-143316-989438-487-pangeo/lib/python3.11/site-packages/stackstac/prepare.py:408: UserWarning: The argument 'infer_datetime_format' is deprecated and will be removed in a future version. A strict version of it is now the default, see https://pandas.pydata.org/pdeps/0004-consistent-to-datetime-parsing.html. You can safely remove this argument.
  times = pd.to_datetime(
<xarray.DataArray 'stackstac-c19c965736d46e489b3788ba41e597d2' (time: 5,
                                                                band: 3,
                                                                y: 3414, x: 2733)> Size: 1GB
dask.array<fetch_raster_window, shape=(5, 3, 3414, 2733), dtype=float64, chunksize=(1, 1, 1024, 1024), chunktype=numpy.ndarray>
Coordinates: (12/52)
  * time                                     (time) datetime64[ns] 40B 2019-0...
    id                                       (time) <U24 480B 'S2A_32TPS_2019...
  * band                                     (band) <U6 72B 'green' ... 'scl'
  * x                                        (x) float64 22kB 6.538e+05 ... 6...
  * y                                        (y) float64 27kB 5.203e+06 ... 5...
    mgrs:grid_square                         (time) <U2 40B 'PS' 'PT' ... 'PS'
    ...                                       ...
    title                                    (band) <U30 360B 'Green (band 3)...
    gsd                                      (band) object 24B 10 20 None
    common_name                              (band) object 24B 'green' ... None
    center_wavelength                        (band) object 24B 0.56 1.61 None
    full_width_half_max                      (band) object 24B 0.045 0.143 None
    epsg                                     int64 8B 32632
Attributes:
    spec:        RasterSpec(epsg=32632, bounds=(653760.0, 5168650.0, 681090.0...
    crs:         epsg:32632
    transform:   | 10.00, 0.00, 653760.00|\n| 0.00,-10.00, 5202790.00|\n| 0.0...
    resolution:  10.0

We want to use the Sentinel-2 L2A product. It’s name is 'SENTINEL2_L2A'.

We get the metadata for this collection as follows. This is an important step to familiarize yourself with the data collection (e.g. learn the band names).

Calculate snow cover#

We will calculate the Normalized Difference Snow Index (NDSI) to calculate the snow cover. The NDSI is calculated as the difference between the green and the swir16 bands divided by the sum of the green and the swir16 bands:

\[ NDSI = \frac {GREEN - SWIR} {GREEN +SWIR} \]

For a matter of clarity we will define the green and the swir16 bands as variables. Other approaches can be used to manage the data, but this is the one we will use in this exercise.

green = s2_cube.sel(band='green')
swir = s2_cube.sel(band='swir16')
scl = s2_cube.sel(band='scl')

Let’s compute the NDSI and mask out the clouds.

ndsi = (green - swir) / (green + swir)
ndsi
<xarray.DataArray 'stackstac-c19c965736d46e489b3788ba41e597d2' (time: 5,
                                                                y: 3414, x: 2733)> Size: 373MB
dask.array<truediv, shape=(5, 3414, 2733), dtype=float64, chunksize=(1, 1024, 1024), chunktype=numpy.ndarray>
Coordinates: (12/45)
  * time                                     (time) datetime64[ns] 40B 2019-0...
    id                                       (time) <U24 480B 'S2A_32TPS_2019...
  * x                                        (x) float64 22kB 6.538e+05 ... 6...
  * y                                        (y) float64 27kB 5.203e+06 ... 5...
    mgrs:grid_square                         (time) <U2 40B 'PS' 'PT' ... 'PS'
    s2:datastrip_id                          (time) <U64 1kB 'S2A_OPER_MSI_L2...
    ...                                       ...
    constellation                            <U10 40B 'sentinel-2'
    s2:reflectance_conversion_factor         (time) float64 40B 1.031 ... 1.029
    s2:datatake_type                         <U8 32B 'INS-NOBS'
    grid:code                                (time) <U10 200B 'MGRS-32TPS' .....
    s2:snow_ice_percentage                   (time) float64 40B 12.19 ... 43.83
    epsg                                     int64 8B 32632
Dask Method Differences: `.compute()` vs `.persist()`

Dask provides two primary methods for executing computations: .compute() and .persist(). Below is an overview of each method and their typical use cases.

.compute()#

  • Functionality: Executes the Dask computation and blocks until the result is available. It then collects and returns the final result to the local process.

  • Use Case: Invoke .compute() when you need to bring the computed result into your local memory. It is typically used as the final step in a Dask workflow after all transformations and computations have been defined.

  • Evaluation: Eager - runs immediately and provides results.

.persist()#

  • Functionality: Begins computing the result in the background while immediately returning a new Dask object that represents the ongoing computation.

  • Use Case: Utilize .persist() in a distributed environment when working with large datasets or complex computations that have expensive intermediate steps. This will keep the intermediate results in the cluster’s distributed memory, improving performance for subsequent computations.

  • Evaluation: Lazy - computations are started but the method returns a reference to the future result without waiting for the completion.

Each method plays a crucial role in optimizing and managing the execution of large-scale computations using Dask, particularly when balancing memory usage and computational efficiency in a distributed setting.

Creating the Snow Map#

So far we have a time series map of NDSI values. We are intereseted in the presence of snow though. Ideally in a binary classification: snow and no snow. To achieve this we are setting a threshold of 0.4 on the NDSI. This gives us a binary snow map.

snow = xr.where((ndsi > 0.42) & ~np.isnan(ndsi), 1, ndsi)
snowmap = xr.where((snow <= 0.42) & ~np.isnan(snow), 0, snow)

Creating a cloud mask#

We are going to use the Scene Classification of Sentinel-2, called the “SCL” band, for creating a cloud mask and then applying it to the NDSI. We will mask out the clouds, which are identified by the values 8 (cloud medium probability), 9 (cloud high probability) and 3 (cloud shadow) in the scl layer.

More detailed info can be found here: https://sentinel.esa.int/web/sentinel/technical-guides/sentinel-2-msi/level-2a/algorithm-overview

cloud_mask = np.logical_not(scl.isin([8, 9, 3])) 

Applying the cloud mask to the snowmap#

We will mask out all pixels that are covered by clouds. This will result in: 0 = no_snow, 1 = snow, 2 = cloud

snowmap_cloudfree = xr.where(cloud_mask, snowmap, 2)
snowmap_cloudfree
<xarray.DataArray 'stackstac-c19c965736d46e489b3788ba41e597d2' (time: 5,
                                                                y: 3414, x: 2733)> Size: 373MB
dask.array<where, shape=(5, 3414, 2733), dtype=float64, chunksize=(1, 1024, 1024), chunktype=numpy.ndarray>
Coordinates: (12/52)
  * time                                     (time) datetime64[ns] 40B 2019-0...
    id                                       (time) <U24 480B 'S2A_32TPS_2019...
    band                                     <U6 24B 'scl'
  * x                                        (x) float64 22kB 6.538e+05 ... 6...
  * y                                        (y) float64 27kB 5.203e+06 ... 5...
    mgrs:grid_square                         (time) <U2 40B 'PS' 'PT' ... 'PS'
    ...                                       ...
    title                                    <U30 120B 'Scene classification ...
    gsd                                      object 8B None
    common_name                              object 8B None
    center_wavelength                        object 8B None
    full_width_half_max                      object 8B None
    epsg                                     int64 8B 32632

Process snow cover data#

Mask data#

As we are only interested to the snow cover in the catchment, we will mask out the data outside the catchment. To achieve it we need to convert the catchment geometry to the same coordinate reference system as the data. The data is in the UTM32N coordinate reference system (EPSG:32632).

aoi_utm32 = catchment_outline.to_crs(epsg=32632)
geom_utm32 = aoi_utm32.iloc[0]['geometry']

As we are going to use the RioXarray library to mask out the data, we need to add some more information to the data. The RioXarray library is a library that allows to manipulate geospatial data in xarray datasets. Underneath it uses the rasterio library that is a library built on top of GDAL.

We need first to specify the coordinate reference system and the nodata value. Both information can be found in the metadata of the data but we need to reinforce it so that RioXarray can use it.

snowmap_cloudfree.rio.write_crs("EPSG:32632", inplace=True)
snowmap_cloudfree.rio.set_nodata(np.nan, inplace=True)
<xarray.DataArray 'stackstac-c19c965736d46e489b3788ba41e597d2' (time: 5,
                                                                y: 3414, x: 2733)> Size: 373MB
dask.array<where, shape=(5, 3414, 2733), dtype=float64, chunksize=(1, 1024, 1024), chunktype=numpy.ndarray>
Coordinates: (12/53)
  * time                                     (time) datetime64[ns] 40B 2019-0...
    id                                       (time) <U24 480B 'S2A_32TPS_2019...
    band                                     <U6 24B 'scl'
  * x                                        (x) float64 22kB 6.538e+05 ... 6...
  * y                                        (y) float64 27kB 5.203e+06 ... 5...
    mgrs:grid_square                         (time) <U2 40B 'PS' 'PT' ... 'PS'
    ...                                       ...
    gsd                                      object 8B None
    common_name                              object 8B None
    center_wavelength                        object 8B None
    full_width_half_max                      object 8B None
    epsg                                     int64 8B 32632
    spatial_ref                              int64 8B 0

Let’s clip the snow_cloud object using the catchment geometry in the UTM32N coordinate reference system.

snowmap_clipped = snowmap_cloudfree.rio.clip([geom_utm32])

It’s time to persist the data in memory. We will use the persist method to load the data in memory and keep it there until the end of the analysis.

clipped_date = snowmap_clipped#.persist()

Aggregate data#

Data aggregation is a very important step in the analysis. It allows to reduce the amount of data and to make the analysis more efficient. Moreover, as in this case, we are going to aggregate the date to daily values, this will allow use to compute statistic on the data at the basin scale later on.

The groupby method allows to group the data by a specific dimension. We will group the data by the time dimension, aggregating to the date and removing the time information, once the group is obtained we will aggregate the data by taking the maximum value.

clipped_date.time
<xarray.DataArray 'time' (time: 5)> Size: 40B
array(['2019-02-03T10:17:56.196000000', '2019-02-06T10:27:38.251000000',
       '2019-02-06T10:27:52.378000000', '2019-02-08T10:17:45.015000000',
       '2019-02-08T10:17:59.486000000'], dtype='datetime64[ns]')
Coordinates: (12/51)
  * time                                     (time) datetime64[ns] 40B 2019-0...
    id                                       (time) <U24 480B 'S2A_32TPS_2019...
    band                                     <U6 24B 'scl'
    mgrs:grid_square                         (time) <U2 40B 'PS' 'PT' ... 'PS'
    s2:datastrip_id                          (time) <U64 1kB 'S2A_OPER_MSI_L2...
    s2:dark_features_percentage              (time) float64 40B 0.1013 ... 15.59
    ...                                       ...
    gsd                                      object 8B None
    common_name                              object 8B None
    center_wavelength                        object 8B None
    full_width_half_max                      object 8B None
    epsg                                     int64 8B 32632
    spatial_ref                              int64 8B 0
clipped_date = snowmap_clipped.groupby(snowmap_clipped.time.dt.floor('D')).max(skipna=True)

As the data has been aggregated to daily values, we need to rename the floor method to something more meaningful as date.

clipped_date = clipped_date.rename({'floor': 'date'})
clipped_date = clipped_date.persist()

Visualize data#

We will use the hvplot library to visualize the data. The library allows to visualize data in xarray datasets. It is based on the holoviews library, which is a library that allows to visualize multidimensional data. To visualize the data on a map, we need to specify the coordinate reference system of the data. The data is in the UTM32N coordinate reference system (EPSG:32632). This will allow the library to project the data on a map. More info on the hvplot library can be found here: https://hvplot.holoviz.org/

clipped_date.hvplot.image(
    x='x',
    y='y',
    groupby='date',
    crs=pyproj.CRS.from_epsg(32632),
    cmap='Pastel2',
    clim=(-1, 2),
    frame_width=500,
    frame_height=500,
    title='Snowmap',
    geo=True, tiles='OSM')

Calculate snow cover with apply_ufunc#

Calculate snow cover using Xarray's apply_ufunc
  • The procedure for computing snow cover can also be summed up as following python function.
  • We first verify that Green, swir16 and scr are in the order of 0,1,2 th variable in band variable. Then we simply copy and past all the python codes in a function.
def calculate_ndsi_snow_cloud(data):
    green = data[0]
    swir = data[1]
    scl = data[2]
    ndsi = (green - swir) / (green + swir)
    ndsi_mask = ( ndsi > 0.4 )& ~np.isnan(ndsi)
    snow = np.where(ndsi_mask, 1, ndsi)
    snowmap = np.where((snow <= 0.42) & ~np.isnan(snow), 0, snow)
    mask = ~( (scl == 8) | (scl == 9) | (scl == 3) )
    snow_cloud = np.where(mask, snowmap, 2)
    return snow_cloud
%%time
da = stackstac.stack(items,
                    bounds_latlon=catchment_outline.iloc[0].geometry.bounds,
                    resolution=20,
                    chunksize=(1,-1,-1,-1),
                    assets=bands)
#Mask data
geom_utm32 = catchment_outline.to_crs(epsg=32632).iloc[0]['geometry']
da.rio.write_crs("EPSG:32632", inplace=True)
da.rio.set_nodata(np.nan, inplace=True)
da = da.rio.clip([geom_utm32])

snow_cloud_clipped=xr.apply_ufunc(
    calculate_ndsi_snow_cloud
    ,da
    ,input_core_dims=[["band","y","x"]]
    ,output_core_dims=[["y","x"]]
    ,exclude_dims=set(["band"])
    ,vectorize=True
    ,dask="parallelized"
    ,output_dtypes=[da.dtype]
    ).assign_attrs({'long_name': 'snow_cloud'}).to_dataset(name='snow_cloud')

snow_cloud_clipped
/home/conda/cubes-and-clouds/30d8bce6e9d2d3312ad568684b7851b268e5321683bef302b527561c66bcbe23-20240905-143316-989438-487-pangeo/lib/python3.11/site-packages/stackstac/prepare.py:408: UserWarning: The argument 'infer_datetime_format' is deprecated and will be removed in a future version. A strict version of it is now the default, see https://pandas.pydata.org/pdeps/0004-consistent-to-datetime-parsing.html. You can safely remove this argument.
  times = pd.to_datetime(
CPU times: user 90.1 ms, sys: 59.7 ms, total: 150 ms
Wall time: 385 ms
<xarray.Dataset> Size: 88MB
Dimensions:                                  (time: 5, x: 1315, y: 1670)
Coordinates: (12/46)
  * time                                     (time) datetime64[ns] 40B 2019-0...
    id                                       (time) <U24 480B 'S2A_32TPS_2019...
  * x                                        (x) float64 11kB 6.543e+05 ... 6...
  * y                                        (y) float64 13kB 5.202e+06 ... 5...
    mgrs:grid_square                         (time) <U2 40B 'PS' 'PT' ... 'PS'
    s2:datastrip_id                          (time) <U64 1kB 'S2A_OPER_MSI_L2...
    ...                                       ...
    s2:reflectance_conversion_factor         (time) float64 40B 1.031 ... 1.029
    s2:datatake_type                         <U8 32B 'INS-NOBS'
    grid:code                                (time) <U10 200B 'MGRS-32TPS' .....
    s2:snow_ice_percentage                   (time) float64 40B 12.19 ... 43.83
    epsg                                     int64 8B 32632
    spatial_ref                              int64 8B 0
Data variables:
    snow_cloud                               (time, y, x) float64 88MB dask.array<chunksize=(1, 1670, 1315), meta=np.ndarray>
Inspect the data dimentions!
  • How did changed from input (da) to output(snow_cloud_clipped)?
  • What is setted as input_core_dims?
  • What is setted as output_core_dims?
  • What is setted as exclude_dims?
  • Did you see 'time' dimension?
  • We will get back to apply_ufunc with next OpenEO example.

Compute statistics#

Our objective is to monitor a specific area over a period of time, ensuring the data quality meets our standards. To achieve this, we determine the proportion of clouds in the watershed at each interval. This cloud coverage data serves to refine our timeline: we exclude any interval where cloud cover exceeds 25%.

Our primary focus is on quantifying the Snow Covered Area (SCA) in the watershed. We tally the number of pixels depicting snow for each interval and calculate the SCA by multiplying the snowy pixels by the pixel’s area. To ascertain the extent of snow coverage, we compare the snowy pixel count to the watershed’s total pixel count, resulting in the percentage of the area that is snow-laden. This percentage is crucial to our analysis.

We need to gather the total pixel counts for the entire watershed, as well as those specific to cloud and snow coverages.

# number of cloud pixels
cloud = xr.where(clipped_date == 2, 1, np.nan).count(dim=['x', 'y']).persist()
# number of all pixels per each single date
aot_total = clipped_date.count(dim=['x', 'y']).persist()
# Cloud fraction per each single date expressed in % 
cloud_fraction = (cloud / aot_total * 100).persist()
# Visualize cloud fraction
cloud_fraction.hvplot.line(title='Cloud cover %', ylabel="&") * hv.HLine(25).opts(
    color='red',
    line_dash='dashed',
    line_width=2.0,
)

We are going to get the same information for the snow cover.

snow = xr.where(clipped_date == 1, 1, np.nan).count(dim=['x', 'y']).persist()
snow_fraction = (snow / aot_total * 100).persist()
# visualize snow fraction
snow_fraction.hvplot.line(title='Snow cover area (%)', ylabel="%")
# mask out cloud fraction > 30% 
masked_cloud_fraction = cloud_fraction < 30
snow_selected = snow_fraction.sel(date=masked_cloud_fraction)
snow_selected.name = 'SCA'
snow_selected.hvplot.line(title="Snow fraction")

Save the cloud filtered snow fraction

snow_selected.to_dataframe().to_csv("31_results/filtered_snow_fraction_pangeo.csv")

Shutdown and Close local Dask cluster#

client.shutdown()
client.close()