mapchete Hub & dask
In our previous post we briefly explained how we improved our processing pipeline for the new Sentinel-2 cloudless 2021 (opens new window) release. In this post we will go deeper into the details of what changed and where we are technologically. Parts of these developments are results from the LOOSE project funded by the European Space Agency (opens new window).
Our core processing tool is the in-house developed open source package mapchete (opens new window). It allows applying a user-defined piece of Python code to geospatial data. This data can either be in the form of rasters or feature datasets.
Mapchete uses the approach of tiled slippy maps when processing by splitting up the geographical area into zoom levels and tiles. This allows for processing large, up to global, areas because these tiles or data chunks can be dimensioned in a way that a single process can handle them in memory. Because a process area is nothing else than a (potentially large) set of tiles, each of which can be processed by one worker, this processing job can easily be paralellized over a number of workers. These workers can either run as separate threads or processes on a single machine or on a cluster, but more on that later.
Mapchete is designed to be extensible via plugins. Per default it is able to read and write the most common raster and vector formats such as GeoTIFF, COG, GeoJSON, GeoPackage, etc. but also more complex inputs or outputs can be implemented such as a satellite data input driver for a Sentinel-2 archive. For our EOxCloudless (opens new window) product line we developed such an input driver for Sentinel-1 and Sentinel-2 data.
Mapchete Hub contains the core package for all processing capabilities but enhances it by adding a REST interface, a database to hold job metadata and allows for distributed, asynchronous job execution. Every processing job too large for a single Laptop can be sent to mapchete Hub where it will be processed and tracked.
# Previous architecture
Processing a global satellite mosaic with 10 meters ground resolution would take too long to run on a single machine, but thanks to the cloud compute resources are easily accessible.
Our initial solution for the issue of distributing the work load on multiple workers was to split up the entire world into squares called "process zones" and assign each worker one zone. Mapchete would then take over, split up the process zone into tiles and process the tiles in parallel on the worker.
The distribution part didn't require many additions to the existing software. Mapchete was the core package taking the process configuration and just running as if it would run on a local machine. Around that Celery (opens new window) and RabbitMQ (opens new window) were used to queue and distribute the jobs. For job submission a REST API was built using Flask (opens new window) and as permanent metadata storage a MongoDB (opens new window) instance was deployed.
The processing campaigns for EOxCloudless were conducted yearly since 2017 and used different Sentinel-2 archives in different stages. In all cases data archives from AWS (Amazon Web Services) were used.
In the first years the JPEG2000 archive (opens new window) managed by Sinergise (opens new window) was used. Because the JPEG2000 format is not optimized for being read directly from an HTTP or S3 endpoint, a caching step was introduced downloading all Sentinel-2 JPEG2000 files onto the worker and converting them into GeoTIFFs. In the recent runs the COG archive (opens new window) managed by Element 84 (opens new window) could be used which omitted the necessity of caching and converting the image data.
This setup worked reasonably well since the first EOxCloudless campaign conducted in 2016/2017 but from an operators view it was at times tedious to handle and in the end lacked some key features. First and foremost we didn't have a proper deployment approach. The server, database and broker containers were all run on on-demand instances on AWS but we had to manually initialize the worker instances and terminate them once the jobs were finished.
Apart from that we faced eventually unresolvable issues when e.g. cancelling/deleting a queued job or in case a worker became unresponsive. Furthermore we wanted to move all of the deployment responsibilities over to EOxHub (opens new window).
# New architecture
Two main points of improvement were identified:
- Job tasks should be distributed on workers, not full jobs.
- Workers should automatically start before and terminate after jobs.
# All the power to dask
dask (opens new window) is an amazing project which got very popular in the data science and geospatial fields recently. It is mainly famous for extending Numpy, pandas and scikit-learn to run on a cluster but dask also replicates the python core package
concurrent.futures interface to schedule arbitrary tasks.
Dask provided us with some attractive additional features:
- A dask cluster does not require a messaging component like RabbitMQ. This is being handled by the dask scheduler.
- The dask scheduler tries to optimize which workers execute which task depending on the task graph.
- A task graph can be defined, adding dependencies between tasks.
dask-kubernetes(opens new window) is a package built to integrate dask cluster management within Kubernetes.
- The dask API is simlar to
concurrent.futures, thus could be integrated into the
mapchete(opens new window) core package without much effort.
- Once dask is integrated, the same client (
mapchete) code can handle both local and distributed processing.
Extending the internal calls to
concurrent.futures with a
dask (opens new window) object also allowed for distributing the job tasks over the cluster rather than sending jobs to workers individually like we did when using Celery. For the operator triggering the processing this makes it much more convenient because before we had to manually splilt up large regions (e.g. continents) into smaller jobs ("process zones"). Now, the operator can just submit one large job which then gets split up and distributed over a larger number of workers.
Previously all of the required steps when creating a cloudless mosaic had to be processed one after the other leading to unused workers at the end of each step. Furthermore it required caching task results so that a follow-up task could pick up from there and process the next step. We did this using an object storage which produced costs (requests and storage).
dask (opens new window) enables us to build task graphs where task results can be directly shared with the follow-up task. The tasks of the processing steps can now start any time when the dependent tasks finished successfully; thus the workers can always be busy and less resouces are wasted.
# EOxHub and Kubernetes
The dask ecosystem contains packages called
dask-kubernetes (opens new window) and
dask-gateway (opens new window) which allow integration into a Kubernetes cluster. This means custom dask clusters can be requested individually for each job and Kubernetes handles all the cloud resources required.
Apart from autoscaling it was important to finally make the deployment more solid. Automated testing, tagged releases and building docker images via CI/CD had been already implemented in previous years so handing over the deployment to EOxHub (opens new window) was straightforward. Deploying on EOxHub also means we can fall back to a prior version if needed by just editing the configuration. Everything else is handled by Kubernetes.
Our mapchete Hub system has grown organically and has evolved quite a bit since our first Sentinel-2 processing campaigns in 2016 and 2017. We came a long way from manually handling AWS spot instances via installation scripts and without docker images (!) to a fully kubernetes-managed, autoscaling cluster. There are still challenges ahead, especially towards optimizing resource usage (i.e. reducing costs) and error handling (i.e. reducing detective work).
The way the system is designed allows us to easily plug in other processes and data backends. Apart from creating cloudless mosaics we are also processing Sentinel-1 data as well as time series preparation tasks for creating agricultural signals on a large scale.
We aim to achieve a transparent computing environment resilient to errors and efficient workload distribution for us at EOX and also for you as our customer.