skip to content

Search

dask_setup

CI Status

A comprehensive single-node Dask setup helper designed for HPC environments (especially Gadi), with intelligent defaults for CPU/I/O-bound workloads, advanced configuration management, storage format optimization, and user-friendly error handling.

What It Does

dask_setup is a feature-rich wrapper around dask.distributed.LocalCluster + Client that eliminates trial-and-error and provides expert-level optimization out of the box:

  • Detects resources automatically from PBS/SLURM/Kubernetes environments
  • Reserves memory for OS/I/O caches and splits the rest across workers
  • Chooses topology (process/thread) based on workload type
  • Configures safe spilling to prevent OOM crashes
  • Pins temp/spill to $PBS_JOBFS (node-local SSD)
  • Provides dashboard with SSH tunnel commands

Key Features

Smart Detection

Automatic PBS/SLURM/Kubernetes environment and resource detection

Configuration Profiles

Pre-configured profiles for common workloads with validation

Workload Optimization

Specialized topologies for CPU-bound, I/O-bound, and mixed workloads

Storage Intelligence

Zarr/NetCDF-aware chunking and compression recommendations

Memory Safety

Aggressive spilling configuration prevents out-of-memory crashes

HPC-Optimized

Routes temp/spill to $PBS_JOBFS for maximum performance

Installation

pip install dask-setup

Requirements: Python 3.11+

Quick Start

Simple Usage

from dask_setup import setup_dask_client

# Optimal defaults for CPU-bound work
client, cluster, dask_tmp = setup_dask_client("cpu")

Advanced Configuration

from dask_setup import setup_dask_client, DaskSetupConfig

config = DaskSetupConfig(
    workload_type="cpu",
    max_workers=8,
    reserve_mem_gb=32.0,
    spill_compression="lz4",
    suggest_chunks=True  # Show xarray chunking recommendations
)
client, cluster, dask_tmp = setup_dask_client(config=config)

Workload Types

CPU-Bound (workload_type="cpu")

  • Topology: Many processes, 1 thread each
  • Best for: Heavy computation, NumPy/Numba operations, xarray math
  • Workers: ≈ number of cores
client, cluster, tmp = setup_dask_client("cpu", reserve_mem_gb=60)
ds = ds.chunk({"time": 240, "y": 512, "x": 512})
result = ds.mean(("y", "x")).compute()

I/O-Bound (workload_type="io")

  • Topology: 1 process with 8-16 threads
  • Best for: High-throughput NetCDF/Zarr I/O
  • Workers: Single process, multi-threaded
client, cluster, tmp = setup_dask_client("io", reserve_mem_gb=40)
ds = xr.open_mfdataset(files, engine="netcdf4", chunks={}, parallel=True)

Mixed (workload_type="mixed")

  • Topology: Few processes, 2 threads each
  • Best for: Pipelines with both reading and computation
  • Workers: Balanced configuration

Configuration Profiles

from dask_setup import ConfigManager, DaskSetupConfig

# Use built-in profiles
client, cluster, tmp = setup_dask_client(profile="cpu_intensive")

# Create and save custom profiles
config = DaskSetupConfig(
    workload_type="io",
    reserve_mem_gb=40.0,
    spill_compression="lz4",
    name="my_io_profile",
    description="Optimized for large NetCDF processing"
)

manager = ConfigManager()
manager.save_profile("my_io_profile", config)

# List available profiles
profiles = manager.list_profiles()

Built-in Profiles

  • cpu_intensive: Heavy computation, many processes, minimal I/O
  • io_heavy: Large file processing, optimized for Zarr/NetCDF
  • memory_conservative: Lower memory usage, safer for shared systems
  • balanced: Good default for mixed workloads

Storage Format Intelligence

dask_setup includes sophisticated storage format detection and optimization for Zarr and NetCDF files:

from dask_setup import recommend_io_chunks

# Automatic format detection and optimization
chunks = recommend_io_chunks(
    ds,                                      # Your xarray dataset
    path_or_url="s3://bucket/data.zarr",   # Storage path
    access_pattern="sequential",            # Access pattern
    verbose=True                            # Show recommendations
)

ds_optimized = ds.chunk(chunks)

Zarr Optimization

  • Cloud-friendly chunking (128-512MB chunks)
  • Compression codecs (zstd for cloud, lz4 for local)
  • Automatic metadata consolidation
  • Bit rounding for floating-point compression

NetCDF Optimization

  • HDF5-aware chunking (64-256MB chunks)
  • Conservative chunking of unlimited dimensions
  • zlib with shuffle filter
  • Optimized caching for HTTP/S3 access

Memory Management

Spill Thresholds

Configured automatically to prevent OOM crashes:

  • 75%: Start spilling to disk
  • 85%: Aggressive spilling
  • 92%: Pause scheduling new tasks
  • 98%: Kill worker (last resort)

Compression & Parallel I/O

config = DaskSetupConfig(
    workload_type="cpu",
    spill_compression="lz4",      # Options: auto, lz4, zstd, snappy
    comm_compression=True,        # Worker-to-worker compression
    spill_threads=4,              # Parallel spill I/O threads
    reserve_mem_gb=50.0
)
client, cluster, tmp = setup_dask_client(config=config)

HPC Integration

PBS Job Example

#!/bin/bash
#PBS -q normalsr
#PBS -l ncpus=104
#PBS -l mem=300gb
#PBS -l jobfs=200gb
#PBS -l walltime=12:00:00
#PBS -l storage=gdata/hh5+gdata/gb02
#PBS -l wd

