1
0

[HUDI-2562] Embedded timeline server on JobManager (#3812)

This commit is contained in:
Danny Chan
2021-10-18 10:45:39 +08:00
committed by GitHub
parent 9aa7cfb802
commit 2eda3de7f9
34 changed files with 540 additions and 334 deletions

View File

@@ -196,7 +196,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
Option<Map<String, String>> recordMetadata = hoodieRecord.getData().getMetadata(); Option<Map<String, String>> recordMetadata = hoodieRecord.getData().getMetadata();
try { try {
// Pass the isUpdateRecord to the props for HoodieRecordPayload to judge // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
// Whether it is a update or insert record. // Whether it is an update or insert record.
boolean isUpdateRecord = isUpdateRecord(hoodieRecord); boolean isUpdateRecord = isUpdateRecord(hoodieRecord);
// If the format can not record the operation field, nullify the DELETE payload manually. // If the format can not record the operation field, nullify the DELETE payload manually.
boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField(); boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField();
@@ -219,7 +219,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
if (config.allowOperationMetadataField()) { if (config.allowOperationMetadataField()) {
HoodieAvroUtils.addOperationToRecord(rewriteRecord, hoodieRecord.getOperation()); HoodieAvroUtils.addOperationToRecord(rewriteRecord, hoodieRecord.getOperation());
} }
if (isUpdateRecord(hoodieRecord)) { if (isUpdateRecord) {
updatedRecordsWritten++; updatedRecordsWritten++;
} else { } else {
insertRecordsWritten++; insertRecordsWritten++;

View File

@@ -35,7 +35,6 @@ import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieCommitException;
@@ -70,7 +69,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.text.ParseException; import java.text.ParseException;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@@ -506,45 +504,9 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
} else { } else {
writeTimer = metrics.getDeltaCommitCtx(); writeTimer = metrics.getDeltaCommitCtx();
} }
table.getHoodieView().sync();
return table; return table;
} }
public String getLastPendingInstant(HoodieTableType tableType) {
final String actionType = CommitUtils.getCommitActionType(tableType);
return getLastPendingInstant(actionType);
}
public String getLastPendingInstant(String actionType) {
HoodieTimeline unCompletedTimeline = FlinkClientUtil.createMetaClient(basePath)
.getCommitsTimeline().filterInflightsAndRequested();
return unCompletedTimeline.getInstants()
.filter(x -> x.getAction().equals(actionType) && x.isInflight())
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()).stream()
.max(Comparator.naturalOrder())
.orElse(null);
}
public String getLastCompletedInstant(HoodieTableType tableType) {
final String commitType = CommitUtils.getCommitActionType(tableType);
HoodieTimeline completedTimeline = FlinkClientUtil.createMetaClient(basePath)
.getCommitsTimeline().filterCompletedInstants();
return completedTimeline.getInstants()
.filter(x -> x.getAction().equals(commitType))
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()).stream()
.max(Comparator.naturalOrder())
.orElse(null);
}
public void transitionRequestedToInflight(String commitType, String inFlightInstant) {
HoodieActiveTimeline activeTimeline = FlinkClientUtil.createMetaClient(basePath).getActiveTimeline();
HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant);
activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
}
public HoodieFlinkTable<T> getHoodieTable() { public HoodieFlinkTable<T> getHoodieTable() {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
} }

View File

@@ -85,7 +85,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) { if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time // if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime); writeClient.startCommitWithTime(instantTime);
writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
} else { } else {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable. // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.

View File

@@ -63,14 +63,28 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieWriteConfig config, public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieWriteConfig config,
HoodieFlinkEngineContext context, HoodieFlinkEngineContext context,
HoodieTableMetaClient metaClient) { HoodieTableMetaClient metaClient) {
return HoodieFlinkTable.create(config, context, metaClient, config.isMetadataTableEnabled());
}
public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieWriteConfig config,
HoodieFlinkEngineContext context,
HoodieTableMetaClient metaClient,
boolean refreshTimeline) {
final HoodieFlinkTable<T> hoodieFlinkTable;
switch (metaClient.getTableType()) { switch (metaClient.getTableType()) {
case COPY_ON_WRITE: case COPY_ON_WRITE:
return new HoodieFlinkCopyOnWriteTable<>(config, context, metaClient); hoodieFlinkTable = new HoodieFlinkCopyOnWriteTable<>(config, context, metaClient);
break;
case MERGE_ON_READ: case MERGE_ON_READ:
return new HoodieFlinkMergeOnReadTable<>(config, context, metaClient); hoodieFlinkTable = new HoodieFlinkMergeOnReadTable<>(config, context, metaClient);
break;
default: default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
} }
if (refreshTimeline) {
hoodieFlinkTable.getHoodieView().sync();
}
return hoodieFlinkTable;
} }
@Override @Override

View File

@@ -101,7 +101,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
FlinkHoodieBloomIndex index = new FlinkHoodieBloomIndex(config); FlinkHoodieBloomIndex index = new FlinkHoodieBloomIndex(config);
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient); HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient, false);
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA); HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);
// Create some partitions, and put some files // Create some partitions, and put some files

View File

@@ -505,6 +505,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
} }
public void transitionRequestedToInflight(String commitType, String inFlightInstant) {
HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant);
transitionRequestedToInflight(requested, Option.empty(), false);
}
public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content) { public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content) {
transitionRequestedToInflight(requested, content, false); transitionRequestedToInflight(requested, content, false);
} }

View File

