This tutorial focuses on getting started with Apache Spark on AWS EMR. In addition to Apache Spark, it touches Apache Zeppelin and S3 Storage. Apache Spark is a distributed computation engine designed to be a flexible, scalable and for the most part, cost-effective solution for distributed computing. The data used for this tutorial is in an S3 bucket, and the Spark application we are creating will pull this data from that bucket. Finally, we are going to build and run the Spark application in Apache Zeppelin.
Launching Spark and Zeppelin on AWS EMR
AWS offers a service called Elastic Map Reduce or EMR to launch big data software across a cluster of ec2 instances. When you create an EMR cluster, you have an option to pick a collection of software that gets bootstrapped across a cluster of nodes. EMR executes step by step operations to configure those tools during a roll-out. Additional steps can be added which may include installing software or perform server operations across all nodes. Since we only need Spark, Hadoop, and Zeppelin to do our data exploration and analysis, the default configurations will work just fine for us.
To launch the service, login to your AWS console and search for EMR. Set the necessary service configurations to get started.
It may take a few minutes for all of those nodes to boot and get configured. Once the cluster enters the ready state, navigate to your Zeppelin notebook by clicking on the Zeppelin link in your cluster summary tab.
Zeppelin is a data science notebook that enables you to submit spark applications into the cluster without going through a terminal. It is possible to submit an application without Zeppelin by SSH into the master node and running
./spark-submit --blah --blah. But for data analysis and exploration purposes, it is more convenient to use the Zeppelin notebook. Just FYI, Zeppelin uses
./spark-submit underneath the hood each time an application is submitted.
Click on Zeppelin Notebook and create a new notebook. You're going to get prompted to enter a name for it; I suggest you call it something useful. I named mine Milo. There is not much going on in this empty notebook we just created. Before we can start doing any analysis, we need to load data from our S3 bucket into the spark memory cluster. The data file I'm going to analyze is a pipe delimited text file called
sale.txt, and it is in an S3 bucket called
biggerdatadog. That file contains sales of all dog houses in Florida.
Installing Spark Packages Through Zeppelin
To work with CSV files it would be in our advantage to use a Spark package designed specifically to handle them. A really good and reputable CSV package is called Spark-CSV. The source for that package can be found on Github here: https://github.com/databricks/spark-csv. Spark-CSV was created and open sourced by a company called Databricks which is the same company that founded Apache Spark.
To install Spark Packages through Zeppelin, navigate to the interpreter's page
And scroll down to the Spark section. Next to the
args option set the following command :
That command tells Zeppelin to submit the Spark application along with Spark-CSV package. Click save and now you can use that package in your notebook.
Working With Spark
Let's get started with our data analyses. The following query will load our CSV file into the memory cluster and print out the inferred data schema. I'm using Spark-CSV to perform schema inferencing so that I don't have to set the dataTypes manually.
val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") .option("delimiter","|") .load("s3n://biggerdatadog/sale.txt")
It appears that type inferencing had inferred the date as an integer type because the date format in the data file is a set of numbers i.e (20171010). To fix this, I explicitly cast that particular column as a dateType.
import org.apache.spark.sql.types._ val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") .option("delimiter","|") .load("s3n://biggerdatadog/sale.txt").withColumn("SaleDate", to_date($"SaleDate".cast("string"), "yyyyMMdd")) df.printSchema()
Lets count how many rows of data we have. Create a new paragraph and input the following:
Since this is dirty data, some rows are missing key pieces of information and we are better off filtering them out. I'm using the help of some imported functions
import org.apache.spark.sql.functions._ to do just that.
import org.apache.spark.sql.functions._ val notNullDF = df.filter($"SaleDate".isNotNull && length(trim($"SaleDate")) > 0) notNullDF.count()
Get Sale Total For Each Month
Let's find out what is the total of all sales per month.
To get this number, we need to group the rows by month and get the sum for each month.
val avgTable = notNullDF.groupBy(month($"SaleDate")).sum("Price") avgTable.show()
This gives us some meaningful insight. Let's chart those numbers so we have a visual illustration of the totals for each month. To create a chart from those results, we need to be stored the results in a temporary table. We can then query that table using spark SQL.
val avgTable = notNullDF.groupBy(month($"SaleDate")).sum("Price") avgTable.registerTempTable("Totals") avgTable.show()
%sql select * From Totals
Awesome right? You can now click through different chart options to select the visual that fits your data.