1
0

[HUDI-1902] Global index for flink writer (#2958)

Supports deduplication for record keys with different partition path.
This commit is contained in:
Danny Chan
2021-05-18 13:55:38 +08:00
committed by GitHub
parent fcedbfcb58
commit 46a2399a45
10 changed files with 345 additions and 89 deletions

View File

@@ -37,7 +37,7 @@ public abstract class BaseAvroPayload implements Serializable {
/**
* For purposes of preCombining.
*/
protected final Comparable orderingVal;
public final Comparable orderingVal;
/**
* Instantiate {@link BaseAvroPayload}.

View File

@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.model;
import java.util.Objects;
/**
* Similar with {@link org.apache.hudi.common.model.HoodieRecordLocation} but with partition path.
*/
public class HoodieRecordGlobalLocation extends HoodieRecordLocation {
private static final long serialVersionUID = 1L;
private String partitionPath;
public HoodieRecordGlobalLocation() {
}
public HoodieRecordGlobalLocation(String partitionPath, String instantTime, String fileId) {
super(instantTime, fileId);
this.partitionPath = partitionPath;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("HoodieGlobalRecordLocation {");
sb.append("partitionPath=").append(partitionPath).append(", ");
sb.append("instantTime=").append(instantTime).append(", ");
sb.append("fileId=").append(fileId);
sb.append('}');
return sb.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HoodieRecordGlobalLocation otherLoc = (HoodieRecordGlobalLocation) o;
return Objects.equals(partitionPath, otherLoc.partitionPath)
&& Objects.equals(instantTime, otherLoc.instantTime)
&& Objects.equals(fileId, otherLoc.fileId);
}
@Override
public int hashCode() {
return Objects.hash(partitionPath, instantTime, fileId);
}
public String getPartitionPath() {
return partitionPath;
}
public void setPartitionPath(String partitionPath) {
this.partitionPath = partitionPath;
}
/**
* Returns the global record location from local.
*/
public static HoodieRecordGlobalLocation fromLocal(String partitionPath, HoodieRecordLocation localLoc) {
return new HoodieRecordGlobalLocation(partitionPath, localLoc.getInstantTime(), localLoc.getFileId());
}
/**
* Returns the record location as local.
*/
public HoodieRecordLocation toLocal(String instantTime) {
return new HoodieRecordLocation(instantTime, fileId);
}
/**
* Copy the location with given partition path.
*/
public HoodieRecordGlobalLocation copy(String partitionPath) {
return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId);
}
}

View File

@@ -26,8 +26,8 @@ import java.util.Objects;
*/
public class HoodieRecordLocation implements Serializable {
private String instantTime;
private String fileId;
protected String instantTime;
protected String fileId;
public HoodieRecordLocation() {
}

View File

@@ -86,6 +86,13 @@ public class FlinkOptions {
.defaultValue(1.5D)
.withDescription("Index state ttl in days, default 1.5 day");
public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = ConfigOptions
.key("index.global.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether to update index for the old partition path\n"
+ "if same key record with different partition path came in, default true");
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------

View File

@@ -21,9 +21,11 @@ package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.BaseAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
@@ -32,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil;
@@ -90,7 +93,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
* <li>If it does not, use the {@link BucketAssigner} to generate a new bucket ID</li>
* </ul>
*/
private MapState<HoodieKey, HoodieRecordLocation> indexState;
private MapState<String, HoodieRecordGlobalLocation> indexState;
/**
* Bucket assigner to assign new bucket IDs or reuse existing ones.
@@ -110,11 +113,23 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
*/
private MapState<String, Integer> partitionLoadState;
/**
* Used to create DELETE payload.
*/
private PayloadCreation payloadCreation;
/**
* If the index is global, update the index for the old partition path
* if same key record with different partition path came in.
*/
private final boolean globalIndex;
public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
this.bootstrapIndex = conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
}
@Override
@@ -132,6 +147,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
context,
writeConfig);
this.payloadCreation = PayloadCreation.instance(this.conf);
}
@Override
@@ -141,11 +157,11 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
@Override
public void initializeState(FunctionInitializationContext context) {
MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
MapStateDescriptor<String, HoodieRecordGlobalLocation> indexStateDesc =
new MapStateDescriptor<>(
"indexState",
TypeInformation.of(HoodieKey.class),
TypeInformation.of(HoodieRecordLocation.class));
Types.STRING,
TypeInformation.of(HoodieRecordGlobalLocation.class));
double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000;
if (ttl > 0) {
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
@@ -166,38 +182,41 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
// 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
HoodieRecord<?> record = (HoodieRecord<?>) value;
final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo;
final String recordKey = hoodieKey.getRecordKey();
final String partitionPath = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
// The dataset may be huge, thus the processing would block for long,
// disabled by default.
if (bootstrapIndex && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) {
// If the partition records are never loaded, load the records first.
loadRecords(hoodieKey.getPartitionPath());
loadRecords(partitionPath);
}
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
if (isChangingRecords && this.indexState.contains(hoodieKey)) {
if (isChangingRecords && indexState.contains(recordKey)) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
location = new HoodieRecordLocation("U", this.indexState.get(hoodieKey).getFileId());
this.bucketAssigner.addUpdate(record.getPartitionPath(), location.getFileId());
} else {
bucketInfo = this.bucketAssigner.addInsert(hoodieKey.getPartitionPath());
switch (bucketInfo.getBucketType()) {
case INSERT:
// This is an insert bucket, use HoodieRecordLocation instant time as "I".
// Downstream operators can then check the instant time to know whether
// a record belongs to an insert bucket.
location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
break;
case UPDATE:
location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
break;
default:
throw new AssertionError();
HoodieRecordGlobalLocation oldLoc = this.indexState.get(recordKey);
if (!StreamerUtil.equal(oldLoc.getPartitionPath(), partitionPath)) {
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.
HoodieRecord<?> deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
location = getNewRecordLocation(partitionPath);
updateIndexState(recordKey, partitionPath, location);
} else {
location = oldLoc.toLocal("U");
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
location = getNewRecordLocation(partitionPath);
if (isChangingRecords) {
this.indexState.put(hoodieKey, location);
updateIndexState(recordKey, partitionPath, location);
}
}
record.unseal();
@@ -206,6 +225,32 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
out.collect((O) record);
}
private HoodieRecordLocation getNewRecordLocation(String partitionPath) {
final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
final HoodieRecordLocation location;
switch (bucketInfo.getBucketType()) {
case INSERT:
// This is an insert bucket, use HoodieRecordLocation instant time as "I".
// Downstream operators can then check the instant time to know whether
// a record belongs to an insert bucket.
location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
break;
case UPDATE:
location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
break;
default:
throw new AssertionError();
}
return location;
}
private void updateIndexState(
String recordKey,
String partitionPath,
HoodieRecordLocation localLoc) throws Exception {
this.indexState.put(recordKey, HoodieRecordGlobalLocation.fromLocal(partitionPath, localLoc));
}
@Override
public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
@@ -245,7 +290,8 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID;
if (shouldLoad) {
this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
this.indexState.put(hoodieKey.getRecordKey(),
new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId()));
}
} catch (Exception e) {
LOG.error("Error when putting record keys into the state from file: {}", baseFile);
@@ -265,7 +311,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
@VisibleForTesting
public boolean isKeyInState(HoodieKey hoodieKey) {
try {
return this.indexState.contains(hoodieKey);
return this.indexState.contains(hoodieKey.getRecordKey());
} catch (Exception e) {
throw new HoodieException(e);
}

View File

@@ -18,16 +18,12 @@
package org.apache.hudi.sink.transform;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
@@ -39,11 +35,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Constructor;
/**
* Function that transforms RowData to HoodieRecord.
@@ -116,53 +108,4 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
HoodieRecordPayload payload = payloadCreation.createPayload(gr, isDelete);
return new HoodieRecord<>(hoodieKey, payload);
}
/**
* Util to create hoodie pay load instance.
*/
private static class PayloadCreation implements Serializable {
private static final long serialVersionUID = 1L;
private final boolean shouldCombine;
private final Constructor<?> constructor;
private final String preCombineField;
private PayloadCreation(
boolean shouldCombine,
Constructor<?> constructor,
@Nullable String preCombineField) {
this.shouldCombine = shouldCombine;
this.constructor = constructor;
this.preCombineField = preCombineField;
}
public static PayloadCreation instance(Configuration conf) throws Exception {
boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
|| WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
String preCombineField = null;
final Class<?>[] argTypes;
final Constructor<?> constructor;
if (shouldCombine) {
preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
argTypes = new Class<?>[] {GenericRecord.class, Comparable.class};
} else {
argTypes = new Class<?>[] {Option.class};
}
final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS);
constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes);
return new PayloadCreation(shouldCombine, constructor, preCombineField);
}
public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean isDelete) throws Exception {
if (shouldCombine) {
ValidationUtils.checkState(preCombineField != null);
Comparable<?> orderingVal = (Comparable<?>) HoodieAvroUtils.getNestedFieldVal(record,
preCombineField, false);
return (HoodieRecordPayload<?>) constructor.newInstance(
isDelete ? null : record, orderingVal);
} else {
return (HoodieRecordPayload<?>) this.constructor.newInstance(Option.of(record));
}
}
}
}