@@ -139,7 +139,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
public void close() { public void close() {
if (this.writeClient != null) { if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully(); this.writeClient.cleanHandlesGracefully();
// this.writeClient.close(); this.writeClient.close();
} }
} }
@@ -155,7 +155,6 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Getter/Setter // Getter/Setter
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@VisibleForTesting @VisibleForTesting
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public Map<String, List<HoodieRecord>> getDataBuffer() { public Map<String, List<HoodieRecord>> getDataBuffer() {

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.CommitUtils;
@@ -41,6 +42,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -57,7 +59,6 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
@@ -95,6 +96,11 @@ public class StreamWriteOperatorCoordinator
*/ */
private transient HoodieFlinkWriteClient writeClient; private transient HoodieFlinkWriteClient writeClient;
/**
* Meta client.
*/
private transient HoodieTableMetaClient metaClient;
/** /**
* Current REQUESTED instant, for validation. * Current REQUESTED instant, for validation.
*/ */
@@ -153,10 +159,11 @@ public class StreamWriteOperatorCoordinator
// initialize event buffer // initialize event buffer
reset(); reset();
this.gateways = new SubtaskGateway[this.parallelism]; this.gateways = new SubtaskGateway[this.parallelism];
// init table, create if not exists.
this.metaClient = initTableIfNotExists(this.conf);
// the write client must create after the table creation
this.writeClient = StreamerUtil.createWriteClient(conf); this.writeClient = StreamerUtil.createWriteClient(conf);
this.tableState = TableState.create(conf); this.tableState = TableState.create(conf);
// init table, create it if not exists.
initTableIfNotExists(this.conf);
// start the executor // start the executor
this.executor = new CoordinatorExecutor(this.context, LOG); this.executor = new CoordinatorExecutor(this.context, LOG);
// start the executor if required // start the executor if required
@@ -171,15 +178,17 @@ public class StreamWriteOperatorCoordinator
@Override @Override
public void close() throws Exception { public void close() throws Exception {
// teardown the resource // teardown the resource
if (writeClient != null) {
writeClient.close();
}
if (executor != null) { if (executor != null) {
executor.close(); executor.close();
} }
if (hiveSyncExecutor != null) { if (hiveSyncExecutor != null) {
hiveSyncExecutor.close(); hiveSyncExecutor.close();
} }
// the write client must close after the executor service
// because the task in the service may send requests to the embedded timeline service.
if (writeClient != null) {
writeClient.close();
}
this.eventBuffer = null; this.eventBuffer = null;
} }
@@ -225,6 +234,14 @@ public class StreamWriteOperatorCoordinator
); );
} }
@Override
public void notifyCheckpointAborted(long checkpointId) {
// once the checkpoint was aborted, unblock the writer tasks to
// reuse the last instant.
executor.execute(this::sendCommitAckEvents,
"unblock data write with aborted checkpoint %s", checkpointId);
}
@Override @Override
public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
// no operation // no operation
@@ -316,7 +333,7 @@ public class StreamWriteOperatorCoordinator
final String instant = HoodieActiveTimeline.createNewInstantTime(); final String instant = HoodieActiveTimeline.createNewInstantTime();
this.writeClient.startCommitWithTime(instant, tableState.commitAction); this.writeClient.startCommitWithTime(instant, tableState.commitAction);
this.instant = instant; this.instant = instant;
this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant); this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
this.writeClient.upgradeDowngrade(this.instant); this.writeClient.upgradeDowngrade(this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
@@ -382,16 +399,28 @@ public class StreamWriteOperatorCoordinator
* sends the commit ack events to unblock the flushing. * sends the commit ack events to unblock the flushing.
*/ */
private void sendCommitAckEvents() { private void sendCommitAckEvents() {
CompletableFuture<?>[] futures = IntStream.range(0, this.parallelism) CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
.mapToObj(taskID -> this.gateways[taskID].sendEvent(CommitAckEvent.getInstance())) .map(gw -> gw.sendEvent(CommitAckEvent.getInstance()))
.toArray(CompletableFuture<?>[]::new); .toArray(CompletableFuture<?>[]::new);
try { try {
CompletableFuture.allOf(futures).get(); CompletableFuture.allOf(futures).get();
} catch (Exception e) { } catch (Throwable throwable) {
throw new HoodieException("Error while waiting for the commit ack events to finish sending", e); if (!sendToFinishedTasks(throwable)) {
throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable);
}
} }
} }
/**
* Decides whether the given exception is caused by sending events to FINISHED tasks.
*
* <p>Ugly impl: the exception may change in the future.
*/
private static boolean sendToFinishedTasks(Throwable throwable) {
return throwable.getCause() instanceof TaskNotRunningException
|| throwable.getCause().getMessage().contains("running");
}
/** /**
* Commits the instant. * Commits the instant.
* *
@@ -474,12 +503,6 @@ public class StreamWriteOperatorCoordinator
return instant; return instant;
} }
@VisibleForTesting
@SuppressWarnings("rawtypes")
public HoodieFlinkWriteClient getWriteClient() {
return writeClient;
}
@VisibleForTesting @VisibleForTesting
public Context getContext() { public Context getContext() {
return context; return context;

View File

@@ -82,20 +82,11 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
this.writerHelper.write((RowData) value); this.writerHelper.write((RowData) value);
} }
@Override
public void close() {
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
}
/** /**
* End input action for batch source. * End input action for batch source.
*/ */
public void endInput() { public void endInput() {
flushData(true); flushData(true);
this.writeClient.cleanHandles();
this.writeStatuses.clear(); this.writeStatuses.clear();
} }

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.sink.bootstrap; package org.apache.hudi.sink.bootstrap;
import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
@@ -27,14 +26,12 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
@@ -92,8 +89,6 @@ public class BootstrapOperator<I, O extends HoodieRecord>
private transient ListState<String> instantState; private transient ListState<String> instantState;
private final Pattern pattern; private final Pattern pattern;
private String lastInstantTime; private String lastInstantTime;
private HoodieFlinkWriteClient writeClient;
private String actionType;
public BootstrapOperator(Configuration conf) { public BootstrapOperator(Configuration conf) {
this.conf = conf; this.conf = conf;
@@ -102,7 +97,8 @@ public class BootstrapOperator<I, O extends HoodieRecord>
@Override @Override
public void snapshotState(StateSnapshotContext context) throws Exception { public void snapshotState(StateSnapshotContext context) throws Exception {
lastInstantTime = this.writeClient.getLastPendingInstant(this.actionType); HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(this.conf);
lastInstantTime = StreamerUtil.getLastPendingInstant(metaClient);
instantState.update(Collections.singletonList(lastInstantTime)); instantState.update(Collections.singletonList(lastInstantTime));
} }
@@ -122,12 +118,8 @@ public class BootstrapOperator<I, O extends HoodieRecord>
} }
this.hadoopConf = StreamerUtil.getHadoopConf(); this.hadoopConf = StreamerUtil.getHadoopConf();
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
this.hoodieTable = getTable(); this.hoodieTable = getTable();
this.writeClient = StreamerUtil.createWriteClient(this.conf, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)));
preLoadIndexRecords(); preLoadIndexRecords();
} }

View File

@@ -20,9 +20,8 @@ package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.common.AbstractWriteFunction; import org.apache.hudi.sink.common.AbstractWriteFunction;
@@ -80,6 +79,11 @@ public class BulkInsertWriteFunction<I>
*/ */
private int taskID; private int taskID;
/**
* Meta Client.
*/
private transient HoodieTableMetaClient metaClient;
/** /**
* Write Client. * Write Client.
*/ */
@@ -95,11 +99,6 @@ public class BulkInsertWriteFunction<I>
*/ */
private transient OperatorEventGateway eventGateway; private transient OperatorEventGateway eventGateway;
/**
* Commit action type.
*/
private transient String actionType;
/** /**
* Constructs a StreamingSinkFunction. * Constructs a StreamingSinkFunction.
* *
@@ -113,12 +112,9 @@ public class BulkInsertWriteFunction<I>
@Override @Override
public void open(Configuration parameters) throws IOException { public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.metaClient = StreamerUtil.createMetaClient(this.config);
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType( this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, false);
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
this.initInstant = this.writeClient.getLastPendingInstant(this.actionType);
sendBootstrapEvent(); sendBootstrapEvent();
initWriterHelper(); initWriterHelper();
} }
@@ -188,12 +184,13 @@ public class BulkInsertWriteFunction<I>
} }
private String instantToWrite() { private String instantToWrite() {
String instant = this.writeClient.getLastPendingInstant(this.actionType); String instant = StreamerUtil.getLastPendingInstant(this.metaClient);
// if exactly-once semantics turns on, // if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits. // waits for the checkpoint notification until the checkpoint timeout threshold hits.
TimeWait timeWait = TimeWait.builder() TimeWait timeWait = TimeWait.builder()
.timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)) .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
.action("instant initialize") .action("instant initialize")
.throwsT(true)
.build(); .build();
while (instant == null || instant.equals(this.initInstant)) { while (instant == null || instant.equals(this.initInstant)) {
// wait condition: // wait condition:
@@ -202,7 +199,7 @@ public class BulkInsertWriteFunction<I>
// sleep for a while // sleep for a while
timeWait.waitFor(); timeWait.waitFor();
// refresh the inflight instant // refresh the inflight instant
instant = this.writeClient.getLastPendingInstant(this.actionType); instant = StreamerUtil.getLastPendingInstant(this.metaClient);
} }
return instant; return instant;
} }

View File

@@ -20,9 +20,7 @@ package org.apache.hudi.sink.common;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
@@ -70,6 +68,11 @@ public abstract class AbstractStreamWriteFunction<I>
*/ */
protected int taskID; protected int taskID;
/**
* Meta Client.
*/
protected transient HoodieTableMetaClient metaClient;
/** /**
* Write Client. * Write Client.
*/ */
@@ -85,11 +88,6 @@ public abstract class AbstractStreamWriteFunction<I>
*/ */
protected transient OperatorEventGateway eventGateway; protected transient OperatorEventGateway eventGateway;
/**
* Commit action type.
*/
protected transient String actionType;
/** /**
* Flag saying whether the write task is waiting for the checkpoint success notification * Flag saying whether the write task is waiting for the checkpoint success notification
* after it finished a checkpoint. * after it finished a checkpoint.
@@ -128,11 +126,8 @@ public abstract class AbstractStreamWriteFunction<I>
@Override @Override
public void initializeState(FunctionInitializationContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception {
this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.metaClient = StreamerUtil.createMetaClient(this.config);
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
this.writeStatuses = new ArrayList<>(); this.writeStatuses = new ArrayList<>();
this.writeMetadataState = context.getOperatorStateStore().getListState( this.writeMetadataState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>( new ListStateDescriptor<>(
@@ -140,7 +135,7 @@ public abstract class AbstractStreamWriteFunction<I>
TypeInformation.of(WriteMetadataEvent.class) TypeInformation.of(WriteMetadataEvent.class)
)); ));
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); this.currentInstant = lastPendingInstant();
if (context.isRestored()) { if (context.isRestored()) {
restoreWriteMetadata(); restoreWriteMetadata();
} else { } else {
@@ -162,12 +157,6 @@ public abstract class AbstractStreamWriteFunction<I>
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Getter/Setter // Getter/Setter
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@VisibleForTesting
@SuppressWarnings("rawtypes")
public HoodieFlinkWriteClient getWriteClient() {
return writeClient;
}
@VisibleForTesting @VisibleForTesting
public boolean isConfirming() { public boolean isConfirming() {
return this.confirming; return this.confirming;
@@ -182,7 +171,7 @@ public abstract class AbstractStreamWriteFunction<I>
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
private void restoreWriteMetadata() throws Exception { private void restoreWriteMetadata() throws Exception {
String lastInflight = this.writeClient.getLastPendingInstant(this.actionType); String lastInflight = lastPendingInstant();
boolean eventSent = false; boolean eventSent = false;
for (WriteMetadataEvent event : this.writeMetadataState.get()) { for (WriteMetadataEvent event : this.writeMetadataState.get()) {
if (Objects.equals(lastInflight, event.getInstantTime())) { if (Objects.equals(lastInflight, event.getInstantTime())) {
@@ -224,6 +213,13 @@ public abstract class AbstractStreamWriteFunction<I>
this.confirming = false; this.confirming = false;
} }
/**
* Returns the last pending instant time.
*/
protected String lastPendingInstant() {
return StreamerUtil.getLastPendingInstant(this.metaClient);
}
/** /**
* Prepares the instant time to write with for next checkpoint. * Prepares the instant time to write with for next checkpoint.
* *
@@ -231,7 +227,7 @@ public abstract class AbstractStreamWriteFunction<I>
* @return The instant time * @return The instant time
*/ */
protected String instantToWrite(boolean hasData) { protected String instantToWrite(boolean hasData) {
String instant = this.writeClient.getLastPendingInstant(this.actionType); String instant = lastPendingInstant();
// if exactly-once semantics turns on, // if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits. // waits for the checkpoint notification until the checkpoint timeout threshold hits.
TimeWait timeWait = TimeWait.builder() TimeWait timeWait = TimeWait.builder()
@@ -244,11 +240,18 @@ public abstract class AbstractStreamWriteFunction<I>
// 2. the inflight instant does not change and the checkpoint has buffering data // 2. the inflight instant does not change and the checkpoint has buffering data
if (instant == null || (instant.equals(this.currentInstant) && hasData)) { if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while // sleep for a while
timeWait.waitFor(); boolean timeout = timeWait.waitFor();
// refresh the inflight instant if (timeout && instant != null) {
instant = this.writeClient.getLastPendingInstant(this.actionType); // if the timeout threshold hits but the last instant still not commit,
// and the task does not receive commit ask event(no data or aborted checkpoint),
// assumes the checkpoint was canceled silently and unblock the data flushing
confirming = false;
} else {
// refresh the inflight instant
instant = lastPendingInstant();
}
} else { } else {
// the inflight instant changed, which means the last instant was committed // the pending instant changed, that means the last instant was committed
// successfully. // successfully.
confirming = false; confirming = false;
} }

