[HUDI-3719] High performance costs of AvroSerizlizer in DataSource wr… (#5137)
* [HUDI-3719] High performance costs of AvroSerizlizer in DataSource writing * add benchmark framework which modify from spark add avroSerDerBenchmark
This commit is contained in:
@@ -0,0 +1,239 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.hudi.benchmark
|
||||
|
||||
|
||||
import java.io.{OutputStream, PrintStream}
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Try
|
||||
|
||||
import org.apache.commons.io.output.TeeOutputStream
|
||||
import org.apache.commons.lang3.SystemUtils
|
||||
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* Reference from spark.
|
||||
* Utility class to benchmark components. An example of how to use this is:
|
||||
* val benchmark = new Benchmark("My Benchmark", valuesPerIteration)
|
||||
* benchmark.addCase("V1")(<function>)
|
||||
* benchmark.addCase("V2")(<function>)
|
||||
* benchmark.run
|
||||
* This will output the average time to run each function and the rate of each function.
|
||||
*
|
||||
* The benchmark function takes one argument that is the iteration that's being run.
|
||||
*
|
||||
* @param name name of this benchmark.
|
||||
* @param valuesPerIteration number of values used in the test case, used to compute rows/s.
|
||||
* @param minNumIters the min number of iterations that will be run per case, not counting warm-up.
|
||||
* @param warmupTime amount of time to spend running dummy case iterations for JIT warm-up.
|
||||
* @param minTime further iterations will be run for each case until this time is used up.
|
||||
* @param outputPerIteration if true, the timing for each run will be printed to stdout.
|
||||
* @param output optional output stream to write benchmark results to
|
||||
*/
|
||||
class HoodieBenchmark(
|
||||
name: String,
|
||||
valuesPerIteration: Long,
|
||||
minNumIters: Int = 2,
|
||||
warmupTime: FiniteDuration = 2.seconds,
|
||||
minTime: FiniteDuration = 2.seconds,
|
||||
outputPerIteration: Boolean = false,
|
||||
output: Option[OutputStream] = None) {
|
||||
import HoodieBenchmark._
|
||||
val benchmarks = mutable.ArrayBuffer.empty[HoodieBenchmark.Case]
|
||||
|
||||
val out = if (output.isDefined) {
|
||||
new PrintStream(new TeeOutputStream(System.out, output.get))
|
||||
} else {
|
||||
System.out
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a case to run when run() is called. The given function will be run for several
|
||||
* iterations to collect timing statistics.
|
||||
*
|
||||
* @param name of the benchmark case
|
||||
* @param numIters if non-zero, forces exactly this many iterations to be run
|
||||
*/
|
||||
def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
|
||||
addTimerCase(name, numIters) { timer =>
|
||||
timer.startTiming()
|
||||
f(timer.iteration)
|
||||
timer.stopTiming()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a case with manual timing control. When the function is run, timing does not start
|
||||
* until timer.startTiming() is called within the given function. The corresponding
|
||||
* timer.stopTiming() method must be called before the function returns.
|
||||
*
|
||||
* @param name of the benchmark case
|
||||
* @param numIters if non-zero, forces exactly this many iterations to be run
|
||||
*/
|
||||
def addTimerCase(name: String, numIters: Int = 0)(f: HoodieBenchmark.Timer => Unit): Unit = {
|
||||
benchmarks += HoodieBenchmark.Case(name, f, numIters)
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the benchmark and outputs the results to stdout. This should be copied and added as
|
||||
* a comment with the benchmark. Although the results vary from machine to machine, it should
|
||||
* provide some baseline.
|
||||
*/
|
||||
def run(): Unit = {
|
||||
require(benchmarks.nonEmpty)
|
||||
// scalastyle:off
|
||||
println("Running benchmark: " + name)
|
||||
|
||||
val results = benchmarks.map { c =>
|
||||
println(" Running case: " + c.name)
|
||||
measure(valuesPerIteration, c.numIters)(c.fn)
|
||||
}
|
||||
println
|
||||
|
||||
val firstBest = results.head.bestMs
|
||||
// The results are going to be processor specific so it is useful to include that.
|
||||
out.println(HoodieBenchmark.getJVMOSInfo())
|
||||
out.println(HoodieBenchmark.getProcessorName())
|
||||
val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max))
|
||||
out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
|
||||
name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)", "Per Row(ns)", "Relative")
|
||||
out.println("-" * (nameLen + 80))
|
||||
results.zip(benchmarks).foreach { case (result, benchmark) =>
|
||||
out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
|
||||
benchmark.name,
|
||||
"%5.0f" format result.bestMs,
|
||||
"%4.0f" format result.avgMs,
|
||||
"%5.0f" format result.stdevMs,
|
||||
"%10.1f" format result.bestRate,
|
||||
"%6.1f" format (1000 / result.bestRate),
|
||||
"%3.1fX" format (firstBest / result.bestMs))
|
||||
}
|
||||
out.println
|
||||
// scalastyle:on
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a single function `f` for iters, returning the average time the function took and
|
||||
* the rate of the function.
|
||||
*/
|
||||
def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = {
|
||||
System.gc() // ensures garbage from previous cases don't impact this one
|
||||
val warmupDeadline = warmupTime.fromNow
|
||||
while (!warmupDeadline.isOverdue) {
|
||||
f(new HoodieBenchmark.Timer(-1))
|
||||
}
|
||||
val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
|
||||
val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
|
||||
val runTimes = ArrayBuffer[Long]()
|
||||
var totalTime = 0L
|
||||
var i = 0
|
||||
while (i < minIters || totalTime < minDuration) {
|
||||
val timer = new HoodieBenchmark.Timer(i)
|
||||
f(timer)
|
||||
val runTime = timer.totalTime()
|
||||
runTimes += runTime
|
||||
totalTime += runTime
|
||||
|
||||
if (outputPerIteration) {
|
||||
// scalastyle:off
|
||||
println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds")
|
||||
// scalastyle:on
|
||||
}
|
||||
i += 1
|
||||
}
|
||||
// scalastyle:off
|
||||
println(s" Stopped after $i iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms")
|
||||
// scalastyle:on
|
||||
assert(runTimes.nonEmpty)
|
||||
val best = runTimes.min
|
||||
val avg = runTimes.sum / runTimes.size
|
||||
val stdev = if (runTimes.size > 1) {
|
||||
math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1))
|
||||
} else 0
|
||||
Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0)
|
||||
}
|
||||
}
|
||||
|
||||
object HoodieBenchmark {
|
||||
|
||||
/**
|
||||
* Object available to benchmark code to control timing e.g. to exclude set-up time.
|
||||
*
|
||||
* @param iteration specifies this is the nth iteration of running the benchmark case
|
||||
*/
|
||||
class Timer(val iteration: Int) {
|
||||
private var accumulatedTime: Long = 0L
|
||||
private var timeStart: Long = 0L
|
||||
|
||||
def startTiming(): Unit = {
|
||||
assert(timeStart == 0L, "Already started timing.")
|
||||
timeStart = System.nanoTime
|
||||
}
|
||||
|
||||
def stopTiming(): Unit = {
|
||||
assert(timeStart != 0L, "Have not started timing.")
|
||||
accumulatedTime += System.nanoTime - timeStart
|
||||
timeStart = 0L
|
||||
}
|
||||
|
||||
def totalTime(): Long = {
|
||||
assert(timeStart == 0L, "Have not stopped timing.")
|
||||
accumulatedTime
|
||||
}
|
||||
}
|
||||
|
||||
case class Case(name: String, fn: Timer => Unit, numIters: Int)
|
||||
case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double)
|
||||
|
||||
/**
|
||||
* This should return a user helpful processor information. Getting at this depends on the OS.
|
||||
* This should return something like "Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz"
|
||||
*/
|
||||
def getProcessorName(): String = {
|
||||
val cpu = if (SystemUtils.IS_OS_MAC_OSX) {
|
||||
Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string"))
|
||||
.stripLineEnd
|
||||
} else if (SystemUtils.IS_OS_LINUX) {
|
||||
Try {
|
||||
val grepPath = Utils.executeAndGetOutput(Seq("which", "grep")).stripLineEnd
|
||||
Utils.executeAndGetOutput(Seq(grepPath, "-m", "1", "model name", "/proc/cpuinfo"))
|
||||
.stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "")
|
||||
}.getOrElse("Unknown processor")
|
||||
} else {
|
||||
System.getenv("PROCESSOR_IDENTIFIER")
|
||||
}
|
||||
cpu
|
||||
}
|
||||
|
||||
/**
|
||||
* This should return a user helpful JVM & OS information.
|
||||
* This should return something like
|
||||
* "OpenJDK 64-Bit Server VM 1.8.0_65-b17 on Linux 4.1.13-100.fc21.x86_64"
|
||||
*/
|
||||
def getJVMOSInfo(): String = {
|
||||
val vmName = System.getProperty("java.vm.name")
|
||||
val runtimeVersion = System.getProperty("java.runtime.version")
|
||||
val osName = System.getProperty("os.name")
|
||||
val osVersion = System.getProperty("os.version")
|
||||
s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.hudi.benchmark
|
||||
|
||||
import java.io.{File, FileOutputStream, OutputStream}
|
||||
|
||||
/**
|
||||
* Reference from spark.
|
||||
* A base class for generate benchmark results to a file.
|
||||
* For JDK9+, JDK major version number is added to the file names to distinguish the results.
|
||||
*/
|
||||
abstract class HoodieBenchmarkBase {
|
||||
var output: Option[OutputStream] = None
|
||||
|
||||
/**
|
||||
* Main process of the whole benchmark.
|
||||
* Implementations of this method are supposed to use the wrapper method `runBenchmark`
|
||||
* for each benchmark scenario.
|
||||
*/
|
||||
def runBenchmarkSuite(mainArgs: Array[String]): Unit
|
||||
|
||||
final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
|
||||
val separator = "=" * 96
|
||||
val testHeader = (separator + '\n' + benchmarkName + '\n' + separator + '\n' + '\n').getBytes
|
||||
output.foreach(_.write(testHeader))
|
||||
func
|
||||
output.foreach(_.write('\n'))
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
// turning this on so the behavior between running benchmark via `spark-submit` or SBT will
|
||||
// be consistent, also allow users to turn on/off certain behavior such as
|
||||
// `spark.sql.codegen.factoryMode`
|
||||
val regenerateBenchmarkFiles: Boolean = System.getenv("SPARK_GENERATE_BENCHMARK_FILES") == "1"
|
||||
if (regenerateBenchmarkFiles) {
|
||||
val version = System.getProperty("java.version").split("\\D+")(0).toInt
|
||||
val jdkString = if (version > 8) s"-jdk$version" else ""
|
||||
val resultFileName =
|
||||
s"${this.getClass.getSimpleName.replace("$", "")}jdkStringsuffix-results.txt"
|
||||
val prefix = HoodieBenchmarks.currentProjectRoot.map(_ + "/").getOrElse("")
|
||||
val dir = new File(s"${prefix}benchmarks/")
|
||||
if (!dir.exists()) {
|
||||
// scalastyle:off println
|
||||
println(s"Creating ${dir.getAbsolutePath} for benchmark results.")
|
||||
// scalastyle:on println
|
||||
dir.mkdirs()
|
||||
}
|
||||
val file = new File(dir, resultFileName)
|
||||
if (!file.exists()) {
|
||||
file.createNewFile()
|
||||
}
|
||||
output = Some(new FileOutputStream(file))
|
||||
}
|
||||
|
||||
runBenchmarkSuite(args)
|
||||
|
||||
output.foreach { o =>
|
||||
if (o != null) {
|
||||
o.close()
|
||||
}
|
||||
}
|
||||
|
||||
afterAll()
|
||||
}
|
||||
|
||||
def suffix: String = ""
|
||||
|
||||
/**
|
||||
* Any shutdown code to ensure a clean shutdown
|
||||
*/
|
||||
def afterAll(): Unit = {}
|
||||
}
|
||||
@@ -0,0 +1,143 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.hudi.benchmark
|
||||
|
||||
import java.io.File
|
||||
import java.lang.reflect.Modifier
|
||||
import java.nio.file.{FileSystems, Paths}
|
||||
import java.util.Locale
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.util.Try
|
||||
import org.apache.hbase.thirdparty.com.google.common.reflect.ClassPath
|
||||
|
||||
/**
|
||||
* Reference from spark.
|
||||
* Run all benchmarks. To run this benchmark, you should build Spark with either Maven or SBT.
|
||||
* After that, you can run as below:
|
||||
*
|
||||
* {{{
|
||||
* 1. with spark-submit
|
||||
* bin/spark-submit --class <this class>
|
||||
* --jars <all spark test jars>,<spark external package jars>
|
||||
* <spark core test jar> <glob pattern for class> <extra arguments>
|
||||
* 2. generate result:
|
||||
* SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class <this class>
|
||||
* --jars <all spark test jars>,<spark external package jars>
|
||||
* <spark core test jar> <glob pattern for class> <extra arguments>
|
||||
* Results will be written to all corresponding files under "benchmarks/".
|
||||
* Notice that it detects the sub-project's directories from jar's paths so the provided jars
|
||||
* should be properly placed under target (Maven build) or target/scala-* (SBT) when you
|
||||
* generate the files.
|
||||
* }}}
|
||||
*
|
||||
* You can use a command as below to find all the test jars.
|
||||
* Make sure to do not select duplicated jars created by different versions of builds or tools.
|
||||
* {{{
|
||||
* find . -name '*-SNAPSHOT-tests.jar' | paste -sd ',' -
|
||||
* }}}
|
||||
*
|
||||
* The example below runs all benchmarks and generates the results:
|
||||
* {{{
|
||||
* SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class \
|
||||
* org.apache.spark.benchmark.Benchmarks --jars \
|
||||
* "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \
|
||||
* "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
|
||||
* "*"
|
||||
* }}}
|
||||
*
|
||||
* The example below runs all benchmarks under "org.apache.spark.sql.execution.datasources"
|
||||
* {{{
|
||||
* bin/spark-submit --class \
|
||||
* org.apache.spark.benchmark.Benchmarks --jars \
|
||||
* "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \
|
||||
* "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
|
||||
* "org.apache.spark.sql.execution.datasources.*"
|
||||
* }}}
|
||||
*/
|
||||
|
||||
object HoodieBenchmarks {
|
||||
var currentProjectRoot: Option[String] = None
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val isFailFast = sys.env.get(
|
||||
"SPARK_BENCHMARK_FAILFAST").map(_.toLowerCase(Locale.ROOT).trim.toBoolean).getOrElse(true)
|
||||
val numOfSplits = sys.env.get(
|
||||
"SPARK_BENCHMARK_NUM_SPLITS").map(_.toLowerCase(Locale.ROOT).trim.toInt).getOrElse(1)
|
||||
val currentSplit = sys.env.get(
|
||||
"SPARK_BENCHMARK_CUR_SPLIT").map(_.toLowerCase(Locale.ROOT).trim.toInt - 1).getOrElse(0)
|
||||
var numBenchmark = 0
|
||||
|
||||
var isBenchmarkFound = false
|
||||
val benchmarkClasses = ClassPath.from(
|
||||
Thread.currentThread.getContextClassLoader
|
||||
).getTopLevelClassesRecursive("org.apache.spark").asScala.toArray
|
||||
val matcher = FileSystems.getDefault.getPathMatcher(s"glob:${args.head}")
|
||||
|
||||
benchmarkClasses.foreach { info =>
|
||||
lazy val clazz = info.load
|
||||
lazy val runBenchmark = clazz.getMethod("main", classOf[Array[String]])
|
||||
// isAssignableFrom seems not working with the reflected class from Guava's
|
||||
// getTopLevelClassesRecursive.
|
||||
require(args.length > 0, "Benchmark class to run should be specified.")
|
||||
if (
|
||||
info.getName.endsWith("Benchmark") &&
|
||||
// TODO(SPARK-34927): Support TPCDSQueryBenchmark in Benchmarks
|
||||
!info.getName.endsWith("TPCDSQueryBenchmark") &&
|
||||
matcher.matches(Paths.get(info.getName)) &&
|
||||
Try(runBenchmark).isSuccess && // Does this has a main method?
|
||||
!Modifier.isAbstract(clazz.getModifiers) // Is this a regular class?
|
||||
) {
|
||||
numBenchmark += 1
|
||||
if (numBenchmark % numOfSplits == currentSplit) {
|
||||
isBenchmarkFound = true
|
||||
|
||||
val targetDirOrProjDir =
|
||||
new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI)
|
||||
.getParentFile.getParentFile
|
||||
|
||||
// The root path to be referred in each benchmark.
|
||||
currentProjectRoot = Some {
|
||||
if (targetDirOrProjDir.getName == "target") {
|
||||
// SBT build
|
||||
targetDirOrProjDir.getParentFile.getCanonicalPath
|
||||
} else {
|
||||
// Maven build
|
||||
targetDirOrProjDir.getCanonicalPath
|
||||
}
|
||||
}
|
||||
|
||||
// scalastyle:off println
|
||||
println(s"Running ${clazz.getName}:")
|
||||
// scalastyle:on println
|
||||
// Force GC to minimize the side effect.
|
||||
System.gc()
|
||||
try {
|
||||
runBenchmark.invoke(null, args.tail.toArray)
|
||||
} catch {
|
||||
case e: Throwable if !isFailFast =>
|
||||
// scalastyle:off println
|
||||
println(s"${clazz.getName} failed with the exception below:")
|
||||
// scalastyle:on println
|
||||
e.printStackTrace()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isBenchmarkFound) throw new RuntimeException("No benchmark found to run.")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.benchmark
|
||||
|
||||
import org.apache.avro.generic.GenericRecord
|
||||
import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils}
|
||||
import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
|
||||
import org.apache.spark.sql.functions.lit
|
||||
import org.apache.spark.sql.{DataFrame, SparkSession}
|
||||
|
||||
/**
|
||||
* Benchmark to measure Avro SerDer performance.
|
||||
*/
|
||||
object AvroSerDerBenchmark extends HoodieBenchmarkBase {
|
||||
protected val spark: SparkSession = getSparkSession
|
||||
|
||||
def getSparkSession: SparkSession = SparkSession
|
||||
.builder()
|
||||
.master("local[1]")
|
||||
.config("spark.driver.memory", "8G")
|
||||
.appName(this.getClass.getCanonicalName)
|
||||
.getOrCreate()
|
||||
|
||||
def getDataFrame(numbers: Long): DataFrame = {
|
||||
spark.range(0, numbers).toDF("id")
|
||||
.withColumn("c1", lit("AvroSerDerBenchmark"))
|
||||
.withColumn("c2", lit(12.99d))
|
||||
.withColumn("c3", lit(1))
|
||||
}
|
||||
|
||||
/**
|
||||
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Windows 10 10.0
|
||||
* Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
|
||||
* perf avro serializer for hoodie: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
* ------------------------------------------------------------------------------------------------------------------------
|
||||
* serialize internalRow to avro Record 6391 6683 413 7.8 127.8 1.0X
|
||||
*/
|
||||
private def avroSerializerBenchmark: Unit = {
|
||||
val benchmark = new HoodieBenchmark(s"perf avro serializer for hoodie", 50000000)
|
||||
benchmark.addCase("serialize internalRow to avro Record") { _ =>
|
||||
val df = getDataFrame(50000000)
|
||||
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
|
||||
spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
|
||||
HoodieSparkUtils.createRdd(df,"record", "my", Some(avroSchema)).foreach(f => f)
|
||||
}
|
||||
benchmark.run()
|
||||
}
|
||||
|
||||
/**
|
||||
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Windows 10 10.0
|
||||
* Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
|
||||
* perf avro deserializer for hoodie: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
* ------------------------------------------------------------------------------------------------------------------------
|
||||
* deserialize avro Record to internalRow 1340 1360 27 7.5 134.0 1.0X
|
||||
*/
|
||||
private def avroDeserializerBenchmark: Unit = {
|
||||
val benchmark = new HoodieBenchmark(s"perf avro deserializer for hoodie", 10000000)
|
||||
val df = getDataFrame(10000000)
|
||||
val sparkSchema = df.schema
|
||||
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
|
||||
val testRdd = HoodieSparkUtils.createRdd(df,"record", "my", Some(avroSchema))
|
||||
testRdd.cache()
|
||||
testRdd.foreach(f => f)
|
||||
spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
|
||||
benchmark.addCase("deserialize avro Record to internalRow") { _ =>
|
||||
testRdd.mapPartitions { iter =>
|
||||
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, "record", "my")
|
||||
val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(schema, sparkSchema)
|
||||
iter.map(record => avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get)
|
||||
}.foreach(f => f)
|
||||
}
|
||||
benchmark.run()
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
spark.stop()
|
||||
}
|
||||
|
||||
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
|
||||
avroSerializerBenchmark
|
||||
avroDeserializerBenchmark
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user