Standing Up Ad-hoc Compute Clusters with Dask

This article was first published on The Pleasure of Finding Things Out: A blog by James Triveri , and kindly contributed to python-bloggers. (You can report issue about the content on this page here)
Want to share your content on python-bloggers? click here.

Dask is an open-source parallel computing library for Python that enables the processing of large datasets and complex computations across multiple cores and distributed systems. It provides a flexible and dynamic task scheduling system that allows users to build complex workflows and handle large-scale data processing.

Dask is a compelling alternative to PySpark for distributed computing, particularly for those who work primarily within the Python ecosystem. Dask doesn’t require a Java runtime, and can be imported like any other third-party package. It is much lighter weight because it doesn’t require the heavy infrastructure needed to support Spark computing environments. PySpark typically needs a full Spark cluster setup with distributed resource management and coordination (e.g., YARN or Kubernetes), which involves significant overhead. In contrast, Dask operates within a Python environment and can scale from a single machine to a cluster as needed, without requiring the setup of a full distributed system.

In particular, I was interested in setting up a Dask cluster to leverage my own computing resources rather than rely on cloud resources or deployment software such as colied, which I would almost certainly opt for if creating a cloud-based cluster in a real-world setting.

There were a number of laptops at my disposal in various states of underutilization and disrepair, and figured setting up a compute cluster would be a great way to gain familiarity with Dask, as well as give new life to these otherwise unused machines. In particular, my cluster consists of:

  1. Lenovo Slim 7 Pro X (Ubuntu 22.04, 20GB RAM, 1TB SSD, 16 CPU): client/scheduler
  2. Sony VAIO (Ubuntu 22.04, 16GB RAM, 512GB SSD, 4 CPU): worker
  3. Raspberry Pi 5 (Debian 12.7, 4GB RAM, 128GB SSD, 4 CPU): worker

It is straightforward to extend the cluster to additional workers. The only requirements are 1) the machine be accessible from the client via ssh, and 2) the availability of a Python environment consistent with the other worker nodes in terms of install location and package versions.

I attempted including an additional worker using a Windows client running the same dask conda environment via WSL, but couldn’t get it to work. I’m sure it’s possible, I just didn’t spend the time to configue it properly. I’ll pick this up again down the road and will publish an update.

In order to ensure a consistent environment across workers, I installed Miniforge to the same location on client and worker nodes, and used an environment file save to a GitHub Gist which was referenced at the time of environment creation:

(base) $ conda env create --file=https://gist.githubusercontent.com/jtrive84/f0d23fcf22bb590caefca1e10243aba2/raw/53328d955a4f543581448c6b6033da94058eb245/dask.yaml

The dask environment contains:

name: dask
channels:
- conda-forge
- defaults
dependencies:
- python=3.12
- dask=2024.12.0
- dask-labextension=7.0.0
- numpy=2.2.1
- asyncssh=2.19.0
- pyarrow=18.1.0
- matplotlib=3.9.4
- ipykernel
- watermark
- jupyterlab

The dask-labextension is a JupyterLab extension that provides a user-friendly interface for monitoring Dask clusters from JupyterLab. It provides real-time visualizations of cluster performance and displays worker status, memory usage, and task progress, and offers a high degree of configurability. I highly recommend utilizing the Dask lab extension if managing Dask clusters from JupyterLab.

DaskLab extension

All nodes run within the dask conda environment located at:

~/miniforge3/envs/dask/bin/python

To demonstrate my cluster, I’ll use the Flight Prices dataset available on Kaggle. This is a 31GB dataset containing one-way flight itineraries found on Expedia between 2022-04-16 and 2022-10-05 for airport codes ATL, DFW, DEN, ORD, LAX, CLT, MIA, JFK, EWR, SFO, DTW, BOS, PHL, LGA, IAD and OAK. The file is larger than the client’s available memory, so it couldn’t be analyzed using Pandas which requires the entire dataset to be loaded into RAM. Dask can spill intermediate results to disk for datasets too large to fit into memory, giving us the ability to work with much larger datasets than would otherwise be possible.

To get an idea of what the data looks like, we can read the first 100 records using Pandas:

import numpy as np
import pandas as pd

pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
np.set_printoptions(suppress=True, precision=5)
pd.options.mode.chained_assignment = None

# Obtained from from https://www.kaggle.com/datasets/dilwong/flightprices
data_path = "~/shared/data/airlines/itineraries.csv"

df = pd.read_csv(data_path, nrows=100)

