Apache spark rdd / dataframe re-calculation

package com.bawi.spark

import java.io.File
import java.nio.file.{Files, Paths}

import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object TestRecalculation {
  def main(args: Array[String]): Unit = {

    val output = if (args.length > 0) args(0) else "numbers"
    
    val sparkConf = new SparkConf().setAppName(getClass.getName)
    if (isLocal()) {
      sparkConf.setMaster("local[*]")
      val file = new File(output)
      if (file.exists())
        FileUtils.deleteDirectory(file)
    }

    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val sparkContext = sparkSession.sparkContext

    val rdd: RDD[Int] = sparkContext.parallelize(Seq(1, 2, 3))
    val rdd2: RDD[Int] = rdd.map(n => {
      println(s"processing: $n")
      n
    })
    import sparkSession.implicits._
    val df: DataFrame = rdd2.toDF("numbers")

    df.write.json(output)

    df.show() // re-calculates

    sparkSession.close()
  }

  def isMac(): Boolean = {
    if (System.getProperty("os.name").contains("Mac")) true else false
  }

  def isUbuntu(): Boolean = {
    if (System.getProperty("os.name").contains("Linux")){
      val path = Paths.get ("/etc/os-release")
      if (path.toFile.exists()) {
        new String(Files.readAllBytes(path)).toLowerCase.contains("Ubuntu")
      } else false
    } else false
  }

  def isLocal(): Boolean = {
    isMac() || isUbuntu()
  }
}

Output:

19/01/04 13:55:20 INFO SparkContext: Running Spark version 2.3.2
19/01/04 13:55:21 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/01/04 13:55:21 INFO SparkUI: Bound SparkUI to 127.0.0.1, and started at http://localhost:4040
19/01/04 13:55:21 INFO Executor: Starting executor ID driver on host localhost
19/01/04 13:55:24 INFO SparkContext: Starting job: json at TestRecalculation.scala:25
19/01/04 13:55:24 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
19/01/04 13:55:24 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
19/01/04 13:55:24 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/01/04 13:55:24 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
19/01/04 13:55:24 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
19/01/04 13:55:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/01/04 13:55:24 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
19/01/04 13:55:24 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
processing: 2
processing: 3
processing: 1
19/01/04 13:55:25 INFO SparkContext: Starting job: show at TestRecalculation.scala:27
19/01/04 13:55:25 INFO Executor: Running task 0.0 in stage 1.0 (TID 8)
19/01/04 13:55:25 INFO Executor: Running task 0.0 in stage 2.0 (TID 9)
19/01/04 13:55:25 INFO Executor: Running task 1.0 in stage 2.0 (TID 10)
19/01/04 13:55:25 INFO Executor: Running task 2.0 in stage 2.0 (TID 11)
19/01/04 13:55:25 INFO Executor: Running task 3.0 in stage 2.0 (TID 12)
processing: 1
19/01/04 13:55:25 INFO SparkContext: Starting job: show at TestRecalculation.scala:27
19/01/04 13:55:25 INFO Executor: Running task 1.0 in stage 3.0 (TID 14)
19/01/04 13:55:25 INFO Executor: Running task 0.0 in stage 3.0 (TID 13)
19/01/04 13:55:25 INFO Executor: Running task 2.0 in stage 3.0 (TID 15)
processing: 3
processing: 2
+-------+
|numbers|
+-------+
|      1|
|      2|
|      3|
+-------+
19/01/04 13:55:25 INFO SparkUI: Stopped Spark web UI at http://localhost:4040
19/01/04 13:55:25 INFO SparkContext: Successfully stopped SparkContext
Process finished with exit code 0

Note the numbers were processed twice.

 
Now with cached dataframe (before calling df.write):

    df.cache()

Output:

19/01/04 14:22:37 INFO SparkContext: Running Spark version 2.3.2
19/01/04 14:22:41 INFO SparkContext: Starting job: json at TestRecalculation.scala:37
19/01/04 14:22:41 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
19/01/04 14:22:41 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/01/04 14:22:41 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/01/04 14:22:41 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
19/01/04 14:22:41 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
19/01/04 14:22:41 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
19/01/04 14:22:41 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
19/01/04 14:22:41 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_0 stored as values in memory (estimated size 16.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_3 stored as values in memory (estimated size 16.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:63790 (size: 16.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_6 stored as values in memory (estimated size 16.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_1 stored as values in memory (estimated size 16.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_3 in memory on localhost:63790 (size: 16.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_6 in memory on localhost:63790 (size: 16.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_4 stored as values in memory (estimated size 16.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_1 in memory on localhost:63790 (size: 16.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_4 in memory on localhost:63790 (size: 16.0 B, free: 2004.5 MB)
processing: 3
processing: 1
processing: 2
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_7 stored as values in memory (estimated size 232.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_5 stored as values in memory (estimated size 232.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_7 in memory on localhost:63790 (size: 232.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_5 in memory on localhost:63790 (size: 232.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO SparkContext: Starting job: show at TestRecalculation.scala:39
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 18.0 KB, free 2004.4 MB)
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 7.5 KB, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:63790 (size: 7.5 KB, free: 2004.5 MB)
19/01/04 14:22:42 INFO Executor: Running task 0.0 in stage 1.0 (TID 8)
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_0 locally
19/01/04 14:22:42 INFO SparkContext: Starting job: show at TestRecalculation.scala:39
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 18.0 KB, free 2004.4 MB)
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 7.5 KB, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:63790 (size: 7.5 KB, free: 2004.5 MB)
19/01/04 14:22:42 INFO Executor: Running task 0.0 in stage 2.0 (TID 9)
19/01/04 14:22:42 INFO Executor: Running task 2.0 in stage 2.0 (TID 11)
19/01/04 14:22:42 INFO Executor: Running task 1.0 in stage 2.0 (TID 10)
19/01/04 14:22:42 INFO Executor: Running task 3.0 in stage 2.0 (TID 12)
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_3 locally
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_4 locally
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_2 locally
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_1 locally
19/01/04 14:22:42 INFO SparkContext: Starting job: show at TestRecalculation.scala:39
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 18.0 KB, free 2004.3 MB)
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 7.5 KB, free 2004.3 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:63790 (size: 7.5 KB, free: 2004.5 MB)
19/01/04 14:22:42 INFO Executor: Running task 0.0 in stage 3.0 (TID 13)
19/01/04 14:22:42 INFO Executor: Running task 1.0 in stage 3.0 (TID 14)
19/01/04 14:22:42 INFO Executor: Running task 2.0 in stage 3.0 (TID 15)
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_5 locally
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_7 locally
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_6 locally
+-------+
|numbers|
+-------+
|      1|
|      2|
|      3|
+-------+
19/01/04 14:22:42 INFO SparkUI: Stopped Spark web UI at http://localhost:4040
19/01/04 14:22:42 INFO SparkContext: Successfully stopped SparkContext
19/01/04 14:22:42 INFO ShutdownHookManager: Shutdown hook called
Process finished with exit code 0

Note: the numbers were processed only once.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s