diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 1315c9940..a33383a05 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -196,7 +196,7 @@ public class HoodieAppendHandle extends Option> recordMetadata = hoodieRecord.getData().getMetadata(); try { // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge - // Whether it is a update or insert record. + // Whether it is an update or insert record. boolean isUpdateRecord = isUpdateRecord(hoodieRecord); // If the format can not record the operation field, nullify the DELETE payload manually. boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField(); @@ -219,7 +219,7 @@ public class HoodieAppendHandle extends if (config.allowOperationMetadataField()) { HoodieAvroUtils.addOperationToRecord(rewriteRecord, hoodieRecord.getOperation()); } - if (isUpdateRecord(hoodieRecord)) { + if (isUpdateRecord) { updatedRecordsWritten++; } else { insertRecordsWritten++; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index e95b0f823..33878eb15 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -35,7 +35,6 @@ import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; @@ -70,7 +69,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.ParseException; -import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -506,45 +504,9 @@ public class HoodieFlinkWriteClient extends } else { writeTimer = metrics.getDeltaCommitCtx(); } - table.getHoodieView().sync(); return table; } - public String getLastPendingInstant(HoodieTableType tableType) { - final String actionType = CommitUtils.getCommitActionType(tableType); - return getLastPendingInstant(actionType); - } - - public String getLastPendingInstant(String actionType) { - HoodieTimeline unCompletedTimeline = FlinkClientUtil.createMetaClient(basePath) - .getCommitsTimeline().filterInflightsAndRequested(); - return unCompletedTimeline.getInstants() - .filter(x -> x.getAction().equals(actionType) && x.isInflight()) - .map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()).stream() - .max(Comparator.naturalOrder()) - .orElse(null); - } - - public String getLastCompletedInstant(HoodieTableType tableType) { - final String commitType = CommitUtils.getCommitActionType(tableType); - HoodieTimeline completedTimeline = FlinkClientUtil.createMetaClient(basePath) - .getCommitsTimeline().filterCompletedInstants(); - return completedTimeline.getInstants() - .filter(x -> x.getAction().equals(commitType)) - .map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()).stream() - .max(Comparator.naturalOrder()) - .orElse(null); - } - - public void transitionRequestedToInflight(String commitType, String inFlightInstant) { - HoodieActiveTimeline activeTimeline = FlinkClientUtil.createMetaClient(basePath).getActiveTimeline(); - HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant); - activeTimeline.transitionRequestedToInflight(requested, Option.empty(), - config.shouldAllowMultiWriteOnSameInstant()); - } - public HoodieFlinkTable getHoodieTable() { return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 634eabaf9..c19c6fa45 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -85,7 +85,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) { // if this is a new commit being applied to metadata for the first time writeClient.startCommitWithTime(instantTime); - writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); + metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); } else { // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable. // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index ce63a2dd0..d7eed45df 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -63,14 +63,28 @@ public abstract class HoodieFlinkTable public static HoodieFlinkTable create(HoodieWriteConfig config, HoodieFlinkEngineContext context, HoodieTableMetaClient metaClient) { + return HoodieFlinkTable.create(config, context, metaClient, config.isMetadataTableEnabled()); + } + + public static HoodieFlinkTable create(HoodieWriteConfig config, + HoodieFlinkEngineContext context, + HoodieTableMetaClient metaClient, + boolean refreshTimeline) { + final HoodieFlinkTable hoodieFlinkTable; switch (metaClient.getTableType()) { case COPY_ON_WRITE: - return new HoodieFlinkCopyOnWriteTable<>(config, context, metaClient); + hoodieFlinkTable = new HoodieFlinkCopyOnWriteTable<>(config, context, metaClient); + break; case MERGE_ON_READ: - return new HoodieFlinkMergeOnReadTable<>(config, context, metaClient); + hoodieFlinkTable = new HoodieFlinkMergeOnReadTable<>(config, context, metaClient); + break; default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } + if (refreshTimeline) { + hoodieFlinkTable.getHoodieView().sync(); + } + return hoodieFlinkTable; } @Override diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java index 91c5cbd26..a41e8c835 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -101,7 +101,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness { public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); FlinkHoodieBloomIndex index = new FlinkHoodieBloomIndex(config); - HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient); + HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient, false); HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA); // Create some partitions, and put some files diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index eca25e1e3..e586815d3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -505,6 +505,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } } + public void transitionRequestedToInflight(String commitType, String inFlightInstant) { + HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant); + transitionRequestedToInflight(requested, Option.empty(), false); + } + public void transitionRequestedToInflight(HoodieInstant requested, Option content) { transitionRequestedToInflight(requested, content, false); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index d510de2b1..0e7e35e7e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -139,7 +139,7 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { public void close() { if (this.writeClient != null) { this.writeClient.cleanHandlesGracefully(); - // this.writeClient.close(); + this.writeClient.close(); } } @@ -155,7 +155,6 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { // ------------------------------------------------------------------------- // Getter/Setter // ------------------------------------------------------------------------- - @VisibleForTesting @SuppressWarnings("rawtypes") public Map> getDataBuffer() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index a7faeca5c..8c400b3bd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CommitUtils; @@ -41,6 +42,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +59,6 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; @@ -95,6 +96,11 @@ public class StreamWriteOperatorCoordinator */ private transient HoodieFlinkWriteClient writeClient; + /** + * Meta client. + */ + private transient HoodieTableMetaClient metaClient; + /** * Current REQUESTED instant, for validation. */ @@ -153,10 +159,11 @@ public class StreamWriteOperatorCoordinator // initialize event buffer reset(); this.gateways = new SubtaskGateway[this.parallelism]; + // init table, create if not exists. + this.metaClient = initTableIfNotExists(this.conf); + // the write client must create after the table creation this.writeClient = StreamerUtil.createWriteClient(conf); this.tableState = TableState.create(conf); - // init table, create it if not exists. - initTableIfNotExists(this.conf); // start the executor this.executor = new CoordinatorExecutor(this.context, LOG); // start the executor if required @@ -171,15 +178,17 @@ public class StreamWriteOperatorCoordinator @Override public void close() throws Exception { // teardown the resource - if (writeClient != null) { - writeClient.close(); - } if (executor != null) { executor.close(); } if (hiveSyncExecutor != null) { hiveSyncExecutor.close(); } + // the write client must close after the executor service + // because the task in the service may send requests to the embedded timeline service. + if (writeClient != null) { + writeClient.close(); + } this.eventBuffer = null; } @@ -225,6 +234,14 @@ public class StreamWriteOperatorCoordinator ); } + @Override + public void notifyCheckpointAborted(long checkpointId) { + // once the checkpoint was aborted, unblock the writer tasks to + // reuse the last instant. + executor.execute(this::sendCommitAckEvents, + "unblock data write with aborted checkpoint %s", checkpointId); + } + @Override public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { // no operation @@ -316,7 +333,7 @@ public class StreamWriteOperatorCoordinator final String instant = HoodieActiveTimeline.createNewInstantTime(); this.writeClient.startCommitWithTime(instant, tableState.commitAction); this.instant = instant; - this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant); + this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); this.writeClient.upgradeDowngrade(this.instant); LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); @@ -382,16 +399,28 @@ public class StreamWriteOperatorCoordinator * sends the commit ack events to unblock the flushing. */ private void sendCommitAckEvents() { - CompletableFuture[] futures = IntStream.range(0, this.parallelism) - .mapToObj(taskID -> this.gateways[taskID].sendEvent(CommitAckEvent.getInstance())) + CompletableFuture[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull) + .map(gw -> gw.sendEvent(CommitAckEvent.getInstance())) .toArray(CompletableFuture[]::new); try { CompletableFuture.allOf(futures).get(); - } catch (Exception e) { - throw new HoodieException("Error while waiting for the commit ack events to finish sending", e); + } catch (Throwable throwable) { + if (!sendToFinishedTasks(throwable)) { + throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable); + } } } + /** + * Decides whether the given exception is caused by sending events to FINISHED tasks. + * + *

Ugly impl: the exception may change in the future. + */ + private static boolean sendToFinishedTasks(Throwable throwable) { + return throwable.getCause() instanceof TaskNotRunningException + || throwable.getCause().getMessage().contains("running"); + } + /** * Commits the instant. * @@ -474,12 +503,6 @@ public class StreamWriteOperatorCoordinator return instant; } - @VisibleForTesting - @SuppressWarnings("rawtypes") - public HoodieFlinkWriteClient getWriteClient() { - return writeClient; - } - @VisibleForTesting public Context getContext() { return context; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java index 0c295781f..090ed29b8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java @@ -82,20 +82,11 @@ public class AppendWriteFunction extends AbstractStreamWriteFunction { this.writerHelper.write((RowData) value); } - @Override - public void close() { - if (this.writeClient != null) { - this.writeClient.cleanHandlesGracefully(); - this.writeClient.close(); - } - } - /** * End input action for batch source. */ public void endInput() { flushData(true); - this.writeClient.cleanHandles(); this.writeStatuses.clear(); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 0e7bb5472..81ab836bb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -19,7 +19,6 @@ package org.apache.hudi.sink.bootstrap; import org.apache.hudi.client.FlinkTaskContextSupplier; -import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; @@ -27,14 +26,12 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.BaseFileUtils; -import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -92,8 +89,6 @@ public class BootstrapOperator private transient ListState instantState; private final Pattern pattern; private String lastInstantTime; - private HoodieFlinkWriteClient writeClient; - private String actionType; public BootstrapOperator(Configuration conf) { this.conf = conf; @@ -102,7 +97,8 @@ public class BootstrapOperator @Override public void snapshotState(StateSnapshotContext context) throws Exception { - lastInstantTime = this.writeClient.getLastPendingInstant(this.actionType); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(this.conf); + lastInstantTime = StreamerUtil.getLastPendingInstant(metaClient); instantState.update(Collections.singletonList(lastInstantTime)); } @@ -122,12 +118,8 @@ public class BootstrapOperator } this.hadoopConf = StreamerUtil.getHadoopConf(); - this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); + this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); this.hoodieTable = getTable(); - this.writeClient = StreamerUtil.createWriteClient(this.conf, getRuntimeContext()); - this.actionType = CommitUtils.getCommitActionType( - WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)), - HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE))); preLoadIndexRecords(); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index 7fce5c0a3..f3cfbae66 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -20,9 +20,8 @@ package org.apache.hudi.sink.bulk; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.common.AbstractWriteFunction; @@ -80,6 +79,11 @@ public class BulkInsertWriteFunction */ private int taskID; + /** + * Meta Client. + */ + private transient HoodieTableMetaClient metaClient; + /** * Write Client. */ @@ -95,11 +99,6 @@ public class BulkInsertWriteFunction */ private transient OperatorEventGateway eventGateway; - /** - * Commit action type. - */ - private transient String actionType; - /** * Constructs a StreamingSinkFunction. * @@ -113,12 +112,9 @@ public class BulkInsertWriteFunction @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.metaClient = StreamerUtil.createMetaClient(this.config); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); - this.actionType = CommitUtils.getCommitActionType( - WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), - HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); - - this.initInstant = this.writeClient.getLastPendingInstant(this.actionType); + this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, false); sendBootstrapEvent(); initWriterHelper(); } @@ -188,12 +184,13 @@ public class BulkInsertWriteFunction } private String instantToWrite() { - String instant = this.writeClient.getLastPendingInstant(this.actionType); + String instant = StreamerUtil.getLastPendingInstant(this.metaClient); // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. TimeWait timeWait = TimeWait.builder() .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)) .action("instant initialize") + .throwsT(true) .build(); while (instant == null || instant.equals(this.initInstant)) { // wait condition: @@ -202,7 +199,7 @@ public class BulkInsertWriteFunction // sleep for a while timeWait.waitFor(); // refresh the inflight instant - instant = this.writeClient.getLastPendingInstant(this.actionType); + instant = StreamerUtil.getLastPendingInstant(this.metaClient); } return instant; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 654f0b864..dd63d929a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -20,9 +20,7 @@ package org.apache.hudi.sink.common; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; @@ -70,6 +68,11 @@ public abstract class AbstractStreamWriteFunction */ protected int taskID; + /** + * Meta Client. + */ + protected transient HoodieTableMetaClient metaClient; + /** * Write Client. */ @@ -85,11 +88,6 @@ public abstract class AbstractStreamWriteFunction */ protected transient OperatorEventGateway eventGateway; - /** - * Commit action type. - */ - protected transient String actionType; - /** * Flag saying whether the write task is waiting for the checkpoint success notification * after it finished a checkpoint. @@ -128,11 +126,8 @@ public abstract class AbstractStreamWriteFunction @Override public void initializeState(FunctionInitializationContext context) throws Exception { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.metaClient = StreamerUtil.createMetaClient(this.config); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); - this.actionType = CommitUtils.getCommitActionType( - WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), - HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); - this.writeStatuses = new ArrayList<>(); this.writeMetadataState = context.getOperatorStateStore().getListState( new ListStateDescriptor<>( @@ -140,7 +135,7 @@ public abstract class AbstractStreamWriteFunction TypeInformation.of(WriteMetadataEvent.class) )); - this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); + this.currentInstant = lastPendingInstant(); if (context.isRestored()) { restoreWriteMetadata(); } else { @@ -162,12 +157,6 @@ public abstract class AbstractStreamWriteFunction // ------------------------------------------------------------------------- // Getter/Setter // ------------------------------------------------------------------------- - @VisibleForTesting - @SuppressWarnings("rawtypes") - public HoodieFlinkWriteClient getWriteClient() { - return writeClient; - } - @VisibleForTesting public boolean isConfirming() { return this.confirming; @@ -182,7 +171,7 @@ public abstract class AbstractStreamWriteFunction // ------------------------------------------------------------------------- private void restoreWriteMetadata() throws Exception { - String lastInflight = this.writeClient.getLastPendingInstant(this.actionType); + String lastInflight = lastPendingInstant(); boolean eventSent = false; for (WriteMetadataEvent event : this.writeMetadataState.get()) { if (Objects.equals(lastInflight, event.getInstantTime())) { @@ -224,6 +213,13 @@ public abstract class AbstractStreamWriteFunction this.confirming = false; } + /** + * Returns the last pending instant time. + */ + protected String lastPendingInstant() { + return StreamerUtil.getLastPendingInstant(this.metaClient); + } + /** * Prepares the instant time to write with for next checkpoint. * @@ -231,7 +227,7 @@ public abstract class AbstractStreamWriteFunction * @return The instant time */ protected String instantToWrite(boolean hasData) { - String instant = this.writeClient.getLastPendingInstant(this.actionType); + String instant = lastPendingInstant(); // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. TimeWait timeWait = TimeWait.builder() @@ -244,11 +240,18 @@ public abstract class AbstractStreamWriteFunction // 2. the inflight instant does not change and the checkpoint has buffering data if (instant == null || (instant.equals(this.currentInstant) && hasData)) { // sleep for a while - timeWait.waitFor(); - // refresh the inflight instant - instant = this.writeClient.getLastPendingInstant(this.actionType); + boolean timeout = timeWait.waitFor(); + if (timeout && instant != null) { + // if the timeout threshold hits but the last instant still not commit, + // and the task does not receive commit ask event(no data or aborted checkpoint), + // assumes the checkpoint was canceled silently and unblock the data flushing + confirming = false; + } else { + // refresh the inflight instant + instant = lastPendingInstant(); + } } else { - // the inflight instant changed, which means the last instant was committed + // the pending instant changed, that means the last instant was committed // successfully. confirming = false; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index b23168e2d..d904b6da9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -69,7 +69,7 @@ public class HoodieFlinkCompactor { // infer changelog mode CompactionUtil.inferChangelogMode(conf, metaClient); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); HoodieFlinkTable table = writeClient.getHoodieTable(); // judge whether have operation @@ -151,5 +151,6 @@ public class HoodieFlinkCompactor { .setParallelism(1); env.execute("flink_hudi_compaction"); + writeClient.close(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 15dbae6df..cff24d97f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -116,7 +116,7 @@ public class BucketAssignFunction> @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(StreamerUtil.getHadoopConf()), new FlinkTaskContextSupplier(getRuntimeContext())); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index 43168aef0..f9d5b1c1f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -94,6 +94,11 @@ public class BucketAssigner implements AutoCloseable { */ private final Map newFileAssignStates; + /** + * Num of accumulated successful checkpoints, used for cleaning the new file assign state. + */ + private int accCkp = 0; + public BucketAssigner( int taskID, int maxParallelism, @@ -117,7 +122,6 @@ public class BucketAssigner implements AutoCloseable { */ public void reset() { bucketInfoMap.clear(); - newFileAssignStates.clear(); } public BucketInfo addUpdate(String partitionPath, String fileIdHint) { @@ -136,16 +140,7 @@ public class BucketAssigner implements AutoCloseable { // first try packing this into one of the smallFiles if (smallFileAssign != null && smallFileAssign.assign()) { - final String key = StreamerUtil.generateBucketKey(partitionPath, smallFileAssign.getFileId()); - // create a new bucket or reuse an existing bucket - BucketInfo bucketInfo; - if (bucketInfoMap.containsKey(key)) { - // Assigns an inserts to existing update bucket - bucketInfo = bucketInfoMap.get(key); - } else { - bucketInfo = addUpdate(partitionPath, smallFileAssign.getFileId()); - } - return bucketInfo; + return new BucketInfo(BucketType.UPDATE, smallFileAssign.getFileId(), partitionPath); } // if we have anything more, create new insert buckets, like normal @@ -154,7 +149,20 @@ public class BucketAssigner implements AutoCloseable { if (newFileAssignState.canAssign()) { newFileAssignState.assign(); final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId); - return bucketInfoMap.get(key); + if (bucketInfoMap.containsKey(key)) { + // the newFileAssignStates is cleaned asynchronously when received the checkpoint success notification, + // the records processed within the time range: + // (start checkpoint, checkpoint success(and instant committed)) + // should still be assigned to the small buckets of last checkpoint instead of new one. + + // the bucketInfoMap is cleaned when checkpoint starts. + + // A promotion: when the HoodieRecord can record whether it is an UPDATE or INSERT, + // we can always return an UPDATE BucketInfo here, and there is no need to record the + // UPDATE bucket through calling #addUpdate. + return bucketInfoMap.get(key); + } + return new BucketInfo(BucketType.UPDATE, newFileAssignState.fileId, partitionPath); } } BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, createFileIdOfThisTask(), partitionPath); @@ -166,7 +174,7 @@ public class BucketAssigner implements AutoCloseable { return bucketInfo; } - private SmallFileAssign getSmallFileAssign(String partitionPath) { + private synchronized SmallFileAssign getSmallFileAssign(String partitionPath) { if (smallFileAssignMap.containsKey(partitionPath)) { return smallFileAssignMap.get(partitionPath); } @@ -186,7 +194,19 @@ public class BucketAssigner implements AutoCloseable { /** * Refresh the table state like TableFileSystemView and HoodieTimeline. */ - public void reload(long checkpointId) { + public synchronized void reload(long checkpointId) { + this.accCkp += 1; + if (this.accCkp > 1) { + // do not clean the new file assignment state for the first checkpoint, + // this #reload calling is triggered by checkpoint success event, the coordinator + // also relies on the checkpoint success event to commit the inflight instant, + // and very possibly this component receives the notification before the coordinator, + // if we do the cleaning, the records processed within the time range: + // (start checkpoint, checkpoint success(and instant committed)) + // would be assigned to a fresh new data bucket which is not the right behavior. + this.newFileAssignStates.clear(); + this.accCkp = 0; + } this.smallFileAssignMap.clear(); this.writeProfile.reload(checkpointId); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index 6b5e96eb8..922c056d2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.AbstractTableFileSystemView; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.commit.SmallFile; @@ -55,7 +56,7 @@ public class DeltaWriteProfile extends WriteProfile { if (!commitTimeline.empty()) { HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); // initialize the filesystem view based on the commit metadata - initFSViewIfNecessary(commitTimeline); + initFileSystemView(); // find smallest file in partition and append to it List allSmallFileSlices = new ArrayList<>(); // If we can index log files, we can add more inserts to log files for fileIds including those under @@ -90,6 +91,10 @@ public class DeltaWriteProfile extends WriteProfile { return smallFileLocations; } + protected AbstractTableFileSystemView getFileSystemView() { + return (AbstractTableFileSystemView) this.table.getSliceView(); + } + private long getTotalFileSize(FileSlice fileSlice) { if (!fileSlice.getBaseFile().isPresent()) { return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList())); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 441125b7d..d3de247ce 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.table.view.AbstractTableFileSystemView; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.partitioner.BucketAssigner; import org.apache.hudi.table.HoodieFlinkTable; @@ -36,7 +36,6 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +44,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -98,7 +96,7 @@ public class WriteProfile { /** * The file system view cache for one checkpoint interval. */ - protected HoodieTableFileSystemView fsView; + protected AbstractTableFileSystemView fsView; /** * Hadoop configuration. @@ -194,7 +192,7 @@ public class WriteProfile { if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); // initialize the filesystem view based on the commit metadata - initFSViewIfNecessary(commitTimeline); + initFileSystemView(); List allFiles = fsView .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); @@ -214,22 +212,16 @@ public class WriteProfile { } @VisibleForTesting - public void initFSViewIfNecessary(HoodieTimeline commitTimeline) { + public void initFileSystemView() { if (fsView == null) { - cleanMetadataCache(commitTimeline.getInstants()); - List metadataList = commitTimeline.getInstants() - .map(instant -> - this.metadataCache.computeIfAbsent( - instant.getTimestamp(), - k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline) - .orElse(null))) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList, table.getMetaClient().getTableType()); - fsView = new HoodieTableFileSystemView(table.getMetaClient(), commitTimeline, commitFiles); + fsView = getFileSystemView(); } } + protected AbstractTableFileSystemView getFileSystemView() { + return (AbstractTableFileSystemView) this.table.getBaseFileOnlyView(); + } + /** * Remove the overdue metadata from the cache * whose instant does not belong to the given instants {@code instants}. @@ -261,8 +253,10 @@ public class WriteProfile { return; } this.table.getMetaClient().reloadActiveTimeline(); + this.table.getHoodieView().sync(); recordProfile(); this.fsView = null; + cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants()); this.smallFilesMap.clear(); this.reloadedCheckpointId = checkpointId; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java index 2ab0819ab..005b084bf 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java @@ -20,6 +20,9 @@ package org.apache.hudi.sink.utils; import org.apache.hudi.exception.HoodieException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -27,15 +30,19 @@ import java.util.concurrent.TimeUnit; * Tool used for time waiting. */ public class TimeWait { - private final long timeout; // timeout in SECONDS - private final long interval; // interval in MILLISECONDS - private final String action; // action to report error message + private static final Logger LOG = LoggerFactory.getLogger(TimeWait.class); + + private final long timeout; // timeout in SECONDS + private final long interval; // interval in MILLISECONDS + private final String action; // action to report error message + private final boolean throwsE; // whether to throw when timeout private long waitingTime = 0L; - private TimeWait(long timeout, long interval, String action) { + private TimeWait(long timeout, long interval, String action, boolean throwsE) { this.timeout = timeout; this.interval = interval; this.action = action; + this.throwsE = throwsE; } public static Builder builder() { @@ -44,14 +51,23 @@ public class TimeWait { /** * Wait for an interval time. + * + * @return true if is timed out */ - public void waitFor() { + public boolean waitFor() { try { if (waitingTime > timeout) { - throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for " + action); + final String msg = "Timeout(" + waitingTime + "ms) while waiting for " + action; + if (this.throwsE) { + throw new HoodieException(msg); + } else { + LOG.warn(msg); + return true; + } } TimeUnit.MILLISECONDS.sleep(interval); waitingTime += interval; + return false; } catch (InterruptedException e) { throw new HoodieException("Error while waiting for " + action, e); } @@ -61,17 +77,18 @@ public class TimeWait { * Builder. */ public static class Builder { - private long timeout; - private long interval; + private long timeout = 5 * 60 * 1000L; // default 5 minutes + private long interval = 1000; private String action; + private boolean throwsT = false; - public Builder() { - this.timeout = 3600; - this.interval = 500; + private Builder() { } public Builder timeout(long timeout) { - this.timeout = timeout; + if (timeout > 0) { + this.timeout = timeout; + } return this; } @@ -85,9 +102,14 @@ public class TimeWait { return this; } + public Builder throwsT(boolean throwsT) { + this.throwsT = throwsT; + return this; + } + public TimeWait build() { Objects.requireNonNull(this.action); - return new TimeWait(this.timeout, this.interval, this.action); + return new TimeWait(this.timeout, this.interval, this.action, this.throwsT); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index c6432e5b5..d43dfd090 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -63,9 +63,9 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, return (DataStreamSinkProvider) dataStream -> { // setup configuration - long ckpTimeout = dataStream.getExecutionEnvironment() - .getCheckpointConfig().getCheckpointTimeout(); - conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); + long ckpInterval = dataStream.getExecutionEnvironment() + .getCheckpointConfig().getCheckpointInterval(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpInterval * 5); // five checkpoints interval RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 7e7bfaa3d..b77415a39 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -32,6 +32,8 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -149,7 +151,21 @@ public class StreamerUtil { return FlinkClientUtil.getHadoopConf(); } + /** + * Mainly used for tests. + */ public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { + return getHoodieClientConfig(conf, false, false); + } + + public static HoodieWriteConfig getHoodieClientConfig(Configuration conf, boolean loadFsViewStorageConfig) { + return getHoodieClientConfig(conf, false, loadFsViewStorageConfig); + } + + public static HoodieWriteConfig getHoodieClientConfig( + Configuration conf, + boolean enableEmbeddedTimelineService, + boolean loadFsViewStorageConfig) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() .withEngineType(EngineType.FLINK) @@ -194,13 +210,20 @@ public class StreamerUtil { .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .build()) + .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) - .withProps(flinkConf2TypedProperties(conf)); + .withProps(flinkConf2TypedProperties(conf)) + .withSchema(getSourceSchema(conf).toString()); - builder = builder.withSchema(getSourceSchema(conf).toString()); - return builder.build(); + HoodieWriteConfig writeConfig = builder.build(); + if (loadFsViewStorageConfig) { + // do not use the builder to give a change for recovering the original fs view storage config + FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH)); + writeConfig.setViewStorageConfig(viewStorageConfig); + } + return writeConfig; } /** @@ -235,11 +258,11 @@ public class StreamerUtil { * @param conf the configuration * @throws IOException if errors happens when writing metadata */ - public static void initTableIfNotExists(Configuration conf) throws IOException { + public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException { final String basePath = conf.getString(FlinkOptions.PATH); final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); if (!tableExists(basePath, hadoopConf)) { - HoodieTableMetaClient.withPropertyBuilder() + HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(conf.getString(FlinkOptions.TABLE_TYPE)) .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)) @@ -250,9 +273,11 @@ public class StreamerUtil { .setTimelineLayoutVersion(1) .initTable(hadoopConf, basePath); LOG.info("Table initialized under base path {}", basePath); + return metaClient; } else { LOG.info("Table [{}/{}] already exists, no need to initialize the table", basePath, conf.getString(FlinkOptions.TABLE_NAME)); + return StreamerUtil.createMetaClient(basePath, hadoopConf); } // Do not close the filesystem in order to use the CACHE, // some filesystems release the handles in #close method. @@ -305,7 +330,7 @@ public class StreamerUtil { /** * Creates the meta client for reader. * - *

The streaming pipeline process is long running, so empty table path is allowed, + *

The streaming pipeline process is long-running, so empty table path is allowed, * the reader would then check and refresh the meta client. * * @see org.apache.hudi.source.StreamReadMonitoringFunction @@ -344,6 +369,8 @@ public class StreamerUtil { /** * Creates the Flink write client. + * + *

This expects to be used by client, the driver should start an embedded timeline server. */ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { HoodieFlinkEngineContext context = @@ -351,16 +378,22 @@ public class StreamerUtil { new SerializableConfiguration(getHadoopConf()), new FlinkTaskContextSupplier(runtimeContext)); - return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf)); + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true); + return new HoodieFlinkWriteClient<>(context, writeConfig); } /** * Creates the Flink write client. * + *

This expects to be used by the driver, the client can then send requests for files view. + * *

The task context supplier is a constant: the write token is always '0-1-0'. */ - public static HoodieFlinkWriteClient createWriteClient(Configuration conf) { - return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, getHoodieClientConfig(conf)); + public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); + // create the filesystem view storage properties for client + ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), writeConfig.getViewStorageConfig()); + return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); } /** @@ -433,6 +466,27 @@ public class StreamerUtil { return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP); } + public static String getLastPendingInstant(HoodieTableMetaClient metaClient) { + return getLastPendingInstant(metaClient, true); + } + + public static String getLastPendingInstant(HoodieTableMetaClient metaClient, boolean reloadTimeline) { + if (reloadTimeline) { + metaClient.reloadActiveTimeline(); + } + return metaClient.getCommitsTimeline().filterInflightsAndRequested() + .lastInstant() + .map(HoodieInstant::getTimestamp) + .orElse(null); + } + + public static String getLastCompletedInstant(HoodieTableMetaClient metaClient) { + return metaClient.getCommitsTimeline().filterCompletedInstants() + .lastInstant() + .map(HoodieInstant::getTimestamp) + .orElse(null); + } + /** * Returns whether there are successful commits on the timeline. * @param metaClient The meta client diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java b/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java new file mode 100644 index 000000000..e05f09552 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Date; +import java.util.Properties; + +import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME; + +/** + * Helper class to read/write {@link FileSystemViewStorageConfig}. + */ +public class ViewStorageProperties { + private static final Logger LOG = LoggerFactory.getLogger(ViewStorageProperties.class); + + private static final String FILE_NAME = "view_storage_conf.properties"; + + /** + * Initialize the {@link #FILE_NAME} meta file. + */ + public static void createProperties( + String basePath, + FileSystemViewStorageConfig config) throws IOException { + Path propertyPath = getPropertiesFilePath(basePath); + FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()); + try (FSDataOutputStream outputStream = fs.create(propertyPath)) { + config.getProps().store(outputStream, + "Filesystem view storage properties saved on " + new Date(System.currentTimeMillis())); + } + } + + /** + * Read the {@link FileSystemViewStorageConfig} with given table base path. + */ + public static FileSystemViewStorageConfig loadFromProperties(String basePath) { + Path propertyPath = getPropertiesFilePath(basePath); + LOG.info("Loading filesystem view storage properties from " + propertyPath); + FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()); + Properties props = new Properties(); + try { + try (FSDataInputStream inputStream = fs.open(propertyPath)) { + props.load(inputStream); + } + return FileSystemViewStorageConfig.newBuilder().fromProperties(props).build(); + } catch (IOException e) { + throw new HoodieIOException("Could not load filesystem view storage properties from " + propertyPath, e); + } + } + + private static Path getPropertiesFilePath(String basePath) { + String auxPath = basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME; + return new Path(auxPath, FILE_NAME); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 1fdb5ca0a..3683f4888 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -20,7 +20,6 @@ package org.apache.hudi.sink; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -30,6 +29,7 @@ import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.MockCoordinatorExecutor; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -94,8 +94,8 @@ public class TestStreamWriteOperatorCoordinator { coordinator.handleEventFromOperator(1, event1); coordinator.notifyCheckpointComplete(1); - String inflight = coordinator.getWriteClient().getLastPendingInstant(HoodieTableType.COPY_ON_WRITE); - String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE); + String inflight = TestUtils.getLastPendingInstant(tempFile.getAbsolutePath()); + String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); assertThat("Instant should be complete", lastCompleted, is(instant)); assertNotEquals("", inflight, "Should start a new instant"); assertNotEquals(instant, inflight, "Should start a new instant"); @@ -145,7 +145,7 @@ public class TestStreamWriteOperatorCoordinator { assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1), "Returns early for empty write results"); - String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE); + String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); assertNull(lastCompleted, "Returns early for empty write results"); assertNull(coordinator.getEventBuffer()[0]); @@ -153,7 +153,7 @@ public class TestStreamWriteOperatorCoordinator { coordinator.handleEventFromOperator(1, event1); assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2), "Commits the instant with partial events anyway"); - lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE); + lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index da418f965..29bb42487 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -27,13 +27,13 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.InsertFunctionWrapper; import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; +import org.apache.hudi.utils.TestUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -45,22 +45,24 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.io.IOException; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -99,8 +101,7 @@ public class TestWriteCopyOnWrite { @BeforeEach public void before() throws Exception { - final String basePath = tempFile.getAbsolutePath(); - conf = TestConfigurations.getDefaultConf(basePath); + conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name()); setUp(conf); this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); @@ -134,7 +135,7 @@ public class TestWriteCopyOnWrite { // this triggers the data write and event send funcWrapper.checkpointFunction(1); - String instant = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); + String instant = lastPendingInstant(); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); @@ -149,18 +150,17 @@ public class TestWriteCopyOnWrite { funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); + checkInstantState(REQUESTED, instant); funcWrapper.checkpointComplete(1); // the coordinator checkpoint commits the inflight instant. - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkInstantState(HoodieInstant.State.COMPLETED, instant); // checkpoint for next round, no data input, so after the checkpoint, // there should not be REQUESTED Instant // this triggers the data write and event send funcWrapper.checkpointFunction(2); - String instant2 = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + String instant2 = lastPendingInstant(); assertNotEquals(instant, instant2); final OperatorEvent nextEvent2 = funcWrapper.getNextEvent(); @@ -174,12 +174,15 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointComplete(2); // started a new instant already - checkInflightInstant(funcWrapper.getWriteClient()); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkInflightInstant(); + checkInstantState(HoodieInstant.State.COMPLETED, instant); } @Test public void testCheckpointFails() throws Exception { + // reset the config option + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data funcWrapper.openFunction(); // no data written and triggers checkpoint fails, @@ -188,8 +191,7 @@ public class TestWriteCopyOnWrite { // this triggers the data write and event send funcWrapper.checkpointFunction(1); - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + String instant = lastPendingInstant(); assertNotNull(instant); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); @@ -204,18 +206,17 @@ public class TestWriteCopyOnWrite { "The last checkpoint was aborted, ignore the events"); // the instant metadata should be reused - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null); + checkInstantState(REQUESTED, instant); + checkInstantState(HoodieInstant.State.COMPLETED, null); for (RowData rowData : TestData.DATA_SET_INSERT) { funcWrapper.invoke(rowData); } // this returns early because there is no inflight instant - assertThrows(HoodieException.class, - () -> funcWrapper.checkpointFunction(2), - "Timeout(0ms) while waiting for"); - // do not sent the write event and fails the checkpoint, + assertDoesNotThrow(() -> funcWrapper.checkpointFunction(2), + "The stream writer reuse the last instant time when waiting for the last instant commit timeout"); + // do not send the write event and fails the checkpoint, // behaves like the last checkpoint is successful. funcWrapper.checkpointFails(2); } @@ -231,16 +232,16 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(1); funcWrapper.getNextEvent(); - String instant1 = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); + String instant1 = lastPendingInstant(); assertNotNull(instant1); // fails the subtask funcWrapper.subTaskFails(0); - String instant2 = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); + String instant2 = lastPendingInstant(); assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant"); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null); + checkInstantState(HoodieInstant.State.COMPLETED, null); } @Test @@ -255,8 +256,7 @@ public class TestWriteCopyOnWrite { // this triggers the data write and event send funcWrapper.checkpointFunction(1); - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + String instant = lastPendingInstant(); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); @@ -264,11 +264,11 @@ public class TestWriteCopyOnWrite { funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); + checkInstantState(REQUESTED, instant); funcWrapper.checkpointComplete(1); checkWrittenData(tempFile, EXPECTED1); // the coordinator checkpoint commits the inflight instant. - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkInstantState(HoodieInstant.State.COMPLETED, instant); checkWrittenData(tempFile, EXPECTED1); } @@ -341,8 +341,7 @@ public class TestWriteCopyOnWrite { // this triggers the data write and event send funcWrapper.checkpointFunction(2); - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + String instant = lastPendingInstant(); nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); @@ -350,10 +349,10 @@ public class TestWriteCopyOnWrite { funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); + checkInstantState(REQUESTED, instant); funcWrapper.checkpointComplete(2); // the coordinator checkpoint commits the inflight instant. - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkInstantState(HoodieInstant.State.COMPLETED, instant); checkWrittenData(tempFile, EXPECTED2); } @@ -386,8 +385,7 @@ public class TestWriteCopyOnWrite { // this triggers the data write and event send funcWrapper.checkpointFunction(2); - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + String instant = lastPendingInstant(); nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); @@ -395,10 +393,10 @@ public class TestWriteCopyOnWrite { funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); + checkInstantState(REQUESTED, instant); funcWrapper.checkpointComplete(2); // the coordinator checkpoint commits the inflight instant. - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkInstantState(HoodieInstant.State.COMPLETED, instant); Map expected = getUpsertWithDeleteExpected(); checkWrittenData(tempFile, expected); @@ -437,8 +435,7 @@ public class TestWriteCopyOnWrite { funcWrapper.getCoordinator().handleEventFromOperator(0, event2); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + String instant = lastPendingInstant(); funcWrapper.checkpointComplete(1); @@ -446,8 +443,8 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, expected, 1); // started a new instant already - checkInflightInstant(funcWrapper.getWriteClient()); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkInflightInstant(); + checkInstantState(HoodieInstant.State.COMPLETED, instant); // insert duplicates again for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { @@ -500,8 +497,7 @@ public class TestWriteCopyOnWrite { funcWrapper.getCoordinator().handleEventFromOperator(0, event2); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + String instant = lastPendingInstant(); funcWrapper.checkpointComplete(1); @@ -511,8 +507,8 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, expected, 1); // started a new instant already - checkInflightInstant(funcWrapper.getWriteClient()); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkInflightInstant(); + checkInstantState(HoodieInstant.State.COMPLETED, instant); // insert duplicates again for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { @@ -552,8 +548,7 @@ public class TestWriteCopyOnWrite { funcWrapper.getCoordinator().handleEventFromOperator(0, event1); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + String instant = lastPendingInstant(); funcWrapper.checkpointComplete(1); @@ -569,8 +564,8 @@ public class TestWriteCopyOnWrite { TestData.checkWrittenAllData(tempFile, expected, 1); // started a new instant already - checkInflightInstant(funcWrapper.getWriteClient()); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkInflightInstant(); + checkInstantState(HoodieInstant.State.COMPLETED, instant); // insert duplicates again for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { @@ -631,8 +626,7 @@ public class TestWriteCopyOnWrite { } assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + String instant = lastPendingInstant(); funcWrapper.checkpointComplete(1); @@ -640,8 +634,8 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, expected, 1); // started a new instant already - checkInflightInstant(funcWrapper.getWriteClient()); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkInflightInstant(); + checkInstantState(HoodieInstant.State.COMPLETED, instant); // insert duplicates again for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { @@ -736,8 +730,7 @@ public class TestWriteCopyOnWrite { assertTrue(funcWrapper.isAlreadyBootstrap()); - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + String instant = lastPendingInstant(); nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); @@ -748,18 +741,18 @@ public class TestWriteCopyOnWrite { funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); + checkInstantState(REQUESTED, instant); funcWrapper.checkpointComplete(1); // the coordinator checkpoint commits the inflight instant. - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkInstantState(HoodieInstant.State.COMPLETED, instant); checkWrittenData(tempFile, EXPECTED2); } @Test public void testWriteExactlyOnce() throws Exception { // reset the config option - conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L); conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); @@ -779,7 +772,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(1); assertTrue(funcWrapper.isConforming(), "The write function should be waiting for the instant to commit"); - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 4; i++) { final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, event); @@ -793,26 +786,25 @@ public class TestWriteCopyOnWrite { assertFalse(funcWrapper.isConforming(), "The write function should finish waiting for the instant to commit"); - // checkpoint for the next round, when there is eager flush but the write - // task is waiting for the instant commit ack, should throw for timeout. + // checkpoint for the next round funcWrapper.checkpointFunction(2); - assertThrows(HoodieException.class, () -> { + assertDoesNotThrow(() -> { for (RowData rowData : TestData.DATA_SET_INSERT) { funcWrapper.invoke(rowData); } - }, "Timeout(500ms) while waiting for instant"); + }, "The stream writer reuse the last instant time when waiting for the last instant commit timeout"); } @Test - public void testReuseEmbeddedServer() { - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + public void testReuseEmbeddedServer() throws IOException { + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig(); assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); // get another write client - writeClient = StreamerUtil.createWriteClient(conf, null); + writeClient = StreamerUtil.createWriteClient(conf); assertSame(writeClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort()); } @@ -821,24 +813,19 @@ public class TestWriteCopyOnWrite { // Utilities // ------------------------------------------------------------------------- - @SuppressWarnings("rawtypes") - private void checkInflightInstant(HoodieFlinkWriteClient writeClient) { - final String instant = writeClient.getLastPendingInstant(getTableType()); + private void checkInflightInstant() { + final String instant = TestUtils.getLastPendingInstant(tempFile.getAbsolutePath()); assertNotNull(instant); } - @SuppressWarnings("rawtypes") - private void checkInstantState( - HoodieFlinkWriteClient writeClient, - HoodieInstant.State state, - String instantStr) { + private void checkInstantState(HoodieInstant.State state, String instantStr) { final String instant; switch (state) { case REQUESTED: - instant = writeClient.getLastPendingInstant(getTableType()); + instant = lastPendingInstant(); break; case COMPLETED: - instant = writeClient.getLastCompletedInstant(getTableType()); + instant = lastCompleteInstant(); break; default: throw new AssertionError("Unexpected state"); @@ -846,6 +833,14 @@ public class TestWriteCopyOnWrite { assertThat(instant, is(instantStr)); } + protected String lastPendingInstant() { + return TestUtils.getLastPendingInstant(tempFile.getAbsolutePath()); + } + + protected String lastCompleteInstant() { + return TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); + } + protected HoodieTableType getTableType() { return HoodieTableType.COPY_ON_WRITE; } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 07e23b56e..fd65914b2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -25,8 +25,6 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieFlinkTable; @@ -39,10 +37,8 @@ import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; import java.io.File; -import java.util.Comparator; import java.util.HashMap; import java.util.Map; -import java.util.stream.Collectors; /** * Test cases for delta stream write. @@ -71,13 +67,7 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { protected void checkWrittenData(File baseFile, Map expected, int partitions) throws Exception { HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient(); Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); - String latestInstant = metaClient.getCommitsTimeline().filterCompletedInstants() - .getInstants() - .filter(x -> x.getAction().equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) - .map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()).stream() - .max(Comparator.naturalOrder()) - .orElse(null); + String latestInstant = lastCompleteInstant(); TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index acce120f4..d3fac46eb 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -20,6 +20,7 @@ package org.apache.hudi.sink; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.utils.TestUtils; import org.apache.flink.configuration.Configuration; @@ -53,4 +54,9 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite { protected HoodieTableType getTableType() { return HoodieTableType.MERGE_ON_READ; } + + @Override + protected String lastCompleteInstant() { + return TestUtils.getLastDeltaCompleteInstant(tempFile.getAbsolutePath()); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index ad1726bb7..52002b118 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -41,8 +41,9 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.util.Arrays; @@ -69,9 +70,8 @@ public class ITTestHoodieFlinkCompactor { @TempDir File tempFile; - //@ParameterizedTest - //@ValueSource(booleans = {true, false}) - @Disabled + @ParameterizedTest + @ValueSource(booleans = {true, false}) public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { // Create hoodie table and insert into data. EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); @@ -112,7 +112,7 @@ public class ITTestHoodieFlinkCompactor { // judge whether have operation // To compute the compaction instant time and do compaction. String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); assertTrue(scheduled, "The compaction plan should be scheduled"); @@ -141,6 +141,7 @@ public class ITTestHoodieFlinkCompactor { .setParallelism(1); env.execute("flink_hudi_compaction"); + writeClient.close(); TestData.checkWrittenFullData(tempFile, EXPECTED); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index d10421d66..053c2a39c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -22,9 +22,6 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieRecordLocation; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.partitioner.profile.WriteProfile; @@ -51,9 +48,9 @@ import java.util.Map; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -358,11 +355,11 @@ public class TestBucketAssigner { assertTrue(smallFiles1.isEmpty(), "Should have no small files"); TestData.writeData(TestData.DATA_SET_INSERT, conf); - Option instantOption = getLastCompleteInstant(writeProfile); - assertFalse(instantOption.isPresent()); + String instantOption = getLastCompleteInstant(writeProfile); + assertNull(instantOption); writeProfile.reload(1); - String instant1 = getLastCompleteInstant(writeProfile).orElse(null); + String instant1 = getLastCompleteInstant(writeProfile); assertNotNull(instant1); List smallFiles2 = writeProfile.getSmallFiles("par1"); assertThat("Should have 1 small file", smallFiles2.size(), is(1)); @@ -376,7 +373,7 @@ public class TestBucketAssigner { smallFiles3.get(0).location.getInstantTime(), is(instant1)); writeProfile.reload(2); - String instant2 = getLastCompleteInstant(writeProfile).orElse(null); + String instant2 = getLastCompleteInstant(writeProfile); assertNotEquals(instant2, instant1, "Should have new complete instant"); List smallFiles4 = writeProfile.getSmallFiles("par1"); assertThat("Should have 1 small file", smallFiles4.size(), is(1)); @@ -389,12 +386,11 @@ public class TestBucketAssigner { WriteProfile writeProfile = new WriteProfile(writeConfig, context); assertTrue(writeProfile.getMetadataCache().isEmpty(), "Empty table should no have any instant metadata"); - HoodieTimeline emptyTimeline = writeProfile.getTable().getActiveTimeline(); - // write 3 instants of data for (int i = 0; i < 3; i++) { TestData.writeData(TestData.DATA_SET_INSERT, conf); } + // the record profile triggers the metadata loading writeProfile.reload(1); assertThat("Metadata cache should have same number entries as timeline instants", writeProfile.getMetadataCache().size(), is(3)); @@ -402,15 +398,10 @@ public class TestBucketAssigner { writeProfile.getSmallFiles("par1"); assertThat("The metadata should be reused", writeProfile.getMetadataCache().size(), is(3)); - - writeProfile.reload(2); - writeProfile.initFSViewIfNecessary(emptyTimeline); - assertTrue(writeProfile.getMetadataCache().isEmpty(), "Metadata cache should be all cleaned"); } - private static Option getLastCompleteInstant(WriteProfile profile) { - return profile.getTable().getMetaClient().getCommitsTimeline() - .filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp); + private static String getLastCompleteInstant(WriteProfile profile) { + return StreamerUtil.getLastCompletedInstant(profile.getTable().getMetaClient()); } private void assertBucketEquals( diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java index ed23754d9..8e90438dc 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java @@ -18,7 +18,6 @@ package org.apache.hudi.sink.utils; -import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.append.AppendWriteFunction; import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; @@ -98,11 +97,6 @@ public class InsertFunctionWrapper { return this.gateway.getNextEvent(); } - @SuppressWarnings("rawtypes") - public HoodieFlinkWriteClient getWriteClient() { - return this.writeFunction.getWriteClient(); - } - public void checkpointFunction(long checkpointId) throws Exception { // checkpoint the coordinator first this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 6b6bedea5..7ac81720a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -18,7 +18,6 @@ package org.apache.hudi.sink.utils; -import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; @@ -211,11 +210,6 @@ public class StreamWriteFunctionWrapper { return this.writeFunction.getDataBuffer(); } - @SuppressWarnings("rawtypes") - public HoodieFlinkWriteClient getWriteClient() { - return this.writeFunction.getWriteClient(); - } - public void checkpointFunction(long checkpointId) throws Exception { // checkpoint the coordinator first this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java index 3687e9d7c..541890f7b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java @@ -89,7 +89,7 @@ public class TestStreamReadMonitoringFunction { assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), "All the instants should have range limit"); - String latestCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); + String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); assertTrue(sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(latestCommit)), "All the splits should be with latestCommit instant time"); @@ -143,7 +143,7 @@ public class TestStreamReadMonitoringFunction { // all the splits should come from the second commit. TestData.writeData(TestData.DATA_SET_INSERT, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); - String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); + String specifiedCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.READ_START_COMMIT, specifiedCommit); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { @@ -174,7 +174,7 @@ public class TestStreamReadMonitoringFunction { // all the splits should come from the earliest commit. TestData.writeData(TestData.DATA_SET_INSERT, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); - String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); + String specifiedCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 9b4ea0084..1a8c3ffff 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -111,7 +111,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); - String firstCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath()); + String firstCommit = TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath()); streamTableEnv.executeSql("drop table t1"); hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) @@ -181,7 +181,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { // execute 2 times execInsertSql(streamTableEnv, insertInto); // remember the commit - String specifiedCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath()); + String specifiedCommit = TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath()); // another update batch String insertInto2 = "insert into t1 select * from source2"; execInsertSql(streamTableEnv, insertInto2); @@ -264,8 +264,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { Map options1 = new HashMap<>(defaultConf.toMap()); options1.put(FlinkOptions.TABLE_NAME.key(), "t1"); Configuration conf = Configuration.fromMap(options1); - HoodieTimeline timeline = StreamerUtil.createWriteClient(conf, null) - .getHoodieTable().getActiveTimeline(); + HoodieTimeline timeline = StreamerUtil.createMetaClient(conf).getActiveTimeline(); assertTrue(timeline.filterCompletedInstants() .getInstants().anyMatch(instant -> instant.getAction().equals("clean")), "some commits should be cleaned"); @@ -285,8 +284,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { // write another commit with deletes TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); - String latestCommit = StreamerUtil.createWriteClient(conf, null) - .getLastCompletedInstant(HoodieTableType.MERGE_ON_READ); + String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) @@ -756,19 +754,17 @@ public class HoodieDataSourceITCase extends AbstractTestBase { + " 'format' = 'debezium-json'\n" + ")"; streamTableEnv.executeSql(sourceDDL); - String hoodieTableDDL = "" - + "CREATE TABLE hoodie_sink(\n" - + " id INT NOT NULL,\n" - + " ts BIGINT,\n" - + " name STRING," - + " weight DOUBLE," - + " PRIMARY KEY (id) NOT ENFORCED" - + ") with (\n" - + " 'connector' = 'hudi',\n" - + " 'path' = '" + tempFile.getAbsolutePath() + "',\n" - + " 'read.streaming.enabled' = '" + (execMode == ExecMode.STREAM) + "',\n" - + " 'write.insert.drop.duplicates' = 'true'" - + ")"; + String hoodieTableDDL = sql("hoodie_sink") + .field("id INT NOT NULL") + .field("ts BIGINT") + .field("name STRING") + .field("weight DOUBLE") + .pkField("id") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.READ_AS_STREAMING, execMode == ExecMode.STREAM) + .option(FlinkOptions.PRE_COMBINE, true) + .noPartition() + .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into hoodie_sink select id, ts, name, weight from debezium_source"; execInsertSql(streamTableEnv, insertInto); @@ -949,7 +945,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { TestData.writeData(TestData.dataSetInsert(3, 4), conf); TestData.writeData(TestData.dataSetInsert(5, 6), conf); - String latestCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); + String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index 9bd03e115..073ae27bc 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -58,7 +58,7 @@ public class TestCompactionUtil { StreamerUtil.initTableIfNotExists(conf); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); HoodieFlinkTable table = writeClient.getHoodieTable(); HoodieTableMetaClient metaClient = table.getMetaClient(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java index 14c9ac10f..92e16cd10 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java @@ -19,6 +19,8 @@ package org.apache.hudi.utils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.source.StreamReadMonitoringFunction; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; @@ -33,17 +35,33 @@ import static org.junit.jupiter.api.Assertions.assertTrue; * Common test utils. */ public class TestUtils { - - public static String getLatestCommit(String basePath) { + public static String getLastPendingInstant(String basePath) { final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); - return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get().getTimestamp(); + return StreamerUtil.getLastPendingInstant(metaClient); } - public static String getFirstCommit(String basePath) { + public static String getLastCompleteInstant(String basePath) { final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); - return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant().get().getTimestamp(); + return StreamerUtil.getLastCompletedInstant(metaClient); + } + + public static String getLastDeltaCompleteInstant(String basePath) { + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + return metaClient.getCommitsTimeline().filterCompletedInstants() + .filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) + .lastInstant() + .map(HoodieInstant::getTimestamp) + .orElse(null); + } + + public static String getFirstCompleteInstant(String basePath) { + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant() + .map(HoodieInstant::getTimestamp).orElse(null); } public static String getSplitPartitionPath(MergeOnReadInputSplit split) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java new file mode 100644 index 000000000..f80760bf1 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.util.ViewStorageProperties; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test cases for {@link ViewStorageProperties}. + */ +public class TestViewStorageProperties { + @TempDir + File tempFile; + + @Test + void testReadWriteProperties() throws IOException { + String basePath = tempFile.getAbsolutePath(); + FileSystemViewStorageConfig config = FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK) + .withRemoteServerHost("host1") + .withRemoteServerPort(1234).build(); + ViewStorageProperties.createProperties(basePath, config); + ViewStorageProperties.createProperties(basePath, config); + ViewStorageProperties.createProperties(basePath, config); + + FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath); + assertThat(readConfig.getStorageType(), is(FileSystemViewStorageType.SPILLABLE_DISK)); + assertThat(readConfig.getRemoteViewServerHost(), is("host1")); + assertThat(readConfig.getRemoteViewServerPort(), is(1234)); + } +}