Wednesday, December 6, 2017

Hbase shell command - quickstart

  1. $ ./bin/hbase shell
    hbase(main):001:0>
  2. help
  3. You must specify the table name : 'test'
    the ColumnFamily name: 'cf'
    hbase(main):001:0> create 'test', 'cf'
  4. hbase(main):002:0> list 'test'
    
  5. hbase(main):003:0> put 'test', 'row1', 'cf:a', 'value1'
    hbase(main):004:0> put 'test', 'row2', 'cf:b', 'value2'
    hbase(main):005:0> put 'test', 'row3', 'cf:c', 'value3'
  6. hbase(main):006:0> scan 'test'
    ROW                                      COLUMN+CELL
     row1                                    column=cf:a, timestamp=1421762485768, value=value1
     row2                                    column=cf:b, timestamp=1421762491785, value=value2
     row3                                    column=cf:c, timestamp=1421762496210, value=value3
    3 row(s) in 0.0230 seconds
  7. hbase(main):007:0> get 'test', 'row1'
    COLUMN                                   CELL
     cf:a                                    timestamp=1421762485768, value=value1
    1 row(s) in 0.0350 seconds
  8. hbase(main):008:0> disable 'test'
    hbase(main):009:0> enable 'test'
    
  9. hbase(main):011:0> drop 'test'
  10. quit

Tuesday, November 21, 2017

HBase shell commands



