|
@@ -0,0 +1,372 @@
|
|
|
+<?xml version="1.0" encoding="UTF-8"?>
|
|
|
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN"
|
|
|
+"http://www.oasis-open.org/docbook/xml/4.5/docbookx.dtd">
|
|
|
+<book xml:base="../">
|
|
|
+ <bookinfo>
|
|
|
+ <title>Spark-HPCCSystems Distributed Connector</title>
|
|
|
+
|
|
|
+ <mediaobject>
|
|
|
+ <imageobject>
|
|
|
+ <imagedata fileref="images/redswooshWithLogo3.jpg" />
|
|
|
+ </imageobject>
|
|
|
+ </mediaobject>
|
|
|
+
|
|
|
+ <author>
|
|
|
+ <surname>Boca Raton Documentation Team</surname>
|
|
|
+ </author>
|
|
|
+
|
|
|
+ <legalnotice>
|
|
|
+ <para>We welcome your comments and feedback about this document via
|
|
|
+ email to <email>docfeedback@hpccsystems.com</email></para>
|
|
|
+
|
|
|
+ <para>Please include <emphasis role="bold">Documentation
|
|
|
+ Feedback</emphasis> in the subject line and reference the document name,
|
|
|
+ page numbers, and current Version Number in the text of the
|
|
|
+ message.</para>
|
|
|
+
|
|
|
+ <para>LexisNexis and the Knowledge Burst logo are registered trademarks
|
|
|
+ of Reed Elsevier Properties Inc., used under license.</para>
|
|
|
+
|
|
|
+ <para>HPCC Systems<superscript>®</superscript> is a registered trademark
|
|
|
+ of LexisNexis Risk Data Management Inc.</para>
|
|
|
+
|
|
|
+ <para>Other products, logos, and services may be trademarks or
|
|
|
+ registered trademarks of their respective companies.</para>
|
|
|
+
|
|
|
+ <para>All names and example data used in this manual are fictitious. Any
|
|
|
+ similarity to actual persons, living or dead, is purely
|
|
|
+ coincidental.</para>
|
|
|
+
|
|
|
+ <para></para>
|
|
|
+ </legalnotice>
|
|
|
+
|
|
|
+ <xi:include href="common/Version.xml" xpointer="FooterInfo"
|
|
|
+ xmlns:xi="http://www.w3.org/2001/XInclude" />
|
|
|
+
|
|
|
+ <xi:include href="common/Version.xml" xpointer="DateVer"
|
|
|
+ xmlns:xi="http://www.w3.org/2001/XInclude" />
|
|
|
+
|
|
|
+ <corpname>HPCC Systems<superscript>®</superscript></corpname>
|
|
|
+
|
|
|
+ <xi:include href="common/Version.xml" xpointer="Copyright"
|
|
|
+ xmlns:xi="http://www.w3.org/2001/XInclude" />
|
|
|
+
|
|
|
+ <mediaobject role="logo">
|
|
|
+ <imageobject>
|
|
|
+ <imagedata fileref="images/LN_Rightjustified.jpg" />
|
|
|
+ </imageobject>
|
|
|
+ </mediaobject>
|
|
|
+ </bookinfo>
|
|
|
+
|
|
|
+ <chapter>
|
|
|
+ <title>The Spark HPCC Systems Connector</title>
|
|
|
+
|
|
|
+ <sect1 id="overview" role="nobrk">
|
|
|
+ <title>Overview</title>
|
|
|
+
|
|
|
+ <para>The Spark-HPCCSystems Distributed Connector is a Java library that
|
|
|
+ facilitates access from a Spark cluster to data stored on an HPCC
|
|
|
+ Systems cluster. The connector library employs the standard HPCC Systems
|
|
|
+ remote file read facility to read data from either sequential or indexed
|
|
|
+ HPCC datasets.</para>
|
|
|
+
|
|
|
+ <para>The data on an HPCC cluster is partitioned horizontally, with data
|
|
|
+ on each cluster node. Once configured, the HPCC data is available for
|
|
|
+ reading in parallel by the Spark cluster.</para>
|
|
|
+
|
|
|
+ <para>In the GitHub repository (<ulink
|
|
|
+ url="https://github.com/hpcc-systems/Spark-HPCC">https://github.com/hpcc-systems/Spark-HPCC</ulink>)
|
|
|
+ you can find the source code and examples. There are several artifacts
|
|
|
+ in the DataAccess/src/main/java folder of primary interest. The
|
|
|
+ <emphasis>org.hpccsystems.spark.HpccFile</emphasis> class is the façade
|
|
|
+ of a file on an HPCC Cluster. The
|
|
|
+ <emphasis>org.hpccsystems.spark.HpccRDD</emphasis> is a resilient
|
|
|
+ distributed dataset derived from the data on the HPCC Cluster and is
|
|
|
+ created by the
|
|
|
+ <emphasis>org.hpccsystems.spark.HpccFile.getRDD</emphasis>(…) method.
|
|
|
+ The <emphasis>HpccFile</emphasis> class uses the
|
|
|
+ <emphasis>org.hpccsystems.spark.HpccDataframeFactory</emphasis> class to
|
|
|
+ construct a <emphasis>Dataset<Row></emphasis> object for the new
|
|
|
+ Spark interface.</para>
|
|
|
+
|
|
|
+ <para>There are several additional artifacts of some interest. The
|
|
|
+ <emphasis>org.hpccsystems.spark.ColumnPruner</emphasis> class is
|
|
|
+ provided to enable retrieving only the columns of interest from the HPCC
|
|
|
+ Cluster. The <emphasis>org.hpccsystems.spark.thor.FileFilter</emphasis>
|
|
|
+ class is provided to enable retrieving only records of interest from the
|
|
|
+ HPCC Cluster.</para>
|
|
|
+
|
|
|
+ <para>The git repository includes two examples under the
|
|
|
+ Examples/src/main/scala folder. The examples
|
|
|
+ (<emphasis>org.hpccsystems.spark_examples.Dataframe_Iris_LR</emphasis>
|
|
|
+ and <emphasis>org.hpccsystems.spark_examples.Iris_LR</emphasis>) are
|
|
|
+ Scala Objects with a main() function. Both examples use the classic Iris
|
|
|
+ dataset. The dataset can be obtained from a variety of sources,
|
|
|
+ including the HPCC-Systems/ecl-ml repository. IrisDs.ecl (can be found
|
|
|
+ under the ML/Tests/Explanatory folder: <ulink
|
|
|
+ url="https://github.com/hpcc-systems/ecl-ml/blob/master/ML/Tests/Explanatory/IrisDS.ecl">https://github.com/hpcc-systems/ecl-ml/blob/master/ML/Tests/Explanatory/IrisDS.ecl</ulink>)
|
|
|
+ can be executed to generate the Iris dataset in HPCC. A walk-through of
|
|
|
+ the examples is provided in the Examples section.</para>
|
|
|
+ </sect1>
|
|
|
+
|
|
|
+ <sect1 id="primary-classes">
|
|
|
+ <title>Primary Classes</title>
|
|
|
+
|
|
|
+ <para>The <emphasis>HpccFile</emphasis> class and the
|
|
|
+ <emphasis>HpccRDD</emphasis> classes are discussed in more detail below.
|
|
|
+ There are the primary classes used to access data from an HPCC Cluster.
|
|
|
+ The <emphasis>org.hpccsystems.spark.HpccDataframeFactory</emphasis>
|
|
|
+ class is employed by the
|
|
|
+ <emphasis>org.hpccsystems.spark.HpccFile</emphasis> class to produce a
|
|
|
+ <emphasis>Dataset<Row></emphasis> instance employing an
|
|
|
+ <emphasis>org.hpccsystems.spark.HpccRDD</emphasis> instance to generate
|
|
|
+ the <emphasis>Dataset<Row></emphasis> instance.</para>
|
|
|
+
|
|
|
+ <para>The <emphasis>org.hpccsystems.spark.HpccFile</emphasis> class has
|
|
|
+ several constructors. All of the constructors take information about the
|
|
|
+ Cluster and the name of the dataset of interest. The JAPI WS-Client
|
|
|
+ classes are used to access file detail information. A definition used to
|
|
|
+ select the columns to be returned and a definition to select the rows to
|
|
|
+ be returned could also be supplied. These are discussed in the
|
|
|
+ <emphasis>Additional Classes of Interest</emphasis> section below. The
|
|
|
+ class has two methods of primary interest: the
|
|
|
+ <emphasis>getRDD(…)</emphasis> method and the
|
|
|
+ <emphasis>getDataframe(…)</emphasis> method, which are illustrated in
|
|
|
+ the <emphasis>Example</emphasis> section.</para>
|
|
|
+
|
|
|
+ <para>The <emphasis>HpccFile</emphasis> class
|
|
|
+ <emphasis>getRecordDefinition()</emphasis> method can be used to
|
|
|
+ retrieve a definition of the file. The
|
|
|
+ <emphasis>getFileParts()</emphasis> method can be used to see how the
|
|
|
+ file is partitioned on the HPCC Cluster. These methods return the same
|
|
|
+ information as can be found on the ECL Watch dataset details page DEF
|
|
|
+ tab and the PARTS tab respectively.</para>
|
|
|
+
|
|
|
+ <para>The <emphasis>org.hpccsystems.spark.HpccRDD</emphasis> class
|
|
|
+ extends the <emphasis>RDD<Record></emphasis> templated class. The
|
|
|
+ class employs the <emphasis>org.hpccsystems.spark.HpccPart</emphasis>
|
|
|
+ class for the Spark partitions. The
|
|
|
+ <emphasis>org.hpccsystems.spark.Record</emphasis> class is used as the
|
|
|
+ container for the fields from the HPCC Cluster. The
|
|
|
+ <emphasis>Record</emphasis> class can create a <emphasis>Row</emphasis>
|
|
|
+ instance with a schema.</para>
|
|
|
+
|
|
|
+ <para>The <emphasis>HpccRDD</emphasis> HpccPart partition objects each
|
|
|
+ read blocks of data from the HPCC Cluster independently from each other.
|
|
|
+ The initial read fetches the first block of data, requests the second
|
|
|
+ block of data, and returns the first record. When the block is
|
|
|
+ exhausted, the next block should be available on the socket and new read
|
|
|
+ request is issued.</para>
|
|
|
+ </sect1>
|
|
|
+
|
|
|
+ <sect1 id="additional-classes-of-interest">
|
|
|
+ <title>Additional Classes of interest</title>
|
|
|
+
|
|
|
+ <para>The main classes of interest for this section are column pruning
|
|
|
+ and file filtering. In addition there is a helper class to remap IP
|
|
|
+ information when required, and this is also discussed below.</para>
|
|
|
+
|
|
|
+ <para>The column selection information is provided as a string to the
|
|
|
+ <emphasis>org.hpccsystems.spark.ColumnPruner</emphasis> object. The
|
|
|
+ string is a list of comma separated field names. A field of interest
|
|
|
+ could contain a row or child dataset, and the dotted name notation is
|
|
|
+ used to support the selection of individual child fields. The
|
|
|
+ <emphasis>ColumnPruner</emphasis> parses the string into a root
|
|
|
+ <emphasis>TargetColumn</emphasis> class instance which holds the top
|
|
|
+ level target columns. A <emphasis>TargetColumn</emphasis> can be a
|
|
|
+ simple field or can be a child dataset and so be a root object for the
|
|
|
+ child record layout.</para>
|
|
|
+
|
|
|
+ <para>The row filter is implemented in the
|
|
|
+ <emphasis>org.hpccsystems.spark.thor.FileFilter</emphasis> class. A
|
|
|
+ <emphasis>FileFilter</emphasis> instance is constricted from an array of
|
|
|
+ <emphasis>org.hpccsystems.spark.thor.FieldFilter</emphasis> objects.
|
|
|
+ Each <emphasis>FieldFilter</emphasis> instance is composed of a field
|
|
|
+ name (in doted notation for compound names) and an array of
|
|
|
+ <emphasis>org.hpccsystems.spark.thor.FieldFilterRange</emphasis>
|
|
|
+ objects. Each <emphasis>FieldFilterRange</emphasis> instance can be an
|
|
|
+ open or closed interval or a single value. The record is selected when
|
|
|
+ at least one <emphasis>FieldFilterRange</emphasis> matches for each of
|
|
|
+ the <emphasis>FieldFilter</emphasis> instances in the array.</para>
|
|
|
+
|
|
|
+ <para>The <emphasis>FieldFilterRange</emphasis> values may be either
|
|
|
+ strings or numbers. There are methods provided to construct the
|
|
|
+ following range tests: equals, not equals, less than, less than or
|
|
|
+ equals, greater than, and a greater than or equals. In addition, a set
|
|
|
+ inclusion test is supported for strings. If the file is an index, the
|
|
|
+ filter fields that are key fields are used for an index lookup. Any
|
|
|
+ filter field unmentioned is treated as wild.</para>
|
|
|
+
|
|
|
+ <para>The usual deployment architecture for HPCC Clusters consists of a
|
|
|
+ collection of nodes on a network. The file management information
|
|
|
+ includes the IP addresses of the nodes that hold the partitions of the
|
|
|
+ file. The Spark-HPCC connector classes use these IP addresses to
|
|
|
+ establish socket connections for the remote read. An HPCC Cluster may be
|
|
|
+ deployed as a virtual cluster with private IP addresses. This works for
|
|
|
+ the cluster components because they are all on the same private LAN.
|
|
|
+ However, the Spark cluster nodes might not be on that same LAN. In this
|
|
|
+ case, the <emphasis>org.hpccsystems.spark.RemapInfo</emphasis> class is
|
|
|
+ used to define the information needed to change the addressing. There
|
|
|
+ are two options that can be used. The first option is that each Thor
|
|
|
+ slave node can be assigned an IP that is visible to the Spark cluster.
|
|
|
+ These addresses must be a contiguous range. The second option is to
|
|
|
+ assign an IP and a contiguous range of port numbers. The
|
|
|
+ <emphasis>RemapInfo</emphasis> object is supplied as a parameter.</para>
|
|
|
+ </sect1>
|
|
|
+
|
|
|
+ <sect1 id="examples">
|
|
|
+ <title>Examples</title>
|
|
|
+
|
|
|
+ <para>We will walk through the two examples below utilizing a Spark
|
|
|
+ environment. Additionally, the repository provides testing programs (in
|
|
|
+ the DataAccess/src/test folder) that can be executed as stand-alone
|
|
|
+ examples. </para>
|
|
|
+
|
|
|
+ <para>These test programs are intended to be run from a development IDE
|
|
|
+ such as Eclipse whereas the examples below are dependent on the Spark
|
|
|
+ shell.</para>
|
|
|
+
|
|
|
+ <sect2 id="iris_lr">
|
|
|
+ <title>Iris_LR</title>
|
|
|
+
|
|
|
+ <para>The Iris_LR example assumes a Spark Shell. You can use the
|
|
|
+ spark-submit command as well. The Spark shell command will need a
|
|
|
+ parameter for the location of the JAPI Jar and the Spark-HPCC
|
|
|
+ Jar.</para>
|
|
|
+
|
|
|
+ <programlisting> spark-shell –-jars /home/my_name/Spark-HPCC.jar,/home/my_name/japi.jar</programlisting>
|
|
|
+
|
|
|
+ <para>This brings up the Spark shell with the two jars pre-pended to
|
|
|
+ the class path. The first thing needed is to establish the imports for
|
|
|
+ the classes.</para>
|
|
|
+
|
|
|
+ <programlisting> import org.hpccsystems.spark.HpccFile
|
|
|
+ import org.hpccsystems.spark.HpccRDD
|
|
|
+ import org.apache.spark.mllib.regression.LabeledPoint
|
|
|
+ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
|
|
|
+ import org.apache.spark.mllib.evaluation.MulticlassMetrics</programlisting>
|
|
|
+
|
|
|
+ <para>The next step is to establish your HpccFile and your RDD for
|
|
|
+ that file. You need the name of the file, the protocol (http or
|
|
|
+ https), the name or IP of the ESP, the port for the ESP (usually
|
|
|
+ 8010), and your user account and password. The <emphasis>sc</emphasis>
|
|
|
+ value is the <emphasis>SparkContext</emphasis> object provided by the
|
|
|
+ shell.</para>
|
|
|
+
|
|
|
+ <programlisting> val hpcc = new HpccFile("my_data", "http", "my_esp", "8010", "x", "*", "")
|
|
|
+ val myRDD = hpcc.getRDD(sc)</programlisting>
|
|
|
+
|
|
|
+ <para>Now we have an RDD of the data. Nothing has actually happened at
|
|
|
+ this point because Spark performs lazy evaluation and there is nothing
|
|
|
+ yet to trigger an evaluation.</para>
|
|
|
+
|
|
|
+ <para>The Spark MLLib has a Logistic Regression package. The MLLib
|
|
|
+ Logistic Regression expects that the data will be provided as Labeled
|
|
|
+ Point formatted records. This is common for supervised training
|
|
|
+ implementations in MLLib. We need column labels, so we make an array
|
|
|
+ of names. We then make a labeled point RDD from our RDD. This is also
|
|
|
+ just a definition. Finally, we define the Logistic Regression that we
|
|
|
+ want to run. The column names are the field names in the ECL record
|
|
|
+ definition for the file, including the name “class” which is the name
|
|
|
+ of the field holding the classification code.</para>
|
|
|
+
|
|
|
+ <programlisting> val names = Array("petal_length","petal_width", "sepal_length",
|
|
|
+ "sepal_width")
|
|
|
+ var lpRDD = myRDD.makeMLLibLabeledPoint("class", names)
|
|
|
+ val lr = new LogisticRegressionWithLBFGS().setNumClasses(3)</programlisting>
|
|
|
+
|
|
|
+ <para>The next step is to define the model, which is an action and
|
|
|
+ will cause Spark to evaluate the definitions.</para>
|
|
|
+
|
|
|
+ <programlisting> val iris_model = lr.run(lpRDD)</programlisting>
|
|
|
+
|
|
|
+ <para>Now we have a model. We will utilize this model to take our
|
|
|
+ original dataset and use the model to produce new labels. The correct
|
|
|
+ way to do this is to have randomly sampled some hold out data. We are
|
|
|
+ just going to use the original dataset because it is easier to show
|
|
|
+ how to use the connector. We then take our original data and use a map
|
|
|
+ function defined inline to create a new record with our prediction
|
|
|
+ value and the original classification.</para>
|
|
|
+
|
|
|
+ <programlisting> val predictionAndLabel = lpRDD.map {
|
|
|
+ case LabeledPoint(label, features) =>
|
|
|
+ val prediction = iris_model.predict(features)
|
|
|
+ (prediction, label)
|
|
|
+ }</programlisting>
|
|
|
+
|
|
|
+ <para>The <emphasis>MulticlassMetrics</emphasis> class can now be used
|
|
|
+ to produce a confusion matrix.</para>
|
|
|
+
|
|
|
+ <programlisting> val metrics = new MulticlassMetrics(predictionAndLabel)
|
|
|
+ metrics.confusionMatrix</programlisting>
|
|
|
+ </sect2>
|
|
|
+
|
|
|
+ <sect2 id="dataframe_iris_lr">
|
|
|
+ <title>Dataframe_Iris_LR</title>
|
|
|
+
|
|
|
+ <para>The Dataframe_Iris_LR is similar to the Iris_LR, except that a
|
|
|
+ Dataframe is used and the new ML Spark classes are used instead of the
|
|
|
+ old MLLib classes. Since ML is not completely done, we do fall back to
|
|
|
+ an MLLib class to create our confusion matrix.</para>
|
|
|
+
|
|
|
+ <para>Once the Spark shell is brought up, we need our import
|
|
|
+ classes.</para>
|
|
|
+
|
|
|
+ <para><programlisting> import org.hpccsystems.spark.HpccFile
|
|
|
+ import org.apache.spark.sql.Dataset
|
|
|
+ import org.apache.spark.ml.feature.VectorAssembler
|
|
|
+ import org.apache.spark.ml.classification.LogisticRegression
|
|
|
+ import org.apache.spark.mllib.evaluation.MulticlassMetrics</programlisting></para>
|
|
|
+
|
|
|
+ <para>The next step is to establish the <emphasis>HpccFile</emphasis>
|
|
|
+ object and create the Dataframe. The <emphasis>spark</emphasis> value
|
|
|
+ is a <emphasis>SparkSession</emphasis> object supplied by the shell
|
|
|
+ and is used instead of the <emphasis>SparkContext</emphasis>
|
|
|
+ object.</para>
|
|
|
+
|
|
|
+ <programlisting> val hpcc = new HpccFile("my_data", "http", "my_esp", "8010", "x", "*", "")
|
|
|
+ val mt_df = hpcc.getDataframe(spark)</programlisting>
|
|
|
+
|
|
|
+ <para>The Spark <emphasis>ml</emphasis> Machine Learning classes use
|
|
|
+ different data container classes. In the case of Logistic Regression,
|
|
|
+ we need to transform our data rows into a row with a column named
|
|
|
+ “features” holding the features and a column named “label” holding the
|
|
|
+ classification label. Recall that our row has “class”, “sepal_width”,
|
|
|
+ “sepal_length”, “petal_width”, and “petal_length” as the column names.
|
|
|
+ This kind of transformation can be accomplished with a
|
|
|
+ <emphasis>VectorAssembler</emphasis> class.</para>
|
|
|
+
|
|
|
+ <programlisting> val assembler = new VectorAssembler()
|
|
|
+ assembler.setInputCols(Array("petal_length","petal_width",
|
|
|
+ "sepal_length", "sepal_width"))
|
|
|
+ assembler.setOutputCol("features")
|
|
|
+ val iris_fv = assembler.transform(my_df)
|
|
|
+ .withColumnRenamed("class", "label")</programlisting>
|
|
|
+
|
|
|
+ <para>Now that the data (<emphasis>iris_fv</emphasis>) is ready, we
|
|
|
+ define our model and fit the data.</para>
|
|
|
+
|
|
|
+ <programlisting> val lr = new LogisticRegression()
|
|
|
+ val iris_model = lr.fit(iris_fv)</programlisting>
|
|
|
+
|
|
|
+ <para>We know want to apply our prediction and evaluate the results.
|
|
|
+ As noted before, we would use a holdout dataset to perform the
|
|
|
+ evaluation. We are going to be lazy and just use the original data to
|
|
|
+ avoid the sampling task. We use the <emphasis>transform(…)</emphasis>
|
|
|
+ function for the model to add the prediction. The function adds a
|
|
|
+ column named “prediction” and defines a new dataset. The new Machine
|
|
|
+ Learning implementation lacks a metrics capability to produce a
|
|
|
+ confusion matrix, so we will then take our dataset with the
|
|
|
+ <emphasis>prediction</emphasis> column and create a new RDD with a
|
|
|
+ dataset for a <emphasis>MulticlassMetrics</emphasis> class.</para>
|
|
|
+
|
|
|
+ <programlisting> val with_preds = iris_model.transform(iris_fv)
|
|
|
+ val predictionAndLabel = with_preds.rdd.map(
|
|
|
+ r => (r.getDouble(r.fieldIndex("prediction")),
|
|
|
+ r.getDouble(r.fieldIndex("label"))))
|
|
|
+ val metrics = new MulticlassMetrics(predictionAndLabel)
|
|
|
+ metrics.confusionMatrix</programlisting>
|
|
|
+ </sect2>
|
|
|
+ </sect1>
|
|
|
+ </chapter>
|
|
|
+</book>
|