module use /g/data/hh5/public/modules/
module load conda_concept/analysis3-unstable

export TMPDIR="$PBS_JOBFS"
python your_script.py

SSH Tunnel for Dashboard

With dashboard=True, the cluster prints an SSH tunnel command:

# On your laptop:
ssh -N -L 8787:<compute_node>:<port> gadi.nci.org.au

# Then open: http://localhost:8787

Common Usage Patterns

Xarray Reductions

client, cluster, dask_tmp = setup_dask_client("cpu", reserve_mem_gb=60)
ds = ds.chunk({"time": 240, "y": 512, "x": 512})
out = ds.mean(("y", "x")).compute()

Writing to Zarr

client, cluster, dask_tmp = setup_dask_client("cpu", max_workers=1)
step = 240

# First window creates store
ds.isel(time=slice(0, step)).to_zarr("out.zarr", mode="w")

# Append by region
for start in range(step, n, step):
    stop = min(start + step, n)
    ds.isel(time=slice(start, stop)).to_zarr(
        "out.zarr", mode="a",
        region={"time": slice(start, stop)}
    )

Rechunking with Rechunker

client, cluster, dask_tmp = setup_dask_client("cpu", reserve_mem_gb=60)

tmp_store = f"{dask_tmp}/tmp_rechunk.zarr"
plan = rechunker.rechunk(
    ds.to_array().data,
    target_chunks={"time": 240, "y": 512, "x": 512},
    max_mem="6GB",
    target_store="out.zarr",
    temp_store=tmp_store,  # Lives on $PBS_JOBFS
)
plan.execute()

Enhanced Error Handling

dask_setup provides context-aware error messages with actionable suggestions:

# Invalid configuration triggers helpful error
try:
    config = DaskSetupConfig(
        max_workers=-5,
        reserve_mem_gb=1000.0,
        workload_type="invalid"
    )
except ConfigurationValidationError as e:
    print(e)
    # Shows:
    # - Specific validation errors
    # - Actionable suggestions
    # - Environment-specific advice
    # - Documentation links

Error Types

  • ConfigurationValidationError: Field-specific validation with fixes
  • ResourceConstraintError: Memory/CPU limits with PBS/SLURM advice
  • DependencyError: Missing packages with installation instructions
  • StorageConfigurationError: Storage format issues with solutions
  • ClusterSetupError: Cluster startup problems with diagnostics

CLI Tools

# List available profiles
dask-setup profiles list

# Show profile details
dask-setup profiles show cpu_intensive

# Create new profile interactively
dask-setup profiles create my_profile

# Validate configuration
dask-setup config validate my_config.yaml

# Export profile
dask-setup profiles export cpu_intensive > cpu_profile.yaml

Testing & Quality

  • 500+ tests covering all functionality
  • 90%+ code coverage requirement
  • Integration tests with real HPC environments
  • Performance benchmarks for optimization validation
  • Cross-platform testing (Linux, macOS, Windows)

Recipe Examples

The dask_setup repository includes comprehensive recipe examples demonstrating real-world usage patterns. These recipes are fully executable Python scripts with detailed comments and explanations.

01. Basic Usage

Core workload types (CPU/I/O/Mixed) with resource detection and safe shutdown patterns.

View on GitHub →

02. Configuration Management

Built-in profiles, custom YAML profiles, and runtime configuration overrides.

View on GitHub →

03. Memory & Compression

Advanced memory management, spill compression strategies, and parallel I/O configuration.

View on GitHub →

04. Storage & I/O

Storage format intelligence for Zarr/NetCDF with chunking and compression optimization.

View on GitHub →

05. Xarray Chunking

Best practices for xarray operations, chunking strategies, and lazy evaluation patterns.

View on GitHub →

06. HPC PBS/SLURM

Job script examples for PBS/SLURM with SSH tunneling and dashboard access patterns.

View on GitHub →

07. Error & Troubleshooting

Common errors and solutions with diagnostic tools and debugging strategies.

View on GitHub →

08. Real-World Science

Complete climate analysis case study with data loading, QC, temporal/spatial analysis, and visualization.

View on GitHub →

09. Performance Optimization

Benchmarking, scaling analysis, and performance tuning for different workload types.

View on GitHub →

10. Tool Integration

Integration with rechunker, Zarr, NetCDF workflows with region-based writing patterns.

View on GitHub →

Running Recipes: Each recipe can be executed directly from the command line:

python examples/recipes/01_basic/basic_usage.py --help
python examples/recipes/08_real_world_science/climate_analysis_case.py --verbose

Troubleshooting

"Task needs > memory_limit"

Use fewer (fatter) workers for heavy steps:

client, cluster, _ = setup_dask_client("cpu", max_workers=1, reserve_mem_gb=60)

Dashboard Unreachable

Use the printed SSH tunnel command. For compute jobs, tunnel to the compute node hostname.

Shared Filesystem Thrashing

Confirm spills go to jobfs by checking the printed temp/spill directory path.

Design Limitations

  • Single node only: For multi-node, use dask-jobqueue
  • No GPU tuning: For CuPy/rapids, adjust settings accordingly
  • POSIX paths: Assumes POSIX filesystem

TL;DR

# Quick results:
setup_dask_client("cpu")    # Heavy math/computation
setup_dask_client("io")     # Large file I/O
setup_dask_client("mixed")  # Mixed workloads

# Or use optimized profiles:
setup_dask_client(profile="cpu_intensive")

Contributing

We welcome contributions! See the GitHub repository for guidelines.

License

Apache-2.0 License