diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 7b553bc39..fdefd9050 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -448,11 +448,6 @@ public class HoodieFlinkWriteClient extends final HoodieRecordLocation loc = record.getCurrentLocation(); final String fileID = loc.getFileId(); final String partitionPath = record.getPartitionPath(); - // Always use FlinkCreateHandle when insert duplication turns on - if (config.allowDuplicateInserts()) { - return new FlinkCreateHandle<>(config, instantTime, table, partitionPath, - fileID, table.getTaskContextSupplier()); - } if (bucketToHandles.containsKey(fileID)) { MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID); diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 0e2b0b338..8504f6aa7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -222,10 +222,10 @@ public class FlinkOptions extends HoodieConfig { .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ"); public static final ConfigOption INSERT_DEDUP = ConfigOptions - .key("write.insert.deduplicate") - .booleanType() - .defaultValue(true) - .withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true"); + .key("write.insert.deduplicate") + .booleanType() + .defaultValue(true) + .withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true"); public static final ConfigOption OPERATION = ConfigOptions .key("write.operation") @@ -370,6 +370,27 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(1024) .withDescription("Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB"); + public static final ConfigOption WRITE_PARQUET_BLOCK_SIZE = ConfigOptions + .key("write.parquet.block.size") + .intType() + .defaultValue(120) + .withDescription("Parquet RowGroup size. It's recommended to make this large enough that scan costs can be" + + " amortized by packing enough column values into a single row group."); + + public static final ConfigOption WRITE_PARQUET_MAX_FILE_SIZE = ConfigOptions + .key("write.parquet.max.file.size") + .intType() + .defaultValue(120) + .withDescription("Target size for parquet files produced by Hudi write phases. " + + "For DFS, this needs to be aligned with the underlying filesystem block size for optimal performance."); + + public static final ConfigOption WRITE_PARQUET_PAGE_SIZE = ConfigOptions + .key("hoodie.parquet.page.size") + .intType() + .defaultValue(1) + .withDescription("Parquet page size. Page is the unit of read within a parquet file. " + + "Within a block, pages are compressed separately."); + public static final ConfigOption WRITE_MERGE_MAX_MEMORY = ConfigOptions .key("write.merge.max_memory") .intType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 71b20ba27..a155fb52d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -18,38 +18,26 @@ package org.apache.hudi.sink; -import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.ObjectSizeCalculator; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.sink.event.CommitAckEvent; +import org.apache.hudi.sink.common.AbstractStreamWriteFunction; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +49,6 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Random; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -101,9 +88,7 @@ import java.util.stream.Collectors; * @param Type of the input record * @see StreamWriteOperatorCoordinator */ -public class StreamWriteFunction - extends KeyedProcessFunction - implements CheckpointedFunction { +public class StreamWriteFunction extends AbstractStreamWriteFunction { private static final long serialVersionUID = 1L; @@ -114,76 +99,20 @@ public class StreamWriteFunction */ private transient Map buckets; - /** - * Config options. - */ - private final Configuration config; - - /** - * Id of current subtask. - */ - private int taskID; - - /** - * Write Client. - */ - private transient HoodieFlinkWriteClient writeClient; - private transient BiFunction, String, List> writeFunction; - /** - * The REQUESTED instant we write the data. - */ - private volatile String currentInstant; - - /** - * Gateway to send operator events to the operator coordinator. - */ - private transient OperatorEventGateway eventGateway; - - /** - * Commit action type. - */ - private transient String actionType; - /** * Total size tracer. */ private transient TotalSizeTracer tracer; - /** - * Flag saying whether the write task is waiting for the checkpoint success notification - * after it finished a checkpoint. - * - *

The flag is needed because the write task does not block during the waiting time interval, - * some data buckets still flush out with old instant time. There are two cases that the flush may produce - * corrupted files if the old instant is committed successfully: - * 1) the write handle was writing data but interrupted, left a corrupted parquet file; - * 2) the write handle finished the write but was not closed, left an empty parquet file. - * - *

