&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&ensp;
[Home Page](../START_HERE.ipynb)

[Previous Notebook](02-CuDF_and_Dask.ipynb)
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
[1](01-Intro_to_Dask.ipynb)
[2](02-CuDF_and_Dask.ipynb)
[3]
[4](04-Challenge.ipynb)
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
[Next Notebook](04-Challenge.ipynb)

# Random Forests Multi-node, Multi-GPU demo

The experimental cuML multi-node, multi-GPU (MNMG) implementation of random forests leverages Dask to do embarrassingly-parallel model fitting. For a random forest with `N` trees being fit by `W` workers, each worker will build `N / W` trees. During inference, predictions from all `N` trees will be combined.

The caller is responsible for partitioning the data efficiently via Dask. To build an accurate model, it's important to ensure that each worker has a representative chunk of the data. This can come by distributing the data evenly after ensuring that it is well shuffled. Or, given sufficient memory capacity, the caller can replicate the data to all workers. This approach will most closely simulate the single-GPU building approach.

**Note:** cuML 0.9 contains the first, experimental preview release of the MNMG random forest model. The API is subject to change in future releases, and some known limitations remain (listed in the documentation).

For more information on MNMG Random Forest models, see the documentation:
 * https://docs.rapids.ai/api/cuml/stable/api.html#cuml.dask.ensemble.RandomForestClassifier
 * https://docs.rapids.ai/api/cuml/stable/api.html#cuml.dask.ensemble.RandomForestRegressor

## Here is the list of contents in the lab:


- <a href='#objcreation'>Start Dask Cluster</a><br> We will specify what resources we want to utilize in our GPU cluster for executing the ML algorithms.
- <a href='#viewing'>Define Parameters</a><br> Define the data and model parameters as per your choice for creating data and model.
- <a href='#selection'>Generation of Data</a><br> We will train our model on a custom dataset created using the parameters specified before.
- <a href='#sellabel'>Distribute Data</a><br> As we are working on a multi-GPU set we will be distributing the data evenly after ensuring that it is well shuffled.
- <a href='#selpos'>Scikit-learn</a><br> Create the scikit-learn implementation of the selection model.
- <a href='#boolean'> CuML</a><br> Convert the Scikit-learn model to CuML and use Dask.
- <a href='#missing'> Accuracy</a><br> Evaluate the performance of both models in terms of accuracy and timing output.

In [None]:
import numpy as np
import sklearn

import pandas as pd
import cudf
import cuml

from sklearn.metrics import accuracy_score
from sklearn import model_selection, datasets

from cuml.dask.common import utils as dask_utils
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask_cudf

from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF
from sklearn.ensemble import RandomForestClassifier as sklRF

<a id='objcreation'></a><br>

## Start Dask cluster

In [None]:
# This will use all GPUs on the local host by default
cluster = LocalCUDACluster(threads_per_worker=1)
c = Client(cluster)

# Query the client for all connected workers
workers = c.has_what().keys()
n_workers = len(workers)
n_streams = 8 # Performance optimization

 <a id='viewing'></a><br>
## Define Parameters

In addition to the number of examples, random forest fitting performance depends heavily on the number of columns in a dataset and (especially) on the maximum depth to which trees are allowed to grow. Lower `max_depth` values can greatly speed up fitting, though going too low may reduce accuracy.

In [None]:
# Data parameters
train_size = 100000
test_size = 1000
n_samples = train_size + test_size
n_features = 20

# Random Forest building parameters
max_depth = 12
n_bins = 16
n_trees = 1000

<a id='selection'></a><br>

## Generate Data on host

In this case, we generate data on the client (initial process) and pass it to the workers. You could also load data directly onto the workers via, for example, `dask_cudf.read_csv()`. See also the k-means MNMG notebook (kmeans_mnmg_demo.ipynb) for an alternative method of generating data on the worker nodes.

In [None]:
X, y = datasets.make_classification(n_samples=n_samples, n_features=n_features,
                                 n_clusters_per_class=1, n_informative=int(n_features / 3),
                                 random_state=123, n_classes=5)
X = X.astype(np.float32)
y = y.astype(np.int32)
X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=test_size)

<a id='sellabel'></a><br>

## Distribute data to worker GPUs

In [None]:
n_partitions = n_workers

def distribute(X, y):
    # First convert to cudf (with real data, you would likely load in cuDF format to start)
    X_cudf = cudf.DataFrame.from_pandas(pd.DataFrame(X))
    y_cudf = cudf.Series(y)

    # Partition with Dask
    # In this case, each worker will train on 1/n_partitions fraction of the data
    X_dask = dask_cudf.from_cudf(X_cudf, npartitions=n_partitions)
    y_dask = dask_cudf.from_cudf(y_cudf, npartitions=n_partitions)

    # Persist to cache the data in active memory
    X_dask, y_dask = \
      dask_utils.persist_across_workers(c, [X_dask, y_dask], workers=workers)
    
    return X_dask, y_dask

X_train_dask, y_train_dask = distribute(X_train, y_train)
X_test_dask, y_test_dask = distribute(X_test, y_test)

<a id='selpos'></a><br>

## Build a scikit-learn model (single node)

Dask does not currently have a simple wrapper for scikit-learn's RandomForest, but scikit-learn does offer multi-CPU support via joblib, which we'll use.

In [None]:
%%time

# Use all avilable CPU cores
skl_model = sklRF(max_depth=max_depth, n_estimators=n_trees, n_jobs=-1)
skl_model.fit(X_train, y_train)

<a id='boolean'></a><br>

## Train the distributed cuML model

In [None]:
%%time

cuml_model = cumlDaskRF(max_depth=max_depth, n_estimators=n_trees, n_bins=n_bins, n_streams=n_streams)
cuml_model.fit(X_train_dask, y_train_dask)

wait(cuml_model.rfs) # Allow asynchronous training tasks to finish

<a id='missing'></a><br>

# Predict and check accuracy

In [None]:
skl_y_pred = skl_model.predict(X_test)
cuml_y_pred = cuml_model.predict(X_test_dask).compute().to_array()

# Due to randomness in the algorithm, you may see slight variation in accuracies
print("SKLearn accuracy:  ", accuracy_score(y_test, skl_y_pred))
print("CuML accuracy:     ", accuracy_score(y_test, cuml_y_pred))

# Conclusion

We can see that the timing output reduces and the accuracy is similar. We are familiar with CuML and Dask integration now. If you wish to explore this in detail, you can refer [here](https://github.com/rapidsai/cuml/tree/branch-0.18/notebooks). If you are confident with the concepts explained till now, you can move to the next lab and attempt to solve the Dask exercise.

## Licensing
  
This material is released by OpenACC-Standard.org, in collaboration with NVIDIA Corporation, under the Creative Commons Attribution 4.0 International (CC BY 4.0).

[Previous Notebook](02-CuDF_and_Dask.ipynb)
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
[1](01-Intro_to_Dask.ipynb)
[2](02-CuDF_and_Dask.ipynb)
[3]
[4](04-Challenge.ipynb)
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
[Next Notebook](04-Challenge.ipynb)

&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&ensp;
[Home Page](../START_HERE.ipynb)