Apache Spark

Deep dive into the Java API
for developers

Alexandre DuBreuil

Alexandre DuBreuil

LesFurets.com

First independent insurance comparison website in France, launched in September 2012

A single website to compare hundreds of offers (car, motorcycle, housing, health, loan insurance)

3M quotes/year, 40% market share, 4M customers

22 Developers, 2 DevOps, 4 Architects

450k lines of code, 60k unit tests, 150 selenium tests

1 release per day

Currently migrating to a lambda architecture...

Apache Spark

the honeymoon

Apache Spark is a fast and general-purpose cluster computing system

It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs

It also supports a rich set of higher-level tools including SparkSQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for micro-batching

Easy to start with: Spark in a Scala notebook

In the notebook you can

  • - write statements in a REPL
  • - export the execution result in a presentable format
  • - show swag graphs with minimum effort
  • - start cloud instances on the fly (databricks' notebook)

Well it's pretty cool ...

... and we have lots of usage for it

  • - support our migration to lambda architecture
  • - on-demand KPI and performance reports
  • - business alerting with Spark Streaming
  • - classify users with Spark MLlib
  • - ...

... but we realise that we don't know how to write Scala

... and a notebook is fine for prototyping but it is not industrial at all

  • - code versioning -> git
  • - continuous integration -> jenkins
  • - unit tests -> JUnit
  • - reuse our code base -> UDF
  • - IDE -> Intellij / Eclipse

Apache Spark

in real life

Add the dependency in Maven

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.0.2</version>
</dependency>

The 2.11 in the artifactId means that Spark was compiled with Scala 2.11 (your Spark cluster will need to be started with same version to avoid serialization problems between the executors)

Also add the DataFrame API (aptly placed in the sql package)

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.0.2</version>
</dependency>

Pretty much all of those elements are imported with a maven dependency

The entry point is SparkSession

private static SparkSession spark = SparkSession.builder()
  .appName("LesFurets.com - Spark")
  .master("local[*]")
  .getOrCreate();

public static void main(String[] args) {
  spark.emptyDataFrame().show();
}

We call the machine that instantiates the SparkSession the driver, it contains the context and communicates with the cluster manager to launch the executions on the workers (or executors)

Apache Spark is a clustered engine that can start in 2 modes: local or standalone / cluster

  • - local: driver and worker on the same JVM
  • - standalone: driver and workers on separate JVMs

That means the jar containing your program is send by the cluster manager (Standalone, Apache Mesos, Hadoop YARN) to the workers and the datas are serialized between the JVMs

Implying : the workers don't have direct access to the driver's variables unless you explicitly broadcast them

Apache Spark

on the ferrets

Shall we live code a simple example? On a ferret dataset find the mean price, by product, for an insurer

DEMO TIME!
com.lesfurets.spark.examples.PricesRun

spark.udf()
     .register("readableProduct",
               (UDF1<String, String>) ProductMapper::english, 
               StringType);
Dataset<Row> averagePrice = prices
    .filter((FilterFunction<Row>) value ->
            value.<String>getAs("insurer")
                 .equals("COOL insurer"))
    .groupBy("product")
    .agg(avg("price").as("average"))
    .withColumn("readableProduct", 
                callUDF("readableProduct", col("product")))
    .orderBy(desc("average"));
averagePrice.show();

What is executed on the worker? And on the driver?

spark
    .read().csv(PATH)
    .filter((FilterFunction<Row>) value ->
            value.<String>getAs("insurer")
                 .equals("COOL insurer"))
    .groupBy("product")
    .agg(avg("price").as("average"))
    .withColumn("readableProduct",
                callUDF("readableProduct", col("product")))
    .orderBy(desc("average"))
    .show();

We call averagePrime.show() a terminal operation that will launch the calculation, the other operations are lazy (think Java 8 streams)

Between each step Spark might shuffle data between the workers

You can see data shuffling and execution plan in Spark UI

But what is that class called Dataset (also called DataFrame) we saw earlier?

A DataFrame is a typed and named column oriented distributed collection of data

From our SparkSession we get a Dataset<Row> (that is an untyped DataSet, also called DataFrame).

// Read a csv file with schema inference
Dataset<Row> data = spark.read()
        .option("inferSchema", true)
        .csv("data.csv");

DataFrame have a schema, even if their type argument is Row like Dataset<Row>

data.printSchema();
root
 |-- uid: string (nullable = true)
 |-- email_hash: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- heure: string (nullable = true)
 |-- module: string (nullable = true)

If you are using SparkSQL, you are also using DataFrame under the hood, and in both cases the execution plans are optimized by Catalyst

What about DataSet<Something>? You can get that from an untyped DataSet

Take Question a Java Bean that corresponds to a LesFurets form question

// Convert Dataset<Row> to Dataset<Question>
Dataset<Question> domainData = data
        .as(Encoders.bean(Question.class))

The DataSet is now typed with Question, on top of his existing schema

Spark 2.0 onwards: SparkSQL, DataFrames and DataSets represent the same component

Resilient Distributed Datasets (RDDs) are Spark's internal plumbing: no need to use them, unless you need to interact with legacy libraries or use low level functionalities (RDD#partitionBy)

// Get the dataset's underlying RDD
RDD<Question> rdd = domainData.rdd();
// RDD's Java API
JavaRDD<Question> javaRDD = domainData.javaRDD();

The interface between DataFrame and RDD is simple

Dataset<Row> dataFrame = spark.createDataFrame(rdd, structType);

Catalyst optimises the program's execution plan, viewable with: Dataset#explain

The generated code is optimized in many ways, this is the result of the Tungsten project (whole-stage codegen)

Apache Spark

unit tested

What if we test our code?

DEMO TIME!
com.lesfurets.spark.examples.PricesRunTest

@BeforeEach
public void before() { 
  List<Row> rows = Arrays.asList(
          RowFactory.create("F1", 50d, "COOL insurer"),
          RowFactory.create("F2", 100d, "COOL insurer"),
          RowFactory.create("F2", 70d, "COOL insurer"));

  StructField product = new StructField("product", IntegerType, ...);
  StructField price = new StructField("price", DoubleType,...);
  StructField insurer = new StructField("insurer", StringType,...);
  StructType structType = new StructType(
          new StructField[]{ product, price, insurer });

  prices = spark.createDataFrame(rows, structType);
}
@Test
public void should_average_price_return_correct_average() {
  Dataset<Row> averagePrice = PricesRun.averagePrice(prices);

  averagePrice.foreach((ForeachFunction<Row>) row 
                       -> assertNotNull(row.getAs("readableProduct")));

  assertEquals(2, averagePrice.count());
  assertEquals(1, averagePrice.first().getAs("product"));
  assertEquals("F2", averagePrice.first().getAs("readableProduct"));
  assertEquals(75, (double) averagePrice.first().<Double>getAs("average"));
}

Testing guidelines:

  • - test startup : start Spark workers before the tests and reuse them to save time (use our @SparkTest JUnit5 extension)
  • - test mode : use standalone mode when possible as it validates object serialization (you need to either have a test cluster, or start one with Maven before the test phase)

Apache Spark

Java VERSUS Scala

Is the Java API limited compared to Scala?

... yes, a little

- We'd like a Java notebook with a REPL (we can still prototype with Scala since it's the same API)

