1
0

[MINOR] Fixing sparkUpdateNode for record generation (#5079)

This commit is contained in:
Sivabalan Narayanan
2022-03-20 21:56:30 -07:00
committed by GitHub
parent 799c78e688
commit a118d56b07
2 changed files with 10 additions and 1 deletions

View File

@@ -47,7 +47,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
if (!config.isDisableGenerate) {
println("Generating input data for node {}", this.getName)
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
writeRecords(context)
}
val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
@@ -68,4 +68,8 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
def getOperation(): String = {
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL
}
def writeRecords(context: ExecutionContext): Unit = {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
/**
* Spark datasource based upsert node
@@ -31,4 +32,8 @@ class SparkUpsertNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConf
override def getOperation(): String = {
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL
}
override def writeRecords(context: ExecutionContext): Unit = {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).count()
}
}