{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
"[Home Page](../START_HERE.ipynb)\n",
"\n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
"[1]\n",
"[2](02-CuDF_and_Dask.ipynb)\n",
"[3](03-CuML_and_Dask.ipynb)\n",
"[4](04-Challenge.ipynb)\n",
"[5](05-Challenge_Solution.ipynb)\n",
" \n",
" \n",
" \n",
" \n",
"[Next Notebook](02-CuDF_and_Dask.ipynb)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask Overview"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple. On the CPU, Dask uses Pandas (NumPy) to execute operations in parallel on DataFrame (array) partitions.\n",
"\n",
"Dask-cuDF extends Dask where necessary to allow its DataFrame partitions to be processed by cuDF GPU DataFrames as opposed to Pandas DataFrames. For instance, when you call dask_cudf.read_csv(…), your cluster’s GPUs do the work of parsing the CSV file(s) with underlying cudf.read_csv(). Dask also supports array based workflows using CuPy.\n",
"\n",
"## When to use Dask\n",
"If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF or CuPy. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask.\n",
"\n",
"One additional benefit Dask provides is that it lets us easily spill data between device and host memory. This can be very useful when we need to do work that would otherwise cause out of memory errors."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Here is the list of modules in the lab:\n",
"\n",
"- Creating a Local Cluster
Learn how to create a GPU cluster and find the amount of resources available to your cluster.\n",
"- Distributed GPU Arrays
We will study how Dask Array provides chunked algorithms on top of Numpy-like libraries like Numpy and CuPy, enabling us to operate on more data.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"# Creating a Local Cluster\n",
"\n",
"The easiest way to scale workflows on a single node is to use the `LocalCUDACluster` API. This lets us create a GPU cluster, using one worker per GPU by default."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"from dask.distributed import Client, wait\n",
"from dask_cuda import LocalCUDACluster\n",
"\n",
"cluster = LocalCUDACluster(threads_per_worker=1)\n",
"client = Client(cluster)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Workers provide two functions:\n",
"\n",
"- Compute tasks as directed by the scheduler\n",
"- Store and serve computed results to other workers or clients\n",
"\n",
"\n",
"Each worker contains a ThreadPool that it uses to evaluate tasks as requested by the scheduler.\n",
"- It stores the results of these tasks locally and serves them to other workers or clients on demand. \n",
"- If the worker is asked to evaluate a task for which it does not have all of the necessary data then it will reach out to its peer workers to gather the necessary dependencies.\n",
"\n",
"Each worker sends computations to a thread in a concurrent.futures.ThreadPoolExecutor for computation. These computations occur in the same process as the Worker communication server so that they can access and share data efficiently between each other. For the purposes of data locality all threads within a worker are considered the same worker."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!nvidia-smi"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"## Distributed GPU Arrays\n",
"\n",
"Let's create a random matrix and calculate the singular value decomposition of it. This is a fairly complex calculation, so it's a great introduction to Dask. Dask can use `CuPy` to create random arrays."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import dask\n",
"import dask.array as da\n",
"import cupy as cp"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"rs = da.random.RandomState(RandomState=cp.random.RandomState, seed=12) # <-- we specify cupy here\n",
"\n",
"x = rs.random((100000, 1000), chunks=(10000,1000))\n",
"x = x.persist() # so quick we don't need to wait"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Notice the `persist` call. Like Apache Spark, Dask operations are lazy . Instead of being executed at that moment, most operations are added to a task graph and the actual evaluation is delayed until the result is needed.\n",
"\n",
"\n",
"Sometimes, though, we want to force the execution of operations. Calling `persist` on a Dask collection fully computes it (or actively computes it in the background), persisting the result into memory. When we’re using distributed systems, we may want to wait until persist is finished before beginning any downstream operations. We can enforce this contract by using `wait`. Wrapping an operation with `wait` will ensure it doesn’t begin executing until all necessary upstream operations have finished.\n",
"\n",
"Let's look at our distributed array."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"x"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask's visual class representation shows us some information about this distributed array. We can see the size of the array, and of individual chunks, among other things. Remember, a Dask array is made up of individual CuPy or NumPy arrays.\n",
"\n",
"Let's take the SVD now."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"u, s, v = da.linalg.svd(x)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"u"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We've just added several hundred tasks to the task graph. We can call `persist` to execute it."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"u, s, v = dask.persist(u, s, v)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can take a look at the results."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"u[:5, :5].compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"That's all there is to it. Dask lets us take array workloads and scale up to as many machines as we have!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Conclusion"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"RAPIDS lets us scale up and take advantage of GPU acceleration. Dask lets us scale out to multiple machines. Dask supports both CuPy arrays and cuDF DataFrames, with generally the same APIs as the single-machine libraries. We encourage you to read the Dask [documentation](https://docs.dask.org/en/latest/) to learn more, and also look at our [10 Minute Guide to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/nightly/10min.html). After this brief introduction on Dask, we will learn how to use CuDF with Dask in detail in the next lab, and perform data operations on multiple levels."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Licensing\n",
" \n",
"This material is released by NVIDIA Corporation under the Creative Commons Attribution 4.0 International (CC BY 4.0)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
"[1]\n",
"[2](02-CuDF_and_Dask.ipynb)\n",
"[3](03-CuML_and_Dask.ipynb)\n",
"[4](04-Challenge.ipynb)\n",
"[5](05-Challenge_Solution.ipynb)\n",
" \n",
" \n",
" \n",
" \n",
"[Next Notebook](02-CuDF_and_Dask.ipynb)\n",
"\n",
" \n",
" \n",
" \n",
" \n",
" \n",
" \n",
"[Home Page](../START_HERE.ipynb)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.2"
}
},
"nbformat": 4,
"nbformat_minor": 4
}