To solve, when this flag was set to true, we block the data flushing thus the #processElement method, - * the flag was reset to false if the task receives the checkpoint success event or the latest inflight instant - * time changed(the last instant committed successfully). - */ - private volatile boolean confirming = false; - - /** - * List state of the write metadata events. - */ - private transient ListState writeMetadataState; - - /** - * Write status list for the current checkpoint. - */ - private List writeStatuses; - /** * Constructs a StreamingSinkFunction. * * @param config The config options */ public StreamWriteFunction(Configuration config) { - this.config = config; + super(config); } @Override @@ -194,42 +123,15 @@ public class StreamWriteFunction } @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); - this.actionType = CommitUtils.getCommitActionType( - WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), - HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); - - this.writeStatuses = new ArrayList<>(); - this.writeMetadataState = context.getOperatorStateStore().getListState( - new ListStateDescriptor<>( - "write-metadata-state", - TypeInformation.of(WriteMetadataEvent.class) - )); - - this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); - if (context.isRestored()) { - restoreWriteMetadata(); - } else { - sendBootstrapEvent(); - } - // blocks flushing until the coordinator starts a new instant - this.confirming = true; - } - - @Override - public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + public void snapshotState() { // Based on the fact that the coordinator starts the checkpoint first, // it would check the validity. // wait for the buffer data flush out and request a new instant flushRemaining(false); - // Reload the snapshot state as the current state. - reloadWriteMetaState(); } @Override - public void processElement(I value, KeyedProcessFunction.Context ctx, Collector out) { + public void processElement(I value, ProcessFunction.Context ctx, Collector out) throws Exception { bufferRecord((HoodieRecord) value); } @@ -264,21 +166,6 @@ public class StreamWriteFunction return ret; } - @VisibleForTesting - @SuppressWarnings("rawtypes") - public HoodieFlinkWriteClient getWriteClient() { - return writeClient; - } - - @VisibleForTesting - public boolean isConfirming() { - return this.confirming; - } - - public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { - this.eventGateway = operatorEventGateway; - } - // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- @@ -307,49 +194,6 @@ public class StreamWriteFunction } } - private void restoreWriteMetadata() throws Exception { - String lastInflight = this.writeClient.getLastPendingInstant(this.actionType); - boolean eventSent = false; - for (WriteMetadataEvent event : this.writeMetadataState.get()) { - if (Objects.equals(lastInflight, event.getInstantTime())) { - // The checkpoint succeed but the meta does not commit, - // re-commit the inflight instant - this.eventGateway.sendEventToCoordinator(event); - LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID); - eventSent = true; - } - } - if (!eventSent) { - sendBootstrapEvent(); - } - } - - private void sendBootstrapEvent() { - this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID)); - LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); - } - - /** - * Reload the write metadata state as the current checkpoint. - */ - private void reloadWriteMetaState() throws Exception { - this.writeMetadataState.clear(); - WriteMetadataEvent event = WriteMetadataEvent.builder() - .taskID(taskID) - .instantTime(currentInstant) - .writeStatus(new ArrayList<>(writeStatuses)) - .bootstrap(true) - .build(); - this.writeMetadataState.add(event); - writeStatuses.clear(); - } - - public void handleOperatorEvent(OperatorEvent event) { - ValidationUtils.checkArgument(event instanceof CommitAckEvent, - "The write function can only handle CommitAckEvent"); - this.confirming = false; - } - /** * Represents a data item in the buffer, this is needed to reduce the * memory footprint. @@ -562,32 +406,6 @@ public class StreamWriteFunction && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0); } - private String instantToWrite(boolean hasData) { - String instant = this.writeClient.getLastPendingInstant(this.actionType); - // if exactly-once semantics turns on, - // waits for the checkpoint notification until the checkpoint timeout threshold hits. - TimeWait timeWait = TimeWait.builder() - .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)) - .action("instant initialize") - .build(); - while (confirming) { - // wait condition: - // 1. there is no inflight instant - // 2. the inflight instant does not change and the checkpoint has buffering data - if (instant == null || (instant.equals(this.currentInstant) && hasData)) { - // sleep for a while - timeWait.waitFor(); - // refresh the inflight instant - instant = this.writeClient.getLastPendingInstant(this.actionType); - } else { - // the inflight instant changed, which means the last instant was committed - // successfully. - confirming = false; - } - } - return instant; - } - @SuppressWarnings("unchecked, rawtypes") private boolean flushBucket(DataBucket bucket) { String instant = instantToWrite(true); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java index c16743e2a..9e39e3f26 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java @@ -18,12 +18,10 @@ package org.apache.hudi.sink; +import org.apache.hudi.sink.common.AbstractWriteOperator; +import org.apache.hudi.sink.common.WriteOperatorFactory; + import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; -import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; -import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.StreamSink; /** @@ -31,27 +29,13 @@ import org.apache.flink.streaming.api.operators.StreamSink; * * @param The input type */ -public class StreamWriteOperator - extends KeyedProcessOperator - implements OperatorEventHandler, BoundedOneInput { - private final StreamWriteFunction sinkFunction; +public class StreamWriteOperator extends AbstractWriteOperator { public StreamWriteOperator(Configuration conf) { super(new StreamWriteFunction<>(conf)); - this.sinkFunction = (StreamWriteFunction) getUserFunction(); } - @Override - public void handleOperatorEvent(OperatorEvent event) { - this.sinkFunction.handleOperatorEvent(event); - } - - void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { - sinkFunction.setOperatorEventGateway(operatorEventGateway); - } - - @Override - public void endInput() { - sinkFunction.endInput(); + public static WriteOperatorFactory getFactory(Configuration conf) { + return WriteOperatorFactory.instance(conf, new StreamWriteOperator<>(conf)); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java new file mode 100644 index 000000000..128c03010 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; +import org.apache.hudi.sink.common.AbstractStreamWriteFunction; +import org.apache.hudi.sink.event.WriteMetadataEvent; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; + +import java.util.List; + +/** + * Sink function to write the data to the underneath filesystem. + * + *

