[HUDI-2847] Flink metadata table supports virtual keys (#4096)
This commit is contained in:
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
|||||||
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
||||||
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
|
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
@@ -38,12 +39,15 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||||
import org.apache.hudi.exception.HoodieUpsertException;
|
import org.apache.hudi.exception.HoodieUpsertException;
|
||||||
import org.apache.hudi.io.HoodieCreateHandle;
|
import org.apache.hudi.io.HoodieCreateHandle;
|
||||||
import org.apache.hudi.io.HoodieMergeHandle;
|
import org.apache.hudi.io.HoodieMergeHandle;
|
||||||
import org.apache.hudi.io.HoodieSortedMergeHandle;
|
import org.apache.hudi.io.HoodieSortedMergeHandle;
|
||||||
import org.apache.hudi.io.HoodieWriteHandle;
|
import org.apache.hudi.io.HoodieWriteHandle;
|
||||||
|
import org.apache.hudi.keygen.BaseKeyGenerator;
|
||||||
|
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
|
||||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||||
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
|
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
|
||||||
import org.apache.hudi.table.action.clean.CleanActionExecutor;
|
import org.apache.hudi.table.action.clean.CleanActionExecutor;
|
||||||
@@ -374,12 +378,21 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
|
|||||||
|
|
||||||
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
|
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
|
||||||
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
|
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
|
||||||
|
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
|
||||||
|
if (!config.populateMetaFields()) {
|
||||||
|
try {
|
||||||
|
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieAvroKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Only BaseKeyGenerator (or any key generator that extends from BaseKeyGenerator) are supported when meta "
|
||||||
|
+ "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (requireSortedRecords()) {
|
if (requireSortedRecords()) {
|
||||||
return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
|
return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
|
||||||
dataFileToBeMerged, taskContextSupplier, Option.empty());
|
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
|
||||||
} else {
|
} else {
|
||||||
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
|
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
|
||||||
dataFileToBeMerged, taskContextSupplier, Option.empty());
|
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -222,6 +222,32 @@ public class TestStreamWriteOperatorCoordinator {
|
|||||||
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(6L));
|
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(6L));
|
||||||
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001"));
|
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001"));
|
||||||
assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
|
assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
|
||||||
|
// write another 2 commits
|
||||||
|
for (int i = 6; i < 8; i++) {
|
||||||
|
instant = mockWriteWithMetadata();
|
||||||
|
metadataTableMetaClient.reloadActiveTimeline();
|
||||||
|
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
|
||||||
|
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(i + 1L));
|
||||||
|
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
|
||||||
|
}
|
||||||
|
|
||||||
|
// write another commit to trigger clean
|
||||||
|
instant = mockWriteWithMetadata();
|
||||||
|
metadataTableMetaClient.reloadActiveTimeline();
|
||||||
|
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
||||||
|
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(10L));
|
||||||
|
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "002"));
|
||||||
|
assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.CLEAN_ACTION));
|
||||||
|
|
||||||
|
// write another commit
|
||||||
|
mockWriteWithMetadata();
|
||||||
|
// write another commit to trigger compaction
|
||||||
|
instant = mockWriteWithMetadata();
|
||||||
|
metadataTableMetaClient.reloadActiveTimeline();
|
||||||
|
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
||||||
|
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(13L));
|
||||||
|
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001"));
|
||||||
|
assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user