View File

@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.utils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.BaseAvroPayload;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.Configuration;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.lang.reflect.Constructor;
/**
* Util to create hoodie pay load instance.
*/
public class PayloadCreation implements Serializable {
private static final long serialVersionUID = 1L;
private final boolean shouldCombine;
private final Constructor<?> constructor;
private final String preCombineField;
private PayloadCreation(
boolean shouldCombine,
Constructor<?> constructor,
@Nullable String preCombineField) {
this.shouldCombine = shouldCombine;
this.constructor = constructor;
this.preCombineField = preCombineField;
}
public static PayloadCreation instance(Configuration conf) throws Exception {
boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
|| WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
String preCombineField = null;
final Class<?>[] argTypes;
final Constructor<?> constructor;
if (shouldCombine) {
preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
argTypes = new Class<?>[] {GenericRecord.class, Comparable.class};
} else {
argTypes = new Class<?>[] {Option.class};
}
final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS);
constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes);
return new PayloadCreation(shouldCombine, constructor, preCombineField);
}
public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean isDelete) throws Exception {
if (shouldCombine) {
ValidationUtils.checkState(preCombineField != null);
Comparable<?> orderingVal = (Comparable<?>) HoodieAvroUtils.getNestedFieldVal(record,
preCombineField, false);
return (HoodieRecordPayload<?>) constructor.newInstance(
isDelete ? null : record, orderingVal);
} else {
return (HoodieRecordPayload<?>) this.constructor.newInstance(Option.of(record));
}
}
public HoodieRecordPayload<?> createDeletePayload(BaseAvroPayload payload) throws Exception {
if (shouldCombine) {
return (HoodieRecordPayload<?>) constructor.newInstance(null, payload.orderingVal);
} else {
return (HoodieRecordPayload<?>) this.constructor.newInstance(Option.empty());
}
}
}