The function writes base files directly for each checkpoint, + * the file may roll over when it’s size hits the configured threshold. + * + * @param Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class AppendWriteFunction extends AbstractStreamWriteFunction { + + private static final long serialVersionUID = 1L; + + /** + * Helper class for log mode. + */ + private transient BulkInsertWriterHelper writerHelper; + + /** + * Table row type. + */ + private final RowType rowType; + + /** + * Constructs an AppendWriteFunction. + * + * @param config The config options + */ + public AppendWriteFunction(Configuration config, RowType rowType) { + super(config); + this.rowType = rowType; + } + + @Override + public void snapshotState() { + // Based on the fact that the coordinator starts the checkpoint first, + // it would check the validity. + // wait for the buffer data flush out and request a new instant + flushData(false); + // nullify the write helper for next ckp + this.writerHelper = null; + } + + @Override + public void processElement(I value, Context ctx, Collector out) throws Exception { + if (this.writerHelper == null) { + initWriterHelper(); + } + this.writerHelper.write((RowData) value); + } + + @Override + public void close() { + if (this.writeClient != null) { + this.writeClient.cleanHandlesGracefully(); + this.writeClient.close(); + } + } + + /** + * End input action for batch source. + */ + public void endInput() { + flushData(true); + this.writeClient.cleanHandles(); + this.writeStatuses.clear(); + } + + // ------------------------------------------------------------------------- + // GetterSetter + // ------------------------------------------------------------------------- + @VisibleForTesting + public BulkInsertWriterHelper getWriterHelper() { + return this.writerHelper; + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + private void initWriterHelper() { + this.currentInstant = instantToWrite(true); + if (this.currentInstant == null) { + // in case there are empty checkpoints that has no input data + throw new HoodieException("No inflight instant when flushing data!"); + } + this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(), + this.currentInstant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), + this.rowType); + } + + private void flushData(boolean endInput) { + final List writeStatus = this.writerHelper.getWriteStatuses(this.taskID); + final WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .instantTime(this.writerHelper.getInstantTime()) + .writeStatus(writeStatus) + .lastBatch(true) + .endInput(endInput) + .build(); + this.eventGateway.sendEventToCoordinator(event); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteOperator.java new file mode 100644 index 000000000..ad1a00203 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteOperator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.hudi.sink.common.AbstractWriteOperator; +import org.apache.hudi.sink.common.WriteOperatorFactory; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.types.logical.RowType; + +/** + * Operator for {@link AppendWriteFunction}. + * + * @param The input type + */ +public class AppendWriteOperator extends AbstractWriteOperator { + + public AppendWriteOperator(Configuration conf, RowType rowType) { + super(new AppendWriteFunction<>(conf, rowType)); + } + + public static WriteOperatorFactory getFactory(Configuration conf, RowType rowType) { + return WriteOperatorFactory.instance(conf, new AppendWriteOperator<>(conf, rowType)); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index dd0f7bce2..7fce5c0a3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -19,21 +19,20 @@ package org.apache.hudi.sink.bulk; import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.client.HoodieInternalWriteStatus; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.common.AbstractWriteFunction; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; -import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Collector; @@ -43,7 +42,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; /** * Sink function to write the data to the underneath filesystem. @@ -55,8 +53,8 @@ import java.util.stream.Collectors; * @param Type of the input record * @see StreamWriteOperatorCoordinator */ -public class BulkInsertWriteFunction - extends ProcessFunction { +public class BulkInsertWriteFunction + extends AbstractWriteFunction { private static final long serialVersionUID = 1L; @@ -126,7 +124,7 @@ public class BulkInsertWriteFunction } @Override - public void processElement(I value, Context ctx, Collector out) throws IOException { + public void processElement(I value, Context ctx, Collector out) throws IOException { this.writerHelper.write((RowData) value); } @@ -142,14 +140,8 @@ public class BulkInsertWriteFunction * End input action for batch source. */ public void endInput() { - final List writeStatus; - try { - this.writerHelper.close(); - writeStatus = this.writerHelper.getWriteStatuses().stream() - .map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList()); - } catch (IOException e) { - throw new HoodieException("Error collect the write status for task [" + this.taskID + "]"); - } + final List writeStatus = this.writerHelper.getWriteStatuses(this.taskID); + final WriteMetadataEvent event = WriteMetadataEvent.builder() .taskID(taskID) .instantTime(this.writerHelper.getInstantTime()) @@ -160,17 +152,9 @@ public class BulkInsertWriteFunction this.eventGateway.sendEventToCoordinator(event); } - /** - * Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}. - */ - private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) { - WriteStatus writeStatus = new WriteStatus(false, 0.1); - writeStatus.setStat(internalWriteStatus.getStat()); - writeStatus.setFileId(internalWriteStatus.getFileId()); - writeStatus.setGlobalError(internalWriteStatus.getGlobalError()); - writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords()); - writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords()); - return writeStatus; + @Override + public void handleOperatorEvent(OperatorEvent event) { + // no operation } // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java index bc01622c8..16fb87fb3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java @@ -18,24 +18,12 @@ package org.apache.hudi.sink.bulk; -import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.common.AbstractWriteOperator; +import org.apache.hudi.sink.common.WriteOperatorFactory; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher; -import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; -import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; -import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; -import org.apache.flink.streaming.api.operators.ProcessOperator; -import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory; -import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperatorParameters; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; /** @@ -44,13 +32,11 @@ import org.apache.flink.table.types.logical.RowType; * @param The input type */ public class BulkInsertWriteOperator - extends ProcessOperator - implements OperatorEventHandler, BoundedOneInput { - private final BulkInsertWriteFunction sinkFunction; + extends AbstractWriteOperator + implements BoundedOneInput { public BulkInsertWriteOperator(Configuration conf, RowType rowType) { super(new BulkInsertWriteFunction<>(conf, rowType)); - this.sinkFunction = (BulkInsertWriteFunction) getUserFunction(); } @Override @@ -58,58 +44,7 @@ public class BulkInsertWriteOperator // no operation } - void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { - sinkFunction.setOperatorEventGateway(operatorEventGateway); - } - - @Override - public void endInput() { - sinkFunction.endInput(); - } - - public static OperatorFactory getFactory(Configuration conf, RowType rowType) { - return new OperatorFactory<>(conf, rowType); - } - - // ------------------------------------------------------------------------- - // Inner Class - // ------------------------------------------------------------------------- - - public static class OperatorFactory - extends SimpleUdfStreamOperatorFactory - implements CoordinatedOperatorFactory, OneInputStreamOperatorFactory { - private static final long serialVersionUID = 1L; - - private final BulkInsertWriteOperator operator; - private final Configuration conf; - - public OperatorFactory(Configuration conf, RowType rowType) { - super(new BulkInsertWriteOperator<>(conf, rowType)); - this.operator = (BulkInsertWriteOperator) getOperator(); - this.conf = conf; - } - - @Override - @SuppressWarnings("unchecked") - public > T createStreamOperator(StreamOperatorParameters parameters) { - final OperatorID operatorID = parameters.getStreamConfig().getOperatorID(); - final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher(); - - this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID)); - this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); - this.operator.setProcessingTimeService(this.processingTimeService); - eventDispatcher.registerEventHandler(operatorID, operator); - return (T) operator; - } - - @Override - public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) { - return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf); - } - - @Override - public void setProcessingTimeService(ProcessingTimeService processingTimeService) { - super.setProcessingTimeService(processingTimeService); - } + public static WriteOperatorFactory getFactory(Configuration conf, RowType rowType) { + return WriteOperatorFactory.instance(conf, new BulkInsertWriteOperator<>(conf, rowType)); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index fbe7678b7..e0cbab602 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -19,9 +19,11 @@ package org.apache.hudi.sink.bulk; import org.apache.hudi.client.HoodieInternalWriteStatus; +import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle; import org.apache.hudi.table.HoodieTable; @@ -39,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; /** * Helper class for bulk insert used by Flink. @@ -101,7 +104,7 @@ public class BulkInsertWriterHelper { } } - public List getWriteStatuses() throws IOException { + public List getHoodieWriteStatuses() throws IOException { close(); return writeStatusList; } @@ -172,5 +175,27 @@ public class BulkInsertWriterHelper { return new RowType(false, mergedFields); } + + public List getWriteStatuses(int taskID) { + try { + return getHoodieWriteStatuses().stream() + .map(BulkInsertWriterHelper::toWriteStatus).collect(Collectors.toList()); + } catch (IOException e) { + throw new HoodieException("Error collect the write status for task [" + taskID + "]"); + } + } + + /** + * Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}. + */ + private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) { + WriteStatus writeStatus = new WriteStatus(false, 0.1); + writeStatus.setStat(internalWriteStatus.getStat()); + writeStatus.setFileId(internalWriteStatus.getFileId()); + writeStatus.setGlobalError(internalWriteStatus.getGlobalError()); + writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords()); + writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords()); + return writeStatus; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java index 7be2b410d..aa6224057 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java @@ -35,7 +35,6 @@ import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; import org.apache.flink.table.runtime.util.StreamRecordCollector; import org.apache.flink.util.MutableObjectIterator; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java new file mode 100644 index 000000000..654f0b864 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.common; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.event.CommitAckEvent; +import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.utils.TimeWait; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Base infrastructures for streaming writer function. + * + * @param Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public abstract class AbstractStreamWriteFunction + extends AbstractWriteFunction + implements CheckpointedFunction { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamWriteFunction.class); + + /** + * Config options. + */ + protected final Configuration config; + + /** + * Id of current subtask. + */ + protected int taskID; + + /** + * Write Client. + */ + protected transient HoodieFlinkWriteClient writeClient; + + /** + * The REQUESTED instant we write the data. + */ + protected volatile String currentInstant; + + /** + * Gateway to send operator events to the operator coordinator. + */ + protected transient OperatorEventGateway eventGateway; + + /** + * Commit action type. + */ + protected transient String actionType; + + /** + * Flag saying whether the write task is waiting for the checkpoint success notification + * after it finished a checkpoint. + * + *

The flag is needed because the write task does not block during the waiting time interval, + * some data buckets still flush out with old instant time. There are two cases that the flush may produce + * corrupted files if the old instant is committed successfully: + * 1) the write handle was writing data but interrupted, left a corrupted parquet file; + * 2) the write handle finished the write but was not closed, left an empty parquet file. + * + *