df.head(7)
legIdsearchDateflightDatestartingAirportdestinationAirportfareBasisCodetravelDurationelapsedDaysisBasicEconomyisRefundableisNonStopbaseFaretotalFareseatsRemainingtotalTravelDistancesegmentsDepartureTimeEpochSecondssegmentsDepartureTimeRawsegmentsArrivalTimeEpochSecondssegmentsArrivalTimeRawsegmentsArrivalAirportCodesegmentsDepartureAirportCodesegmentsAirlineNamesegmentsAirlineCodesegmentsEquipmentDescriptionsegmentsDurationInSecondssegmentsDistancesegmentsCabinCode
09ca0e81111c683bec1012473feefd28f2022-04-162022-04-17ATLBOSLA0NX0MCPT2H29M0FalseFalseTrue217.67248.69947.016502146202022-04-17T12:57:00.000-04:0016502235602022-04-17T15:26:00.000-04:00BOSATLDeltaDLAirbus A3218940947coach
198685953630e772a098941b71906592b2022-04-162022-04-17ATLBOSLA0NX0MCPT2H30M0FalseFalseTrue217.67248.64947.016501914002022-04-17T06:30:00.000-04:0016502004002022-04-17T09:00:00.000-04:00BOSATLDeltaDLAirbus A3219000947coach
298d90cbc32bfbb05c2fc32897c7c10872022-04-162022-04-17ATLBOSLA0NX0MCPT2H30M0FalseFalseTrue217.67248.69947.016502097002022-04-17T11:35:00.000-04:0016502187002022-04-17T14:05:00.000-04:00BOSATLDeltaDLBoeing 757-2009000947coach
3969a269d38eae583f455486fa90877b42022-04-162022-04-17ATLBOSLA0NX0MCPT2H32M0FalseFalseTrue217.67248.68947.016502183402022-04-17T13:59:00.000-04:0016502274602022-04-17T16:31:00.000-04:00BOSATLDeltaDLAirbus A3219120947coach
4980370cf27c89b40d2833a1d5afc97512022-04-162022-04-17ATLBOSLA0NX0MCPT2H34M0FalseFalseTrue217.67248.69947.016502039402022-04-17T09:59:00.000-04:0016502131802022-04-17T12:33:00.000-04:00BOSATLDeltaDLAirbus A3219240947coach
579eda9f841e226a1e2121d74211e595c2022-04-162022-04-17ATLBOSVH0AUEL1PT2H38M0FalseFalseTrue217.67248.67947.016502067002022-04-17T10:45:00.000-04:0016502161802022-04-17T13:23:00.000-04:00BOSATLJetBlue AirwaysB6NaN9480947coach
69335fae376c38bb61263281779f469ec2022-04-162022-04-17ATLBOSV0AJZNN1PT4H12M0FalseFalseFalse213.02251.13956.01650198000||16502056202022-04-17T08:20:00.000-04:00||2022-04-17T10:2…1650203400||16502131202022-04-17T09:50:00.000-04:00||2022-04-17T12:3…CLT||BOSATL||CLTAmerican Airlines||American AirlinesAA||AAAirbus A320||Airbus A3205400||7500228||728coach||coach

Using LocalCluster

Before demonstrating the use of SSHCluster, we’ll look at LocalCLuster, which allows us to create a local multi-core cluster for parallel computing on a single machine. It helps to distribute tasks across multiple CPUs or threads on your local machine, giving us the ability to scale computations beyond a single core:

# Creating LocalCluster instance.
from dask.distributed import LocalCluster

cluster = LocalCluster(n_workers=2, threads_per_worker=4) 
client = cluster.get_client()

client

Client

Client-a5ec0bd6-c9fe-11ef-8e58-3003c82ce6ed

Connection method: Cluster objectCluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

2025-01-03 12:16:44,335 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 initialized by task ('shuffle-transfer-8b04cd7a3e831297147d5c00d79f8bb9', 295) executed on worker tcp://127.0.0.1:33431
2025-01-03 12:18:07,698 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 deactivated due to stimulus 'task-finished-1735928287.6946151'
2025-01-03 12:19:33,219 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle b7f5ce97ecf82f18ee01140b59d55007 initialized by task ('shuffle-transfer-b7f5ce97ecf82f18ee01140b59d55007', 99) executed on worker tcp://127.0.0.1:33431
2025-01-03 12:20:26,159 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle b7f5ce97ecf82f18ee01140b59d55007 deactivated due to stimulus 'task-finished-1735928426.1586232'
2025-01-03 12:20:35,652 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 initialized by task ('shuffle-transfer-8b04cd7a3e831297147d5c00d79f8bb9', 405) executed on worker tcp://127.0.0.1:36301
2025-01-03 12:21:36,894 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 deactivated due to stimulus 'task-finished-1735928496.8918214'

Then we can read in the itineraries dataset and perform an aggregation. First we get the number of records by starting airport:

import dask
import dask.dataframe as ddf

data_path = "~/shared/data/airlines/itineraries.csv"

df = ddf.read_csv(
    data_path, 
    usecols=["startingAirport", "destinationAirport", "baseFare", "totalFare"],
    assume_missing=True
)

dfstart = df.groupby("startingAirport").startingAirport.count()