As told in HBase introduction, HBase provides Extensible jruby-based (JIRB) shell as a feature to execute some commands(each command represents one functionality).
HBase shell commands are mainly categorized into 6 parts
1) General  HBase shell commands
status Show cluster status. Can be ‘summary’, ‘simple’, or ‘detailed’. The
default is ‘summary’.
hbase> status
hbase> status ‘simple’
hbase> status ‘summary’
hbase> status ‘detailed’
version Output this HBase versionUsage:
hbase> version
whoami Show the current hbase user.Usage:
hbase> whoami
2) Tables Management commands
alter Alter column family schema; pass table name and a dictionary
specifying new column family schema. Dictionaries are described
on the main help command output. Dictionary must include name
of column family to alter.For example, to change or add the ‘f1’ column family in table ‘t1’ from
current value to keep a maximum of 5 cell VERSIONS, do:
hbase> alter ‘t1’, NAME => ‘f1’, VERSIONS => 5
You can operate on several column families:
hbase> alter ‘t1’, ‘f1’, {NAME => ‘f2’, IN_MEMORY => true}, {NAME => ‘f3’, VERSIONS => 5}
To delete the ‘f1’ column family in table ‘t1’, use one of:hbase> alter ‘t1’, NAME => ‘f1’, METHOD => ‘delete’
hbase> alter ‘t1’, ‘delete’ => ‘f1’
You can also change table-scope attributes like MAX_FILESIZE, READONLY,
MEMSTORE_FLUSHSIZE, DEFERRED_LOG_FLUSH, etc. These can be put at the end;
for example, to change the max size of a region to 128MB, do:
hbase> alter ‘t1’, MAX_FILESIZE => ‘134217728’
You can add a table coprocessor by setting a table coprocessor attribute:
hbase> alter ‘t1’,
‘coprocessor’=>’hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2
Since you can have multiple coprocessors configured for a table, a
sequence number will be automatically appended to the attribute name
to uniquely identify it.
The coprocessor attribute must match the pattern below in order for
the framework to understand how to load the coprocessor classes:
[coprocessor jar file location] | class name | [priority] | [arguments]
You can also set configuration settings specific to this table or column family:
hbase> alter ‘t1’, CONFIGURATION => {‘hbase.hregion.scan.loadColumnFamiliesOnDemand’ => ‘true’}
hbase> alter ‘t1’, {NAME => ‘f2’, CONFIGURATION => {‘hbase.hstore.blockingStoreFiles’ => ’10’}}
You can also remove a table-scope attribute:
hbase> alter ‘t1’, METHOD => ‘table_att_unset’, NAME => ‘MAX_FILESIZE’
hbase> alter ‘t1’, METHOD => ‘table_att_unset’, NAME => ‘coprocessor$1’
There could be more than one alteration in one command:
hbase> alter ‘t1’, { NAME => ‘f1’, VERSIONS => 3 },
{ MAX_FILESIZE => ‘134217728’ }, { METHOD => ‘delete’, NAME => ‘f2’ },
OWNER => ‘johndoe’, METADATA => { ‘mykey’ => ‘myvalue’ }
create Create table; pass table name, a dictionary of specifications per
column family, and optionally a dictionary of table configuration.
hbase> create ‘t1’, {NAME => ‘f1’, VERSIONS => 5}
hbase> create ‘t1’, {NAME => ‘f1’}, {NAME => ‘f2’}, {NAME => ‘f3’}
hbase> # The above in shorthand would be the following:
hbase> create ‘t1’, ‘f1’, ‘f2’, ‘f3’
hbase> create ‘t1’, {NAME => ‘f1’, VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true}
hbase> create ‘t1’, {NAME => ‘f1’, CONFIGURATION => {‘hbase.hstore.blockingStoreFiles’ => ’10’}}
Table configuration options can be put at the end.
describe Describe the named table.
hbase> describe ‘t1’
disable Start disable of named table
hbase> disable ‘t1’
disable_all Disable all of tables matching the given regex
hbase> disable_all ‘t.*’
is_disabled verifies Is named table disabled
hbase> is_disabled ‘t1’
drop  Drop the named table. Table must first be disabled
hbase> drop ‘t1’
drop_all Drop all of the tables matching the given regex
hbase> drop_all ‘t.*’
enable Start enable of named table
hbase> enable ‘t1’
enable_all Enable all of the tables matching the given regex
hbase> enable_all ‘t.*’
is_enabled verifies Is named table enabled
hbase> is_enabled ‘t1’
exists Does the named table exist
hbase> exists ‘t1’
list List all tables in hbase. Optional regular expression parameter could
be used to filter the output
hbase> list
hbase> list ‘abc.*’
show_filters Show all the filters in hbase.
hbase> show_filters
alter_status Get the status of the alter command. Indicates the number of regions of the table that have received the updated schema Pass table name.
hbase> alter_status ‘t1’
alter_async Alter column family schema, does not wait for all regions to receive the
schema changes. Pass table name and a dictionary specifying new column
family schema. Dictionaries are described on the main help command output.
Dictionary must include name of column family to alter.
To change or add the ‘f1’ column family in table ‘t1’ from defaults
to instead keep a maximum of 5 cell VERSIONS, do:hbase> alter_async ‘t1’, NAME => ‘f1’, VERSIONS => 5To delete the ‘f1’ column family in table ‘t1’, do:
hbase> alter_async ‘t1’, NAME => ‘f1’, METHOD => ‘delete’or a shorter version:hbase> alter_async ‘t1’, ‘delete’ => ‘f1’
You can also change table-scope attributes like MAX_FILESIZE
MEMSTORE_FLUSHSIZE, READONLY, and DEFERRED_LOG_FLUSH.
For example, to change the max size of a family to 128MB, do:
hbase> alter ‘t1’, METHOD => ‘table_att’, MAX_FILESIZE => ‘134217728’
There could be more than one alteration in one command:
hbase> alter ‘t1’, {NAME => ‘f1’}, {NAME => ‘f2’, METHOD => ‘delete’}
To check if all the regions have been updated, use alter_status <table_name>
3) Data Manipulation commands  
count Count the number of rows in a table. Return value is the number of rows.
This operation may take a LONG time (Run ‘$HADOOP_HOME/bin/hadoop jar
hbase.jar rowcount’ to run a counting mapreduce job). Current count is shown
every 1000 rows by default. Count interval may be optionally specified. Scan
caching is enabled on count scans by default. Default cache size is 10 rows.
If your rows are small in size, you may want to increase this
parameter. Examples:hbase> count ‘t1’
hbase> count ‘t1’, INTERVAL => 100000
hbase> count ‘t1’, CACHE => 1000
hbase> count ‘t1’, INTERVAL => 10, CACHE => 1000
The same commands also can be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding commands would be:hbase> t.count
hbase> t.count INTERVAL => 100000
hbase> t.count CACHE => 1000
hbase> t.count INTERVAL => 10, CACHE => 1000
delete Put a delete cell value at specified table/row/column and optionally
timestamp coordinates. Deletes must match the deleted cell’s
coordinates exactly. When scanning, a delete cell suppresses older
versions. To delete a cell from ‘t1’ at row ‘r1’ under column ‘c1’
marked with the time ‘ts1’, do:
hbase> delete ‘t1’, ‘r1’, ‘c1’, ts1
The same command can also be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding command would be:hbase> t.delete ‘r1’, ‘c1’, ts1
deleteall Delete all cells in a given row; pass a table name, row, and optionally
a column and timestamp. Examples:hbase> deleteall ‘t1’, ‘r1’
hbase> deleteall ‘t1’, ‘r1’, ‘c1’
hbase> deleteall ‘t1’, ‘r1’, ‘c1’, ts1
The same commands also can be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding command would be:hbase> t.deleteall ‘r1’
hbase> t.deleteall ‘r1’, ‘c1’
hbase> t.deleteall ‘r1’, ‘c1’, ts1
get Get row or cell contents; pass table name, row, and optionally
a dictionary of column(s), timestamp, timerange and versions. Examples:
hbase> get ‘t1’, ‘r1’
hbase> get ‘t1’, ‘r1’, {TIMERANGE => [ts1, ts2]}
hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’}
hbase> get ‘t1’, ‘r1’, {COLUMN => [‘c1’, ‘c2’, ‘c3’]}
hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1}
hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMERANGE => [ts1, ts2], VERSIONS => 4}
hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1, VERSIONS => 4}
hbase> get ‘t1’, ‘r1’, {FILTER => “ValueFilter(=, ‘binary:abc’)”}
hbase> get ‘t1’, ‘r1’, ‘c1’
hbase> get ‘t1’, ‘r1’, ‘c1’, ‘c2’
hbase> get ‘t1’, ‘r1’, [‘c1’, ‘c2’]
Besides the default ‘toStringBinary’ format, ‘get’ also supports custom formatting by
column. A user can define a FORMATTER by adding it to the column name in the get
specification. The FORMATTER can be stipulated:1. either as a org.apache.hadoop.hbase.util.Bytes method name (e.g, toInt, toString)
2. or as a custom class followed by method name: e.g. ‘c(MyFormatterClass).format’.Example formatting cf:qualifier1 and cf:qualifier2 both as Integers:
hbase> get ‘t1’, ‘r1’ {COLUMN => [‘cf:qualifier1:toInt’,
‘cf:qualifier2:c(org.apache.hadoop.hbase.util.Bytes).toInt’] }
Note that you can specify a FORMATTER by column only (cf:qualifer). You cannot specify
a FORMATTER for all columns of a column family.The same commands also can be run on a reference to a table (obtained via get_table or
create_table). Suppose you had a reference t to table ‘t1’, the corresponding commands
would be:
hbase> t.get ‘r1’
hbase> t.get ‘r1’, {TIMERANGE => [ts1, ts2]}
hbase> t.get ‘r1’, {COLUMN => ‘c1’}
hbase> t.get ‘r1’, {COLUMN => [‘c1’, ‘c2’, ‘c3’]}
hbase> t.get ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1}
hbase> t.get ‘r1’, {COLUMN => ‘c1’, TIMERANGE => [ts1, ts2], VERSIONS => 4}
hbase> t.get ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1, VERSIONS => 4}
hbase> t.get ‘r1’, {FILTER => “ValueFilter(=, ‘binary:abc’)”}
hbase> t.get ‘r1’, ‘c1’
hbase> t.get ‘r1’, ‘c1’, ‘c2’
hbase> t.get ‘r1’, [‘c1’, ‘c2’]
get_counter Return a counter cell value at specified table/row/column coordinates.
A cell cell should be managed with atomic increment function oh HBase
and the data should be binary encoded. Example:
hbase> get_counter ‘t1’, ‘r1’, ‘c1’
The same commands also can be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding command would be:
hbase> t.get_counter ‘r1’, ‘c1’
incr Increments a cell ‘value’ at specified table/row/column coordinates.
To increment a cell value in table ‘t1’ at row ‘r1’ under column
‘c1’ by 1 (can be omitted) or 10 do:
hbase> incr ‘t1’, ‘r1’, ‘c1’
hbase> incr ‘t1’, ‘r1’, ‘c1’, 1
hbase> incr ‘t1’, ‘r1’, ‘c1’, 10
The same commands also can be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding command would be:hbase> t.incr ‘r1’, ‘c1’
hbase> t.incr ‘r1’, ‘c1’, 1
hbase> t.incr ‘r1’, ‘c1’, 10
put Put a cell ‘value’ at specified table/row/column and optionally
timestamp coordinates. To put a cell value into table ‘t1’ at
row ‘r1’ under column ‘c1’ marked with the time ‘ts1’, do:
hbase> put ‘t1’, ‘r1’, ‘c1’, ‘value’, ts1
The same commands also can be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding command would be:
hbase> t.put ‘r1’, ‘c1’, ‘value’, ts1
scan Scan a table; pass table name and optionally a dictionary of scanner
specifications. Scanner specifications may include one or more of:
TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, TIMESTAMP, MAXLENGTH,
or COLUMNS, CACHEIf no columns are specified, all columns will be scanned.
To scan all members of a column family, leave the qualifier empty as in
‘col_family:’.The filter can be specified in two ways:
1. Using a filterString – more information on this is available in the
Filter Language document attached to the HBASE-4176 JIRA
2. Using the entire package name of the filter.Some examples:hbase> scan ‘.META.’
hbase> scan ‘.META.’, {COLUMNS => ‘info:regioninfo’}
hbase> scan ‘t1’, {COLUMNS => [‘c1’, ‘c2’], LIMIT => 10, STARTROW => ‘xyz’}
hbase> scan ‘t1’, {COLUMNS => ‘c1’, TIMERANGE => [1303668804, 1303668904]}
hbase> scan ‘t1’, {FILTER => “(PrefixFilter (‘row2’) AND
(QualifierFilter (>=, ‘binary:xyz’))) AND (TimestampsFilter ( 123, 456))”}
hbase> scan ‘t1’, {FILTER =>
org.apache.hadoop.hbase.filter.ColumnPaginationFilter.new(1, 0)}
For experts, there is an additional option — CACHE_BLOCKS — which
switches block caching for the scanner on (true) or off (false). By
default it is enabled. Examples:hbase> scan ‘t1’, {COLUMNS => [‘c1’, ‘c2’], CACHE_BLOCKS => false}
Also for experts, there is an advanced option — RAW — which instructs the
scanner to return all cells (including delete markers and uncollected deleted
cells). This option cannot be combined with requesting specific COLUMNS.
Disabled by default. Example:
hbase> scan ‘t1’, {RAW => true, VERSIONS => 10}
Besides the default ‘toStringBinary’ format, ‘scan’ supports custom formatting
by column. A user can define a FORMATTER by adding it to the column name in
the scan specification. The FORMATTER can be stipulated:
1. either as a org.apache.hadoop.hbase.util.Bytes method name (e.g, toInt, toString)
2. or as a custom class followed by method name: e.g. ‘c(MyFormatterClass).format’.
Example formatting cf:qualifier1 and cf:qualifier2 both as Integers:
hbase> scan ‘t1’, {COLUMNS => [‘cf:qualifier1:toInt’,
‘cf:qualifier2:c(org.apache.hadoop.hbase.util.Bytes).toInt’] }
Note that you can specify a FORMATTER by column only (cf:qualifer). You cannot
specify a FORMATTER for all columns of a column family.
Scan can also be used directly from a table, by first getting a reference to a
table, like such:
hbase> t = get_table ‘t’
hbase> t.scan
Note in the above situation, you can still provide all the filtering, columns,
options, etc as described above.
truncate Disables, drops and recreates the specified table.
Examples:
hbase>truncate ‘t1’
4) HBase surgery tools
assign Assign a region. Use with caution. If region already assigned,
this command will do a force reassign. For experts only.
Examples:
hbase> assign ‘REGION_NAME’
balancer Trigger the cluster balancer. Returns true if balancer ran and was able to
tell the region servers to unassign all the regions to balance (the re-assignment itself is async).
Otherwise false (Will not run if regions in transition).
Examples:
hbase> balancer
balance_switch Enable/Disable balancer. Returns previous balancer state.
Examples:
hbase> balance_switch true
hbase> balance_switch false
close_region Close a single region. Ask the master to close a region out on the cluster
or if ‘SERVER_NAME’ is supplied, ask the designated hosting regionserver to
close the region directly. Closing a region, the master expects ‘REGIONNAME’
to be a fully qualified region name. When asking the hosting regionserver to
directly close a region, you pass the regions’ encoded name only. A region
name looks like this:TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396.The trailing period is part of the regionserver name. A region’s encoded name
is the hash at the end of a region name; e.g. 527db22f95c8a9e0116f0cc13c680396
(without the period). A ‘SERVER_NAME’ is its host, port plus startcode. For
example: host187.example.com,60020,1289493121758 (find servername in master ui
or when you do detailed status in shell). This command will end up running
close on the region hosting regionserver. The close is done without the
master’s involvement (It will not know of the close). Once closed, region will
stay closed. Use assign to reopen/reassign. Use unassign or move to assign
the region elsewhere on cluster. Use with caution. For experts only.
Examples:hbase> close_region ‘REGIONNAME’
hbase> close_region ‘REGIONNAME’, ‘SERVER_NAME’
compact Compact all regions in passed table or pass a region row
to compact an individual region. You can also compact a single column
family within a region.
Examples:
Compact all regions in a table:
hbase> compact ‘t1’
Compact an entire region:
hbase> compact ‘r1’
Compact only a column family within a region:
hbase> compact ‘r1’, ‘c1’
Compact a column family within a table:
hbase> compact ‘t1’, ‘c1’
flush Flush all regions in passed table or pass a region row to
flush an individual region. For example:hbase> flush ‘TABLENAME’
hbase> flush ‘REGIONNAME’
major_compact Run major compaction on passed table or pass a region row
to major compact an individual region. To compact a single
column family within a region specify the region name
followed by the column family name.
Examples:
Compact all regions in a table:
hbase> major_compact ‘t1’
Compact an entire region:
hbase> major_compact ‘r1’
Compact a single column family within a region:
hbase> major_compact ‘r1’, ‘c1’
Compact a single column family within a table:
hbase> major_compact ‘t1’, ‘c1’
move Move a region. Optionally specify target regionserver else we choose one
at random. NOTE: You pass the encoded region name, not the region name so
this command is a little different to the others. The encoded region name
is the hash suffix on region names: e.g. if the region name were
TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396. then
the encoded region name portion is 527db22f95c8a9e0116f0cc13c680396
A server name is its host, port plus startcode. For example:
host187.example.com,60020,1289493121758
Examples:hbase> move ‘ENCODED_REGIONNAME’
hbase> move ‘ENCODED_REGIONNAME’, ‘SERVER_NAME’
split Split entire table or pass a region to split individual region. With the
second parameter, you can specify an explicit split key for the region.
Examples:
split ‘tableName’
split ‘regionName’ # format: ‘tableName,startKey,id’
split ‘tableName’, ‘splitKey’
split ‘regionName’, ‘splitKey’
unassign Unassign a region. Unassign will close region in current location and then
reopen it again. Pass ‘true’ to force the unassignment (‘force’ will clear
all in-memory state in master before the reassign. If results in
double assignment use hbck -fix to resolve. To be used by experts).
Use with caution. For expert use only. Examples:hbase> unassign ‘REGIONNAME’
hbase> unassign ‘REGIONNAME’, true
hlog_roll Roll the log writer. That is, start writing log messages to a new file.
The name of the regionserver should be given as the parameter. A
‘server_name’ is the host, port plus startcode of a regionserver. For
example: host187.example.com,60020,1289493121758 (find servername in
master ui or when you do detailed status in shell)
hbase>hlog_roll
zk_dump Dump status of HBase cluster as seen by ZooKeeper. Example:
hbase>zk_dump