To solve, when this flag was set to true, we block the data flushing thus the #processElement method, + * the flag was reset to false if the task receives the checkpoint success event or the latest inflight instant + * time changed(the last instant committed successfully). + */ + protected volatile boolean confirming = false; + + /** + * List state of the write metadata events. + */ + private transient ListState writeMetadataState; + + /** + * Write status list for the current checkpoint. + */ + protected List writeStatuses; + + /** + * Constructs a StreamWriteFunctionBase. + * + * @param config The config options + */ + public AbstractStreamWriteFunction(Configuration config) { + this.config = config; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); + this.actionType = CommitUtils.getCommitActionType( + WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), + HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); + + this.writeStatuses = new ArrayList<>(); + this.writeMetadataState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>( + "write-metadata-state", + TypeInformation.of(WriteMetadataEvent.class) + )); + + this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); + if (context.isRestored()) { + restoreWriteMetadata(); + } else { + sendBootstrapEvent(); + } + // blocks flushing until the coordinator starts a new instant + this.confirming = true; + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + snapshotState(); + // Reload the snapshot state as the current state. + reloadWriteMetaState(); + } + + public abstract void snapshotState(); + + // ------------------------------------------------------------------------- + // Getter/Setter + // ------------------------------------------------------------------------- + @VisibleForTesting + @SuppressWarnings("rawtypes") + public HoodieFlinkWriteClient getWriteClient() { + return writeClient; + } + + @VisibleForTesting + public boolean isConfirming() { + return this.confirming; + } + + public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { + this.eventGateway = operatorEventGateway; + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void restoreWriteMetadata() throws Exception { + String lastInflight = this.writeClient.getLastPendingInstant(this.actionType); + boolean eventSent = false; + for (WriteMetadataEvent event : this.writeMetadataState.get()) { + if (Objects.equals(lastInflight, event.getInstantTime())) { + // The checkpoint succeed but the meta does not commit, + // re-commit the inflight instant + this.eventGateway.sendEventToCoordinator(event); + LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID); + eventSent = true; + } + } + if (!eventSent) { + sendBootstrapEvent(); + } + } + + private void sendBootstrapEvent() { + this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID)); + LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); + } + + /** + * Reload the write metadata state as the current checkpoint. + */ + private void reloadWriteMetaState() throws Exception { + this.writeMetadataState.clear(); + WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .instantTime(currentInstant) + .writeStatus(new ArrayList<>(writeStatuses)) + .bootstrap(true) + .build(); + this.writeMetadataState.add(event); + writeStatuses.clear(); + } + + public void handleOperatorEvent(OperatorEvent event) { + ValidationUtils.checkArgument(event instanceof CommitAckEvent, + "The write function can only handle CommitAckEvent"); + this.confirming = false; + } + + /** + * Prepares the instant time to write with for next checkpoint. + * + * @param hasData Whether the task has buffering data + * @return The instant time + */ + protected String instantToWrite(boolean hasData) { + String instant = this.writeClient.getLastPendingInstant(this.actionType); + // if exactly-once semantics turns on, + // waits for the checkpoint notification until the checkpoint timeout threshold hits. + TimeWait timeWait = TimeWait.builder() + .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)) + .action("instant initialize") + .build(); + while (confirming) { + // wait condition: + // 1. there is no inflight instant + // 2. the inflight instant does not change and the checkpoint has buffering data + if (instant == null || (instant.equals(this.currentInstant) && hasData)) { + // sleep for a while + timeWait.waitFor(); + // refresh the inflight instant + instant = this.writeClient.getLastPendingInstant(this.actionType); + } else { + // the inflight instant changed, which means the last instant was committed + // successfully. + confirming = false; + } + } + return instant; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java new file mode 100644 index 000000000..8e776006f --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.common; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.operators.BoundedOneInput; + +/** + * Base class for write function. + * + * @param the input type + */ +public abstract class AbstractWriteFunction extends ProcessFunction implements BoundedOneInput { + /** + * Sets up the event gateway. + */ + public abstract void setOperatorEventGateway(OperatorEventGateway operatorEventGateway); + + /** + * Invoked when bounded source ends up. + */ + public abstract void endInput(); + + /** + * Handles the operator event sent by the coordinator. + * @param event The event + */ + public abstract void handleOperatorEvent(OperatorEvent event); +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java new file mode 100644 index 000000000..e339ccb0b --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.common; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.ProcessOperator; + +/** + * Base class for write operator. + * + * @param the input type + */ +public abstract class AbstractWriteOperator + extends ProcessOperator + implements OperatorEventHandler, BoundedOneInput { + private final AbstractWriteFunction function; + + public AbstractWriteOperator(AbstractWriteFunction function) { + super(function); + this.function = function; + } + + public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { + this.function.setOperatorEventGateway(operatorEventGateway); + } + + @Override + public void endInput() { + this.function.endInput(); + } + + @Override + public void handleOperatorEvent(OperatorEvent evt) { + this.function.handleOperatorEvent(evt); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/WriteOperatorFactory.java similarity index 83% rename from hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java rename to hudi-flink/src/main/java/org/apache/hudi/sink/common/WriteOperatorFactory.java index ce898866b..01a28debc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/WriteOperatorFactory.java @@ -16,7 +16,10 @@ * limitations under the License. */ -package org.apache.hudi.sink; +package org.apache.hudi.sink.common; + +import org.apache.hudi.sink.StreamWriteOperator; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -31,20 +34,24 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters; /** * Factory class for {@link StreamWriteOperator}. */ -public class StreamWriteOperatorFactory +public class WriteOperatorFactory extends SimpleUdfStreamOperatorFactory implements CoordinatedOperatorFactory, OneInputStreamOperatorFactory { private static final long serialVersionUID = 1L; - private final StreamWriteOperator operator; + private final AbstractWriteOperator operator; private final Configuration conf; - public StreamWriteOperatorFactory(Configuration conf) { - super(new StreamWriteOperator<>(conf)); - this.operator = (StreamWriteOperator) getOperator(); + public WriteOperatorFactory(Configuration conf, AbstractWriteOperator operator) { + super(operator); + this.operator = operator; this.conf = conf; } + public static WriteOperatorFactory instance(Configuration conf, AbstractWriteOperator operator) { + return new WriteOperatorFactory<>(conf, operator); + } + @Override @SuppressWarnings("unchecked") public > T createStreamOperator(StreamOperatorParameters parameters) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java index 93f74af43..541fd062f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java @@ -29,7 +29,8 @@ public class CommitAckEvent implements OperatorEvent { private static final CommitAckEvent INSTANCE = new CommitAckEvent(); // default constructor for efficient serialization - public CommitAckEvent() {} + public CommitAckEvent() { + } public static CommitAckEvent getInstance() { return INSTANCE; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index 9f56bddeb..6b5e96eb8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -61,7 +61,7 @@ public class DeltaWriteProfile extends WriteProfile { // If we can index log files, we can add more inserts to log files for fileIds including those under // pending compaction. List allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) - .collect(Collectors.toList()); + .collect(Collectors.toList()); for (FileSlice fileSlice : allFileSlices) { if (isSmallFile(fileSlice)) { allSmallFileSlices.add(fileSlice); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java index 1fcf3f11c..0ab8f12de 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java @@ -52,7 +52,8 @@ public class WriteProfiles { private static final Map PROFILES = new HashMap<>(); - private WriteProfiles() {} + private WriteProfiles() { + } public static synchronized WriteProfile singleton( boolean ignoreSmallFiles, @@ -104,7 +105,6 @@ public class WriteProfiles { * @param basePath Table base path * @param metadata The metadata * @param fs The filesystem - * * @return the commit file status list */ private static List getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) { @@ -143,7 +143,6 @@ public class WriteProfiles { * @param basePath The table base path * @param instant The hoodie instant * @param timeline The timeline - * * @return the commit metadata or empty if any error occurs */ public static Option getCommitMetadataSafely( @@ -172,7 +171,6 @@ public class WriteProfiles { * @param basePath The table base path * @param instant The hoodie instant * @param timeline The timeline - * * @return the commit metadata */ public static HoodieCommitMetadata getCommitMetadata( diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java index 5b811a8b0..0007fd1e5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java @@ -29,7 +29,8 @@ import org.apache.flink.table.types.logical.RowType; * Utilities for {@link RowDataToHoodieFunction}. */ public abstract class RowDataToHoodieFunctions { - private RowDataToHoodieFunctions() {} + private RowDataToHoodieFunctions() { + } /** * Creates a {@link RowDataToHoodieFunction} instance based on the given configuration. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java index f40a838b0..282cca7cd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java @@ -28,6 +28,7 @@ public interface Transformer { /** * Transform source DataStream to target DataStream. + * * @param source */ DataStream apply(DataStream source); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index f31645cc4..121118877 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -21,12 +21,14 @@ package org.apache.hudi.sink.utils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; -import org.apache.hudi.sink.StreamWriteOperatorFactory; +import org.apache.hudi.sink.StreamWriteOperator; +import org.apache.hudi.sink.append.AppendWriteOperator; import org.apache.hudi.sink.bootstrap.BootstrapOperator; import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator; import org.apache.hudi.sink.bulk.BulkInsertWriteOperator; import org.apache.hudi.sink.bulk.RowDataKeyGen; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.common.WriteOperatorFactory; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; @@ -41,6 +43,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; @@ -52,7 +55,7 @@ import org.apache.flink.table.types.logical.RowType; public class Pipelines { public static DataStreamSink bulkInsert(Configuration conf, RowType rowType, DataStream dataStream) { - BulkInsertWriteOperator.OperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType); + WriteOperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType); final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); if (partitionFields.length > 0) { @@ -80,9 +83,17 @@ public class Pipelines { operatorFactory) // follow the parallelism of upstream operators to avoid shuffle .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) - .addSink(new CleanFunction<>(conf)) - .setParallelism(1) - .name("clean_commits"); + .addSink(DummySink.INSTANCE); + } + + public static DataStreamSink append(Configuration conf, RowType rowType, DataStream dataStream) { + WriteOperatorFactory operatorFactory = AppendWriteOperator.getFactory(conf, rowType); + + return dataStream + .transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory) + .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) + .addSink(DummySink.INSTANCE); } public static DataStream bootstrap( @@ -143,7 +154,7 @@ public class Pipelines { } public static DataStream hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream dataStream) { - StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); + WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf); return dataStream // Key-by record key, to avoid multiple subtasks write to a bucket at the same time .keyBy(HoodieRecord::getRecordKey) @@ -180,4 +191,12 @@ public class Pipelines { .setParallelism(1) .name("clean_commits"); } + + /** + * Dummy sink that does nothing. + */ + public static class DummySink implements SinkFunction { + private static final long serialVersionUID = 1L; + public static DummySink INSTANCE = new DummySink(); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index ae6b3e1fe..112dfda54 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -270,16 +270,16 @@ public class StreamReadMonitoringFunction final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); List inputSplits = writePartitions.stream() .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue) - .map(fileSlice -> { - Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() - .sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getPath().toString()) - .collect(Collectors.toList())); - String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); - return new MergeOnReadInputSplit(cnt.getAndAdd(1), - basePath, logPaths, commitToIssue, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange); - }).collect(Collectors.toList())) + .map(fileSlice -> { + Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() + .sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()) + .collect(Collectors.toList())); + String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); + return new MergeOnReadInputSplit(cnt.getAndAdd(1), + basePath, logPaths, commitToIssue, + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange); + }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 75272abf4..3dc8cb27a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -142,22 +142,22 @@ public class FlinkStreamerConfig extends Configuration { public Integer writeTaskNum = 4; @Parameter(names = {"--partition-default-name"}, - description = "The default partition name in case the dynamic partition column value is null/empty string") + description = "The default partition name in case the dynamic partition column value is null/empty string") public String partitionDefaultName = "__DEFAULT_PARTITION__"; @Parameter(names = {"--index-bootstrap-enabled"}, - description = "Whether to bootstrap the index state from existing hoodie table, default false") + description = "Whether to bootstrap the index state from existing hoodie table, default false") public Boolean indexBootstrapEnabled = false; @Parameter(names = {"--index-state-ttl"}, description = "Index state ttl in days, default 1.5 day") public Double indexStateTtl = 1.5D; @Parameter(names = {"--index-global-enabled"}, description = "Whether to update index for the old partition path " - + "if same key record with different partition path came in, default false") + + "if same key record with different partition path came in, default false") public Boolean indexGlobalEnabled = false; @Parameter(names = {"--index-partition-regex"}, - description = "Whether to load partitions in state if partition path matching, default *") + description = "Whether to load partitions in state if partition path matching, default *") public String indexPartitionRegex = ".*"; @Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization") @@ -167,8 +167,8 @@ public class FlinkStreamerConfig extends Configuration { public String sourceAvroSchema = ""; @Parameter(names = {"--utc-timezone"}, description = "Use UTC timezone or local timezone to the conversion between epoch" - + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" - + " use UTC timezone, by default true") + + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" + + " use UTC timezone, by default true") public Boolean utcTimezone = true; @Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false") @@ -180,18 +180,18 @@ public class FlinkStreamerConfig extends Configuration { public Boolean hiveStylePartitioning = false; @Parameter(names = {"--write-task-max-size"}, description = "Maximum memory in MB for a write task, when the threshold hits,\n" - + "it flushes the max size data bucket to avoid OOM, default 1GB") + + "it flushes the max size data bucket to avoid OOM, default 1GB") public Double writeTaskMaxSize = 1024D; @Parameter(names = {"--write-batch-size"}, - description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB") + description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB") public Double writeBatchSize = 64D; @Parameter(names = {"--write-log-block-size"}, description = "Max log block size in MB for log file, default 128MB") public Integer writeLogBlockSize = 128; @Parameter(names = {"--write-log-max-size"}, - description = "Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB") + description = "Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB") public Integer writeLogMaxSize = 1024; @Parameter(names = {"--write-merge-max-memory"}, description = "Max memory in MB for merge, default 100MB") @@ -204,11 +204,11 @@ public class FlinkStreamerConfig extends Configuration { public Integer compactionTasks = 10; @Parameter(names = {"--compaction-trigger-strategy"}, - description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n" - + "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n" - + "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n" - + "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n" - + "Default is 'num_commits'") + description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n" + + "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n" + + "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n" + + "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n" + + "Default is 'num_commits'") public String compactionTriggerStrategy = FlinkOptions.NUM_COMMITS; @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits") @@ -227,16 +227,16 @@ public class FlinkStreamerConfig extends Configuration { public Boolean cleanAsyncEnabled = true; @Parameter(names = {"--clean-retain-commits"}, - description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + "This also directly translates into how much you can incrementally pull on this table, default 10") public Integer cleanRetainCommits = 10; @Parameter(names = {"--archive-max-commits"}, - description = "Max number of commits to keep before archiving older commits into a sequential log, default 30") + description = "Max number of commits to keep before archiving older commits into a sequential log, default 30") public Integer archiveMaxCommits = 30; @Parameter(names = {"--archive-min-commits"}, - description = "Min number of commits to keep before archiving older commits into a sequential log, default 20") + description = "Min number of commits to keep before archiving older commits into a sequential log, default 20") public Integer archiveMinCommits = 20; @Parameter(names = {"--hive-sync-enable"}, description = "Asynchronously sync Hive meta to HMS, default false") @@ -270,7 +270,7 @@ public class FlinkStreamerConfig extends Configuration { public String hiveSyncPartitionFields = ""; @Parameter(names = {"--hive-sync-partition-extractor-class"}, description = "Tool to extract the partition value from HDFS path, " - + "default 'SlashEncodedDayPartitionValueExtractor'") + + "default 'SlashEncodedDayPartitionValueExtractor'") public String hiveSyncPartitionExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName(); @Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false") @@ -289,7 +289,7 @@ public class FlinkStreamerConfig extends Configuration { public Boolean hiveSyncSkipRoSuffix = false; @Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" - + "Disabled by default for backward compatibility.") + + "Disabled by default for backward compatibility.") public Boolean hiveSyncSupportTimestamp = false; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 89f7ed78c..034485746 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -105,7 +105,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab /** * The sanity check. * - * @param conf The table options + * @param conf The table options * @param schema The table schema */ private void sanityCheck(Configuration conf, ResolvedSchema schema) { @@ -217,7 +217,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab /** * Sets up the hive options from the table definition. - * */ + */ private static void setupHiveOptions(Configuration conf) { if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING) && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME)) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 2ced22aaf..2fdd0fd68 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -77,10 +77,17 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // default parallelism int parallelism = dataStream.getExecutionConfig().getParallelism(); + + DataStream pipeline; + // Append mode + if (StreamerUtil.allowDuplicateInserts(conf)) { + return Pipelines.append(conf, rowType, dataStream); + } + // bootstrap final DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded()); // write pipeline - DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); + pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); // compaction if (StreamerUtil.needsAsyncCompaction(conf)) { return Pipelines.compact(conf, pipeline); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 52cb76588..78d1db6c6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -303,7 +303,8 @@ public class HoodieTableSource implements .collect(Collectors.toList())); return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); - }).collect(Collectors.toList()); }) + }).collect(Collectors.toList()); + }) .flatMap(Collection::stream) .collect(Collectors.toList()); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java index 8f1347f30..1eb7e2db3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java @@ -283,11 +283,11 @@ public class FilePathUtils { * *

