dask_setup
A comprehensive single-node Dask setup helper designed for HPC environments (especially Gadi), with intelligent defaults for CPU/I/O-bound workloads, advanced configuration management, storage format optimization, and user-friendly error handling.
What It Does
dask_setup is a feature-rich wrapper around dask.distributed.LocalCluster + Client
that eliminates trial-and-error and provides expert-level optimization out of the box:
- Detects resources automatically from PBS/SLURM/Kubernetes environments
- Reserves memory for OS/I/O caches and splits the rest across workers
- Chooses topology (process/thread) based on workload type
- Configures safe spilling to prevent OOM crashes
- Pins temp/spill to
$PBS_JOBFS(node-local SSD) - Provides dashboard with SSH tunnel commands
Key Features
Smart Detection
Automatic PBS/SLURM/Kubernetes environment and resource detection
Configuration Profiles
Pre-configured profiles for common workloads with validation
Workload Optimization
Specialized topologies for CPU-bound, I/O-bound, and mixed workloads
Storage Intelligence
Zarr/NetCDF-aware chunking and compression recommendations
Memory Safety
Aggressive spilling configuration prevents out-of-memory crashes
HPC-Optimized
Routes temp/spill to $PBS_JOBFS for maximum performance
Installation
pip install dask-setup Requirements: Python 3.11+
Quick Start
Simple Usage
from dask_setup import setup_dask_client
# Optimal defaults for CPU-bound work
client, cluster, dask_tmp = setup_dask_client("cpu") Advanced Configuration
from dask_setup import setup_dask_client, DaskSetupConfig
config = DaskSetupConfig(
workload_type="cpu",
max_workers=8,
reserve_mem_gb=32.0,
spill_compression="lz4",
suggest_chunks=True # Show xarray chunking recommendations
)
client, cluster, dask_tmp = setup_dask_client(config=config) Workload Types
CPU-Bound (workload_type="cpu")
- Topology: Many processes, 1 thread each
- Best for: Heavy computation, NumPy/Numba operations, xarray math
- Workers: ≈ number of cores
client, cluster, tmp = setup_dask_client("cpu", reserve_mem_gb=60)
ds = ds.chunk({"time": 240, "y": 512, "x": 512})
result = ds.mean(("y", "x")).compute() I/O-Bound (workload_type="io")
- Topology: 1 process with 8-16 threads
- Best for: High-throughput NetCDF/Zarr I/O
- Workers: Single process, multi-threaded
client, cluster, tmp = setup_dask_client("io", reserve_mem_gb=40)
ds = xr.open_mfdataset(files, engine="netcdf4", chunks={}, parallel=True) Mixed (workload_type="mixed")
- Topology: Few processes, 2 threads each
- Best for: Pipelines with both reading and computation
- Workers: Balanced configuration
Configuration Profiles
from dask_setup import ConfigManager, DaskSetupConfig
# Use built-in profiles
client, cluster, tmp = setup_dask_client(profile="cpu_intensive")
# Create and save custom profiles
config = DaskSetupConfig(
workload_type="io",
reserve_mem_gb=40.0,
spill_compression="lz4",
name="my_io_profile",
description="Optimized for large NetCDF processing"
)
manager = ConfigManager()
manager.save_profile("my_io_profile", config)
# List available profiles
profiles = manager.list_profiles() Built-in Profiles
- cpu_intensive: Heavy computation, many processes, minimal I/O
- io_heavy: Large file processing, optimized for Zarr/NetCDF
- memory_conservative: Lower memory usage, safer for shared systems
- balanced: Good default for mixed workloads
Storage Format Intelligence
dask_setup includes sophisticated storage format detection and optimization
for Zarr and NetCDF files:
from dask_setup import recommend_io_chunks
# Automatic format detection and optimization
chunks = recommend_io_chunks(
ds, # Your xarray dataset
path_or_url="s3://bucket/data.zarr", # Storage path
access_pattern="sequential", # Access pattern
verbose=True # Show recommendations
)
ds_optimized = ds.chunk(chunks) Zarr Optimization
- Cloud-friendly chunking (128-512MB chunks)
- Compression codecs (zstd for cloud, lz4 for local)
- Automatic metadata consolidation
- Bit rounding for floating-point compression
NetCDF Optimization
- HDF5-aware chunking (64-256MB chunks)
- Conservative chunking of unlimited dimensions
- zlib with shuffle filter
- Optimized caching for HTTP/S3 access
Memory Management
Spill Thresholds
Configured automatically to prevent OOM crashes:
- 75%: Start spilling to disk
- 85%: Aggressive spilling
- 92%: Pause scheduling new tasks
- 98%: Kill worker (last resort)
Compression & Parallel I/O
config = DaskSetupConfig(
workload_type="cpu",
spill_compression="lz4", # Options: auto, lz4, zstd, snappy
comm_compression=True, # Worker-to-worker compression
spill_threads=4, # Parallel spill I/O threads
reserve_mem_gb=50.0
)
client, cluster, tmp = setup_dask_client(config=config) HPC Integration
PBS Job Example
#!/bin/bash
#PBS -q normalsr
#PBS -l ncpus=104
#PBS -l mem=300gb
#PBS -l jobfs=200gb
#PBS -l walltime=12:00:00
#PBS -l storage=gdata/hh5+gdata/gb02
#PBS -l wd
module use /g/data/hh5/public/modules/
module load conda_concept/analysis3-unstable
export TMPDIR="$PBS_JOBFS"
python your_script.py SSH Tunnel for Dashboard
With dashboard=True, the cluster prints an SSH tunnel command:
# On your laptop:
ssh -N -L 8787:<compute_node>:<port> gadi.nci.org.au
# Then open: http://localhost:8787 Common Usage Patterns
Xarray Reductions
client, cluster, dask_tmp = setup_dask_client("cpu", reserve_mem_gb=60)
ds = ds.chunk({"time": 240, "y": 512, "x": 512})
out = ds.mean(("y", "x")).compute() Writing to Zarr
client, cluster, dask_tmp = setup_dask_client("cpu", max_workers=1)
step = 240
# First window creates store
ds.isel(time=slice(0, step)).to_zarr("out.zarr", mode="w")
# Append by region
for start in range(step, n, step):
stop = min(start + step, n)
ds.isel(time=slice(start, stop)).to_zarr(
"out.zarr", mode="a",
region={"time": slice(start, stop)}
) Rechunking with Rechunker
client, cluster, dask_tmp = setup_dask_client("cpu", reserve_mem_gb=60)
tmp_store = f"{dask_tmp}/tmp_rechunk.zarr"
plan = rechunker.rechunk(
ds.to_array().data,
target_chunks={"time": 240, "y": 512, "x": 512},
max_mem="6GB",
target_store="out.zarr",
temp_store=tmp_store, # Lives on $PBS_JOBFS
)
plan.execute() Enhanced Error Handling
dask_setup provides context-aware error messages with actionable suggestions:
# Invalid configuration triggers helpful error
try:
config = DaskSetupConfig(
max_workers=-5,
reserve_mem_gb=1000.0,
workload_type="invalid"
)
except ConfigurationValidationError as e:
print(e)
# Shows:
# - Specific validation errors
# - Actionable suggestions
# - Environment-specific advice
# - Documentation links Error Types
- ConfigurationValidationError: Field-specific validation with fixes
- ResourceConstraintError: Memory/CPU limits with PBS/SLURM advice
- DependencyError: Missing packages with installation instructions
- StorageConfigurationError: Storage format issues with solutions
- ClusterSetupError: Cluster startup problems with diagnostics
CLI Tools
# List available profiles
dask-setup profiles list
# Show profile details
dask-setup profiles show cpu_intensive
# Create new profile interactively
dask-setup profiles create my_profile
# Validate configuration
dask-setup config validate my_config.yaml
# Export profile
dask-setup profiles export cpu_intensive > cpu_profile.yaml Testing & Quality
- 500+ tests covering all functionality
- 90%+ code coverage requirement
- Integration tests with real HPC environments
- Performance benchmarks for optimization validation
- Cross-platform testing (Linux, macOS, Windows)
Recipe Examples
The dask_setup repository includes comprehensive recipe examples demonstrating
real-world usage patterns. These recipes are fully executable Python scripts with detailed
comments and explanations.
01. Basic Usage
Core workload types (CPU/I/O/Mixed) with resource detection and safe shutdown patterns.
View on GitHub →02. Configuration Management
Built-in profiles, custom YAML profiles, and runtime configuration overrides.
View on GitHub →03. Memory & Compression
Advanced memory management, spill compression strategies, and parallel I/O configuration.
View on GitHub →04. Storage & I/O
Storage format intelligence for Zarr/NetCDF with chunking and compression optimization.
View on GitHub →05. Xarray Chunking
Best practices for xarray operations, chunking strategies, and lazy evaluation patterns.
View on GitHub →06. HPC PBS/SLURM
Job script examples for PBS/SLURM with SSH tunneling and dashboard access patterns.
View on GitHub →07. Error & Troubleshooting
Common errors and solutions with diagnostic tools and debugging strategies.
View on GitHub →08. Real-World Science
Complete climate analysis case study with data loading, QC, temporal/spatial analysis, and visualization.
View on GitHub →09. Performance Optimization
Benchmarking, scaling analysis, and performance tuning for different workload types.
View on GitHub →10. Tool Integration
Integration with rechunker, Zarr, NetCDF workflows with region-based writing patterns.
View on GitHub →Running Recipes: Each recipe can be executed directly from the command line:
python examples/recipes/01_basic/basic_usage.py --help
python examples/recipes/08_real_world_science/climate_analysis_case.py --verbose Troubleshooting
"Task needs > memory_limit"
Use fewer (fatter) workers for heavy steps:
client, cluster, _ = setup_dask_client("cpu", max_workers=1, reserve_mem_gb=60) Dashboard Unreachable
Use the printed SSH tunnel command. For compute jobs, tunnel to the compute node hostname.
Shared Filesystem Thrashing
Confirm spills go to jobfs by checking the printed temp/spill directory path.
Design Limitations
- Single node only: For multi-node, use
dask-jobqueue - No GPU tuning: For CuPy/rapids, adjust settings accordingly
- POSIX paths: Assumes POSIX filesystem
TL;DR
# Quick results:
setup_dask_client("cpu") # Heavy math/computation
setup_dask_client("io") # Large file I/O
setup_dask_client("mixed") # Mixed workloads
# Or use optimized profiles:
setup_dask_client(profile="cpu_intensive") Contributing
We welcome contributions! See the GitHub repository for guidelines.
License
Apache-2.0 License