1
0

[HUDI-3581] Reorganize some clazz for hudi flink (#4983)

This commit is contained in:
Danny Chan
2022-03-10 15:55:15 +08:00
committed by GitHub
parent 034addaef5
commit ec24407191
15 changed files with 28 additions and 14 deletions

View File

@@ -170,6 +170,7 @@ public abstract class HoodieAsyncService implements Serializable {
if (null != callback) { if (null != callback) {
callback.apply(null != error); callback.apply(null != error);
} }
this.started = false;
}); });
} }

View File

@@ -835,11 +835,18 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* Provides a new commit time for a write operation (insert/update/delete). * Provides a new commit time for a write operation (insert/update/delete).
*/ */
public String startCommit() { public String startCommit() {
HoodieTableMetaClient metaClient = createMetaClient(true);
return startCommit(metaClient.getCommitActionType(), metaClient);
}
/**
* Provides a new commit time for a write operation (insert/update/delete/insert_overwrite/insert_overwrite_table) with specified action.
*/
public String startCommit(String actionType, HoodieTableMetaClient metaClient) {
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(), CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites()); HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
String instantTime = HoodieActiveTimeline.createNewInstantTime(); String instantTime = HoodieActiveTimeline.createNewInstantTime();
HoodieTableMetaClient metaClient = createMetaClient(true); startCommit(instantTime, actionType, metaClient);
startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
return instantTime; return instantTime;
} }

View File

@@ -364,7 +364,7 @@ public class StreamWriteOperatorCoordinator
private void startInstant() { private void startInstant() {
// put the assignment in front of metadata generation, // put the assignment in front of metadata generation,
// because the instant request from write task is asynchronous. // because the instant request from write task is asynchronous.
this.instant = this.writeClient.startCommit(); this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient);
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
this.ckpMetadata.startInstant(this.instant); this.ckpMetadata.startInstant(this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.table.format.cow; package org.apache.hudi.table.format.cow;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.common.io.FilePathFilter;

View File

@@ -22,8 +22,12 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector; import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector; import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector; import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader; import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader; import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader; import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.table.format.cow; package org.apache.hudi.table.format.cow.vector;
import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.vector.BytesColumnVector; import org.apache.flink.table.data.vector.BytesColumnVector;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.table.format.cow; package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.formats.parquet.vector.ParquetDictionary; import org.apache.flink.formats.parquet.vector.ParquetDictionary;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader; import org.apache.flink.formats.parquet.vector.reader.ColumnReader;

View File

@@ -18,8 +18,8 @@
package org.apache.hudi.table.format.cow.vector.reader; package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.hudi.table.format.cow.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector; import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader; import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;

View File

@@ -15,12 +15,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.table.format.cow; package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.table.data.vector.writable.WritableBytesVector; import org.apache.flink.table.data.vector.writable.WritableBytesVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector; import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.flink.table.data.vector.writable.WritableIntVector;
import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Binary;

View File

@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.table.format.cow; package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.flink.table.data.vector.writable.WritableIntVector;

View File

@@ -16,7 +16,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.table.format.cow; package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader; import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.table.data.ColumnarRowData; import org.apache.flink.table.data.ColumnarRowData;

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.table.format.cow; package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.table.data.vector.writable.WritableColumnVector; import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.flink.table.data.vector.writable.WritableIntVector;

View File

@@ -31,7 +31,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader; import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataProjection; import org.apache.hudi.util.RowDataProjection;

View File

@@ -66,7 +66,7 @@ import java.util.concurrent.TimeUnit;
/** /**
* Integration test for Flink Hoodie stream sink. * Integration test for Flink Hoodie stream sink.
*/ */
public class StreamWriteITCase extends TestLogger { public class ITTestDataStreamWrite extends TestLogger {
private static final Map<String, List<String>> EXPECTED = new HashMap<>(); private static final Map<String, List<String>> EXPECTED = new HashMap<>();
private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new HashMap<>(); private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new HashMap<>();

View File

@@ -73,7 +73,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
* IT cases for Hoodie table source and sink. * IT cases for Hoodie table source and sink.
*/ */
public class HoodieDataSourceITCase extends AbstractTestBase { public class ITTestHoodieDataSource extends AbstractTestBase {
private TableEnvironment streamTableEnv; private TableEnvironment streamTableEnv;
private TableEnvironment batchTableEnv; private TableEnvironment batchTableEnv;