1
0

[HUDI-1234] Insert new records to data files without merging for "Insert" operation. (#2111)

* Added HoodieConcatHandle to skip merging for "insert" operation when the corresponding config is set

Co-authored-by: Sivabalan Narayanan <sivabala@uber.com>
This commit is contained in:
SteNicholas
2021-01-28 02:09:51 +08:00
committed by GitHub
parent a54550d94f
commit 2ee1c3fb0c
7 changed files with 266 additions and 26 deletions

View File

@@ -131,6 +131,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled";
private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false";
// Allow duplicates with inserts while merging with existing records
private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = "hoodie.merge.allow.duplicate.on.inserts";
private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = "false";
/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
@@ -330,6 +334,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
}
public boolean allowDuplicateInserts() {
return Boolean.parseBoolean(props.getProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS));
}
public EngineType getEngineType() {
return engineType;
}
@@ -1180,6 +1188,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles) {
props.setProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS, String.valueOf(routeInsertsToNewFiles));
return this;
}
public Builder withProperties(Properties properties) {
this.props.putAll(properties);
return this;
@@ -1234,6 +1247,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED),
MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
setDefaultOnCondition(props, !props.containsKey(MERGE_ALLOW_DUPLICATE_ON_INSERTS),
MERGE_ALLOW_DUPLICATE_ON_INSERTS, DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS);
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build());

View File

@@ -58,17 +58,45 @@ import java.util.Map;
import java.util.Set;
@SuppressWarnings("Duplicates")
/**
* Handle to merge incoming records to those in storage.
* <p>
* Simplified Logic:
* For every existing record
* Check if there is a new record coming in. If yes, merge two records and write to file
* else write the record as is
* For all pending records from incoming batch, write to file.
*
* Illustration with simple data.
* Incoming data:
* rec1_2, rec4_2, rec5_1, rec6_1
* Existing data:
* rec1_1, rec2_1, rec3_1, rec4_1
*
* For every existing record, merge w/ incoming if requried and write to storage.
* => rec1_1 and rec1_2 is merged to write rec1_2 to storage
* => rec2_1 is written as is
* => rec3_1 is written as is
* => rec4_2 and rec4_1 is merged to write rec4_2 to storage
* Write all pending records from incoming set to storage
* => rec5_1 and rec6_1
*
* Final snapshot in storage
* rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1
*
* </p>
*/
public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
private HoodieFileWriter<IndexedRecord> fileWriter;
protected HoodieFileWriter<IndexedRecord> fileWriter;
private Path newFilePath;
protected Path newFilePath;
private Path oldFilePath;
private long recordsWritten = 0;
protected long recordsWritten = 0;
private long recordsDeleted = 0;
private long updatedRecordsWritten = 0;
protected long insertRecordsWritten = 0;

View File

@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
/**
* Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#allowDuplicateInserts()}}
* is set, this handle will be used instead of {@link HoodieMergeHandle}.
*
* Simplified Logic:
* For every existing record
* Write the record as is
* For all incoming records, write to file as is.
*
* Illustration with simple data.
* Incoming data:
* rec1_2, rec4_2, rec5_1, rec6_1
* Existing data:
* rec1_1, rec2_1, rec3_1, rec4_1
*
* For every existing record, write to storage as is.
* => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage
* Write all records from incoming set to storage
* => rec1_2, rec4_2, rec5_1 and rec6_1
*
* Final snapshot in storage
* rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1
*
* Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not
* happen and every batch should have new records to be inserted. Above example is for illustration purposes only.
*/
public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class);
public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator recordItr,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier);
}
public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier);
}
/**
* Write old record as is w/o merging with incoming record.
*/
@Override
public void write(GenericRecord oldRecord) {
String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
try {
fileWriter.writeAvro(key, oldRecord);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writerSchemaWithMetafields.toString(true));
LOG.debug("Old record is " + oldRecord);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.collection.Pair;
import java.io.Serializable;
@@ -41,11 +42,21 @@ public class WorkloadProfile implements Serializable {
*/
protected final WorkloadStat globalStat;
/**
* Write operation type.
*/
private WriteOperationType operationType;
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile) {
this.partitionPathStatMap = profile.getLeft();
this.globalStat = profile.getRight();
}
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType) {
this(profile);
this.operationType = operationType;
}
public WorkloadStat getGlobalStat() {
return globalStat;
}
@@ -62,11 +73,16 @@ public class WorkloadProfile implements Serializable {
return partitionPathStatMap.get(partitionPath);
}
public WriteOperationType getOperationType() {
return operationType;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("WorkloadProfile {");
sb.append("globalStat=").append(globalStat).append(", ");
sb.append("partitionStat=").append(partitionPathStatMap);
sb.append("partitionStat=").append(partitionPathStatMap).append(", ");
sb.append("operationType=").append(operationType);
sb.append('}');
return sb.toString();
}