1
0

[HUDI-1624] The state based index should bootstrap from existing base files (#2581)

This commit is contained in:
Danny Chan
2021-02-23 13:37:44 +08:00
committed by GitHub
parent 43a0776c7c
commit 3ceb1b4c83
7 changed files with 261 additions and 23 deletions

View File

@@ -27,7 +27,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.HoodieTable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static java.util.stream.Collectors.toList;
@@ -37,6 +37,27 @@ import static java.util.stream.Collectors.toList;
*/
public class HoodieIndexUtils {
/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
*
* @param partition Partition of interest
* @param context Instance of {@link HoodieEngineContext} to use
* @param hoodieTable Instance of {@link HoodieTable} of interest
* @return the list of {@link HoodieBaseFile}
*/
public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
final String partition,
final HoodieTable hoodieTable) {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
if (latestCommitTime.isPresent()) {
return hoodieTable.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partition, latestCommitTime.get().getTimestamp())
.collect(toList());
}
return Collections.emptyList();
}
/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
*
@@ -50,15 +71,11 @@ public class HoodieIndexUtils {
final HoodieTable hoodieTable) {
context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions");
return context.flatMap(partitions, partitionPath -> {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
List<Pair<String, HoodieBaseFile>> filteredFiles = new ArrayList<>();
if (latestCommitTime.isPresent()) {
filteredFiles = hoodieTable.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
.map(f -> Pair.of(partitionPath, f))
.collect(toList());
}
List<Pair<String, HoodieBaseFile>> filteredFiles =
getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream()
.map(baseFile -> Pair.of(partitionPath, baseFile))
.collect(toList());
return filteredFiles.stream();
}, Math.max(partitions.size(), 1));
}

View File

@@ -46,7 +46,6 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.codehaus.jackson.node.NullNode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -235,9 +234,9 @@ public class HoodieAvroUtils {
Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", false);
Schema.Field recordKeyField =
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
Schema.Field partitionPathField =
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
toBeAddedFields.add(recordKeyField);
toBeAddedFields.add(partitionPathField);

View File

@@ -21,25 +21,41 @@ package org.apache.hudi.operator.partitioner;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
/**
* The function to build the write profile incrementally for records within a checkpoint,
@@ -48,8 +64,8 @@ import org.apache.flink.util.Collector;
* <p>All the records are tagged with HoodieRecordLocation, instead of real instant time,
* INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep
* the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides
* where the record should write to. The "I" and "U" tag is only used for downstream to decide whether
* the data bucket is a INSERT or a UPSERT, we should factor the it out when the underneath writer
* where the record should write to. The "I" and "U" tags are only used for downstream to decide whether
* the data bucket is an INSERT or an UPSERT, we should factor the tags out when the underneath writer
* supports specifying the bucket type explicitly.
*
* <p>The output records should then shuffle by the bucket ID and thus do scalable write.
@@ -60,14 +76,51 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
extends KeyedProcessFunction<K, I, O>
implements CheckpointedFunction, CheckpointListener {
private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class);
private HoodieFlinkEngineContext context;
/**
* Index cache(speed-up) state for the underneath file based(BloomFilter) indices.
* When a record came in, we do these check:
*
* <ul>
* <li>Try to load all the records in the partition path where the record belongs to</li>
* <li>Checks whether the state contains the record key</li>
* <li>If it does, tag the record with the location</li>
* <li>If it does not, use the {@link BucketAssigner} to generate a new bucket ID</li>
* </ul>
*/
private MapState<HoodieKey, HoodieRecordLocation> indexState;
/**
* Bucket assigner to assign new bucket IDs or reuse existing ones.
*/
private BucketAssigner bucketAssigner;
private final Configuration conf;
private transient org.apache.hadoop.conf.Configuration hadoopConf;
private final boolean isChangingRecords;
/**
* All the partition paths when the task starts. It is used to help checking whether all the partitions
* are loaded into the state.
*/
private transient List<String> initialPartitionsToLoad;
/**
* State to book-keep which partition is loaded into the index state {@code indexState}.
*/
private MapState<String, Integer> partitionLoadState;
/**
* Whether all partitions are loaded, if it is true,
* we can only check the state for locations.
*/
private boolean allPartitionsLoaded = false;
public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
@@ -78,13 +131,20 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = new BucketAssigner(
context,
writeConfig);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.context = new HoodieFlinkEngineContext(
new SerializableConfiguration(this.hadoopConf),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = new BucketAssigner(context, writeConfig);
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
this.conf.getString(FlinkOptions.PATH), false, false, false);
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
// reference: org.apache.flink.streaming.api.datastream.KeyedStream
this.initialPartitionsToLoad = allPartitionPaths.stream()
.filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID)
.collect(Collectors.toList());
}
@Override
@@ -100,6 +160,12 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
TypeInformation.of(HoodieKey.class),
TypeInformation.of(HoodieRecordLocation.class));
indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
MapStateDescriptor<String, Integer> partitionLoadStateDesc =
new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
if (context.isRestored()) {
checkPartitionsLoaded();
}
}
@SuppressWarnings("unchecked")
@@ -112,6 +178,10 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
// If the partition records are never loaded, load the records first.
loadRecords(hoodieKey.getPartitionPath());
}
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
if (isChangingRecords && this.indexState.contains(hoodieKey)) {
@@ -146,5 +216,69 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
// Refresh the table state when there are new commits.
this.bucketAssigner.reset();
this.bucketAssigner.refreshTable();
checkPartitionsLoaded();
}
/**
* Load all the indices of give partition path into the backup state.
*
* @param partitionPath The partition path
* @throws Exception when error occurs for state update
*/
private void loadRecords(String partitionPath) throws Exception {
HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
List<HoodieBaseFile> latestBaseFiles =
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable);
for (HoodieBaseFile baseFile : latestBaseFiles) {
List<HoodieKey> hoodieKeys =
ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
hoodieKeys.forEach(hoodieKey -> {
try {
this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
} catch (Exception e) {
throw new HoodieIOException("Error when load record keys from file: " + baseFile);
}
});
}
// Mark the partition path as loaded.
partitionLoadState.put(partitionPath, 0);
}
/**
* Checks whether all the partitions of the table are loaded into the state,
* set the flag {@code allPartitionsLoaded} to true if it is.
*/
private void checkPartitionsLoaded() {
for (String partition : this.initialPartitionsToLoad) {
try {
if (!this.partitionLoadState.contains(partition)) {
return;
}
} catch (Exception e) {
LOG.warn("Error when check whether all partitions are loaded, ignored", e);
throw new HoodieException(e);
}
}
this.allPartitionsLoaded = true;
}
@VisibleForTesting
public boolean isAllPartitionsLoaded() {
return this.allPartitionsLoaded;
}
@VisibleForTesting
public void clearIndexState() {
this.allPartitionsLoaded = false;
this.indexState.clear();
}
@VisibleForTesting
public boolean isKeyInState(HoodieKey hoodieKey) {
try {
return this.indexState.contains(hoodieKey);
} catch (Exception e) {
throw new HoodieException(e);
}
}
}