5) Cluster replication tools
add_peer Add a peer cluster to replicate to, the id must be a short and
the cluster key is composed like this:
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
This gives a full path for HBase to connect to another cluster.
Examples:hbase> add_peer ‘1’, “server1.cie.com:2181:/hbase
hbase> add_peer ‘2’, “zk1,zk2,zk3:2182:/hbase-prod”
remove_peer Stops the specified replication stream and deletes all the meta
information kept about it. Examples:
hbase> remove_peer ‘1’
list_peers List all replication peer clusters.
hbase> list_peers
enable_peer Restarts the replication to the specified peer cluster,
continuing from where it was disabled.Examples:
hbase> enable_peer ‘1’
disable_peer Stops the replication stream to the specified cluster, but still
keeps track of new edits to replicate.Examples:
hbase> disable_peer ‘1’
start_replication Restarts all the replication features. The state in which each
stream starts in is undetermined.
WARNING:
start/stop replication is only meant to be used in critical load situations.
Examples:
hbase> start_replication
stop_replication Stops all the replication features. The state in which each
stream stops in is undetermined.
WARNING:
start/stop replication is only meant to be used in critical load situations.
Examples:
hbase> stop_replication
6) Security tools
grant Grant users specific rights.
Syntax : grantpermissions is either zero or more letters from the set “RWXCA”.
READ(‘R’), WRITE(‘W’), EXEC(‘X’), CREATE(‘C’), ADMIN(‘A’)For example:hbase> grant ‘bobsmith’, ‘RWXCA’
hbase> grant ‘bobsmith’, ‘RW’, ‘t1’, ‘f1’, ‘col1’
revoke Revoke a user’s access rights.
Syntax : revoke
For example:
hbase> revoke ‘bobsmith’, ‘t1’, ‘f1’, ‘col1’
user_permission Show all permissions for the particular user.
Syntax : user_permission
For example:hbase> user_permission
hbase> user_permission ‘table1’