View File

@@ -69,7 +69,7 @@ public class HoodieFlinkCompactor {
// infer changelog mode // infer changelog mode
CompactionUtil.inferChangelogMode(conf, metaClient); CompactionUtil.inferChangelogMode(conf, metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
HoodieFlinkTable<?> table = writeClient.getHoodieTable(); HoodieFlinkTable<?> table = writeClient.getHoodieTable();
// judge whether have operation // judge whether have operation
@@ -151,5 +151,6 @@ public class HoodieFlinkCompactor {
.setParallelism(1); .setParallelism(1);
env.execute("flink_hudi_compaction"); env.execute("flink_hudi_compaction");
writeClient.close();
} }
} }

View File

@@ -116,7 +116,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(StreamerUtil.getHadoopConf()), new SerializableConfiguration(StreamerUtil.getHadoopConf()),
new FlinkTaskContextSupplier(getRuntimeContext())); new FlinkTaskContextSupplier(getRuntimeContext()));

View File

@@ -94,6 +94,11 @@ public class BucketAssigner implements AutoCloseable {
*/ */
private final Map<String, NewFileAssignState> newFileAssignStates; private final Map<String, NewFileAssignState> newFileAssignStates;
/**
* Num of accumulated successful checkpoints, used for cleaning the new file assign state.
*/
private int accCkp = 0;
public BucketAssigner( public BucketAssigner(
int taskID, int taskID,
int maxParallelism, int maxParallelism,
@@ -117,7 +122,6 @@ public class BucketAssigner implements AutoCloseable {
*/ */
public void reset() { public void reset() {
bucketInfoMap.clear(); bucketInfoMap.clear();
newFileAssignStates.clear();
} }
public BucketInfo addUpdate(String partitionPath, String fileIdHint) { public BucketInfo addUpdate(String partitionPath, String fileIdHint) {
@@ -136,16 +140,7 @@ public class BucketAssigner implements AutoCloseable {
// first try packing this into one of the smallFiles // first try packing this into one of the smallFiles
if (smallFileAssign != null && smallFileAssign.assign()) { if (smallFileAssign != null && smallFileAssign.assign()) {
final String key = StreamerUtil.generateBucketKey(partitionPath, smallFileAssign.getFileId()); return new BucketInfo(BucketType.UPDATE, smallFileAssign.getFileId(), partitionPath);
// create a new bucket or reuse an existing bucket
BucketInfo bucketInfo;
if (bucketInfoMap.containsKey(key)) {
// Assigns an inserts to existing update bucket
bucketInfo = bucketInfoMap.get(key);
} else {
bucketInfo = addUpdate(partitionPath, smallFileAssign.getFileId());
}
return bucketInfo;
} }
// if we have anything more, create new insert buckets, like normal // if we have anything more, create new insert buckets, like normal
@@ -154,7 +149,20 @@ public class BucketAssigner implements AutoCloseable {
if (newFileAssignState.canAssign()) { if (newFileAssignState.canAssign()) {
newFileAssignState.assign(); newFileAssignState.assign();
final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId); final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId);
return bucketInfoMap.get(key); if (bucketInfoMap.containsKey(key)) {
// the newFileAssignStates is cleaned asynchronously when received the checkpoint success notification,
// the records processed within the time range:
// (start checkpoint, checkpoint success(and instant committed))
// should still be assigned to the small buckets of last checkpoint instead of new one.
// the bucketInfoMap is cleaned when checkpoint starts.
// A promotion: when the HoodieRecord can record whether it is an UPDATE or INSERT,
// we can always return an UPDATE BucketInfo here, and there is no need to record the
// UPDATE bucket through calling #addUpdate.
return bucketInfoMap.get(key);
}
return new BucketInfo(BucketType.UPDATE, newFileAssignState.fileId, partitionPath);
} }
} }
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, createFileIdOfThisTask(), partitionPath); BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, createFileIdOfThisTask(), partitionPath);
@@ -166,7 +174,7 @@ public class BucketAssigner implements AutoCloseable {
return bucketInfo; return bucketInfo;
} }
private SmallFileAssign getSmallFileAssign(String partitionPath) { private synchronized SmallFileAssign getSmallFileAssign(String partitionPath) {
if (smallFileAssignMap.containsKey(partitionPath)) { if (smallFileAssignMap.containsKey(partitionPath)) {
return smallFileAssignMap.get(partitionPath); return smallFileAssignMap.get(partitionPath);
} }
@@ -186,7 +194,19 @@ public class BucketAssigner implements AutoCloseable {
/** /**
* Refresh the table state like TableFileSystemView and HoodieTimeline. * Refresh the table state like TableFileSystemView and HoodieTimeline.
*/ */
public void reload(long checkpointId) { public synchronized void reload(long checkpointId) {
this.accCkp += 1;
if (this.accCkp > 1) {
// do not clean the new file assignment state for the first checkpoint,
// this #reload calling is triggered by checkpoint success event, the coordinator
// also relies on the checkpoint success event to commit the inflight instant,
// and very possibly this component receives the notification before the coordinator,
// if we do the cleaning, the records processed within the time range:
// (start checkpoint, checkpoint success(and instant committed))
// would be assigned to a fresh new data bucket which is not the right behavior.
this.newFileAssignStates.clear();
this.accCkp = 0;
}
this.smallFileAssignMap.clear(); this.smallFileAssignMap.clear();
this.writeProfile.reload(checkpointId); this.writeProfile.reload(checkpointId);
} }

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.AbstractTableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.commit.SmallFile; import org.apache.hudi.table.action.commit.SmallFile;
@@ -55,7 +56,7 @@ public class DeltaWriteProfile extends WriteProfile {
if (!commitTimeline.empty()) { if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// initialize the filesystem view based on the commit metadata // initialize the filesystem view based on the commit metadata
initFSViewIfNecessary(commitTimeline); initFileSystemView();
// find smallest file in partition and append to it // find smallest file in partition and append to it
List<FileSlice> allSmallFileSlices = new ArrayList<>(); List<FileSlice> allSmallFileSlices = new ArrayList<>();
// If we can index log files, we can add more inserts to log files for fileIds including those under // If we can index log files, we can add more inserts to log files for fileIds including those under
@@ -90,6 +91,10 @@ public class DeltaWriteProfile extends WriteProfile {
return smallFileLocations; return smallFileLocations;
} }
protected AbstractTableFileSystemView getFileSystemView() {
return (AbstractTableFileSystemView) this.table.getSliceView();
}
private long getTotalFileSize(FileSlice fileSlice) { private long getTotalFileSize(FileSlice fileSlice) {
if (!fileSlice.getBaseFile().isPresent()) { if (!fileSlice.getBaseFile().isPresent()) {
return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList())); return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));

View File

@@ -25,7 +25,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.AbstractTableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.BucketAssigner; import org.apache.hudi.sink.partitioner.BucketAssigner;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
@@ -36,7 +36,6 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -45,7 +44,6 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -98,7 +96,7 @@ public class WriteProfile {
/** /**
* The file system view cache for one checkpoint interval. * The file system view cache for one checkpoint interval.
*/ */
protected HoodieTableFileSystemView fsView; protected AbstractTableFileSystemView fsView;
/** /**
* Hadoop configuration. * Hadoop configuration.
@@ -194,7 +192,7 @@ public class WriteProfile {
if (!commitTimeline.empty()) { // if we have some commits if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// initialize the filesystem view based on the commit metadata // initialize the filesystem view based on the commit metadata
initFSViewIfNecessary(commitTimeline); initFileSystemView();
List<HoodieBaseFile> allFiles = fsView List<HoodieBaseFile> allFiles = fsView
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
@@ -214,22 +212,16 @@ public class WriteProfile {
} }
@VisibleForTesting @VisibleForTesting
public void initFSViewIfNecessary(HoodieTimeline commitTimeline) { public void initFileSystemView() {
if (fsView == null) { if (fsView == null) {
cleanMetadataCache(commitTimeline.getInstants()); fsView = getFileSystemView();
List<HoodieCommitMetadata> metadataList = commitTimeline.getInstants()
.map(instant ->
this.metadataCache.computeIfAbsent(
instant.getTimestamp(),
k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline)
.orElse(null)))
.filter(Objects::nonNull)
.collect(Collectors.toList());
FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList, table.getMetaClient().getTableType());
fsView = new HoodieTableFileSystemView(table.getMetaClient(), commitTimeline, commitFiles);
} }
} }
protected AbstractTableFileSystemView getFileSystemView() {
return (AbstractTableFileSystemView) this.table.getBaseFileOnlyView();
}
/** /**
* Remove the overdue metadata from the cache * Remove the overdue metadata from the cache
* whose instant does not belong to the given instants {@code instants}. * whose instant does not belong to the given instants {@code instants}.
@@ -261,8 +253,10 @@ public class WriteProfile {
return; return;
} }
this.table.getMetaClient().reloadActiveTimeline(); this.table.getMetaClient().reloadActiveTimeline();
this.table.getHoodieView().sync();
recordProfile(); recordProfile();
this.fsView = null; this.fsView = null;
cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants());
this.smallFilesMap.clear(); this.smallFilesMap.clear();
this.reloadedCheckpointId = checkpointId; this.reloadedCheckpointId = checkpointId;
} }

View File

@@ -20,6 +20,9 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -27,15 +30,19 @@ import java.util.concurrent.TimeUnit;
* Tool used for time waiting. * Tool used for time waiting.
*/ */
public class TimeWait { public class TimeWait {
private final long timeout; // timeout in SECONDS private static final Logger LOG = LoggerFactory.getLogger(TimeWait.class);
private final long interval; // interval in MILLISECONDS
private final String action; // action to report error message private final long timeout; // timeout in SECONDS
private final long interval; // interval in MILLISECONDS
private final String action; // action to report error message
private final boolean throwsE; // whether to throw when timeout
private long waitingTime = 0L; private long waitingTime = 0L;
private TimeWait(long timeout, long interval, String action) { private TimeWait(long timeout, long interval, String action, boolean throwsE) {
this.timeout = timeout; this.timeout = timeout;
this.interval = interval; this.interval = interval;
this.action = action; this.action = action;
this.throwsE = throwsE;
} }
public static Builder builder() { public static Builder builder() {
@@ -44,14 +51,23 @@ public class TimeWait {
/** /**
* Wait for an interval time. * Wait for an interval time.
*
* @return true if is timed out
*/ */
public void waitFor() { public boolean waitFor() {
try { try {
if (waitingTime > timeout) { if (waitingTime > timeout) {
throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for " + action); final String msg = "Timeout(" + waitingTime + "ms) while waiting for " + action;
if (this.throwsE) {
throw new HoodieException(msg);
} else {
LOG.warn(msg);
return true;
}
} }
TimeUnit.MILLISECONDS.sleep(interval); TimeUnit.MILLISECONDS.sleep(interval);
waitingTime += interval; waitingTime += interval;
return false;
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new HoodieException("Error while waiting for " + action, e); throw new HoodieException("Error while waiting for " + action, e);
} }
@@ -61,17 +77,18 @@ public class TimeWait {
* Builder. * Builder.
*/ */
public static class Builder { public static class Builder {
private long timeout; private long timeout = 5 * 60 * 1000L; // default 5 minutes
private long interval; private long interval = 1000;
private String action; private String action;
private boolean throwsT = false;
public Builder() { private Builder() {
this.timeout = 3600;
this.interval = 500;
} }
public Builder timeout(long timeout) { public Builder timeout(long timeout) {
this.timeout = timeout; if (timeout > 0) {
this.timeout = timeout;
}
return this; return this;
} }
@@ -85,9 +102,14 @@ public class TimeWait {
return this; return this;
} }
public Builder throwsT(boolean throwsT) {
this.throwsT = throwsT;
return this;
}
public TimeWait build() { public TimeWait build() {
Objects.requireNonNull(this.action); Objects.requireNonNull(this.action);
return new TimeWait(this.timeout, this.interval, this.action); return new TimeWait(this.timeout, this.interval, this.action, this.throwsT);
} }
} }
} }

View File

@@ -63,9 +63,9 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
return (DataStreamSinkProvider) dataStream -> { return (DataStreamSinkProvider) dataStream -> {
// setup configuration // setup configuration
long ckpTimeout = dataStream.getExecutionEnvironment() long ckpInterval = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout(); .getCheckpointConfig().getCheckpointInterval();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpInterval * 5); // five checkpoints interval
RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType(); RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType();

View File

@@ -32,6 +32,8 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
@@ -149,7 +151,21 @@ public class StreamerUtil {
return FlinkClientUtil.getHadoopConf(); return FlinkClientUtil.getHadoopConf();
} }
/**
* Mainly used for tests.
*/
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
return getHoodieClientConfig(conf, false, false);
}
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf, boolean loadFsViewStorageConfig) {
return getHoodieClientConfig(conf, false, loadFsViewStorageConfig);
}
public static HoodieWriteConfig getHoodieClientConfig(
Configuration conf,
boolean enableEmbeddedTimelineService,
boolean loadFsViewStorageConfig) {
HoodieWriteConfig.Builder builder = HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder() HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.FLINK) .withEngineType(EngineType.FLINK)
@@ -194,13 +210,20 @@ public class StreamerUtil {
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
.build()) .build())
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
.withAutoCommit(false) .withAutoCommit(false)
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
.withProps(flinkConf2TypedProperties(conf)); .withProps(flinkConf2TypedProperties(conf))
.withSchema(getSourceSchema(conf).toString());
builder = builder.withSchema(getSourceSchema(conf).toString()); HoodieWriteConfig writeConfig = builder.build();
return builder.build(); if (loadFsViewStorageConfig) {
// do not use the builder to give a change for recovering the original fs view storage config
FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
writeConfig.setViewStorageConfig(viewStorageConfig);
}
return writeConfig;
} }
/** /**
@@ -235,11 +258,11 @@ public class StreamerUtil {
* @param conf the configuration * @param conf the configuration
* @throws IOException if errors happens when writing metadata * @throws IOException if errors happens when writing metadata
*/ */
public static void initTableIfNotExists(Configuration conf) throws IOException { public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException {
final String basePath = conf.getString(FlinkOptions.PATH); final String basePath = conf.getString(FlinkOptions.PATH);
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
if (!tableExists(basePath, hadoopConf)) { if (!tableExists(basePath, hadoopConf)) {
HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(conf.getString(FlinkOptions.TABLE_TYPE)) .setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
.setTableName(conf.getString(FlinkOptions.TABLE_NAME)) .setTableName(conf.getString(FlinkOptions.TABLE_NAME))
.setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)) .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null))
@@ -250,9 +273,11 @@ public class StreamerUtil {
.setTimelineLayoutVersion(1) .setTimelineLayoutVersion(1)
.initTable(hadoopConf, basePath); .initTable(hadoopConf, basePath);
LOG.info("Table initialized under base path {}", basePath); LOG.info("Table initialized under base path {}", basePath);
return metaClient;
} else { } else {
LOG.info("Table [{}/{}] already exists, no need to initialize the table", LOG.info("Table [{}/{}] already exists, no need to initialize the table",
basePath, conf.getString(FlinkOptions.TABLE_NAME)); basePath, conf.getString(FlinkOptions.TABLE_NAME));
return StreamerUtil.createMetaClient(basePath, hadoopConf);
} }
// Do not close the filesystem in order to use the CACHE, // Do not close the filesystem in order to use the CACHE,
// some filesystems release the handles in #close method. // some filesystems release the handles in #close method.
@@ -305,7 +330,7 @@ public class StreamerUtil {
/** /**
* Creates the meta client for reader. * Creates the meta client for reader.
* *
* <p>The streaming pipeline process is long running, so empty table path is allowed, * <p>The streaming pipeline process is long-running, so empty table path is allowed,
* the reader would then check and refresh the meta client. * the reader would then check and refresh the meta client.
* *
* @see org.apache.hudi.source.StreamReadMonitoringFunction * @see org.apache.hudi.source.StreamReadMonitoringFunction
@@ -344,6 +369,8 @@ public class StreamerUtil {
/** /**
* Creates the Flink write client. * Creates the Flink write client.
*
* <p>This expects to be used by client, the driver should start an embedded timeline server.
*/ */
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
HoodieFlinkEngineContext context = HoodieFlinkEngineContext context =
@@ -351,16 +378,22 @@ public class StreamerUtil {
new SerializableConfiguration(getHadoopConf()), new SerializableConfiguration(getHadoopConf()),
new FlinkTaskContextSupplier(runtimeContext)); new FlinkTaskContextSupplier(runtimeContext));
return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf)); HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
return new HoodieFlinkWriteClient<>(context, writeConfig);
} }
/** /**
* Creates the Flink write client. * Creates the Flink write client.
* *
* <p>This expects to be used by the driver, the client can then send requests for files view.
*
* <p>The task context supplier is a constant: the write token is always '0-1-0'. * <p>The task context supplier is a constant: the write token is always '0-1-0'.
*/ */
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) { public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, getHoodieClientConfig(conf)); HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
// create the filesystem view storage properties for client
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), writeConfig.getViewStorageConfig());
return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
} }
/** /**
@@ -433,6 +466,27 @@ public class StreamerUtil {
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP); return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);
} }
public static String getLastPendingInstant(HoodieTableMetaClient metaClient) {
return getLastPendingInstant(metaClient, true);
}
public static String getLastPendingInstant(HoodieTableMetaClient metaClient, boolean reloadTimeline) {
if (reloadTimeline) {
metaClient.reloadActiveTimeline();
}
return metaClient.getCommitsTimeline().filterInflightsAndRequested()
.lastInstant()
.map(HoodieInstant::getTimestamp)
.orElse(null);
}
public static String getLastCompletedInstant(HoodieTableMetaClient metaClient) {
return metaClient.getCommitsTimeline().filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::getTimestamp)
.orElse(null);
}
/** /**
* Returns whether there are successful commits on the timeline. * Returns whether there are successful commits on the timeline.
* @param metaClient The meta client * @param metaClient The meta client

View File

@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Date;
import java.util.Properties;
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
/**
* Helper class to read/write {@link FileSystemViewStorageConfig}.
*/
public class ViewStorageProperties {
private static final Logger LOG = LoggerFactory.getLogger(ViewStorageProperties.class);
private static final String FILE_NAME = "view_storage_conf.properties";
/**
* Initialize the {@link #FILE_NAME} meta file.
*/
public static void createProperties(
String basePath,
FileSystemViewStorageConfig config) throws IOException {
Path propertyPath = getPropertiesFilePath(basePath);
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
config.getProps().store(outputStream,
"Filesystem view storage properties saved on " + new Date(System.currentTimeMillis()));
}
}
/**
* Read the {@link FileSystemViewStorageConfig} with given table base path.
*/
public static FileSystemViewStorageConfig loadFromProperties(String basePath) {
Path propertyPath = getPropertiesFilePath(basePath);
LOG.info("Loading filesystem view storage properties from " + propertyPath);
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
Properties props = new Properties();
try {
try (FSDataInputStream inputStream = fs.open(propertyPath)) {
props.load(inputStream);
}
return FileSystemViewStorageConfig.newBuilder().fromProperties(props).build();
} catch (IOException e) {
throw new HoodieIOException("Could not load filesystem view storage properties from " + propertyPath, e);
}
}
private static Path getPropertiesFilePath(String basePath) {
String auxPath = basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
return new Path(auxPath, FILE_NAME);
}
}

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -30,6 +29,7 @@ import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor; import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -94,8 +94,8 @@ public class TestStreamWriteOperatorCoordinator {
coordinator.handleEventFromOperator(1, event1); coordinator.handleEventFromOperator(1, event1);
coordinator.notifyCheckpointComplete(1); coordinator.notifyCheckpointComplete(1);
String inflight = coordinator.getWriteClient().getLastPendingInstant(HoodieTableType.COPY_ON_WRITE); String inflight = TestUtils.getLastPendingInstant(tempFile.getAbsolutePath());
String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE); String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
assertThat("Instant should be complete", lastCompleted, is(instant)); assertThat("Instant should be complete", lastCompleted, is(instant));
assertNotEquals("", inflight, "Should start a new instant"); assertNotEquals("", inflight, "Should start a new instant");
assertNotEquals(instant, inflight, "Should start a new instant"); assertNotEquals(instant, inflight, "Should start a new instant");
@@ -145,7 +145,7 @@ public class TestStreamWriteOperatorCoordinator {
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1), assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1),
"Returns early for empty write results"); "Returns early for empty write results");
String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE); String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
assertNull(lastCompleted, "Returns early for empty write results"); assertNull(lastCompleted, "Returns early for empty write results");
assertNull(coordinator.getEventBuffer()[0]); assertNull(coordinator.getEventBuffer()[0]);
@@ -153,7 +153,7 @@ public class TestStreamWriteOperatorCoordinator {
coordinator.handleEventFromOperator(1, event1); coordinator.handleEventFromOperator(1, event1);
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2), assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2),
"Commits the instant with partial events anyway"); "Commits the instant with partial events anyway");
lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE); lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant));
} }