View File

@@ -207,6 +207,10 @@ public class BucketAssigner {
this.table = HoodieFlinkTable.create(this.config, this.context);
}
public HoodieTable<?, ?, ?, ?> getTable() {
return table;
}
/**
* Returns a list of small files in the given partition path.
*/

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.operator;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.exception.HoodieException;
@@ -48,6 +49,7 @@ import static org.apache.hudi.operator.utils.TestData.checkWrittenData;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -382,6 +384,68 @@ public class StreamWriteFunctionTest {
checkWrittenData(tempFile, expected, 1);
}
@Test
public void testIndexStateBootstrap() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_ONE) {
funcWrapper.invoke(rowData);
}
assertEmptyDataFiles();
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
funcWrapper.checkpointComplete(1);
// Mark the index state as not fully loaded to trigger re-load from the filesystem.
funcWrapper.clearIndexState();
// upsert another data buffer
for (RowData rowData : TestData.DATA_SET_TWO) {
funcWrapper.invoke(rowData);
}
checkIndexLoaded(
new HoodieKey("id1", "par1"),
new HoodieKey("id2", "par1"),
new HoodieKey("id3", "par2"),
new HoodieKey("id4", "par2"),
new HoodieKey("id5", "par3"),
new HoodieKey("id6", "par3"),
new HoodieKey("id7", "par4"),
new HoodieKey("id8", "par4"));
// the data is not flushed yet
checkWrittenData(tempFile, EXPECTED1);
// this triggers the data write and event send
funcWrapper.checkpointFunction(2);
String instant = funcWrapper.getWriteClient()
.getInflightAndRequestedInstant("COPY_ON_WRITE");
nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
checkWrittenData(tempFile, EXPECTED2);
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
assertFalse(funcWrapper.isAllPartitionsLoaded(),
"All partitions assume to be loaded into the index state");
funcWrapper.checkpointComplete(2);
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED2);
assertTrue(funcWrapper.isAllPartitionsLoaded(),
"All partitions assume to be loaded into the index state");
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@@ -419,4 +483,11 @@ public class StreamWriteFunctionTest {
assertNotNull(dataFiles);
assertThat(dataFiles.length, is(0));
}
private void checkIndexLoaded(HoodieKey... keys) {
for (HoodieKey key : keys) {
assertTrue(funcWrapper.isKeyInState(key),
"Key: " + key + " assumes to be in the index state");
}
}
}

View File

@@ -33,7 +33,7 @@ public class MockFunctionInitializationContext implements FunctionInitialization
@Override
public boolean isRestored() {
throw new UnsupportedOperationException();
return false;
}
@Override

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.operator.utils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.operator.StreamWriteFunction;
import org.apache.hudi.operator.StreamWriteOperatorCoordinator;
@@ -162,4 +163,16 @@ public class StreamWriteFunctionWrapper<I> {
public StreamWriteOperatorCoordinator getCoordinator() {
return coordinator;
}
public void clearIndexState() {
this.bucketAssignerFunction.clearIndexState();
}
public boolean isKeyInState(HoodieKey hoodieKey) {
return this.bucketAssignerFunction.isKeyInState(hoodieKey);
}
public boolean isAllPartitionsLoaded() {
return this.bucketAssignerFunction.isAllPartitionsLoaded();
}
}