Command like SQL LIMIT in HBase

From the HBase shell you can use LIMIT:
hbase> scan 'test-table', {'LIMIT' => 5}
From the Java API you can use Scan.setMaxResultSize(N) or scan.setMaxResultsPerColumnFamily(N).

Get output from scans in hbase shell

I know that this post is quite old but i was searching something about HBase myself and came across with it.
Well i don't know if this is the best way to do it, but you can definitely use the scripting option HBase gives you. Just open a shell (preferably go to the directory bin of HBase) and run
echo "scan 'foo'" | ./hbase shell > myText
where foo is the name of the table you want to scan. If you then open myText you will see the results in there. Hope i helped!

Tuesday, October 24, 2017

Spark Programming Guide

Overview

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.
A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.
This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.

Linking with Spark

Spark 2.2.0 supports lambda expressions for concisely writing functions, otherwise you can use the classes in theorg.apache.spark.api.java.function package.
Note that support for Java 7 was removed in Spark 2.2.0.
To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at:
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0
In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS.
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
Finally, you need to import some Spark classes into your program. Add the following lines:
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

Initializing Spark

The first thing a Spark program must do is to create a JavaSparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.

Using the Shell

In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc. Making your own SparkContext will not work. You can set which master the context connects to using the --master argument, and you can add JARs to the classpath by passing a comma-separated list to the --jars argument. You can also add dependencies (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates to the --packages argument. Any additional repositories where dependencies might exist (e.g. Sonatype) can be passed to the --repositories argument. For example, to run bin/spark-shell on exactly four cores, use:
$ ./bin/spark-shell --master local[4]
Or, to also add code.jar to its classpath, use:
$ ./bin/spark-shell --master local[4] --jars code.jar
To include a dependency using Maven coordinates:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
For a complete list of options, run spark-shell --help. Behind the scenes, spark-shell invokes the more general spark-submit script.

Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

Parallelized Collections

Parallelized collections are created by calling JavaSparkContext’s parallelize method on an existing Collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
Once created, the distributed dataset (distData) can be operated on in parallel. For example, we might call distData.reduce((a, b) -> a + b)to add up the elements of the list. We describe operations on distributed datasets later on.
One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

External Datasets

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation:
JavaRDD<String> distFile = sc.textFile("data.txt");
Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduceoperations as follows: distFile.map(s -> s.length()).reduce((a, b) -> a + b).
Some notes on reading files with Spark:
  • If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
  • All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
  • The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
Apart from text files, Spark’s Java API also supports several other data formats:
  • JavaSparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
  • For SequenceFiles, use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file. These should be subclasses of Hadoop’s Writable interface, like IntWritable and Text.
  • For other Hadoop InputFormats, you can use the JavaSparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use JavaSparkContext.newAPIHadoopRDD for InputFormats based on the “new” MapReduce API (org.apache.hadoop.mapreduce).
  • JavaRDD.saveAsObjectFile and JavaSparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.

RDD Operations

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

Basics

To illustrate RDD basics, consider the simple program below:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.
If we also wanted to use lineLengths again later, we could add:
lineLengths.persist(StorageLevel.MEMORY_ONLY());
before the reduce, which would cause lineLengths to be saved in memory after the first time it is computed.

Passing Functions to Spark

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. In Java, functions are represented by classes implementing the interfaces in the org.apache.spark.api.java.function package. There are two ways to create such functions:
  • Implement the Function interfaces in your own class, either as an anonymous inner class or a named one, and pass an instance of it to Spark.
  • Use lambda expressions to concisely define an implementation.
While much of this guide uses lambda syntax for conciseness, it is easy to use all the same APIs in long-form. For example, we could have written our code above as follows:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});
Or, if writing the functions inline is unwieldy:
class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
Note that anonymous inner classes in Java can also access variables in the enclosing scope as long as they are marked final. Spark will ship copies of these variables to each worker node as it does for other languages.

