Want to share your content on python-bloggers? click here.
Dask is an open-source parallel computing library for Python that enables the processing of large datasets and complex computations across multiple cores and distributed systems. It provides a flexible and dynamic task scheduling system that allows users to build complex workflows and handle large-scale data processing.
Dask is a compelling alternative to PySpark for distributed computing, particularly for those who work primarily within the Python ecosystem. Dask doesn’t require a Java runtime, and can be imported like any other third-party package. It is much lighter weight because it doesn’t require the heavy infrastructure needed to support Spark computing environments. PySpark typically needs a full Spark cluster setup with distributed resource management and coordination (e.g., YARN or Kubernetes), which involves significant overhead. In contrast, Dask operates within a Python environment and can scale from a single machine to a cluster as needed, without requiring the setup of a full distributed system.
In particular, I was interested in setting up a Dask cluster to leverage my own computing resources rather than rely on cloud resources or deployment software such as colied, which I would almost certainly opt for if creating a cloud-based cluster in a real-world setting.
There were a number of laptops at my disposal in various states of underutilization and disrepair, and figured setting up a compute cluster would be a great way to gain familiarity with Dask, as well as give new life to these otherwise unused machines. In particular, my cluster consists of:
- Lenovo Slim 7 Pro X (Ubuntu 22.04, 20GB RAM, 1TB SSD, 16 CPU): client/scheduler
- Sony VAIO (Ubuntu 22.04, 16GB RAM, 512GB SSD, 4 CPU): worker
- Raspberry Pi 5 (Debian 12.7, 4GB RAM, 128GB SSD, 4 CPU): worker
It is straightforward to extend the cluster to additional workers. The only requirements are 1) the machine be accessible from the client via ssh, and 2) the availability of a Python environment consistent with the other worker nodes in terms of install location and package versions.
I attempted including an additional worker using a Windows client running the same dask conda environment via WSL, but couldn’t get it to work. I’m sure it’s possible, I just didn’t spend the time to configue it properly. I’ll pick this up again down the road and will publish an update.
In order to ensure a consistent environment across workers, I installed Miniforge to the same location on client and worker nodes, and used an environment file save to a GitHub Gist which was referenced at the time of environment creation:
(base) $ conda env create --file=https://gist.githubusercontent.com/jtrive84/f0d23fcf22bb590caefca1e10243aba2/raw/53328d955a4f543581448c6b6033da94058eb245/dask.yaml
The dask environment contains:
name: dask channels: - conda-forge - defaults dependencies: - python=3.12 - dask=2024.12.0 - dask-labextension=7.0.0 - numpy=2.2.1 - asyncssh=2.19.0 - pyarrow=18.1.0 - matplotlib=3.9.4 - ipykernel - watermark - jupyterlab
The dask-labextension is a JupyterLab extension that provides a user-friendly interface for monitoring Dask clusters from JupyterLab. It provides real-time visualizations of cluster performance and displays worker status, memory usage, and task progress, and offers a high degree of configurability. I highly recommend utilizing the Dask lab extension if managing Dask clusters from JupyterLab.
All nodes run within the dask conda environment located at:
~/miniforge3/envs/dask/bin/python
To demonstrate my cluster, I’ll use the Flight Prices dataset available on Kaggle. This is a 31GB dataset containing one-way flight itineraries found on Expedia between 2022-04-16 and 2022-10-05 for airport codes ATL, DFW, DEN, ORD, LAX, CLT, MIA, JFK, EWR, SFO, DTW, BOS, PHL, LGA, IAD and OAK. The file is larger than the client’s available memory, so it couldn’t be analyzed using Pandas which requires the entire dataset to be loaded into RAM. Dask can spill intermediate results to disk for datasets too large to fit into memory, giving us the ability to work with much larger datasets than would otherwise be possible.
To get an idea of what the data looks like, we can read the first 100 records using Pandas:
import numpy as np import pandas as pd pd.set_option('display.max_columns', None) pd.set_option('display.width', None) np.set_printoptions(suppress=True, precision=5) pd.options.mode.chained_assignment = None # Obtained from from https://www.kaggle.com/datasets/dilwong/flightprices data_path = "~/shared/data/airlines/itineraries.csv" df = pd.read_csv(data_path, nrows=100) df.head(7)
legId | searchDate | flightDate | startingAirport | destinationAirport | fareBasisCode | travelDuration | elapsedDays | isBasicEconomy | isRefundable | isNonStop | baseFare | totalFare | seatsRemaining | totalTravelDistance | segmentsDepartureTimeEpochSeconds | segmentsDepartureTimeRaw | segmentsArrivalTimeEpochSeconds | segmentsArrivalTimeRaw | segmentsArrivalAirportCode | segmentsDepartureAirportCode | segmentsAirlineName | segmentsAirlineCode | segmentsEquipmentDescription | segmentsDurationInSeconds | segmentsDistance | segmentsCabinCode | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 9ca0e81111c683bec1012473feefd28f | 2022-04-16 | 2022-04-17 | ATL | BOS | LA0NX0MC | PT2H29M | 0 | False | False | True | 217.67 | 248.6 | 9 | 947.0 | 1650214620 | 2022-04-17T12:57:00.000-04:00 | 1650223560 | 2022-04-17T15:26:00.000-04:00 | BOS | ATL | Delta | DL | Airbus A321 | 8940 | 947 | coach |
1 | 98685953630e772a098941b71906592b | 2022-04-16 | 2022-04-17 | ATL | BOS | LA0NX0MC | PT2H30M | 0 | False | False | True | 217.67 | 248.6 | 4 | 947.0 | 1650191400 | 2022-04-17T06:30:00.000-04:00 | 1650200400 | 2022-04-17T09:00:00.000-04:00 | BOS | ATL | Delta | DL | Airbus A321 | 9000 | 947 | coach |
2 | 98d90cbc32bfbb05c2fc32897c7c1087 | 2022-04-16 | 2022-04-17 | ATL | BOS | LA0NX0MC | PT2H30M | 0 | False | False | True | 217.67 | 248.6 | 9 | 947.0 | 1650209700 | 2022-04-17T11:35:00.000-04:00 | 1650218700 | 2022-04-17T14:05:00.000-04:00 | BOS | ATL | Delta | DL | Boeing 757-200 | 9000 | 947 | coach |
3 | 969a269d38eae583f455486fa90877b4 | 2022-04-16 | 2022-04-17 | ATL | BOS | LA0NX0MC | PT2H32M | 0 | False | False | True | 217.67 | 248.6 | 8 | 947.0 | 1650218340 | 2022-04-17T13:59:00.000-04:00 | 1650227460 | 2022-04-17T16:31:00.000-04:00 | BOS | ATL | Delta | DL | Airbus A321 | 9120 | 947 | coach |
4 | 980370cf27c89b40d2833a1d5afc9751 | 2022-04-16 | 2022-04-17 | ATL | BOS | LA0NX0MC | PT2H34M | 0 | False | False | True | 217.67 | 248.6 | 9 | 947.0 | 1650203940 | 2022-04-17T09:59:00.000-04:00 | 1650213180 | 2022-04-17T12:33:00.000-04:00 | BOS | ATL | Delta | DL | Airbus A321 | 9240 | 947 | coach |
5 | 79eda9f841e226a1e2121d74211e595c | 2022-04-16 | 2022-04-17 | ATL | BOS | VH0AUEL1 | PT2H38M | 0 | False | False | True | 217.67 | 248.6 | 7 | 947.0 | 1650206700 | 2022-04-17T10:45:00.000-04:00 | 1650216180 | 2022-04-17T13:23:00.000-04:00 | BOS | ATL | JetBlue Airways | B6 | NaN | 9480 | 947 | coach |
6 | 9335fae376c38bb61263281779f469ec | 2022-04-16 | 2022-04-17 | ATL | BOS | V0AJZNN1 | PT4H12M | 0 | False | False | False | 213.02 | 251.1 | 3 | 956.0 | 1650198000||1650205620 | 2022-04-17T08:20:00.000-04:00||2022-04-17T10:2… | 1650203400||1650213120 | 2022-04-17T09:50:00.000-04:00||2022-04-17T12:3… | CLT||BOS | ATL||CLT | American Airlines||American Airlines | AA||AA | Airbus A320||Airbus A320 | 5400||7500 | 228||728 | coach||coach |
Using LocalCluster
Before demonstrating the use of SSHCluster
, we’ll look at LocalCLuster
, which allows us to create a local multi-core cluster for parallel computing on a single machine. It helps to distribute tasks across multiple CPUs or threads on your local machine, giving us the ability to scale computations beyond a single core:
# Creating LocalCluster instance. from dask.distributed import LocalCluster cluster = LocalCluster(n_workers=2, threads_per_worker=4) client = cluster.get_client() client
Client
Client-a5ec0bd6-c9fe-11ef-8e58-3003c82ce6ed
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
e965c2ed
Dashboard: http://127.0.0.1:8787/status | Workers: 2 |
Total threads: 8 | Total memory: 27.09 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-65021379-9cf0-472f-af9f-c744d27ff689
Comm: tcp://127.0.0.1:36097 | Workers: 2 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 27.09 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:33431 | Total threads: 4 |
Dashboard: http://127.0.0.1:39027/status | Memory: 13.55 GiB |
Nanny: tcp://127.0.0.1:46835 | |
Local directory: /tmp/dask-scratch-space/worker-s4dyz9pk |
Worker: 1
Comm: tcp://127.0.0.1:36301 | Total threads: 4 |
Dashboard: http://127.0.0.1:35767/status | Memory: 13.55 GiB |
Nanny: tcp://127.0.0.1:36265 | |
Local directory: /tmp/dask-scratch-space/worker-9t1r0nti |
2025-01-03 12:16:44,335 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 initialized by task ('shuffle-transfer-8b04cd7a3e831297147d5c00d79f8bb9', 295) executed on worker tcp://127.0.0.1:33431 2025-01-03 12:18:07,698 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 deactivated due to stimulus 'task-finished-1735928287.6946151' 2025-01-03 12:19:33,219 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle b7f5ce97ecf82f18ee01140b59d55007 initialized by task ('shuffle-transfer-b7f5ce97ecf82f18ee01140b59d55007', 99) executed on worker tcp://127.0.0.1:33431 2025-01-03 12:20:26,159 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle b7f5ce97ecf82f18ee01140b59d55007 deactivated due to stimulus 'task-finished-1735928426.1586232' 2025-01-03 12:20:35,652 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 initialized by task ('shuffle-transfer-8b04cd7a3e831297147d5c00d79f8bb9', 405) executed on worker tcp://127.0.0.1:36301 2025-01-03 12:21:36,894 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 deactivated due to stimulus 'task-finished-1735928496.8918214'
Then we can read in the itineraries dataset and perform an aggregation. First we get the number of records by starting airport:
import dask import dask.dataframe as ddf data_path = "~/shared/data/airlines/itineraries.csv" df = ddf.read_csv( data_path, usecols=["startingAirport", "destinationAirport", "baseFare", "totalFare"], assume_missing=True ) dfstart = df.groupby("startingAirport").startingAirport.count() dfstart.compute()
startingAirport CLT 5494510 MIA 4930213 OAK 3809884 DFW 5674959 ATL 5312028 IAD 3464378 JFK 4425164 DTW 4547052 DEN 4697143 LGA 5919323 BOS 5883876 ORD 5503476 PHL 4726187 EWR 3970797 LAX 8073281 SFO 5706482 Name: startingAirport, dtype: int64
Next we compute the average total fare by starting airport:
dffare = df.groupby("startingAirport").totalFare.mean() dffare.compute()
startingAirport CLT 321.456261 MIA 299.955595 OAK 534.211396 DFW 294.077856 ATL 303.774077 IAD 370.046151 JFK 375.406488 DTW 330.940539 DEN 335.077884 LGA 299.220774 BOS 285.865775 ORD 281.691875 PHL 344.088743 EWR 302.986457 LAX 379.254937 SFO 434.504077 Name: totalFare, dtype: float64
client.shutdown()
Using SSHCluster
SSHCluster
is used to create our distributed cluster on the local network. It accepts a list of hostnames, the first of which will be used for the scheduler and the rest as workers. We can repeat the name of the first host to have it also serve as a worker.
The full list of worker options can be found here. Note that in when using LocalCluster
, n_workers
specifies the total number of workers. When using SSHCluster
, n_workers
is the number of workers per host. In the next cell, we’ve create a cluster with two nodes, 1 worker each, with 4 threads per worker:
from dask.distributed import Client, SSHCluster # Path to dask environment executable for all nodes. remote_python = "~/miniforge3/envs/dask/bin/python" hosts = ["localhost", "192.168.86.138", "192.168.86.147"] worker_opts = {"n_workers": 1, "nthreads": 4} #"memory_limit": "8GB" connect_opts = { "known_hosts": None, "username": "jtriz", "password": "xxxxx", } cluster = SSHCluster( hosts=hosts, worker_options=worker_opts, connect_options=connect_opts, remote_python=remote_python ) client = Client(cluster) client
2025-01-03 12:24:23,300 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:23,299 - distributed.scheduler - INFO - State start 2025-01-03 12:24:23,304 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:23,303 - distributed.scheduler - INFO - Scheduler at: tcp://192.168.86.154:46219 2025-01-03 12:24:24,150 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,149 - distributed.nanny - INFO - Start Nanny at: 'tcp://192.168.86.154:35955' 2025-01-03 12:24:24,161 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,160 - distributed.nanny - INFO - Start Nanny at: 'tcp://192.168.86.154:45391' 2025-01-03 12:24:24,538 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,536 - distributed.worker - INFO - Start worker at: tcp://192.168.86.154:40165 2025-01-03 12:24:24,553 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,551 - distributed.worker - INFO - Start worker at: tcp://192.168.86.154:39183
Client
Client-f59132cc-c9ff-11ef-8e58-3003c82ce6ed
Connection method: Cluster object | Cluster type: distributed.SpecCluster |
Dashboard: http://192.168.86.154:8787/status |
Cluster Info
SpecCluster
SSHCluster
Dashboard: http://192.168.86.154:8787/status | Workers: 2 |
Total threads: 8 | Total memory: 13.55 GiB |
Scheduler Info
Scheduler
Scheduler-04823e11-b8e4-4127-9552-fea45137927a
Comm: tcp://192.168.86.154:46219 | Workers: 2 |
Dashboard: http://192.168.86.154:8787/status | Total threads: 8 |
Started: Just now | Total memory: 13.55 GiB |
Workers
Worker: tcp://192.168.86.154:39183
Comm: tcp://192.168.86.154:39183 | Total threads: 4 |
Dashboard: http://192.168.86.154:35959/status | Memory: 6.77 GiB |
Nanny: tcp://192.168.86.154:35955 | |
Local directory: /tmp/dask-scratch-space/worker-b7o381g5 |
Worker: tcp://192.168.86.154:40165
Comm: tcp://192.168.86.154:40165 | Total threads: 4 |
Dashboard: http://192.168.86.154:43629/status | Memory: 6.77 GiB |
Nanny: tcp://192.168.86.154:45391 | |
Local directory: /tmp/dask-scratch-space/worker-lh5cekag |
Getting a count of the number of records by starting airport:
data_path = "~/shared/data/airlines/itineraries.csv" df = ddf.read_csv( data_path, usecols=["startingAirport", "destinationAirport", "baseFare", "totalFare"], assume_missing=True ) dfstart = df.groupby("startingAirport").startingAirport.count() dfstart.compute()
startingAirport CLT 5494510 MIA 4930213 OAK 3809884 DFW 5674959 ATL 5312028 IAD 3464378 JFK 4425164 DTW 4547052 DEN 4697143 LGA 5919323 BOS 5883876 ORD 5503476 PHL 4726187 EWR 3970797 LAX 8073281 SFO 5706482 Name: startingAirport, dtype: int64
As before, computing the average total fare by starting airport:
dffare = df.groupby("startingAirport").totalFare.mean() dffare.compute()
startingAirport CLT 321.456261 MIA 299.955595 OAK 534.211396 DFW 294.077856 ATL 303.774077 IAD 370.046151 JFK 375.406488 DTW 330.940539 DEN 335.077884 LGA 299.220774 BOS 285.865775 ORD 281.691875 PHL 344.088743 EWR 302.986457 LAX 379.254937 SFO 434.504077 Name: totalFare, dtype: float64
client.shutdown()
The Dask Lab extention didn’t seem to work when using SSHCluster
. This probably has more to do with my setup than any issue with the extention itself, but even after tinkering with it for a while, I couldn’t get it to display usage metrics.
While I was able to get SSHCluster
setup and running on my local network, I’ll probably opt for LocalCLuster
for my distributed computing needs, first because my data processing needs aren’t that significant and this was intended as a proof of concept, and second not having access to the dash lab extention when using SSHCluter
is limiting. In addition, at times I noticed a mismatch between the actual number of workers and number of workers displayed by the SSHCluster
client, so it was difficult to determine if the cluster was properly configured without diagnostic tooling.
I plan on researching how to go about setting up an additional worker on WSL within an SSHCluster
, but that will be a future post.
Want to share your content on python-bloggers? click here.