diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index 1ce6dfb28..889d7945b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -170,6 +170,7 @@ public abstract class HoodieAsyncService implements Serializable { if (null != callback) { callback.apply(null != error); } + this.started = false; }); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index eca2a3d67..fb703e37a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -835,11 +835,18 @@ public abstract class BaseHoodieWriteClient rollbackFailedWrites()); String instantTime = HoodieActiveTimeline.createNewInstantTime(); - HoodieTableMetaClient metaClient = createMetaClient(true); - startCommit(instantTime, metaClient.getCommitActionType(), metaClient); + startCommit(instantTime, actionType, metaClient); return instantTime; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 65f9eec9a..c4f2e771c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -364,7 +364,7 @@ public class StreamWriteOperatorCoordinator private void startInstant() { // put the assignment in front of metadata generation, // because the instant request from write task is asynchronous. - this.instant = this.writeClient.startCommit(); + this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient); this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); this.ckpMetadata.startInstant(this.instant); LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index 7fb0b9d5c..d0d374693 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.format.cow; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.FilePathFilter; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index ca1408dcb..e112bcf24 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -22,8 +22,12 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.table.format.cow.vector.HeapArrayVector; import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector; import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector; +import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector; import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader; +import org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader; +import org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader; import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader; +import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader; import org.apache.flink.core.fs.Path; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java similarity index 97% rename from hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java rename to hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java index 4705b2f63..a2f6d5b0c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.format.cow; +package org.apache.hudi.table.format.cow.vector; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.vector.BytesColumnVector; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java similarity index 99% rename from hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java rename to hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java index efbe91404..07416a371 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.format.cow; +package org.apache.hudi.table.format.cow.vector.reader; import org.apache.flink.formats.parquet.vector.ParquetDictionary; import org.apache.flink.formats.parquet.vector.reader.ColumnReader; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java index 256d4c1bb..d94c1e1da 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java @@ -18,8 +18,8 @@ package org.apache.hudi.table.format.cow.vector.reader; -import org.apache.hudi.table.format.cow.ParquetDecimalVector; import org.apache.hudi.table.format.cow.vector.HeapArrayVector; +import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector; import org.apache.flink.formats.parquet.vector.reader.ColumnReader; import org.apache.flink.table.data.TimestampData; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/FixedLenBytesColumnReader.java similarity index 98% rename from hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java rename to hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/FixedLenBytesColumnReader.java index 07a93e19c..61461a728 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/FixedLenBytesColumnReader.java @@ -15,12 +15,11 @@ * limitations under the License. */ -package org.apache.hudi.table.format.cow; +package org.apache.hudi.table.format.cow.vector.reader; import org.apache.flink.table.data.vector.writable.WritableBytesVector; import org.apache.flink.table.data.vector.writable.WritableColumnVector; import org.apache.flink.table.data.vector.writable.WritableIntVector; - import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.io.api.Binary; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java similarity index 98% rename from hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java rename to hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java index 024567c83..555853bda 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hudi.table.format.cow; +package org.apache.hudi.table.format.cow.vector.reader; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.vector.writable.WritableIntVector; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java similarity index 99% rename from hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java rename to hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java index 3cb491cfa..92f5d1e19 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java @@ -16,7 +16,9 @@ * limitations under the License. */ -package org.apache.hudi.table.format.cow; +package org.apache.hudi.table.format.cow.vector.reader; + +import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector; import org.apache.flink.formats.parquet.vector.reader.ColumnReader; import org.apache.flink.table.data.ColumnarRowData; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/RunLengthDecoder.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RunLengthDecoder.java similarity index 99% rename from hudi-flink/src/main/java/org/apache/hudi/table/format/cow/RunLengthDecoder.java rename to hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RunLengthDecoder.java index 159574714..f13340ced 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/RunLengthDecoder.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RunLengthDecoder.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.format.cow; +package org.apache.hudi.table.format.cow.vector.reader; import org.apache.flink.table.data.vector.writable.WritableColumnVector; import org.apache.flink.table.data.vector.writable.WritableIntVector; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 4ea202a70..8283b5c3c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -31,7 +31,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.FormatUtils; -import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader; +import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.RowDataProjection; diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java similarity index 99% rename from hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java rename to hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index eaa2d6ced..4864696da 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -66,7 +66,7 @@ import java.util.concurrent.TimeUnit; /** * Integration test for Flink Hoodie stream sink. */ -public class StreamWriteITCase extends TestLogger { +public class ITTestDataStreamWrite extends TestLogger { private static final Map> EXPECTED = new HashMap<>(); private static final Map> EXPECTED_TRANSFORMER = new HashMap<>(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java similarity index 99% rename from hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java rename to hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 7c9b0bb6a..903be90b9 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -73,7 +73,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** * IT cases for Hoodie table source and sink. */ -public class HoodieDataSourceITCase extends AbstractTestBase { +public class ITTestHoodieDataSource extends AbstractTestBase { private TableEnvironment streamTableEnv; private TableEnvironment batchTableEnv;