Understanding closures 

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.

Example

Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in local mode (--master = local[n]) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

Local vs. cluster modes

The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.
The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.
In local mode, in some circumstances the foreach function will actually execute within the same JVM as the driver and will reference the same original counter, and may actually update it.
To ensure well-defined behavior in these sorts of scenarios one should use an Accumulator. Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.

Printing elements of an RDD

Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).

Working with Key-Value Pairs

While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements by a key.
In Java, key-value pairs are represented using the scala.Tuple2 class from the Scala standard library. You can simply call new Tuple2(a, b) to create a tuple, and access its fields later with tuple._1() and tuple._2().
RDDs of key-value pairs are represented by the JavaPairRDD class. You can construct JavaPairRDDs from JavaRDDs using special versions of the map operations, like mapToPair and flatMapToPair. The JavaPairRDD will have both standard RDD functions and special key-value ones.
For example, the following code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
We could also use counts.sortByKey(), for example, to sort the pairs alphabetically, and finally counts.collect() to bring them back to the driver program as an array of objects.
Note: when using custom objects as the key in key-value pair operations, you must be sure that a custom equals() method is accompanied with a matching hashCode() method. For full details, see the contract outlined in the Object.hashCode() documentation.

Transformations

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.
TransformationMeaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which funcreturns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacement, fraction, seed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R)
and pair RDD functions doc (Scala, Java) for details.
ActionMeaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path) 
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path) 
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
The Spark RDD API also exposes asynchronous versions of some actions, like foreachAsync for foreach, which immediately return a FutureAction to the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.