View File

@@ -392,6 +392,56 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
}
@Test
void testWriteGlobalIndex() {
// the source generates 4 commits
String createSource = TestConfigurations.getFileSourceDDL(
"source", "test_source_4.data", 4);
streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(hoodieTableDDL);
final String insertInto2 = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto2);
List<Row> result = CollectionUtil.iterableToList(
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(result, "[id1,Phoebe,52,1970-01-01T00:00:08,par4]");
}
@Test
void testWriteLocalIndex() {
// the source generates 4 commits
String createSource = TestConfigurations.getFileSourceDDL(
"source", "test_source_4.data", 4);
streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "false");
options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(hoodieTableDDL);
final String insertInto2 = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto2);
List<Row> result = CollectionUtil.iterableToList(
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
final String expected = "["
+ "id1,Stephen,34,1970-01-01T00:00:02,par1, "
+ "id1,Fabian,32,1970-01-01T00:00:04,par2, "
+ "id1,Jane,19,1970-01-01T00:00:06,par3, "
+ "id1,Phoebe,52,1970-01-01T00:00:08,par4]";
assertRowsEquals(result, expected, 3);
}
@Test
void testStreamReadEmptyTablePath() throws Exception {
// create an empty table

View File

@@ -256,8 +256,20 @@ public class TestData {
* @param expected Expected string of the sorted rows
*/
public static void assertRowsEquals(List<Row> rows, String expected) {
assertRowsEquals(rows, expected, 0);
}
/**
* Sort the {@code rows} using field at index {@code orderingPos} and asserts
* it equals with the expected string {@code expected}.
*
* @param rows Actual result rows
* @param expected Expected string of the sorted rows
* @param orderingPos Field position for ordering
*/
public static void assertRowsEquals(List<Row> rows, String expected, int orderingPos) {
String rowsString = rows.stream()
.sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
.sorted(Comparator.comparing(o -> toStringSafely(o.getField(orderingPos))))
.collect(Collectors.toList()).toString();
assertThat(rowsString, is(expected));
}

View File

@@ -0,0 +1,8 @@
{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"}
{"uuid": "id1", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"}
{"uuid": "id1", "name": "Julian", "age": 54, "ts": "1970-01-01T00:00:03", "partition": "par2"}
{"uuid": "id1", "name": "Fabian", "age": 32, "ts": "1970-01-01T00:00:04", "partition": "par2"}
{"uuid": "id1", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", "partition": "par3"}
{"uuid": "id1", "name": "Jane", "age": 19, "ts": "1970-01-01T00:00:06", "partition": "par3"}
{"uuid": "id1", "name": "Ella", "age": 38, "ts": "1970-01-01T00:00:07", "partition": "par4"}
{"uuid": "id1", "name": "Phoebe", "age": 52, "ts": "1970-01-01T00:00:08", "partition": "par4"}