&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&ensp;
[Home Page](../START_HERE.ipynb)

[Previous Notebook](01-Intro_to_Dask.ipynb)
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
[1](01-Intro_to_Dask.ipynb)
[2]
[3](03-CuML_and_Dask.ipynb)
[4](04-Challenge.ipynb)
[5](05-Challenge_Solution.ipynb)
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
[Next Notebook](03-CuML_and_Dask.ipynb)

# Introduction to cuDF and Dask-cuDF
=======================

Modeled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask-cuDF, geared mainly for new users. The tutorial is split into modules with embedded exercises for you to practice the concepts. All the concepts have both CuDF and Dask-CuDF syntax for enhanced understanding.



### What are these Libraries?

[cuDF](https://github.com/rapidsai/cudf) is a Python GPU DataFrame library (built on the Apache Arrow columnar memory format) for loading, joining, aggregating, filtering, and otherwise manipulating data.

[Dask](https://dask.org/) is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple.

[Dask-cuDF](https://github.com/rapidsai/dask-cudf) is a library that provides a partitioned, GPU-backed dataframe, using Dask.



### When to use cuDF and Dask-cuDF

If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask-cuDF.



## Here is the list of contents in the lab:


- <a href='#objcreation'>Creating Dask-CuDF Objects</a><br> This module shows you how to work with Dask-CuDF dataframes, the distributed GPU equivalent of Pandas dataframes, for faster data transactions. It includes creating Dask-CuDF objects, viewing data, selecting data, boolean indexing and dealing with missing data.
- <a href='#operation'>Operations</a><br> Learn how to view descriptive statistics, perform string operations, histogramming, concatenate, joins, append, group data and use applymap.
- <a href='#time'>TimeSeries</a><br> Introduction to using TimeSeries data format in Dask-CuDF  
- <a href='#cat'>Categoricals</a><br> Introduction to using categorical data in Dask-CuDF  
- <a href='#condatarep'>Converting Data Representations</a><br> Here we will work with converting data representations, including Arrow, Pandas and Numpy, that are commonly required in data science pipelines.
- <a href='#datainout'>Getting Data In and Out</a><br> Transfering Dask-CuDf dataframes to and from CSV and Parquet files.


In [None]:
import os

import numpy as np
import pandas as pd
import cudf
import dask_cudf

np.random.seed(12)

#### Portions of this were borrowed and adapted from the
#### cuDF cheatsheet, existing cuDF documentation,
#### and 10 Minutes to Pandas.

<a id='objcreation'></a>

Object Creation
---------------

Creating a `cudf.Series` and `dask_cudf.Series`.

In [None]:
s = cudf.Series([1,2,3,None,4])
print(s)

In [None]:
ds = dask_cudf.from_cudf(s, npartitions=2) 
print(ds.compute())

Creating a `cudf.DataFrame` and a `dask_cudf.DataFrame` by specifying values for each column.

In [None]:
df = cudf.DataFrame({'a': list(range(20)),
                     'b': list(reversed(range(20))),
                     'c': list(range(20))
                    })

In [None]:
ddf = dask_cudf.from_cudf(df, npartitions=2) 
print(ddf.compute())

Creating a `cudf.DataFrame` from a pandas `Dataframe` and a `dask_cudf.Dataframe` from a `cudf.Dataframe`.

*Note that best practice for using Dask-cuDF is to read data directly into a `dask_cudf.DataFrame` with something like `read_csv` (discussed below).*

In [None]:
pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [0.1, 0.2, None, 0.3]})
gdf = cudf.DataFrame.from_pandas(pdf)
print(gdf)

In [None]:
dask_df = dask_cudf.from_cudf(pdf, npartitions=2)
dask_gdf = dask_cudf.from_dask_dataframe(dask_df)
print(dask_gdf.compute())

<a id='viewing'></a><br>

Viewing Data
-------------

Viewing the top rows of a GPU dataframe.

In [None]:
print(df.head(2))

In [None]:
print(ddf.head(2))

Sorting by values.

In [None]:
print(df.sort_values(by='b'))

In [None]:
print(ddf.sort_values(by='b').compute())

<a id='selection'></a>

Selection
------------

## Getting

Selecting a single column, which initially yields a `cudf.Series` or `dask_cudf.Series`. Calling `compute` results in a `cudf.Series` (equivalent to `df.a`).

In [None]:
print(df['a'])

In [None]:
print(ddf['a'].compute())

<a id='sellabel'></a>

## Selection by Label

Selecting rows from index 2 to index 5 from columns 'a' and 'b'.

In [None]:
print(df.loc[2:5, ['a', 'b']])

In [None]:
print(ddf.loc[2:5, ['a', 'b']].compute())

<a id='selpos'></a>

## Selection by Position

Selecting via integers and integer slices, like numpy/pandas. Note that this functionality is not available for Dask-cuDF DataFrames.

In [None]:
print(df.iloc[0])

In [None]:
print(df.iloc[0:3, 0:2])

You can also select elements of a `DataFrame` or `Series` with direct index access.

In [None]:
print(df[3:5])

In [None]:
print(s[3:5])

<a id='boolean'></a>

## Boolean Indexing

Selecting rows in a `DataFrame` or `Series` by direct Boolean indexing.

In [None]:
print(df[df.b > 15])

In [None]:
print(ddf[ddf.b > 15].compute())

Selecting values from a `DataFrame` where a Boolean condition is met, via the `query` API.

In [None]:
print(df.query("b == 3"))  

In [None]:
print(ddf.query("b == 3").compute())  

You can also pass local variables to Dask-cuDF queries, via the `local_dict` keyword. With standard cuDF, you may either use the `local_dict` keyword or directly pass the variable via the `@` keyword.

In [None]:
cudf_comparator = 3
print(df.query("b == @cudf_comparator"))

In [None]:
dask_cudf_comparator = 3
print(ddf.query("b == @val", local_dict={'val':dask_cudf_comparator}).compute())  

Supported logical operators include `>`, `<`, `>=`, `<=`, `==`, and `!=`.

<a id='multi'></a><br>

## MultiIndex

cuDF supports hierarchical indexing of DataFrames using MultiIndex. Grouping hierarchically (see `Grouping` below) automatically produces a DataFrame with a MultiIndex.

In [None]:
arrays = [['a', 'a', 'b', 'b'],
          [1, 2, 3, 4]]
tuples = list(zip(*arrays))
idx = cudf.MultiIndex.from_tuples(tuples)
idx

This index can back either axis of a DataFrame.

In [None]:
gdf1 = cudf.DataFrame({'first': np.random.rand(4), 'second': np.random.rand(4)})
gdf1.index = idx
print(gdf1.to_pandas())

In [None]:
gdf2 = cudf.DataFrame({'first': np.random.rand(4), 'second': np.random.rand(4)}).T
gdf2.columns = idx
print(gdf2.to_pandas())

Accessing values of a DataFrame with a MultiIndex. Note that slicing is not yet supported.

In [None]:
print(gdf1.loc[('b', 3)].to_pandas())

<a id='missing'></a><br>

Missing Data
------------

Missing data can be replaced by using the `fillna` method.

In [None]:
print(s.fillna(999))

In [None]:
print(ds.fillna(999).compute())

<a id='operation'></a><br>

Operations
------------

<a id='stats'></a><br>

## Stats

Calculating descriptive statistics for a `Series`.

In [None]:
print(s.mean(), s.var())

In [None]:
print(ds.mean().compute(), ds.var().compute())

<a id='applymap'></a><br>

## Applymap

Applying functions to a `Series`. Note that applying user defined functions directly with Dask-cuDF is not yet implemented. For now, you can use [map_partitions](http://docs.dask.org/en/stable/dataframe-api.html#dask.dataframe.DataFrame.map_partitions) to apply a function to each partition of the distributed dataframe.

In [None]:
def add_ten(num):
    return num + 10

print(df['a'].applymap(add_ten))

In [None]:
print(ddf['a'].map_partitions(add_ten).compute())

<a id='histo'></a>

## Histogramming

Counting the number of occurrences of each unique value of variable.

In [None]:
print(df.a.value_counts())

In [None]:
print(ddf.a.value_counts().compute())

<a id='string'></a><br>

## String Methods

Like pandas, cuDF provides string processing methods in the `str` attribute of `Series`. Full documentation of string methods is a work in progress. Please see the cuDF API documentation for more information.

In [None]:
s = cudf.Series(['A', 'B', 'C', 'Aaba', 'Baca', None, 'CABA', 'dog', 'cat'])
print(s.str.lower())

In [None]:
ds = dask_cudf.from_cudf(s, npartitions=2)
print(ds.str.lower().compute())

<a id='concat'></a><br>

## Concat

Concatenating `Series` and `DataFrames` row-wise.

In [None]:
s = cudf.Series([1, 2, 3, None, 5])
print(cudf.concat([s, s]))

In [None]:
ds2 = dask_cudf.from_cudf(s, npartitions=2)
print(dask_cudf.concat([ds2, ds2]).compute())

<a id='join'></a><br>

## Join

Performing SQL style merges. Note that the dataframe order is not maintained, but may be restored post-merge by sorting by the index.

In [None]:
df_a = cudf.DataFrame()
df_a['key'] = ['a', 'b', 'c', 'd', 'e']
df_a['vals_a'] = [float(i + 10) for i in range(5)]

df_b = cudf.DataFrame()
df_b['key'] = ['a', 'c', 'e']
df_b['vals_b'] = [float(i+100) for i in range(3)]

merged = df_a.merge(df_b, on=['key'], how='left')
print(merged)

In [None]:
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)

merged = ddf_a.merge(ddf_b, on=['key'], how='left').compute()
print(merged)

<a id='append'></a><br>

## Append

Appending values from another `Series` or array-like object.

In [None]:
print(s.append(s))

In [None]:
print(ds2.append(ds2).compute())

<a id='grouping'></a><br>

## Grouping

Like pandas, cuDF and Dask-cuDF support the Split-Apply-Combine groupby paradigm.

In [None]:
df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]