Shuffle operations

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

Background

To understand what happens during the shuffle we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to compute the result.
In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.
Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to use:
  • mapPartitions to sort each partition using, for example, .sorted
  • repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
  • sortBy to make a globally ordered RDD
Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Performance Impact

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.
Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.
Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, reduceByKey and aggregateByKey create these structures on the map side, and 'ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.
Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are preserved until the corresponding RDDs are no longer used and are garbage collected. This is done so the shuffle files don’t need to be re-created if the lineage is re-computed. Garbage collection may happen only after a long period of time, if the application retains references to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may consume a large amount of disk space. The temporary storage directory is specified by the spark.local.dirconfiguration parameter when configuring the Spark context.
Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the ‘Shuffle Behavior’ section within the Spark Configuration Guide.

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala,Java, Python) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:
Storage LevelMeaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER 
(Java and Scala)
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER 
(Java and Scala)
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.
Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level. The available storage levels in Python include MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, and DISK_ONLY_2.
Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

Which Storage Level to Choose?

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:
  • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
  • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)
  • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
  • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.

Removing Data

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

Shared Variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.
Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]
After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

Accumulators

Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.
As a user, you can create named or unnamed accumulators. As seen in the image below, a named accumulator (in this instance counter) will display in the web UI for the stage that modifies that accumulator. Spark displays the value for each accumulator modified by a task in the “Tasks” table.
Accumulators in the Spark UI
Tracking accumulators in the UI can be useful for understanding the progress of running stages (NOTE: this is not yet supported in Python).
A numeric accumulator can be created by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using the add method. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.
The code below shows an accumulator being used to add up the elements of an array:
LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10
While this code used the built-in support for accumulators of type Long, programmers can also create their own types by subclassing AccumulatorV2. The AccumulatorV2 abstract class has several methods which one has to override: reset for resetting the accumulator to zero, addfor adding another value into the accumulator, merge for merging another same-type accumulator into this one. Other methods that must be overridden are contained in the API documentation. For example, supposing we had a MyVector class representing mathematical vectors, we could write:
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {

  private MyVector myVector = MyVector.createZeroVector();

  public void reset() {
    myVector.reset();
  }

  public void add(MyVector v) {
    myVector.add(v);
  }
  ...
}

// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");
Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added.
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map(). The below code fragment demonstrates this property:
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

Deploying to a Cluster

The application submission guide describes how to submit applications to a cluster. In short, once you package your application into a JAR (for Java/Scala) or a set of .py or .zip files (for Python), the bin/spark-submit script lets you submit it to any supported cluster manager.

Launching Spark jobs from Java / Scala

The org.apache.spark.launcher package provides classes for launching Spark jobs as child processes using a simple Java API.

Unit Testing

Spark is friendly to unit testing with any popular unit test framework. Simply create a SparkContext in your test with the master URL set to local, run your operations, and then call SparkContext.stop() to tear it down. Make sure you stop the context within a finally block or the test framework’s tearDown method, as Spark does not support two contexts running concurrently in the same program.

Where to Go from Here

You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples directory (Scala,Java, Python, R). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example script; for instance:
./bin/run-example SparkPi
For Python examples, use spark-submit instead:
./bin/spark-submit examples/src/main/python/pi.py
For R examples, use spark-submit instead:
./bin/spark-submit examples/src/main/r/dataframe.R
For help on optimizing your programs, the configuration and tuning guides provide information on best practices. They are especially important for making sure that your data is stored in memory in an efficient format. For help on deploying, the cluster mode overview describes the components involved in distributed operation and supported cluster managers.
Finally, full API documentation is available in Scala, Java, Python and R.