1
0

[HUDI-4204] Fixing NPE with row writer path and with OCC (#5850)

This commit is contained in:
Sivabalan Narayanan
2022-07-21 18:57:34 -04:00
committed by GitHub
parent 50cdb867c7
commit 2e0dd29714
4 changed files with 17 additions and 4 deletions

View File

@@ -491,7 +491,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param writeOperationType * @param writeOperationType
* @param metaClient * @param metaClient
*/ */
protected void preWrite(String instantTime, WriteOperationType writeOperationType, public void preWrite(String instantTime, WriteOperationType writeOperationType,
HoodieTableMetaClient metaClient) { HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType); setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);

View File

@@ -254,7 +254,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
} }
@Override @Override
protected void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) { public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType); setOperationType(writeOperationType);
// Note: the code to read the commit metadata is not thread safe for JSON deserialization, // Note: the code to read the commit metadata is not thread safe for JSON deserialization,
// remove the table metadata sync // remove the table metadata sync

View File

@@ -68,6 +68,7 @@ public class DataSourceInternalWriterHelper {
this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build(); this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
this.metaClient.validateTableProperties(writeConfig.getProps()); this.metaClient.validateTableProperties(writeConfig.getProps());
this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient); this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, metaClient);
} }
public boolean useCommitCoordinator() { public boolean useCommitCoordinator() {

View File

@@ -167,14 +167,21 @@ class TestHoodieSparkSqlWriter {
* @param sortMode Bulk insert sort mode * @param sortMode Bulk insert sort mode
* @param populateMetaFields Flag for populating meta fields * @param populateMetaFields Flag for populating meta fields
*/ */
def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, populateMetaFields: Boolean = true): Unit = { def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, populateMetaFields: Boolean = true, enableOCCConfigs: Boolean = false): Unit = {
//create a new table //create a new table
val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4") var fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields)) .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields))
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), sortMode.name()) .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), sortMode.name())
if (enableOCCConfigs) {
fooTableModifier = fooTableModifier
.updated("hoodie.write.concurrency.mode","optimistic_concurrency_control")
.updated("hoodie.cleaner.policy.failed.writes","LAZY")
.updated("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.InProcessLockProvider")
}
// generate the inserts // generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
@@ -306,6 +313,11 @@ class TestHoodieSparkSqlWriter {
testBulkInsertWithSortMode(sortMode, populateMetaFields = true) testBulkInsertWithSortMode(sortMode, populateMetaFields = true)
} }
@Test
def testBulkInsertForSortModeWithOCC(): Unit = {
testBulkInsertWithSortMode(BulkInsertSortMode.GLOBAL_SORT, populateMetaFields = true, true)
}
/** /**
* Test case for Bulk insert with populating meta fields or * Test case for Bulk insert with populating meta fields or
* without populating meta fields. * without populating meta fields.