add job files and Dockerfiles

This commit is contained in:
Rob Genova
2017-06-24 16:28:12 -07:00
parent 547e7faae6
commit 68b97da606
6 changed files with 206 additions and 63 deletions

View File

@@ -1,104 +1,88 @@
# Nomad / Spark integration
We maintain a fork of Apache Spark that natively supports using a Nomad cluster to run Spark applications. When running on Nomad, the Spark executors that run Spark tasks for your application, and optionally the application driver itself, run as Nomad tasks in a Nomad job. See the [usage guide](./RunningSparkOnNomad.pdf) for more details.
The Nomad ecosystem includes a fork of Apache Spark that natively supports using a Nomad cluster to run Spark applications. When running on Nomad, the Spark executors that run Spark tasks for your application, and optionally the application driver itself, run as Nomad tasks in a Nomad job. See the [usage guide](./RunningSparkOnNomad.pdf) for more details.
To give the Spark integration a test drive `cd` to `examples/spark/spark` on one of the servers (the `examples/spark/spark` subdirectory will be created when the cluster is provisioned).
Clusters provisioned with Nomad's Terraform templates are automatically configured to run the Spark integration. The sample job files found here are also provisioned onto every client and server.
A number of sample Spark commands are listed below. These demonstrate some of the official examples as well as features like `spark-sql`, `spark-shell` and dataframes.
## Setup
You can monitor Nomad status simulaneously with:
To give the Spark integration a test drive, provision a cluster and SSH to any one of the clients or servers (the public IPs are displayed when the Terraform provisioning process completes):
```bash
$ nomad status
$ nomad status [JOB_ID]
$ nomad alloc-status [ALLOC_ID]
$ ssh -i /path/to/key ubuntu@PUBLIC_IP
```
## Sample Spark commands
### SparkPi
Java (client mode)
The Spark history server and several of the sample Spark jobs below require HDFS. Using the included job file, deploy an HDFS cluster on Nomad:
```bash
$ ./bin/spark-submit --class org.apache.spark.examples.JavaSparkPi --master nomad --conf spark.executor.instances=8 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz examples/jars/spark-examples*.jar 100
$ cd $HOME/examples/spark
$ nomad run hdfs.nomad
$ nomad status hdfs
```
Java (cluster mode)
When the allocations are all in the `running` state (as shown by `nomad status hdfs`), query Consul to verify that the HDFS service has been registered:
```bash
$ ./bin/spark-submit --class org.apache.spark.examples.JavaSparkPi --master nomad --deploy-mode cluster --conf spark.executor.instances=4 --conf spark.nomad.cluster.monitorUntil=complete --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz https://s3.amazonaws.com/rcgenova-nomad-spark/spark-examples_2.11-2.1.0-SNAPSHOT.jar 100
$ dig hdfs.service.consul
```
Python (client mode)
Next, create directories and files in HDFS for use by the history server and the sample Spark jobs:
```bash
$ ./bin/spark-submit --master nomad --conf spark.executor.instances=8 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz examples/src/main/python/pi.py 100
$ hdfs dfs -mkdir /foo
$ hdfs dfs -put /var/log/apt/history.log /foo
$ hdfs dfs -mkdir /spark-events
$ hdfs dfs -ls /
```
Python (cluster mode)
Finally, deploy the Spark history server:
```bash
$ ./bin/spark-submit --master nomad --deploy-mode cluster --conf spark.executor.instances=4 --conf spark.nomad.cluster.monitorUntil=complete --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz examples/src/main/python/pi.py 100
$ cd $HOME/examples/spark
$ nomad run spark-history-server-hdfs.nomad
```
Scala, (client mode)
You can find the private IP for the service with a Consul DNS lookup:
```bash
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master nomad --conf spark.executor.instances=8 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz examples/jars/spark-examples*.jar 100
$ dig spark-history.service.consul
```
### Machine Learning
Cross-reference the private IP with the `terraforom apply` output to get the corresponding public IP. You can access the history server at http://PUBLIC_IP:18080
Python (client mode)
## Sample Spark jobs
A number of sample spark-submit commands are listed below that demonstrate several of the official Spark examples. Features like `spark-sql`, `spark-shell` and pyspark are included as well. The commands can be executed from any client or server.
### SparkPi (Java)
```bash
$ ./bin/spark-submit --master nomad --conf spark.executor.instances=8 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz examples/src/main/python/ml/logistic_regression_with_elastic_net.py
spark-submit --class org.apache.spark.examples.JavaSparkPi --master nomad --deploy-mode cluster --conf spark.executor.instances=4 --conf spark.nomad.cluster.monitorUntil=complete --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://hdfs.service.consul/spark-events --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz https://s3.amazonaws.com/rcgenova-nomad-spark/spark-examples_2.11-2.1.0-SNAPSHOT.jar 100
```
Scala (client mode)
### Word count (Java)
```bash
$ ./bin/spark-submit --class org.apache.spark.examples.SparkLR --master nomad --conf spark.executor.instances=8 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz examples/jars/spark-examples*.jar
spark-submit --class org.apache.spark.examples.JavaWordCount --master nomad --deploy-mode cluster --conf spark.executor.instances=4 --conf spark.nomad.cluster.monitorUntil=complete --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://hdfs.service.consul/spark-events --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz https://s3.amazonaws.com/rcgenova-nomad-spark/spark-examples_2.11-2.1.0-SNAPSHOT.jar hdfs://hdfs.service.consul/foo/history.log
```
### Streaming
Run these commands simultaneously:
### DFSReadWriteTest (Scala)
```bash
$ bin/spark-submit --class org.apache.spark.examples.streaming.clickstream.PageViewGenerator --master nomad --deploy-mode cluster --conf spark.executor.instances=4 --conf spark.nomad.cluster.monitorUntil=complete --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz https://s3.amazonaws.com/rcgenova-nomad-spark/spark-examples_2.11-2.1.0-SNAPSHOT.jar 44444 10
```
```bash
$ bin/spark-submit --class org.apache.spark.examples.streaming.clickstream.PageViewStream --master nomad --deploy-mode cluster --conf spark.executor.instances=4 --conf spark.nomad.cluster.monitorUntil=complete --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz https://s3.amazonaws.com/rcgenova-nomad-spark/spark-examples_2.11-2.1.0-SNAPSHOT.jar errorRatePerZipCode localhost 44444
```
### pyspark
```bash
$ ./bin/pyspark --master nomad --conf spark.executor.instances=8 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz
```
```bash
$ df = spark.read.json("examples/src/main/resources/people.json")
$ df.show()
$ df.printSchema()
$ df.createOrReplaceTempView("people")
$ sqlDF = spark.sql("SELECT * FROM people")
$ sqlDF.show()
spark-submit --class org.apache.spark.examples.DFSReadWriteTest --master nomad --deploy-mode cluster --conf spark.executor.instances=4 --conf spark.nomad.cluster.monitorUntil=complete --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://hdfs.service.consul/spark-events --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz https://s3.amazonaws.com/rcgenova-nomad-spark/spark-examples_2.11-2.1.0-SNAPSHOT.jar /home/ubuntu/.bashrc hdfs://hdfs.service.consul/foo
```
### spark-shell
Start the shell:
```bash
$ ./bin/spark-shell --master nomad --conf spark.executor.instances=8 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz
spark-shell --master nomad --conf spark.executor.instances=4 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz
```
From spark-shell:
Run a few commands:
```bash
$ :type spark
$ spark.version
$ val data = 1 to 10000
@@ -106,33 +90,46 @@ $ val distData = sc.parallelize(data)
$ distData.filter(_ < 10).collect()
```
### spark-sql
### sql-shell
Start the shell:
```bash
$ bin/spark-sql --master nomad --conf spark.executor.instances=8 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz jars/spark-sql_2.11-2.1.0-SNAPSHOT.jar
spark-sql --master nomad --conf spark.executor.instances=4 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz jars/spark-sql_2.11-2.1.0-SNAPSHOT.jar
```
From spark-shell:
Run a few commands:
```bash
CREATE TEMPORARY VIEW usersTable
$ CREATE TEMPORARY VIEW usersTable
USING org.apache.spark.sql.parquet
OPTIONS (
path "examples/src/main/resources/users.parquet"
path "/usr/local/bin/spark/examples/src/main/resources/users.parquet"
);
SELECT * FROM usersTable;
$ SELECT * FROM usersTable;
```
### Data frames
### pyspark
Start the shell:
```bash
$ bin/spark-shell --master nomad --conf spark.executor.instances=8 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz
pyspark --master nomad --conf spark.executor.instances=4 --conf spark.nomad.sparkDistribution=https://s3.amazonaws.com/rcgenova-nomad-spark/spark-2.1.0-bin-nomad-preview-6.tgz
```
From spark-shell:
Run a few commands:
```bash
$ val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
$ usersDF.select("name", "favorite_color").write.save("/tmp/namesAndFavColors.parquet")
$ df = spark.read.json("/usr/local/bin/spark/examples/src/main/resources/people.json")
$ df.show()
$ df.printSchema()
$ df.createOrReplaceTempView("people")
$ sqlDF = spark.sql("SELECT * FROM people")
$ sqlDF.show()
```