The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}]. * - * @param path The base path - * @param hadoopConf The hadoop configuration - * @param partitionKeys The partition key list + * @param path The base path + * @param hadoopConf The hadoop configuration + * @param partitionKeys The partition key list * @param defaultParName The default partition name for nulls - * @param hivePartition Whether the partition path is in Hive style + * @param hivePartition Whether the partition path is in Hive style */ public static List> getPartitions( Path path, @@ -338,9 +338,9 @@ public class FilePathUtils { /** * Returns all the file paths that is the parents of the data files. * - * @param path The base path - * @param conf The Flink configuration - * @param hadoopConf The hadoop configuration + * @param path The base path + * @param conf The Flink configuration + * @param hadoopConf The hadoop configuration * @param partitionKeys The partition key list */ public static Path[] getReadPaths( @@ -362,11 +362,10 @@ public class FilePathUtils { /** * Transforms the given partition key value mapping to read paths. * - * @param path The base path - * @param partitionKeys The partition key list + * @param path The base path + * @param partitionKeys The partition key list * @param partitionPaths The partition key value mapping - * @param hivePartition Whether the partition path is in Hive style - * + * @param hivePartition Whether the partition path is in Hive style * @see #getReadPaths */ public static Path[] partitionPath2ReadPath( @@ -384,10 +383,9 @@ public class FilePathUtils { /** * Transforms the given partition key value mapping to relative partition paths. * - * @param partitionKeys The partition key list + * @param partitionKeys The partition key list * @param partitionPaths The partition key value mapping - * @param hivePartition Whether the partition path is in Hive style - * + * @param hivePartition Whether the partition path is in Hive style * @see #getReadPaths */ public static Set toRelativePartitionPaths( diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java index e6f40a557..efbe91404 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java @@ -296,7 +296,8 @@ public abstract class AbstractColumnReader /** * After read a page, we may need some initialization. */ - protected void afterReadPage() {} + protected void afterReadPage() { + } /** * Support lazy dictionary ids decode. See more in {@link ParquetDictionary}. diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java index b929e7bef..0c93eeac2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java @@ -97,15 +97,15 @@ public class MergeOnReadInputSplit implements InputSplit { @Override public String toString() { return "MergeOnReadInputSplit{" - + "splitNum=" + splitNum - + ", basePath=" + basePath - + ", logPaths=" + logPaths - + ", latestCommit='" + latestCommit + '\'' - + ", tablePath='" + tablePath + '\'' - + ", maxCompactionMemoryInBytes=" + maxCompactionMemoryInBytes - + ", mergeType='" + mergeType + '\'' - + ", instantRange=" + instantRange - + '}'; + + "splitNum=" + splitNum + + ", basePath=" + basePath + + ", logPaths=" + logPaths + + ", latestCommit='" + latestCommit + '\'' + + ", tablePath='" + tablePath + '\'' + + ", maxCompactionMemoryInBytes=" + maxCompactionMemoryInBytes + + ", mergeType='" + mergeType + '\'' + + ", instantRange=" + instantRange + + '}'; } - + } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java index 0a63c9125..36dfecbb7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java @@ -106,7 +106,6 @@ public class MergeOnReadTableState implements Serializable { * * @param pkOffsets the pk offsets in required row type * @return pk field logical types - * * @see #getPkOffsetsInRequired() */ public LogicalType[] getPkTypes(int[] pkOffsets) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java index db72cc18e..df815a82e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java @@ -162,7 +162,7 @@ public class AvroSchemaConverter { *

Use "record" as the type name. * * @param schema the schema type, usually it should be the top level record type, e.g. not a - * nested type + * nested type * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType schema) { @@ -176,7 +176,7 @@ public class AvroSchemaConverter { * schema. Nested record type that only differs with type name is still compatible. * * @param logicalType logical type - * @param rowName the record name + * @param rowName the record name * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType, String rowName) { @@ -315,7 +315,9 @@ public class AvroSchemaConverter { return valueType; } - /** Returns schema with nullable true. */ + /** + * Returns schema with nullable true. + */ private static Schema nullableSchema(Schema schema) { return schema.isNullable() ? schema diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 9f625baa1..b95c9e1d3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -181,6 +181,9 @@ public class StreamerUtil { .withStorageConfig(HoodieStorageConfig.newBuilder() .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) .logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024) + .parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 * 1024) + .parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 1024) + .parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 1024L) .build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED)) diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 659e0225e..1890d07d2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -245,8 +245,6 @@ public class StreamWriteITCase extends TestLogger { RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) .getLogicalType(); - StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(conf); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, @@ -302,8 +300,6 @@ public class StreamWriteITCase extends TestLogger { RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) .getLogicalType(); - StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(conf); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 0c5224159..b403f3c65 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -23,13 +23,13 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.utils.InsertFunctionWrapper; import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; @@ -58,6 +58,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -532,11 +533,7 @@ public class TestWriteCopyOnWrite { @Test public void testInsertAllowsDuplication() throws Exception { - // reset the config option - conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size - conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); - conf.setBoolean(FlinkOptions.INSERT_DEDUP, false); - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + InsertFunctionWrapper funcWrapper = new InsertFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data funcWrapper.openFunction(); @@ -547,19 +544,16 @@ public class TestWriteCopyOnWrite { // this triggers the data write and event send funcWrapper.checkpointFunction(1); - Map> dataBuffer = funcWrapper.getDataBuffer(); - assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + assertNull(funcWrapper.getWriterHelper()); final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first - final OperatorEvent event2 = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class)); + assertThat("The operator expect to send an event", event1, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, event1); - funcWrapper.getCoordinator().handleEventFromOperator(0, event2); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); + .getLastPendingInstant(getTableType()); funcWrapper.checkpointComplete(1); @@ -585,10 +579,8 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(2); - final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first - final OperatorEvent event4 = funcWrapper.getNextEvent(); - funcWrapper.getCoordinator().handleEventFromOperator(0, event3); - funcWrapper.getCoordinator().handleEventFromOperator(0, event4); + final OperatorEvent event2 = funcWrapper.getNextEvent(); // remove the first event first + funcWrapper.getCoordinator().handleEventFromOperator(0, event2); funcWrapper.checkpointComplete(2); // same with the original base file content. diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java index 32ee72541..7a1eaeecf 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java @@ -77,7 +77,7 @@ public class TestRowDataKeyGen { assertThat(keyGen1.getPartitionPath(rowData1), is("par1/1970-01-01T00:00:00.001")); // null record key and partition path - final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE,null, null, 23, null, null); + final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, null, 23, null, null); assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2)); assertThat(keyGen1.getPartitionPath(rowData2), is("default/default")); // empty record key and partition path diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java index d53d58ebb..fe2ddad18 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java @@ -53,11 +53,17 @@ public class CompactFunctionWrapper { private final IOManager ioManager; private final StreamingRuntimeContext runtimeContext; - /** Function that generates the {@link HoodieCompactionPlan}. */ + /** + * Function that generates the {@link HoodieCompactionPlan}. + */ private CompactionPlanOperator compactionPlanOperator; - /** Function that executes the compaction task. */ + /** + * Function that executes the compaction task. + */ private CompactFunction compactFunction; - /** Stream sink to handle compaction commits. */ + /** + * Stream sink to handle compaction commits. + */ private CompactionCommitSink commitSink; public CompactFunctionWrapper(Configuration conf) throws Exception { @@ -120,7 +126,7 @@ public class CompactFunctionWrapper { compactionPlanOperator.notifyCheckpointComplete(checkpointID); // collect the CompactCommitEvents List compactCommitEvents = new ArrayList<>(); - for (CompactionPlanEvent event: events) { + for (CompactionPlanEvent event : events) { compactFunction.processElement(event, null, new Collector() { @Override public void collect(CompactionCommitEvent event) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java new file mode 100644 index 000000000..ed23754d9 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.append.AppendWriteFunction; +import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; +import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.util.concurrent.CompletableFuture; + +/** + * A wrapper class to manipulate the {@link AppendWriteFunction} instance for testing. + * + * @param Input type + */ +public class InsertFunctionWrapper { + private final Configuration conf; + private final RowType rowType; + + private final StreamingRuntimeContext runtimeContext; + private final MockOperatorEventGateway gateway; + private final MockOperatorCoordinatorContext coordinatorContext; + private final StreamWriteOperatorCoordinator coordinator; + private final MockStateInitializationContext stateInitializationContext; + + /** + * Append write function. + */ + private AppendWriteFunction writeFunction; + + public InsertFunctionWrapper(String tablePath, Configuration conf) { + IOManager ioManager = new IOManagerAsync(); + MockEnvironment environment = new MockEnvironmentBuilder() + .setTaskName("mockTask") + .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .setIOManager(ioManager) + .build(); + this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); + this.gateway = new MockOperatorEventGateway(); + this.conf = conf; + this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType(); + // one function + this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1); + this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); + this.stateInitializationContext = new MockStateInitializationContext(); + } + + public void openFunction() throws Exception { + this.coordinator.start(); + this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); + + setupWriteFunction(); + } + + public void invoke(I record) throws Exception { + writeFunction.processElement((RowData) record, null, null); + } + + public WriteMetadataEvent[] getEventBuffer() { + return this.coordinator.getEventBuffer(); + } + + public OperatorEvent getNextEvent() { + return this.gateway.getNextEvent(); + } + + @SuppressWarnings("rawtypes") + public HoodieFlinkWriteClient getWriteClient() { + return this.writeFunction.getWriteClient(); + } + + public void checkpointFunction(long checkpointId) throws Exception { + // checkpoint the coordinator first + this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); + + writeFunction.snapshotState(null); + stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); + } + + public void checkpointComplete(long checkpointId) { + stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); + coordinator.notifyCheckpointComplete(checkpointId); + } + + public StreamWriteOperatorCoordinator getCoordinator() { + return coordinator; + } + + public BulkInsertWriterHelper getWriterHelper() { + return this.writeFunction.getWriterHelper(); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void setupWriteFunction() throws Exception { + writeFunction = new AppendWriteFunction<>(conf, rowType); + writeFunction.setRuntimeContext(runtimeContext); + writeFunction.setOperatorEventGateway(gateway); + writeFunction.initializeState(this.stateInitializationContext); + writeFunction.open(conf); + + // handle the bootstrap event + coordinator.handleEventFromOperator(0, getNextEvent()); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 4ada5172b..6b6bedea5 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -95,7 +95,7 @@ public class StreamWriteFunctionWrapper { /** * Stream write function. */ - private StreamWriteFunction, Object> writeFunction; + private StreamWriteFunction> writeFunction; private CompactFunctionWrapper compactFunctionWrapper; diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index 8096d5e33..233e6fa7e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -68,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ public class TestStreamReadOperator { private static final Map EXPECTED = new HashMap<>(); + static { EXPECTED.put("par1", "+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]"); EXPECTED.put("par2", "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2]"); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index f8fc42ac3..a04e7bb99 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -796,6 +796,35 @@ public class HoodieDataSourceITCase extends AbstractTestBase { + "+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]", 3); } + @Test + void testAppendWrite() { + TableEnvironment tableEnv = batchTableEnv; + // csv source + String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data"); + tableEnv.executeSql(csvSourceDDL); + + String hoodieTableDDL = sql("hoodie_sink") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.OPERATION, "insert") + .option(FlinkOptions.INSERT_DEDUP, false) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + String insertInto = "insert into hoodie_sink select * from csv_source"; + execInsertSql(tableEnv, insertInto); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from hoodie_sink").execute().collect()); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + // apply filters + List result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from hoodie_sink where uuid > 'id5'").execute().collect()); + assertRowsEquals(result2, "[" + + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- @@ -874,7 +903,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { } private List execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout) - throws InterruptedException { + throws InterruptedException { tEnv.executeSql("DROP TABLE IF EXISTS sink"); tEnv.executeSql(sinkDDL); TableResult tableResult = tEnv.executeSql("insert into sink " + select); @@ -883,7 +912,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { tableResult.getJobClient().ifPresent(JobClient::cancel); tEnv.executeSql("DROP TABLE IF EXISTS sink"); return CollectSinkTableFactory.RESULT.values().stream() - .flatMap(Collection::stream) - .collect(Collectors.toList()); + .flatMap(Collection::stream) + .collect(Collectors.toList()); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 6824090a2..231d42c26 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -128,18 +128,18 @@ public class TestConfigurations { DataType[] fieldTypes = tableSchema.getFieldDataTypes(); for (int i = 0; i < fieldNames.length; i++) { builder.append(" `") - .append(fieldNames[i]) - .append("` ") - .append(fieldTypes[i].toString()); + .append(fieldNames[i]) + .append("` ") + .append(fieldTypes[i].toString()); if (i != fieldNames.length - 1) { builder.append(","); } builder.append("\n"); } final String withProps = "" - + ") with (\n" - + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n" - + ")"; + + ") with (\n" + + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n" + + ")"; builder.append(withProps); return builder.toString(); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java index 6b0b71c71..8822a6f79 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java @@ -22,7 +22,8 @@ package org.apache.hudi.utils; * Test sql statements. */ public class TestSQL { - private TestSQL() {} + private TestSQL() { + } public static final String INSERT_T1 = "insert into t1 values\n" + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index 7459f3a2b..af40a8dd8 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -57,12 +57,12 @@ public class TestStreamerUtil { // Validate the partition fields & preCombineField in hoodie.properties. HoodieTableMetaClient metaClient1 = HoodieTableMetaClient.builder() - .setBasePath(tempFile.getAbsolutePath()) - .setConf(new org.apache.hadoop.conf.Configuration()) - .build(); + .setBasePath(tempFile.getAbsolutePath()) + .setConf(new org.apache.hadoop.conf.Configuration()) + .build(); assertTrue(metaClient1.getTableConfig().getPartitionFields().isPresent(), - "Missing partition columns in the hoodie.properties."); - assertArrayEquals(metaClient1.getTableConfig().getPartitionFields().get(), new String[] { "p0", "p1" }); + "Missing partition columns in the hoodie.properties."); + assertArrayEquals(metaClient1.getTableConfig().getPartitionFields().get(), new String[] {"p0", "p1"}); assertEquals(metaClient1.getTableConfig().getPreCombineField(), "ts"); // Test for non-partitioned table. @@ -70,9 +70,9 @@ public class TestStreamerUtil { FileIOUtils.deleteDirectory(tempFile); StreamerUtil.initTableIfNotExists(conf); HoodieTableMetaClient metaClient2 = HoodieTableMetaClient.builder() - .setBasePath(tempFile.getAbsolutePath()) - .setConf(new org.apache.hadoop.conf.Configuration()) - .build(); + .setBasePath(tempFile.getAbsolutePath()) + .setConf(new org.apache.hadoop.conf.Configuration()) + .build(); assertFalse(metaClient2.getTableConfig().getPartitionFields().isPresent()); }