ddf = dask_cudf.from_cudf(df, npartitions=2)

Grouping and then applying the `sum` function to the grouped data.

In [None]:
print(df.groupby('agg_col1').sum())

In [None]:
print(ddf.groupby('agg_col1').sum().compute())

Grouping hierarchically then applying the `sum` function to grouped data. We send the result to a pandas dataframe only for printing purposes.

In [None]:
print(df.groupby(['agg_col1', 'agg_col2']).sum().to_pandas())

In [None]:
ddf.groupby(['agg_col1', 'agg_col2']).sum().compute().to_pandas()

Grouping and applying statistical functions to specific columns, using `agg`.

In [None]:
print(df.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'}))

In [None]:
print(ddf.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'}).compute())

<a id='tran'></a><br>

## Transpose

Transposing a dataframe, using either the `transpose` method or `T` property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask-cuDF.

In [None]:
sample = cudf.DataFrame({'a':[1,2,3], 'b':[4,5,6]})
print(sample)

In [None]:
print(sample.transpose())

<a id='time'></a><br>

Time Series
------------


`DataFrames` supports `datetime` typed columns, which allow users to interact with and filter data based on specific timestamps.

In [None]:
import datetime as dt

date_df = cudf.DataFrame()
date_df['date'] = pd.date_range('11/20/2018', periods=72, freq='D')
date_df['value'] = np.random.sample(len(date_df))

search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')
print(date_df.query('date <= @search_date'))

In [None]:
date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)
print(date_ddf.query('date <= @search_date', local_dict={'search_date':search_date}).compute())

<a id='cat'></a><br>

Categoricals
------------

`DataFrames` support categorical columns.

In [None]:
pdf = pd.DataFrame({"id":[1,2,3,4,5,6], "grade":['a', 'b', 'b', 'a', 'a', 'e']})
pdf["grade"] = pdf["grade"].astype("category")

gdf = cudf.DataFrame.from_pandas(pdf)
print(gdf)

In [None]:
dgdf = dask_cudf.from_cudf(gdf, npartitions=2)
print(dgdf.compute())

Accessing the categories of a column. Note that this is currently not supported in Dask-cuDF.

In [None]:
gdf.grade.cat.categories

Accessing the underlying code values of each categorical observation.

In [None]:
print(gdf.grade.cat.codes)

In [None]:
print(dgdf.grade.cat.codes.compute())

<a id='condatarep'></a><br>


Converting Data Representation
--------------------------------

<a id='pandas'></a><br>

## Pandas

Converting a cuDF and Dask-cuDF `DataFrame` to a pandas `DataFrame`.

In [None]:
print(df.head().to_pandas())

In [None]:
print(ddf.compute().head().to_pandas())

<a id='numpy'></a><br>

## Numpy

Converting a cuDF or Dask-cuDF `DataFrame` to a numpy `ndarray`.

In [None]:
print(df.as_matrix())

In [None]:
print(ddf.compute().as_matrix())

Converting a cuDF or Dask-cuDF `Series` to a numpy `ndarray`.

In [None]:
print(df['a'].to_array())

In [None]:
print(ddf['a'].compute().to_array())

<a id='arrow'></a><br>   

## Arrow

Converting a cuDF or Dask-cuDF `DataFrame` to a PyArrow `Table`.

In [None]:
print(df.to_arrow())

In [None]:
print(ddf.compute().to_arrow())

<a id='datainout'></a><br>

Getting Data In/Out
------------------------


 <a id='csv'></a><br>

## CSV

Writing to a CSV file, by first sending data to a pandas `Dataframe` on the host.

In [None]:
if not os.path.exists('example_output'):
    os.mkdir('example_output')
    
df.to_pandas().to_csv('example_output/foo.csv', index=False)

In [None]:
ddf.compute().to_pandas().to_csv('example_output/foo_dask.csv', index=False)

Reading from a csv file.

In [None]:
df = cudf.read_csv('example_output/foo.csv')
print(df)

In [None]:
ddf = dask_cudf.read_csv('example_output/foo_dask.csv')
print(ddf.compute())

Reading all CSV files in a directory into a single `dask_cudf.DataFrame`, using the star wildcard.

In [None]:
ddf = dask_cudf.read_csv('example_output/*.csv')
print(ddf.compute())

<a id='par'></a><br>

## Parquet

Writing to parquet files, using the CPU via PyArrow.

In [None]:
df.to_parquet('example_output/temp_parquet')

Reading parquet files with a GPU-accelerated parquet reader.

In [None]:
df = cudf.read_parquet('example_output/temp_parquet')
print(df.to_pandas())

Writing to parquet files from a `dask_cudf.DataFrame` using PyArrow under the hood.

In [None]:
ddf.to_parquet('example_files')  

# Conclusion

Now we are familiar with creating Dask-CuDF dataframes, selecting, viewing and manipulating data. The operations are almost the same as pandas, and can easily replace the pandas operations in our traditional data science pipeline. While the results may vary slightly on different GPUs, it should be clear that distributed GPU acceleration can make a significant difference. We can get much faster results with the same code! The next tutorial will show you how to use CuML with Dask. This is very exciting as we can now boost our models with distributed GPU programming.

## Licensing
  
This material is released by OpenACC-Standard.org, in collaboration with NVIDIA Corporation, under the Creative Commons Attribution 4.0 International (CC BY 4.0).

[Previous Notebook](01-Intro_to_Dask.ipynb)
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
[1](01-Intro_to_Dask.ipynb)
[2]
[3](03-CuML_and_Dask.ipynb)
[4](04-Challenge.ipynb)
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
[Next Notebook](03-CuML_and_Dask.ipynb)

&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&ensp;
[Home Page](../START_HERE.ipynb)