1
0

[HUDI-115] Adding DefaultHoodieRecordPayload to honor ordering with combineAndGetUpdateValue (#2311)

* Added ability to pass in `properties` to payload methods, so they can perform table/record specific merges
* Added default methods so existing payload classes are backwards compatible. 
* Adding DefaultHoodiePayload to honor ordering while merging two records
* Fixing default payload based on feedback
This commit is contained in:
Sivabalan Narayanan
2020-12-19 22:19:42 -05:00
committed by GitHub
parent 5388c7f7a3
commit 33d338f392
14 changed files with 447 additions and 27 deletions

View File

@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.config;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.config.HoodieMemoryConfig.Builder;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import static org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_ORDERING_FIELD_VAL;
import static org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP;
/**
* Hoodie payload related configs.
*/
public class HoodiePayloadConfig extends DefaultHoodieConfig {
public HoodiePayloadConfig(Properties props) {
super(props);
}
public static HoodiePayloadConfig.Builder newBuilder() {
return new HoodiePayloadConfig.Builder();
}
public static class Builder {
private final Properties props = new Properties();
public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
return this;
}
}
public Builder fromProperties(Properties props) {
this.props.putAll(props);
return this;
}
public Builder withPayloadOrderingField(String payloadOrderingField) {
props.setProperty(PAYLOAD_ORDERING_FIELD_PROP, String.valueOf(payloadOrderingField));
return this;
}
public HoodiePayloadConfig build() {
HoodiePayloadConfig config = new HoodiePayloadConfig(props);
setDefaultOnCondition(props, !props.containsKey(PAYLOAD_ORDERING_FIELD_PROP), DEFAULT_PAYLOAD_ORDERING_FIELD_VAL,
String.valueOf(DEFAULT_PAYLOAD_ORDERING_FIELD_VAL));
return config;
}
}
}

View File

@@ -145,6 +145,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
// We keep track of original config and rewritten config
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
private FileSystemViewStorageConfig viewStorageConfig;
private HoodiePayloadConfig hoodiePayloadConfig;
private EngineType engineType;
@@ -163,6 +164,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
}
public static HoodieWriteConfig.Builder newBuilder() {
@@ -744,6 +746,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return clientSpecifiedViewStorageConfig;
}
public HoodiePayloadConfig getPayloadConfig() {
return hoodiePayloadConfig;
}
/**
* Commit call back configs.
*/
@@ -804,6 +810,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private boolean isViewConfigSet = false;
private boolean isConsistencyGuardSet = false;
private boolean isCallbackConfigSet = false;
private boolean isPayloadConfigSet = false;
public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
@@ -944,6 +951,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withPayloadConfig(HoodiePayloadConfig payloadConfig) {
props.putAll(payloadConfig.getProps());
isPayloadConfigSet = true;
return this;
}
public Builder withAutoCommit(boolean autoCommit) {
props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
return this;
@@ -1084,6 +1097,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isCallbackConfigSet,
HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isPayloadConfigSet,
HoodiePayloadConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !props.containsKey(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION),
EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION);

View File

@@ -225,7 +225,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
try {
Option<IndexedRecord> combinedAvroRecord =
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchemaWithMetafields : writerSchema);
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchemaWithMetafields : writerSchema,
config.getPayloadConfig().getProps());
if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully write the the combined new