Apache Spark Basics with Scala

Prerequisite:

Dataset: Record Linkage Comparison Patterns Data Set

1. Download and Store

Download the dataset from the above repository and store it either on local filesystem or HDFS file system: Local File System:

HDFS File System:

2. Launch Spark

Launch spark on local cluster. It will start Scala REPL(Read-Execute-Print-Loop).

3. Spark Context

If you see the log carefully, you can find that it indicates that Spark context is available as sc. SparkContext coordinates the execution of Spark jobs on the cluster.

4. Methods for creating Resilient Distributed Datasets (RDDs)

RDD in spark is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat[1].

An RDD can be thought as collection or partitions, each partition is a subset of dataset. Execution of data in partition happens in parallel whereas execution within the partition happens in sequence.

The first parameter is the collection of objects which needs to be executed in parallel. The second parameter is the number of partitions. Execution / computations in spark happen lazily. At the time of actual execution, data / objects within the partition are retrieved from the driver process.

RDD can also be created from the text file or directory of text files residing on a file system (local OS file system) or HDFS.

5. Bringing Data from the Cluster to the Client:

  • first: returns the first element of the RDD into the client
  • take(n): to get the first n lines from the linkage data set
  • collect: to return all of the contents of an RDD to the client as an array.

6. Actions:

Just by creating RDD, distributed computation does not start on the cluster. In fact this happens on invoking actions on RDD. Example actions are:

  • count()
  • collect()
  • saveAsTextFile("hdfs://localhost:54310/user/ranjank/datasets/linkage1")

Making it easier to read:

We can observe that the dataset contains also the header. Let’s see how to remove the header from the data set. Let’s define a method for doing this task:

7. Structuring Data with Tuples and Case Classes

Till now, whatever data we have processed are in the form of array and RDD, which contains strings of comma-separated fields. Data in this form are difficult to analyse. We need to parse these string data into the structured format so that we can get different fields / columns / features into the correct data type.

Looking at the header and the data it seems that first two fields are two integer IDs and then an array of nine doubles representing the match scores and then a boolean field indicating if fields matched (NaN for missing values).

8. Caching

The content of an RDD is transient by default. Spark provides a way to persist the data contained in RDD. When an action is called on RDD a set of computations is done on cluster and contents are stored in memory or disk across clusters.

A call to cache instructs that RDD should be stored the next time it is computed.

rdd.cache() is same as calling rdd.persist(StorageLevel.MEMORY). By calling this, the RDD is stored as unserialized Java objects. When Spark finds that a partition will not fit in memory, it will simply not store it. RDD will be recomputed when next time it is needed. This is fine in cases where frequent and /or low-latency access is required because it avoids costly serialization. But the downside is that it consumes more memory than other alternatives.

When StorageLevel.MEMORY_SER storage level is used, large byte buffers in
memory are allocated and it serializes the RDD contents into them. When right format is used, the serialized data usually takes up two to five times less space than its raw equivalent.

Other storage levels are StorageLevel.MEMORY_AND_DISK, StorageLevel.MEMORY_AND_DISK_SER. These two storage levels are similar to above storage levels StorageLevel.MEMORY and StorageLevel.MEMORY_SER, except the fact that if partition does not fit in memory, it is simply spilled on disk.

9. Aggregation

Aggregation on local machine:

Note: For performing aggregations on data in the cluster it is good practice to do more filtering that we can do to the data before performing an aggregation.

10. Creating Histograms for Discrete Variables

Create histogram to count how many of the MatchData records in parsed have a value of true or false for the matched field.

11. Summary Statistics for Continuous Variables

For continuous variables, like the match scores for each of the fields in the patient records, get a basic set of statistics such as mean, standard deviation, and
extremal values like the maximum and minimum.

12. Creating Reusable Code for Computing Summary Statistics

Ref: 1, 2

Leave a Comment

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload CAPTCHA.

Fork me on GitHub