View File

@@ -27,13 +27,13 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.InsertFunctionWrapper; import org.apache.hudi.sink.utils.InsertFunctionWrapper;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData; import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -45,22 +45,24 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
@@ -99,8 +101,7 @@ public class TestWriteCopyOnWrite {
@BeforeEach @BeforeEach
public void before() throws Exception { public void before() throws Exception {
final String basePath = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf = TestConfigurations.getDefaultConf(basePath);
conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name()); conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name());
setUp(conf); setUp(conf);
this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
@@ -134,7 +135,7 @@ public class TestWriteCopyOnWrite {
// this triggers the data write and event send // this triggers the data write and event send
funcWrapper.checkpointFunction(1); funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); String instant = lastPendingInstant();
final OperatorEvent nextEvent = funcWrapper.getNextEvent(); final OperatorEvent nextEvent = funcWrapper.getNextEvent();
MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
@@ -149,18 +150,17 @@ public class TestWriteCopyOnWrite {
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); checkInstantState(REQUESTED, instant);
funcWrapper.checkpointComplete(1); funcWrapper.checkpointComplete(1);
// the coordinator checkpoint commits the inflight instant. // the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(HoodieInstant.State.COMPLETED, instant);
// checkpoint for next round, no data input, so after the checkpoint, // checkpoint for next round, no data input, so after the checkpoint,
// there should not be REQUESTED Instant // there should not be REQUESTED Instant
// this triggers the data write and event send // this triggers the data write and event send
funcWrapper.checkpointFunction(2); funcWrapper.checkpointFunction(2);
String instant2 = funcWrapper.getWriteClient() String instant2 = lastPendingInstant();
.getLastPendingInstant(getTableType());
assertNotEquals(instant, instant2); assertNotEquals(instant, instant2);
final OperatorEvent nextEvent2 = funcWrapper.getNextEvent(); final OperatorEvent nextEvent2 = funcWrapper.getNextEvent();
@@ -174,12 +174,15 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointComplete(2); funcWrapper.checkpointComplete(2);
// started a new instant already // started a new instant already
checkInflightInstant(funcWrapper.getWriteClient()); checkInflightInstant();
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(HoodieInstant.State.COMPLETED, instant);
} }
@Test @Test
public void testCheckpointFails() throws Exception { public void testCheckpointFails() throws Exception {
// reset the config option
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data // open the function and ingest data
funcWrapper.openFunction(); funcWrapper.openFunction();
// no data written and triggers checkpoint fails, // no data written and triggers checkpoint fails,
@@ -188,8 +191,7 @@ public class TestWriteCopyOnWrite {
// this triggers the data write and event send // this triggers the data write and event send
funcWrapper.checkpointFunction(1); funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient() String instant = lastPendingInstant();
.getLastPendingInstant(getTableType());
assertNotNull(instant); assertNotNull(instant);
final OperatorEvent nextEvent = funcWrapper.getNextEvent(); final OperatorEvent nextEvent = funcWrapper.getNextEvent();
@@ -204,18 +206,17 @@ public class TestWriteCopyOnWrite {
"The last checkpoint was aborted, ignore the events"); "The last checkpoint was aborted, ignore the events");
// the instant metadata should be reused // the instant metadata should be reused
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); checkInstantState(REQUESTED, instant);
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null); checkInstantState(HoodieInstant.State.COMPLETED, null);
for (RowData rowData : TestData.DATA_SET_INSERT) { for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
// this returns early because there is no inflight instant // this returns early because there is no inflight instant
assertThrows(HoodieException.class, assertDoesNotThrow(() -> funcWrapper.checkpointFunction(2),
() -> funcWrapper.checkpointFunction(2), "The stream writer reuse the last instant time when waiting for the last instant commit timeout");
"Timeout(0ms) while waiting for"); // do not send the write event and fails the checkpoint,
// do not sent the write event and fails the checkpoint,
// behaves like the last checkpoint is successful. // behaves like the last checkpoint is successful.
funcWrapper.checkpointFails(2); funcWrapper.checkpointFails(2);
} }
@@ -231,16 +232,16 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(1); funcWrapper.checkpointFunction(1);
funcWrapper.getNextEvent(); funcWrapper.getNextEvent();
String instant1 = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); String instant1 = lastPendingInstant();
assertNotNull(instant1); assertNotNull(instant1);
// fails the subtask // fails the subtask
funcWrapper.subTaskFails(0); funcWrapper.subTaskFails(0);
String instant2 = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); String instant2 = lastPendingInstant();
assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant"); assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null); checkInstantState(HoodieInstant.State.COMPLETED, null);
} }
@Test @Test
@@ -255,8 +256,7 @@ public class TestWriteCopyOnWrite {
// this triggers the data write and event send // this triggers the data write and event send
funcWrapper.checkpointFunction(1); funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient() String instant = lastPendingInstant();
.getLastPendingInstant(getTableType());
final OperatorEvent nextEvent = funcWrapper.getNextEvent(); final OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
@@ -264,11 +264,11 @@ public class TestWriteCopyOnWrite {
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); checkInstantState(REQUESTED, instant);
funcWrapper.checkpointComplete(1); funcWrapper.checkpointComplete(1);
checkWrittenData(tempFile, EXPECTED1); checkWrittenData(tempFile, EXPECTED1);
// the coordinator checkpoint commits the inflight instant. // the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED1); checkWrittenData(tempFile, EXPECTED1);
} }
@@ -341,8 +341,7 @@ public class TestWriteCopyOnWrite {
// this triggers the data write and event send // this triggers the data write and event send
funcWrapper.checkpointFunction(2); funcWrapper.checkpointFunction(2);
String instant = funcWrapper.getWriteClient() String instant = lastPendingInstant();
.getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent(); nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
@@ -350,10 +349,10 @@ public class TestWriteCopyOnWrite {
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); checkInstantState(REQUESTED, instant);
funcWrapper.checkpointComplete(2); funcWrapper.checkpointComplete(2);
// the coordinator checkpoint commits the inflight instant. // the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED2); checkWrittenData(tempFile, EXPECTED2);
} }
@@ -386,8 +385,7 @@ public class TestWriteCopyOnWrite {
// this triggers the data write and event send // this triggers the data write and event send
funcWrapper.checkpointFunction(2); funcWrapper.checkpointFunction(2);
String instant = funcWrapper.getWriteClient() String instant = lastPendingInstant();
.getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent(); nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
@@ -395,10 +393,10 @@ public class TestWriteCopyOnWrite {
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); checkInstantState(REQUESTED, instant);
funcWrapper.checkpointComplete(2); funcWrapper.checkpointComplete(2);
// the coordinator checkpoint commits the inflight instant. // the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(HoodieInstant.State.COMPLETED, instant);
Map<String, String> expected = getUpsertWithDeleteExpected(); Map<String, String> expected = getUpsertWithDeleteExpected();
checkWrittenData(tempFile, expected); checkWrittenData(tempFile, expected);
@@ -437,8 +435,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.getCoordinator().handleEventFromOperator(0, event2); funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient() String instant = lastPendingInstant();
.getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1); funcWrapper.checkpointComplete(1);
@@ -446,8 +443,8 @@ public class TestWriteCopyOnWrite {
checkWrittenData(tempFile, expected, 1); checkWrittenData(tempFile, expected, 1);
// started a new instant already // started a new instant already
checkInflightInstant(funcWrapper.getWriteClient()); checkInflightInstant();
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(HoodieInstant.State.COMPLETED, instant);
// insert duplicates again // insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
@@ -500,8 +497,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.getCoordinator().handleEventFromOperator(0, event2); funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient() String instant = lastPendingInstant();
.getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1); funcWrapper.checkpointComplete(1);
@@ -511,8 +507,8 @@ public class TestWriteCopyOnWrite {
checkWrittenData(tempFile, expected, 1); checkWrittenData(tempFile, expected, 1);
// started a new instant already // started a new instant already
checkInflightInstant(funcWrapper.getWriteClient()); checkInflightInstant();
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(HoodieInstant.State.COMPLETED, instant);
// insert duplicates again // insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
@@ -552,8 +548,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.getCoordinator().handleEventFromOperator(0, event1); funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient() String instant = lastPendingInstant();
.getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1); funcWrapper.checkpointComplete(1);
@@ -569,8 +564,8 @@ public class TestWriteCopyOnWrite {
TestData.checkWrittenAllData(tempFile, expected, 1); TestData.checkWrittenAllData(tempFile, expected, 1);
// started a new instant already // started a new instant already
checkInflightInstant(funcWrapper.getWriteClient()); checkInflightInstant();
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(HoodieInstant.State.COMPLETED, instant);
// insert duplicates again // insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
@@ -631,8 +626,7 @@ public class TestWriteCopyOnWrite {
} }
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient() String instant = lastPendingInstant();
.getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1); funcWrapper.checkpointComplete(1);
@@ -640,8 +634,8 @@ public class TestWriteCopyOnWrite {
checkWrittenData(tempFile, expected, 1); checkWrittenData(tempFile, expected, 1);
// started a new instant already // started a new instant already
checkInflightInstant(funcWrapper.getWriteClient()); checkInflightInstant();
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(HoodieInstant.State.COMPLETED, instant);
// insert duplicates again // insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
@@ -736,8 +730,7 @@ public class TestWriteCopyOnWrite {
assertTrue(funcWrapper.isAlreadyBootstrap()); assertTrue(funcWrapper.isAlreadyBootstrap());
String instant = funcWrapper.getWriteClient() String instant = lastPendingInstant();
.getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent(); nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
@@ -748,18 +741,18 @@ public class TestWriteCopyOnWrite {
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); checkInstantState(REQUESTED, instant);
funcWrapper.checkpointComplete(1); funcWrapper.checkpointComplete(1);
// the coordinator checkpoint commits the inflight instant. // the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED2); checkWrittenData(tempFile, EXPECTED2);
} }
@Test @Test
public void testWriteExactlyOnce() throws Exception { public void testWriteExactlyOnce() throws Exception {
// reset the config option // reset the config option
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
@@ -779,7 +772,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(1); funcWrapper.checkpointFunction(1);
assertTrue(funcWrapper.isConforming(), "The write function should be waiting for the instant to commit"); assertTrue(funcWrapper.isConforming(), "The write function should be waiting for the instant to commit");
for (int i = 0; i < 2; i++) { for (int i = 0; i < 4; i++) {
final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class)); assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event); funcWrapper.getCoordinator().handleEventFromOperator(0, event);
@@ -793,26 +786,25 @@ public class TestWriteCopyOnWrite {
assertFalse(funcWrapper.isConforming(), "The write function should finish waiting for the instant to commit"); assertFalse(funcWrapper.isConforming(), "The write function should finish waiting for the instant to commit");
// checkpoint for the next round, when there is eager flush but the write // checkpoint for the next round
// task is waiting for the instant commit ack, should throw for timeout.
funcWrapper.checkpointFunction(2); funcWrapper.checkpointFunction(2);
assertThrows(HoodieException.class, () -> { assertDoesNotThrow(() -> {
for (RowData rowData : TestData.DATA_SET_INSERT) { for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
}, "Timeout(500ms) while waiting for instant"); }, "The stream writer reuse the last instant time when waiting for the last instant commit timeout");
} }
@Test @Test
public void testReuseEmbeddedServer() { public void testReuseEmbeddedServer() throws IOException {
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig(); FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig();
assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
// get another write client // get another write client
writeClient = StreamerUtil.createWriteClient(conf, null); writeClient = StreamerUtil.createWriteClient(conf);
assertSame(writeClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); assertSame(writeClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort()); assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort());
} }
@@ -821,24 +813,19 @@ public class TestWriteCopyOnWrite {
// Utilities // Utilities
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@SuppressWarnings("rawtypes") private void checkInflightInstant() {
private void checkInflightInstant(HoodieFlinkWriteClient writeClient) { final String instant = TestUtils.getLastPendingInstant(tempFile.getAbsolutePath());
final String instant = writeClient.getLastPendingInstant(getTableType());
assertNotNull(instant); assertNotNull(instant);
} }
@SuppressWarnings("rawtypes") private void checkInstantState(HoodieInstant.State state, String instantStr) {
private void checkInstantState(
HoodieFlinkWriteClient writeClient,
HoodieInstant.State state,
String instantStr) {
final String instant; final String instant;
switch (state) { switch (state) {
case REQUESTED: case REQUESTED:
instant = writeClient.getLastPendingInstant(getTableType()); instant = lastPendingInstant();
break; break;
case COMPLETED: case COMPLETED:
instant = writeClient.getLastCompletedInstant(getTableType()); instant = lastCompleteInstant();
break; break;
default: default:
throw new AssertionError("Unexpected state"); throw new AssertionError("Unexpected state");
@@ -846,6 +833,14 @@ public class TestWriteCopyOnWrite {
assertThat(instant, is(instantStr)); assertThat(instant, is(instantStr));
} }
protected String lastPendingInstant() {
return TestUtils.getLastPendingInstant(tempFile.getAbsolutePath());
}
protected String lastCompleteInstant() {
return TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
}
protected HoodieTableType getTableType() { protected HoodieTableType getTableType() {
return HoodieTableType.COPY_ON_WRITE; return HoodieTableType.COPY_ON_WRITE;
} }

View File

@@ -25,8 +25,6 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
@@ -39,10 +37,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import java.io.File; import java.io.File;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
/** /**
* Test cases for delta stream write. * Test cases for delta stream write.
@@ -71,13 +67,7 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception { protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient(); HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient();
Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
String latestInstant = metaClient.getCommitsTimeline().filterCompletedInstants() String latestInstant = lastCompleteInstant();
.getInstants()
.filter(x -> x.getAction().equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()).stream()
.max(Comparator.naturalOrder())
.orElse(null);
TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema); TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
} }

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.sink;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
@@ -53,4 +54,9 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite {
protected HoodieTableType getTableType() { protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ; return HoodieTableType.MERGE_ON_READ;
} }
@Override
protected String lastCompleteInstant() {
return TestUtils.getLastDeltaCompleteInstant(tempFile.getAbsolutePath());
}
} }

View File

@@ -41,8 +41,9 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File; import java.io.File;
import java.util.Arrays; import java.util.Arrays;
@@ -69,9 +70,8 @@ public class ITTestHoodieFlinkCompactor {
@TempDir @TempDir
File tempFile; File tempFile;
//@ParameterizedTest @ParameterizedTest
//@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
@Disabled
public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
// Create hoodie table and insert into data. // Create hoodie table and insert into data.
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
@@ -112,7 +112,7 @@ public class ITTestHoodieFlinkCompactor {
// judge whether have operation // judge whether have operation
// To compute the compaction instant time and do compaction. // To compute the compaction instant time and do compaction.
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
assertTrue(scheduled, "The compaction plan should be scheduled"); assertTrue(scheduled, "The compaction plan should be scheduled");
@@ -141,6 +141,7 @@ public class ITTestHoodieFlinkCompactor {
.setParallelism(1); .setParallelism(1);
env.execute("flink_hudi_compaction"); env.execute("flink_hudi_compaction");
writeClient.close();
TestData.checkWrittenFullData(tempFile, EXPECTED); TestData.checkWrittenFullData(tempFile, EXPECTED);
} }
} }

View File

@@ -22,9 +22,6 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.profile.WriteProfile; import org.apache.hudi.sink.partitioner.profile.WriteProfile;
@@ -51,9 +48,9 @@ import java.util.Map;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
@@ -358,11 +355,11 @@ public class TestBucketAssigner {
assertTrue(smallFiles1.isEmpty(), "Should have no small files"); assertTrue(smallFiles1.isEmpty(), "Should have no small files");
TestData.writeData(TestData.DATA_SET_INSERT, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
Option<String> instantOption = getLastCompleteInstant(writeProfile); String instantOption = getLastCompleteInstant(writeProfile);
assertFalse(instantOption.isPresent()); assertNull(instantOption);
writeProfile.reload(1); writeProfile.reload(1);
String instant1 = getLastCompleteInstant(writeProfile).orElse(null); String instant1 = getLastCompleteInstant(writeProfile);
assertNotNull(instant1); assertNotNull(instant1);
List<SmallFile> smallFiles2 = writeProfile.getSmallFiles("par1"); List<SmallFile> smallFiles2 = writeProfile.getSmallFiles("par1");
assertThat("Should have 1 small file", smallFiles2.size(), is(1)); assertThat("Should have 1 small file", smallFiles2.size(), is(1));
@@ -376,7 +373,7 @@ public class TestBucketAssigner {
smallFiles3.get(0).location.getInstantTime(), is(instant1)); smallFiles3.get(0).location.getInstantTime(), is(instant1));
writeProfile.reload(2); writeProfile.reload(2);
String instant2 = getLastCompleteInstant(writeProfile).orElse(null); String instant2 = getLastCompleteInstant(writeProfile);
assertNotEquals(instant2, instant1, "Should have new complete instant"); assertNotEquals(instant2, instant1, "Should have new complete instant");
List<SmallFile> smallFiles4 = writeProfile.getSmallFiles("par1"); List<SmallFile> smallFiles4 = writeProfile.getSmallFiles("par1");
assertThat("Should have 1 small file", smallFiles4.size(), is(1)); assertThat("Should have 1 small file", smallFiles4.size(), is(1));
@@ -389,12 +386,11 @@ public class TestBucketAssigner {
WriteProfile writeProfile = new WriteProfile(writeConfig, context); WriteProfile writeProfile = new WriteProfile(writeConfig, context);
assertTrue(writeProfile.getMetadataCache().isEmpty(), "Empty table should no have any instant metadata"); assertTrue(writeProfile.getMetadataCache().isEmpty(), "Empty table should no have any instant metadata");
HoodieTimeline emptyTimeline = writeProfile.getTable().getActiveTimeline();
// write 3 instants of data // write 3 instants of data
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
TestData.writeData(TestData.DATA_SET_INSERT, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
} }
// the record profile triggers the metadata loading
writeProfile.reload(1); writeProfile.reload(1);
assertThat("Metadata cache should have same number entries as timeline instants", assertThat("Metadata cache should have same number entries as timeline instants",
writeProfile.getMetadataCache().size(), is(3)); writeProfile.getMetadataCache().size(), is(3));
@@ -402,15 +398,10 @@ public class TestBucketAssigner {
writeProfile.getSmallFiles("par1"); writeProfile.getSmallFiles("par1");
assertThat("The metadata should be reused", assertThat("The metadata should be reused",
writeProfile.getMetadataCache().size(), is(3)); writeProfile.getMetadataCache().size(), is(3));
writeProfile.reload(2);
writeProfile.initFSViewIfNecessary(emptyTimeline);
assertTrue(writeProfile.getMetadataCache().isEmpty(), "Metadata cache should be all cleaned");
} }
private static Option<String> getLastCompleteInstant(WriteProfile profile) { private static String getLastCompleteInstant(WriteProfile profile) {
return profile.getTable().getMetaClient().getCommitsTimeline() return StreamerUtil.getLastCompletedInstant(profile.getTable().getMetaClient());
.filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp);
} }
private void assertBucketEquals( private void assertBucketEquals(

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.sink.utils; package org.apache.hudi.sink.utils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.append.AppendWriteFunction; import org.apache.hudi.sink.append.AppendWriteFunction;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
@@ -98,11 +97,6 @@ public class InsertFunctionWrapper<I> {
return this.gateway.getNextEvent(); return this.gateway.getNextEvent();
} }
@SuppressWarnings("rawtypes")
public HoodieFlinkWriteClient getWriteClient() {
return this.writeFunction.getWriteClient();
}
public void checkpointFunction(long checkpointId) throws Exception { public void checkpointFunction(long checkpointId) throws Exception {
// checkpoint the coordinator first // checkpoint the coordinator first
this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.sink.utils; package org.apache.hudi.sink.utils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
@@ -211,11 +210,6 @@ public class StreamWriteFunctionWrapper<I> {
return this.writeFunction.getDataBuffer(); return this.writeFunction.getDataBuffer();
} }
@SuppressWarnings("rawtypes")
public HoodieFlinkWriteClient getWriteClient() {
return this.writeFunction.getWriteClient();
}
public void checkpointFunction(long checkpointId) throws Exception { public void checkpointFunction(long checkpointId) throws Exception {
// checkpoint the coordinator first // checkpoint the coordinator first
this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());

View File

@@ -89,7 +89,7 @@ public class TestStreamReadMonitoringFunction {
assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()),
"All the instants should have range limit"); "All the instants should have range limit");
String latestCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
assertTrue(sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(latestCommit)), assertTrue(sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(latestCommit)),
"All the splits should be with latestCommit instant time"); "All the splits should be with latestCommit instant time");
@@ -143,7 +143,7 @@ public class TestStreamReadMonitoringFunction {
// all the splits should come from the second commit. // all the splits should come from the second commit.
TestData.writeData(TestData.DATA_SET_INSERT, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); String specifiedCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.READ_START_COMMIT, specifiedCommit); conf.setString(FlinkOptions.READ_START_COMMIT, specifiedCommit);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) { try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) {
@@ -174,7 +174,7 @@ public class TestStreamReadMonitoringFunction {
// all the splits should come from the earliest commit. // all the splits should come from the earliest commit.
TestData.writeData(TestData.DATA_SET_INSERT, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); String specifiedCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST); conf.setString(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) { try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) {

View File

@@ -111,7 +111,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String insertInto = "insert into t1 select * from source"; String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto); execInsertSql(streamTableEnv, insertInto);
String firstCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath()); String firstCommit = TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
streamTableEnv.executeSql("drop table t1"); streamTableEnv.executeSql("drop table t1");
hoodieTableDDL = sql("t1") hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
@@ -181,7 +181,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
// execute 2 times // execute 2 times
execInsertSql(streamTableEnv, insertInto); execInsertSql(streamTableEnv, insertInto);
// remember the commit // remember the commit
String specifiedCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath()); String specifiedCommit = TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
// another update batch // another update batch
String insertInto2 = "insert into t1 select * from source2"; String insertInto2 = "insert into t1 select * from source2";
execInsertSql(streamTableEnv, insertInto2); execInsertSql(streamTableEnv, insertInto2);
@@ -264,8 +264,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
Map<String, String> options1 = new HashMap<>(defaultConf.toMap()); Map<String, String> options1 = new HashMap<>(defaultConf.toMap());
options1.put(FlinkOptions.TABLE_NAME.key(), "t1"); options1.put(FlinkOptions.TABLE_NAME.key(), "t1");
Configuration conf = Configuration.fromMap(options1); Configuration conf = Configuration.fromMap(options1);
HoodieTimeline timeline = StreamerUtil.createWriteClient(conf, null) HoodieTimeline timeline = StreamerUtil.createMetaClient(conf).getActiveTimeline();
.getHoodieTable().getActiveTimeline();
assertTrue(timeline.filterCompletedInstants() assertTrue(timeline.filterCompletedInstants()
.getInstants().anyMatch(instant -> instant.getAction().equals("clean")), .getInstants().anyMatch(instant -> instant.getAction().equals("clean")),
"some commits should be cleaned"); "some commits should be cleaned");
@@ -285,8 +284,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
// write another commit with deletes // write another commit with deletes
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
String latestCommit = StreamerUtil.createWriteClient(conf, null) String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
.getLastCompletedInstant(HoodieTableType.MERGE_ON_READ);
String hoodieTableDDL = sql("t1") String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
@@ -756,19 +754,17 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
+ " 'format' = 'debezium-json'\n" + " 'format' = 'debezium-json'\n"
+ ")"; + ")";
streamTableEnv.executeSql(sourceDDL); streamTableEnv.executeSql(sourceDDL);
String hoodieTableDDL = "" String hoodieTableDDL = sql("hoodie_sink")
+ "CREATE TABLE hoodie_sink(\n" .field("id INT NOT NULL")
+ " id INT NOT NULL,\n" .field("ts BIGINT")
+ " ts BIGINT,\n" .field("name STRING")
+ " name STRING," .field("weight DOUBLE")
+ " weight DOUBLE," .pkField("id")
+ " PRIMARY KEY (id) NOT ENFORCED" .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ ") with (\n" .option(FlinkOptions.READ_AS_STREAMING, execMode == ExecMode.STREAM)
+ " 'connector' = 'hudi',\n" .option(FlinkOptions.PRE_COMBINE, true)
+ " 'path' = '" + tempFile.getAbsolutePath() + "',\n" .noPartition()
+ " 'read.streaming.enabled' = '" + (execMode == ExecMode.STREAM) + "',\n" .end();
+ " 'write.insert.drop.duplicates' = 'true'"
+ ")";
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into hoodie_sink select id, ts, name, weight from debezium_source"; String insertInto = "insert into hoodie_sink select id, ts, name, weight from debezium_source";
execInsertSql(streamTableEnv, insertInto); execInsertSql(streamTableEnv, insertInto);
@@ -949,7 +945,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
TestData.writeData(TestData.dataSetInsert(3, 4), conf); TestData.writeData(TestData.dataSetInsert(3, 4), conf);
TestData.writeData(TestData.dataSetInsert(5, 6), conf); TestData.writeData(TestData.dataSetInsert(5, 6), conf);
String latestCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
String hoodieTableDDL = sql("t1") String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.PATH, tempFile.getAbsolutePath())

View File

@@ -58,7 +58,7 @@ public class TestCompactionUtil {
StreamerUtil.initTableIfNotExists(conf); StreamerUtil.initTableIfNotExists(conf);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
HoodieFlinkTable table = writeClient.getHoodieTable(); HoodieFlinkTable table = writeClient.getHoodieTable();
HoodieTableMetaClient metaClient = table.getMetaClient(); HoodieTableMetaClient metaClient = table.getMetaClient();

View File

@@ -19,6 +19,8 @@
package org.apache.hudi.utils; package org.apache.hudi.utils;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.source.StreamReadMonitoringFunction; import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -33,17 +35,33 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Common test utils. * Common test utils.
*/ */
public class TestUtils { public class TestUtils {
public static String getLastPendingInstant(String basePath) {
public static String getLatestCommit(String basePath) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get().getTimestamp(); return StreamerUtil.getLastPendingInstant(metaClient);
} }
public static String getFirstCommit(String basePath) { public static String getLastCompleteInstant(String basePath) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant().get().getTimestamp(); return StreamerUtil.getLastCompletedInstant(metaClient);
}
public static String getLastDeltaCompleteInstant(String basePath) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
return metaClient.getCommitsTimeline().filterCompletedInstants()
.filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION))
.lastInstant()
.map(HoodieInstant::getTimestamp)
.orElse(null);
}
public static String getFirstCompleteInstant(String basePath) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant()
.map(HoodieInstant::getTimestamp).orElse(null);
} }
public static String getSplitPartitionPath(MergeOnReadInputSplit split) { public static String getSplitPartitionPath(MergeOnReadInputSplit split) {

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.util.ViewStorageProperties;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* Test cases for {@link ViewStorageProperties}.
*/
public class TestViewStorageProperties {
@TempDir
File tempFile;
@Test
void testReadWriteProperties() throws IOException {
String basePath = tempFile.getAbsolutePath();
FileSystemViewStorageConfig config = FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK)
.withRemoteServerHost("host1")
.withRemoteServerPort(1234).build();
ViewStorageProperties.createProperties(basePath, config);
ViewStorageProperties.createProperties(basePath, config);
ViewStorageProperties.createProperties(basePath, config);
FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath);
assertThat(readConfig.getStorageType(), is(FileSystemViewStorageType.SPILLABLE_DISK));
assertThat(readConfig.getRemoteViewServerHost(), is("host1"));
assertThat(readConfig.getRemoteViewServerPort(), is(1234));
}
}