- The Java API is harder to learn since there's less documentation on it's usage (hence this talk)

- It's easy to do very verbose implementations

- Types... types everywhere (e.g. Encoders.STRING())

For example my first word count implementation...

Dataset<Row> wordCount = lines
  .flatMap((FlatMapFunction<Row, String>) row -> {
      String[] words = row.<String>getAs("line").split(" ");
      return asList(words).iterator();
  }, STRING())
  .map((MapFunction<String, Tuple2<String, Integer>>) word ->
          new Tuple2<>(word, 1), tuple(STRING(), INT()))
  .toDF("word", "count")
  .groupBy("word")
  .sum("count")
  .orderBy(desc("sum(count)"))

... I'm using flapMap and map with lambda as arguments (very useful but a bit verbose)

... that same word count can be written more concisely by knowing the API

Dataset<Row> wordCount = lines
  .select(split(col("lines"), " ").alias("words"))
  .select(explode(col("words")).alias("word"))
  .groupBy("word")
  .count()
  .orderBy(desc("count"));

... even if it feels a bit magical

Best tip of the month:

Most functions for select, map, flapMap, reduce, filter, etc., that you'll need are in org.apache.spark.sql.functions (like split and explode in previous slide)

Before writing a function by hand, check in that (non-documented) package

Unfortunately, Java 8 lambda usage has shortcomings, we need to cast them

For example, to get the last element in a group:

Dataset<Tuple2<String, QuoteJoin>> price = 
  quotes
    .groupByKey((MapFunction<QuoteJoin, String>)
      QuoteJoin::getUid, STRING())
    .reduceGroups((ReduceFunction<QuoteJoin>) (v1, v2) ->
      v1.getSnapshotId()
        .compareTo(v2.getSnapshotId()) > 0 ? v1 : v2);

However, these methods take Single Abstract Method interfaces (SAM Interfaces) as parameters, but can't be called directly because they are overloaded for the Scala calls. Last example should read:

Dataset<Tuple2<String, QuoteJoin>> price = 
  quotes
    .groupByKey(QuoteJoin::getUid, STRING())
    .reduceGroups((v1, v2) -> v1.getSnapshotId()
        .compareTo(v2.getSnapshotId()) > 0 ? v1 : v2);

This is a know problem that comes from bytecode compatibility between Scala and Java that is resolved in Scala 2.12. Spark's support of that Scala version is not trivial, see discussions SPARK-14220 and SPARK-14643.

Type serializer (org.apache.spark.sql.Encoders.*) are inferred in Scala, explicit in Java

Dataset<Tuple2<String, TarificationJoin>> tupleTarif = 
  tarification
    .groupByKey((MapFunction<TarificationJoin, String>)
      TarificationJoin::getOffreUid, STRING())
    .reduceGroups((ReduceFunction<TarificationJoin>) (v1, v2) ->
      v1.getSnapshotId()
        .compareTo(v2.getSnapshotId()) > 0 ? v1 : v2);

What we didn't talk about

  • Spark Streaming: similar API that is a little harder to use with Java
  • Cassandra Connector: (or HDFS) makes us use the RDD API (to give partitioning information, join datasets efficiently, etc.)
  • RDD API: is kind of a pain in Java, since it's Tuple oriented, which is difficult to use (but it can be avoided with the DataFrame API)

Apache Spark

Conclusion

Does Apache Spark fit in a Java ecosystem? Yes! Because

  • the unified DataFrame API is usable in Java
  • it's testable with any unit testing framework
  • it's launchable easily from Maven and your IDE
  • it's strongly typed and we like that... (?)
  • it integrates with our codebase
  • it supports declarative (Java 8!) style

Java API is coolest API

Ressources :

- Slides and code (with JUnit4 et JUnit5 Spark annotations)

https://github.com/lesfurets/lesfurets-conference

- (french) Articles about Spark's Java API and unit testing

https://beastie.lesfurets.com/articles

END