dfstart.compute()
startingAirport
CLT    5494510
MIA    4930213
OAK    3809884
DFW    5674959
ATL    5312028
IAD    3464378
JFK    4425164
DTW    4547052
DEN    4697143
LGA    5919323
BOS    5883876
ORD    5503476
PHL    4726187
EWR    3970797
LAX    8073281
SFO    5706482
Name: startingAirport, dtype: int64

Next we compute the average total fare by starting airport:

dffare = df.groupby("startingAirport").totalFare.mean()
dffare.compute()
startingAirport
CLT    321.456261
MIA    299.955595
OAK    534.211396
DFW    294.077856
ATL    303.774077
IAD    370.046151
JFK    375.406488
DTW    330.940539
DEN    335.077884
LGA    299.220774
BOS    285.865775
ORD    281.691875
PHL    344.088743
EWR    302.986457
LAX    379.254937
SFO    434.504077
Name: totalFare, dtype: float64
client.shutdown()

Using SSHCluster

SSHCluster is used to create our distributed cluster on the local network. It accepts a list of hostnames, the first of which will be used for the scheduler and the rest as workers. We can repeat the name of the first host to have it also serve as a worker.

The full list of worker options can be found here. Note that in when using LocalCluster, n_workers specifies the total number of workers. When using SSHCluster, n_workers is the number of workers per host. In the next cell, we’ve create a cluster with two nodes, 1 worker each, with 4 threads per worker:

from dask.distributed import Client, SSHCluster

# Path to dask environment executable for all nodes. 
remote_python = "~/miniforge3/envs/dask/bin/python"

hosts = ["localhost", "192.168.86.138", "192.168.86.147"]

worker_opts = {"n_workers": 1, "nthreads": 4} #"memory_limit": "8GB"

connect_opts = {
    "known_hosts": None, 
    "username": "jtriz", 
    "password": "xxxxx",
}

cluster = SSHCluster(
    hosts=hosts,
    worker_options=worker_opts,
    connect_options=connect_opts,
    remote_python=remote_python
)

client = Client(cluster)

client
2025-01-03 12:24:23,300 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:23,299 - distributed.scheduler - INFO - State start
2025-01-03 12:24:23,304 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:23,303 - distributed.scheduler - INFO -   Scheduler at: tcp://192.168.86.154:46219
2025-01-03 12:24:24,150 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,149 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.86.154:35955'
2025-01-03 12:24:24,161 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,160 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.86.154:45391'
2025-01-03 12:24:24,538 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,536 - distributed.worker - INFO -       Start worker at: tcp://192.168.86.154:40165
2025-01-03 12:24:24,553 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,551 - distributed.worker - INFO -       Start worker at: tcp://192.168.86.154:39183

Client

Client-f59132cc-c9ff-11ef-8e58-3003c82ce6ed

Connection method: Cluster objectCluster type: distributed.SpecCluster
Dashboard: http://192.168.86.154:8787/status

Cluster Info

Getting a count of the number of records by starting airport:

data_path = "~/shared/data/airlines/itineraries.csv"

df = ddf.read_csv(
    data_path, 
    usecols=["startingAirport", "destinationAirport", "baseFare", "totalFare"],
    assume_missing=True
)

dfstart = df.groupby("startingAirport").startingAirport.count()

dfstart.compute()
startingAirport
CLT    5494510
MIA    4930213
OAK    3809884
DFW    5674959
ATL    5312028
IAD    3464378
JFK    4425164
DTW    4547052
DEN    4697143
LGA    5919323
BOS    5883876
ORD    5503476
PHL    4726187
EWR    3970797
LAX    8073281
SFO    5706482
Name: startingAirport, dtype: int64

As before, computing the average total fare by starting airport:

dffare = df.groupby("startingAirport").totalFare.mean()
dffare.compute()
startingAirport
CLT    321.456261
MIA    299.955595
OAK    534.211396
DFW    294.077856
ATL    303.774077
IAD    370.046151
JFK    375.406488
DTW    330.940539
DEN    335.077884
LGA    299.220774
BOS    285.865775
ORD    281.691875
PHL    344.088743
EWR    302.986457
LAX    379.254937
SFO    434.504077
Name: totalFare, dtype: float64
client.shutdown()

The Dask Lab extention didn’t seem to work when using SSHCluster. This probably has more to do with my setup than any issue with the extention itself, but even after tinkering with it for a while, I couldn’t get it to display usage metrics.

While I was able to get SSHCluster setup and running on my local network, I’ll probably opt for LocalCLuster for my distributed computing needs, first because my data processing needs aren’t that significant and this was intended as a proof of concept, and second not having access to the dash lab extention when using SSHCluter is limiting. In addition, at times I noticed a mismatch between the actual number of workers and number of workers displayed by the SSHCluster client, so it was difficult to determine if the cluster was properly configured without diagnostic tooling.

I plan on researching how to go about setting up an additional worker on WSL within an SSHCluster, but that will be a future post.

To leave a comment for the author, please follow the link and comment on their blog: The Pleasure of Finding Things Out: A blog by James Triveri .

Want to share your content on python-bloggers? click here.