View File

@@ -0,0 +1,9 @@
FROM openjdk:7
ENV HADOOP_VERSION 2.7.3
RUN wget -O - http://apache.mirror.iphh.net/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz | tar xz -C /usr/local/
ENV HADOOP_PREFIX /usr/local/hadoop-$HADOOP_VERSION
ENV PATH $PATH:$HADOOP_PREFIX/bin
COPY core-site.xml $HADOOP_PREFIX/etc/hadoop/

View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hdfs.service.consul/</value>
</property>
</configuration>

View File

@@ -0,0 +1,7 @@
FROM openjdk:7-jre
RUN curl https://spark-nomad.s3.amazonaws.com/spark-2.1.1-bin-nomad.tgz | tar -xzC /tmp
RUN mv /tmp/spark* /opt/spark
ENV SPARK_HOME /opt/spark
ENV PATH $PATH:$SPARK_HOME/bin

View File

@@ -0,0 +1,78 @@
job "hdfs" {
datacenters = [ "dc1" ]
group "NameNode" {
constraint {
operator = "distinct_hosts"
value = "true"
}
task "NameNode" {
resources {
memory = 500
network {
port "ipc" {
static = "8020"
}
port "ui" {
static = "50070"
}
}
}
driver = "docker"
config {
image = "rcgenova/hadoop-2.7.3"
command = "bash"
args = [ "-c", "hdfs namenode -format && exec hdfs namenode -D fs.defaultFS=hdfs://${NOMAD_ADDR_ipc}/ -D dfs.permissions.enabled=false" ]
network_mode = "host"
port_map {
ipc = 8020
ui = 50070
}
}
service {
name = "hdfs"
port = "ipc"
}
}
}
group "DataNode" {
count = 3
constraint {
operator = "distinct_hosts"
value = "true"
}
task "DataNode" {
resources {
memory = 500
network {
port "data" {
static = "50010"
}
port "ipc" {
static = "50020"
}
port "ui" {
static = "50075"
}
}
}
driver = "docker"
config {
network_mode = "host"
image = "rcgenova/hadoop-2.7.3"
args = [ "hdfs", "datanode"
, "-D", "fs.defaultFS=hdfs://hdfs.service.consul/"
, "-D", "dfs.permissions.enabled=false"
]
port_map {
data = 50010
ipc = 50020
ui = 50075
}
}
}
}
}

View File

@@ -0,0 +1,44 @@
job "spark-history-server" {
datacenters = ["dc1"]
type = "service"
group "server" {
count = 1
task "history-server" {
driver = "docker"
config {
image = "barnardb/spark"
command = "/spark/spark-2.1.0-bin-nomad/bin/spark-class"
args = [ "org.apache.spark.deploy.history.HistoryServer" ]
port_map {
ui = 18080
}
network_mode = "host"
}
env {
"SPARK_HISTORY_OPTS" = "-Dspark.history.fs.logDirectory=hdfs://hdfs.service.consul/spark-events/"
"SPARK_PUBLIC_DNS" = "spark-history.service.consul"
}
resources {
cpu = 500
memory = 500
network {
mbits = 250
port "ui" {
static = 18080
}
}
}
service {
name = "spark-history"
tags = ["spark", "ui"]
port = "ui"
}
}
}
}