From a118d56b071bad6a4701c3578e43e5fdfe416e6c Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sun, 20 Mar 2022 21:56:30 -0700 Subject: [PATCH] [MINOR] Fixing sparkUpdateNode for record generation (#5079) --- .../hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala | 6 +++++- .../hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala | 5 +++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala index b8c46cad3..b0bec48a4 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala @@ -47,7 +47,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { override def execute(context: ExecutionContext, curItrCount: Int): Unit = { if (!config.isDisableGenerate) { println("Generating input data for node {}", this.getName) - context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count() + writeRecords(context) } val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch, context.getWriterContext.getHoodieTestSuiteWriter.getSchema, @@ -68,4 +68,8 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { def getOperation(): String = { DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL } + + def writeRecords(context: ExecutionContext): Unit = { + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count() + } } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala index 113de93ad..f83bc5563 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala @@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config +import org.apache.hudi.integ.testsuite.dag.ExecutionContext /** * Spark datasource based upsert node @@ -31,4 +32,8 @@ class SparkUpsertNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConf override def getOperation(): String = { DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL } + + override def writeRecords(context: ExecutionContext): Unit = { + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).count() + } }