From d415d45416707ca4d5b1dbad65dc80e6fccfa378 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Mon, 29 Mar 2021 10:47:29 +0800 Subject: [PATCH] [HUDI-1729] Asynchronous Hive sync and commits cleaning for Flink writer (#2732) --- .../hudi/client/HoodieFlinkWriteClient.java | 91 ++++++++++++-- hudi-flink/pom.xml | 10 ++ .../hudi/configuration/FlinkOptions.java | 117 +++++++++++++++++- .../org/apache/hudi/sink/CleanFunction.java | 89 +++++++++++++ .../sink/StreamWriteOperatorCoordinator.java | 47 ++++++- .../sink/compact/CompactionCommitSink.java | 8 +- .../partitioner/BucketAssignFunction.java | 17 ++- .../hudi/sink/utils/HiveSyncContext.java | 89 +++++++++++++ .../hudi/sink/utils/NonThrownExecutor.java | 78 ++++++++++++ .../hudi/streamer/HoodieFlinkStreamerV2.java | 12 +- .../apache/hudi/table/HoodieTableSink.java | 9 +- .../org/apache/hudi/util/StreamerUtil.java | 22 ++++ .../apache/hudi/sink/StreamWriteITCase.java | 15 ++- .../TestStreamWriteOperatorCoordinator.java | 32 ++++- .../hudi/sink/TestWriteCopyOnWrite.java | 4 +- .../hudi/table/HoodieDataSourceITCase.java | 35 +++++- .../apache/hudi/utils/TestConfigurations.java | 11 +- .../java/org/apache/hudi/utils/TestData.java | 2 +- .../factory/ContinuousFileSourceFactory.java | 11 +- .../utils/source/ContinuousFileSource.java | 4 +- .../{test_source2.data => test_source_2.data} | 0 .../src/test/resources/test_source_3.data | 8 ++ packaging/hudi-flink-bundle/pom.xml | 34 +++++ 23 files changed, 704 insertions(+), 41 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java rename hudi-flink/src/test/resources/{test_source2.data => test_source_2.data} (100%) create mode 100644 hudi-flink/src/test/resources/test_source_3.data diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 21a396c78..de4d8ca65 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -20,6 +20,7 @@ package org.apache.hudi.client; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.client.utils.TransactionUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; @@ -38,6 +39,7 @@ import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.HoodieIndex; @@ -49,6 +51,8 @@ import org.apache.hudi.io.MiniBatchHandle; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.HoodieTimelineArchiveLog; +import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade; @@ -117,7 +121,7 @@ public class HoodieFlinkWriteClient extends @Override public List> filterExists(List> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieFlinkTable table = getHoodieTable(); Timer.Context indexTimer = metrics.getIndexCtx(); List> recordsWithLocation = getIndex().tagLocation(hoodieRecords, context, table); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); @@ -198,6 +202,47 @@ public class HoodieFlinkWriteClient extends return postWrite(result, instantTime, table); } + @Override + protected void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) { + setOperationType(writeOperationType); + this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); + this.txnManager.beginTransaction(Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime)), lastCompletedTxnAndMetadata + .isPresent() + ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty()); + try { + syncTableMetadata(); + } finally { + this.txnManager.endTransaction(); + } + // remove the async cleaning + } + + /** + * Starts async cleaning service for finished commits. + * + *

The Flink write client is designed to write data set as buckets + * but cleaning action should trigger after all the write actions within a + * checkpoint finish. + */ + public void startAsyncCleaning() { + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + } + + /** + * Blocks and wait for the async cleaning service to finish. + * + *

The Flink write client is designed to write data set as buckets + * but cleaning action should trigger after all the write actions within a + * checkpoint finish. + */ + public void waitForCleaningFinish() { + if (this.asyncCleanerService != null) { + LOG.info("Cleaner has been spawned already. Waiting for it to finish"); + AsyncCleanerService.waitForCompletion(asyncCleanerService); + LOG.info("Cleaner has finished"); + } + } + @Override protected List postWrite(HoodieWriteMetadata> result, String instantTime, @@ -208,12 +253,37 @@ public class HoodieFlinkWriteClient extends return result.getWriteStatuses(); } + /** + * Post commit is rewrite to be invoked after a successful commit. + * + *

The Flink write client is designed to write data set as buckets + * but cleaning action should trigger after all the write actions within a + * checkpoint finish. + * + * @param instantTime The latest successful commit time + */ + public void postCommit(String instantTime) { + try { + HoodieTable table = createTable(config, hadoopConf); + // Delete the marker directory for the instant. + new MarkerFiles(createTable(config, hadoopConf), instantTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + // We cannot have unbounded commit files. Archive commits if we have to archive + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); + archiveLog.archiveIfRequired(context); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } finally { + this.heartbeatClient.stop(instantTime); + } + } + @Override public void commitCompaction( String compactionInstantTime, List writeStatuses, Option> extraMetadata) throws IOException { - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieFlinkTable table = getHoodieTable(); HoodieCommitMetadata metadata = FlinkCompactHelpers.newInstance().createCompactionMetadata( table, compactionInstantTime, writeStatuses, config.getSchema()); extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); @@ -318,7 +388,7 @@ public class HoodieFlinkWriteClient extends setWriteSchemaForDeletes(metaClient); } // Create a Hoodie table which encapsulated the commits and files visible - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, metaClient); + HoodieFlinkTable table = getHoodieTable(); if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { writeTimer = metrics.getCommitCtx(); } else { @@ -328,16 +398,14 @@ public class HoodieFlinkWriteClient extends } public List getInflightsAndRequestedInstants(String commitType) { - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); - HoodieTimeline unCompletedTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsAndRequested(); + HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested(); return unCompletedTimeline.getInstants().filter(x -> x.getAction().equals(commitType)).map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); } public String getInflightAndRequestedInstant(String tableType) { final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); - HoodieTimeline unCompletedTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsAndRequested(); + HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested(); return unCompletedTimeline.getInstants() .filter(x -> x.getAction().equals(commitType)) .map(HoodieInstant::getTimestamp) @@ -348,8 +416,7 @@ public class HoodieFlinkWriteClient extends public String getLastCompletedInstant(String tableType) { final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); - HoodieTimeline completedTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline completedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterCompletedInstants(); return completedTimeline.getInstants() .filter(x -> x.getAction().equals(commitType)) .map(HoodieInstant::getTimestamp) @@ -359,7 +426,7 @@ public class HoodieFlinkWriteClient extends } public void deletePendingInstant(String tableType, String instant) { - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieFlinkTable table = getHoodieTable(); String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline(); activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, instant); @@ -367,7 +434,7 @@ public class HoodieFlinkWriteClient extends } public void transitionRequestedToInflight(String tableType, String inFlightInstant) { - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieFlinkTable table = getHoodieTable(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant); @@ -376,7 +443,7 @@ public class HoodieFlinkWriteClient extends } public void rollbackInflightCompaction(HoodieInstant inflightInstant) { - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieFlinkTable table = getHoodieTable(); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { rollbackInflightCompaction(inflightInstant, table); diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml index c81734b8d..a95404cba 100644 --- a/hudi-flink/pom.xml +++ b/hudi-flink/pom.xml @@ -97,6 +97,16 @@ hudi-hadoop-mr ${project.version} + + org.apache.hudi + hudi-hive-sync + ${project.version} + + + org.apache.hudi + hudi-sync-common + ${project.version} + diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index c13d7823b..8691d91b5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.streamer.FlinkStreamerConfig; @@ -64,7 +65,7 @@ public class FlinkOptions { // ------------------------------------------------------------------------ public static final ConfigOption PARTITION_DEFAULT_NAME = ConfigOptions - .key("partition.default.name") + .key("partition.default_name") .stringType() .defaultValue("__DEFAULT_PARTITION__") .withDescription("The default partition name in case the dynamic partition" @@ -233,6 +234,12 @@ public class FlinkOptions { .withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n" + "Actual value obtained by invoking .toString()"); + public static final ConfigOption PARTITION_PATH_URL_ENCODE = ConfigOptions + .key("write.partition.url_encode") + .booleanType() + .defaultValue(false) + .withDescription("Whether to encode the partition path url, default false"); + public static final ConfigOption KEYGEN_CLASS = ConfigOptions .key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP) .stringType() @@ -287,6 +294,114 @@ public class FlinkOptions { .defaultValue(3600) // default 1 hour .withDescription("Max delta seconds time needed to trigger compaction, default 1 hour"); + public static final ConfigOption CLEAN_ASYNC_ENABLED = ConfigOptions + .key("clean.async.enabled") + .booleanType() + .defaultValue(true) + .withDescription("Whether to cleanup the old commits immediately on new commits, enabled by default"); + + public static final ConfigOption CLEAN_RETAIN_COMMITS = ConfigOptions + .key("clean.retain_commits") + .intType() + .defaultValue(10)// default 10 commits + .withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + + "This also directly translates into how much you can incrementally pull on this table, default 10"); + + // ------------------------------------------------------------------------ + // Hive Sync Options + // ------------------------------------------------------------------------ + public static final ConfigOption HIVE_SYNC_ENABLED = ConfigOptions + .key("hive_sync.enable") + .booleanType() + .defaultValue(false) + .withDescription("Asynchronously sync Hive meta to HMS, default false"); + + public static final ConfigOption HIVE_SYNC_DB = ConfigOptions + .key("hive_sync.db") + .stringType() + .defaultValue("default") + .withDescription("Database name for hive sync, default 'default'"); + + public static final ConfigOption HIVE_SYNC_TABLE = ConfigOptions + .key("hive_sync.table") + .stringType() + .defaultValue("unknown") + .withDescription("Table name for hive sync, default 'unknown'"); + + public static final ConfigOption HIVE_SYNC_FILE_FORMAT = ConfigOptions + .key("hive_sync.file_format") + .stringType() + .defaultValue("PARQUET") + .withDescription("File format for hive sync, default 'PARQUET'"); + + public static final ConfigOption HIVE_SYNC_USERNAME = ConfigOptions + .key("hive_sync.username") + .stringType() + .defaultValue("hive") + .withDescription("Username for hive sync, default 'hive'"); + + public static final ConfigOption HIVE_SYNC_PASSWORD = ConfigOptions + .key("hive_sync.password") + .stringType() + .defaultValue("hive") + .withDescription("Password for hive sync, default 'hive'"); + + public static final ConfigOption HIVE_SYNC_JDBC_URL = ConfigOptions + .key("hive_sync.jdbc_url") + .stringType() + .defaultValue("jdbc:hive2://localhost:10000") + .withDescription("Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'"); + + public static final ConfigOption HIVE_SYNC_PARTITION_FIELDS = ConfigOptions + .key("hive_sync.partition_fields") + .stringType() + .defaultValue("") + .withDescription("Partition fields for hive sync, default ''"); + + public static final ConfigOption HIVE_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigOptions + .key("hive_sync.partition_extractor_class") + .stringType() + .defaultValue(SlashEncodedDayPartitionValueExtractor.class.getCanonicalName()) + .withDescription("Tool to extract the partition value from HDFS path, " + + "default 'SlashEncodedDayPartitionValueExtractor'"); + + public static final ConfigOption HIVE_SYNC_ASSUME_DATE_PARTITION = ConfigOptions + .key("hive_sync.assume_date_partitioning") + .booleanType() + .defaultValue(false) + .withDescription("Assume partitioning is yyyy/mm/dd, default false"); + + public static final ConfigOption HIVE_SYNC_USE_JDBC = ConfigOptions + .key("hive_sync.use_jdbc") + .booleanType() + .defaultValue(true) + .withDescription("Use JDBC when hive synchronization is enabled, default true"); + + public static final ConfigOption HIVE_SYNC_AUTO_CREATE_DB = ConfigOptions + .key("hive_sync.auto_create_db") + .booleanType() + .defaultValue(true) + .withDescription("Auto create hive database if it does not exists, default true"); + + public static final ConfigOption HIVE_SYNC_IGNORE_EXCEPTIONS = ConfigOptions + .key("hive_sync.ignore_exceptions") + .booleanType() + .defaultValue(false) + .withDescription("Ignore exceptions during hive synchronization, default false"); + + public static final ConfigOption HIVE_SYNC_SKIP_RO_SUFFIX = ConfigOptions + .key("hive_sync.skip_ro_suffix") + .booleanType() + .defaultValue(false) + .withDescription("Skip the _ro suffix for Read optimized table when registering, default false"); + + public static final ConfigOption HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions + .key("hive_sync.support_timestamp") + .booleanType() + .defaultValue(false) + .withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" + + "Disabled by default for backward compatibility."); + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java new file mode 100644 index 000000000..7b875ff8e --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.utils.NonThrownExecutor; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +/** + * Sink function that cleans the old commits. + * + *

It starts a cleaning task on new checkpoints, there is only one cleaning task + * at a time, a new task can not be scheduled until the last task finished(fails or normally succeed). + * The cleaning task never expects to throw but only log. + */ +public class CleanFunction extends AbstractRichFunction + implements SinkFunction, CheckpointedFunction, CheckpointListener { + private final Configuration conf; + + private HoodieFlinkWriteClient writeClient; + private NonThrownExecutor executor; + + private volatile boolean isCleaning; + + public CleanFunction(Configuration conf) { + this.conf = conf; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + this.executor = new NonThrownExecutor(); + } + } + + @Override + public void notifyCheckpointComplete(long l) throws Exception { + if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) { + executor.execute(() -> { + try { + this.writeClient.waitForCleaningFinish(); + } finally { + // ensure to switch the isCleaning flag + this.isCleaning = false; + } + }, "wait for cleaning finish", ""); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { + this.writeClient.startAsyncCleaning(); + this.isCleaning = true; + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // no operation + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 659bb2ea2..ad0661771 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -26,6 +26,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; +import org.apache.hudi.sink.utils.HiveSyncContext; +import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -107,6 +109,16 @@ public class StreamWriteOperatorCoordinator */ private final boolean needsScheduleCompaction; + /** + * A single-thread executor to handle all the asynchronous jobs of the coordinator. + */ + private NonThrownExecutor executor; + + /** + * Context that holds variables for asynchronous hive sync. + */ + private HiveSyncContext hiveSyncContext; + /** * Constructs a StreamingSinkOperatorCoordinator. * @@ -131,14 +143,21 @@ public class StreamWriteOperatorCoordinator initTableIfNotExists(this.conf); // start a new instant startInstant(); + // start the executor if required + if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) { + initHiveSync(); + } } @Override - public void close() { + public void close() throws Exception { // teardown the resource if (writeClient != null) { writeClient.close(); } + if (executor != null) { + executor.close(); + } this.eventBuffer = null; } @@ -164,10 +183,25 @@ public class StreamWriteOperatorCoordinator if (needsScheduleCompaction) { writeClient.scheduleCompaction(Option.empty()); } + // sync Hive if is enabled + syncHiveIfEnabled(); // start new instant. startInstant(); } + private void syncHiveIfEnabled() { + if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) { + this.executor.execute(this::syncHive, "sync hive metadata", this.instant); + } + } + + /** + * Sync hoodie table metadata to Hive metastore. + */ + public void syncHive() { + hiveSyncContext.hiveSyncTool().syncHoodieTable(); + } + private void startInstant() { this.instant = this.writeClient.startCommit(); this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.instant); @@ -232,6 +266,11 @@ public class StreamWriteOperatorCoordinator true); } + private void initHiveSync() { + this.executor = new NonThrownExecutor(); + this.hiveSyncContext = HiveSyncContext.create(conf); + } + static byte[] readBytes(DataInputStream in, int size) throws IOException { byte[] bytes = new byte[size]; in.readFully(bytes); @@ -299,6 +338,7 @@ public class StreamWriteOperatorCoordinator doCommit(); } + @SuppressWarnings("unchecked") private void checkAndCommitWithRetry() { int retryTimes = this.conf.getInteger(FlinkOptions.RETRY_TIMES); if (retryTimes < 0) { @@ -378,6 +418,7 @@ public class StreamWriteOperatorCoordinator boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata)); if (success) { + writeClient.postCommit(this.instant); reset(); LOG.info("Commit instant [{}] success!", this.instant); } else { @@ -415,6 +456,10 @@ public class StreamWriteOperatorCoordinator return writeClient; } + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + /** * Provider for {@link StreamWriteOperatorCoordinator}. */ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index da077a0e7..86529a124 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -30,10 +30,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,8 +51,11 @@ import java.util.stream.Collectors; * it loads and checks the compaction plan {@link HoodieCompactionPlan}, * if all the compaction operations {@link org.apache.hudi.common.model.CompactionOperation} * of the plan are finished, tries to commit the compaction action. + * + *

It also inherits the {@link CleanFunction} cleaning ability. This is needed because + * the SQL API does not allow multiple sinks in one table sink provider. */ -public class CompactionCommitSink extends RichSinkFunction { +public class CompactionCommitSink extends CleanFunction { private static final Logger LOG = LoggerFactory.getLogger(CompactionCommitSink.class); /** @@ -76,6 +79,7 @@ public class CompactionCommitSink extends RichSinkFunction> */ private boolean allPartitionsLoaded = false; + /** + * Flag saying whether to check that all the partitions are loaded. + * So that there is chance that flag {@code allPartitionsLoaded} becomes true. + */ + private boolean checkPartition = true; + public BucketAssignFunction(Configuration conf) { this.conf = conf; this.isChangingRecords = WriteOperationType.isChangingRecords( @@ -174,6 +180,13 @@ public class BucketAssignFunction> final HoodieKey hoodieKey = record.getKey(); final BucketInfo bucketInfo; final HoodieRecordLocation location; + + // Checks whether all the partitions are loaded first. + if (checkPartition && !allPartitionsLoaded) { + checkPartitionsLoaded(); + checkPartition = false; + } + if (!allPartitionsLoaded && initialPartitionsToLoad.contains(hoodieKey.getPartitionPath()) // this is an existing partition && !partitionLoadState.contains(hoodieKey.getPartitionPath())) { @@ -213,7 +226,9 @@ public class BucketAssignFunction> public void notifyCheckpointComplete(long l) { // Refresh the table state when there are new commits. this.bucketAssigner.refreshTable(); - checkPartitionsLoaded(); + if (!allPartitionsLoaded) { + checkPartition = true; + } } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java new file mode 100644 index 000000000..a791076c1 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * Hive synchronization context. + * + *

Use this context to create the {@link HiveSyncTool} for synchronization. + */ +public class HiveSyncContext { + private final HiveSyncConfig syncConfig; + private final HiveConf hiveConf; + private final FileSystem fs; + + private HiveSyncContext(HiveSyncConfig syncConfig, HiveConf hiveConf, FileSystem fs) { + this.syncConfig = syncConfig; + this.hiveConf = hiveConf; + this.fs = fs; + } + + public HiveSyncTool hiveSyncTool() { + return new HiveSyncTool(this.syncConfig, this.hiveConf, this.fs); + } + + public static HiveSyncContext create(Configuration conf) { + HiveSyncConfig syncConfig = buildSyncConfig(conf); + org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); + String path = conf.getString(FlinkOptions.PATH); + FileSystem fs = FSUtils.getFs(path, hadoopConf); + HiveConf hiveConf = new HiveConf(); + hiveConf.addResource(fs.getConf()); + return new HiveSyncContext(syncConfig, hiveConf, fs); + } + + private static HiveSyncConfig buildSyncConfig(Configuration conf) { + HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(); + hiveSyncConfig.basePath = conf.getString(FlinkOptions.PATH); + hiveSyncConfig.baseFileFormat = conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT); + hiveSyncConfig.usePreApacheInputFormat = false; + hiveSyncConfig.databaseName = conf.getString(FlinkOptions.HIVE_SYNC_DB); + hiveSyncConfig.tableName = conf.getString(FlinkOptions.HIVE_SYNC_TABLE); + hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME); + hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD); + hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL); + hiveSyncConfig.partitionFields = Arrays.stream(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS) + .split(",")).map(String::trim).collect(Collectors.toList()); + hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS); + hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC); + // needs to support metadata table for flink + hiveSyncConfig.useFileListingFromMetadata = false; + hiveSyncConfig.verifyMetadataFileListing = false; + hiveSyncConfig.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS); + hiveSyncConfig.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP); + hiveSyncConfig.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB); + hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.PARTITION_PATH_URL_ENCODE); + hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX); + hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION); + return hiveSyncConfig; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java new file mode 100644 index 000000000..1d0542e46 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * An executor service that catches all the throwable with logging. + */ +public class NonThrownExecutor implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(NonThrownExecutor.class); + + /** + * A single-thread executor to handle all the asynchronous jobs. + */ + private final ExecutorService executor; + + public NonThrownExecutor() { + this.executor = Executors.newSingleThreadExecutor(); + } + + /** + * Run the action in a loop. + */ + public void execute( + final ThrowingRunnable action, + final String actionName, + final String instant) { + + executor.execute( + () -> { + try { + action.run(); + LOG.info("Executor executes action [{}] for instant [{}] success!", actionName, instant); + } catch (Throwable t) { + // if we have a JVM critical error, promote it immediately, there is a good + // chance the + // logging or job failing will not succeed any more + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + final String errMsg = String.format("Executor executes action [%s] error", actionName); + LOG.error(errMsg, t); + } + }); + } + + @Override + public void close() throws Exception { + if (executor != null) { + executor.shutdownNow(); + // We do not expect this to actually block for long. At this point, there should + // be very few task running in the executor, if any. + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java index 60edfa44b..10c0b419e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java @@ -20,6 +20,7 @@ package org.apache.hudi.streamer; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; @@ -32,7 +33,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; @@ -76,7 +76,7 @@ public class HoodieFlinkStreamerV2 { StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); - DataStream dataStream = env.addSource(new FlinkKafkaConsumer<>( + env.addSource(new FlinkKafkaConsumer<>( cfg.kafkaTopic, new JsonRowDataDeserializationSchema( rowType, @@ -99,9 +99,11 @@ public class HoodieFlinkStreamerV2 { .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", null, operatorFactory) .uid("uid_hoodie_stream_write") - .setParallelism(numWriteTask); - - env.addOperator(dataStream.getTransformation()); + .setParallelism(numWriteTask) + .addSink(new CleanFunction<>(conf)) + .setParallelism(1) + .name("clean_commits") + .uid("uid_clean_commits"); env.execute(cfg.targetTableName); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 81aaf59f4..28568f719 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -20,6 +20,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; @@ -34,7 +35,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; @@ -95,9 +95,9 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning { .name("compact_commit") .setParallelism(1); // compaction commit should be singleton } else { - return pipeline.addSink(new DummySinkFunction<>()) + return pipeline.addSink(new CleanFunction<>(conf)) .setParallelism(1) - .name("dummy").uid("uid_dummy"); + .name("clean_commits").uid("uid_clean_commits"); } }; } @@ -131,7 +131,4 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning { public void applyStaticPartition(Map map) { // no operation } - - // Dummy sink function that does nothing. - private static class DummySinkFunction implements SinkFunction {} } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 4756c2434..900ec4165 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -18,7 +18,11 @@ package org.apache.hudi.util; +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.DFSPropertiesConfiguration; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; @@ -42,6 +46,7 @@ import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; @@ -204,6 +209,11 @@ public class StreamerUtil { CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))) .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS)) .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS)) + .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) + .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS)) + // override and hardcode to 20, + // actually Flink cleaning is always with parallelism 1 now + .withCleanerParallelism(20) .build()) .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withAutoCommit(false) @@ -302,4 +312,16 @@ public class StreamerUtil { .equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED); } + + /** + * Creates the Flink write client. + */ + public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { + HoodieFlinkEngineContext context = + new HoodieFlinkEngineContext( + new SerializableConfiguration(getHadoopConf()), + new FlinkTaskContextSupplier(runtimeContext)); + + return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf)); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index bcd6d5428..94e09758a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -243,7 +243,7 @@ public class StreamWriteITCase extends TestLogger { TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName("UTF-8"); - execEnv + DataStream pipeline = execEnv // use PROCESS_CONTINUOUSLY mode to trigger checkpoint .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) @@ -259,10 +259,15 @@ public class StreamWriteITCase extends TestLogger { // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write") - .transform("compact_plan_generate", - TypeInformation.of(CompactionPlanEvent.class), - new CompactionPlanOperator(conf)) + .uid("uid_hoodie_stream_write"); + + pipeline.addSink(new CleanFunction<>(conf)) + .setParallelism(1) + .name("clean_commits").uid("uid_clean_commits"); + + pipeline.transform("compact_plan_generate", + TypeInformation.of(CompactionPlanEvent.class), + new CompactionPlanOperator(conf)) .uid("uid_compact_plan_generate") .setParallelism(1) // plan generate must be singleton .keyBy(event -> event.getOperation().hashCode()) diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index eab1374f2..90874f1e6 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -22,11 +22,13 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -42,6 +44,7 @@ import java.util.concurrent.CompletableFuture; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -63,7 +66,7 @@ public class TestStreamWriteOperatorCoordinator { } @AfterEach - public void after() { + public void after() throws Exception { coordinator.close(); } @@ -148,4 +151,31 @@ public class TestStreamWriteOperatorCoordinator { () -> coordinator.notifyCheckpointComplete(1), "Try 3 to commit instant"); } + + @Test + public void testHiveSyncInvoked() throws Exception { + // override the default configuration + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true); + coordinator = new StreamWriteOperatorCoordinator(conf, 1); + coordinator.start(); + + String instant = coordinator.getInstant(); + assertNotEquals("", instant); + + WriteStatus writeStatus = new WriteStatus(true, 0.1D); + writeStatus.setPartitionPath("par1"); + writeStatus.setStat(new HoodieWriteStat()); + OperatorEvent event0 = BatchWriteSuccessEvent.builder() + .taskID(0) + .instantTime(instant) + .writeStatus(Collections.singletonList(writeStatus)) + .isLastBatch(true) + .build(); + + coordinator.handleEventFromOperator(0, event0); + + // never throw for hive synchronization now + assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1)); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index a98bd3d38..0e53cfaca 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -491,7 +491,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(2); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant("COPY_ON_WRITE"); + .getInflightAndRequestedInstant(getTableType()); nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); @@ -507,6 +507,8 @@ public class TestWriteCopyOnWrite { // the coordinator checkpoint commits the inflight instant. checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkWrittenData(tempFile, EXPECTED2); + // next element triggers all partitions load check + funcWrapper.invoke(TestData.DATA_SET_INSERT.get(0)); assertTrue(funcWrapper.isAllPartitionsLoaded(), "All partitions assume to be loaded into the index state"); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 291c582e0..218c7420f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -18,12 +18,15 @@ package org.apache.hudi.table; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; import org.apache.hudi.utils.TestUtils; import org.apache.hudi.utils.factory.CollectSinkTableFactory; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -49,6 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.hudi.utils.TestData.assertRowsEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * IT cases for Hoodie table source and sink. @@ -105,7 +109,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { void testStreamReadAppendData() throws Exception { // create filesystem table named source String createSource = TestConfigurations.getFileSourceDDL("source"); - String createSource2 = TestConfigurations.getFileSourceDDL("source2", "test_source2.data"); + String createSource2 = TestConfigurations.getFileSourceDDL("source2", "test_source_2.data"); streamTableEnv.executeSql(createSource); streamTableEnv.executeSql(createSource2); @@ -175,6 +179,35 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } + @Test + void testStreamWriteWithCleaning() throws InterruptedException { + // create filesystem table named source + + // the source generates 4 commits but the cleaning task + // would always try to keep the remaining commits number as 1 + String createSource = TestConfigurations.getFileSourceDDL( + "source", "test_source_3.data", 4); + streamTableEnv.executeSql(createSource); + + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "1"); // only keep 1 commits + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 select * from source"; + execInsertSql(streamTableEnv, insertInto); + + Configuration defaultConf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + Map options1 = new HashMap<>(defaultConf.toMap()); + options1.put(FlinkOptions.TABLE_NAME.key(), "t1"); + Configuration conf = Configuration.fromMap(options1); + HoodieTimeline timeline = StreamerUtil.createWriteClient(conf, null) + .getHoodieTable().getActiveTimeline(); + assertTrue(timeline.filterCompletedInstants() + .getInstants().anyMatch(instant -> instant.getAction().equals("clean")), + "some commits should be cleaned"); + } + @ParameterizedTest @EnumSource(value = ExecMode.class) void testWriteAndRead(ExecMode execMode) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 392cf5f85..f5b9fb4b3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -91,7 +91,15 @@ public class TestConfigurations { return getFileSourceDDL(tableName, "test_source.data"); } + public static String getFileSourceDDL(String tableName, int checkpoints) { + return getFileSourceDDL(tableName, "test_source.data", checkpoints); + } + public static String getFileSourceDDL(String tableName, String fileName) { + return getFileSourceDDL(tableName, fileName, 2); + } + + public static String getFileSourceDDL(String tableName, String fileName, int checkpoints) { String sourcePath = Objects.requireNonNull(Thread.currentThread() .getContextClassLoader().getResource(fileName)).toString(); return "create table " + tableName + "(\n" @@ -102,7 +110,8 @@ public class TestConfigurations { + " `partition` varchar(20)\n" + ") with (\n" + " 'connector' = '" + ContinuousFileSourceFactory.FACTORY_ID + "',\n" - + " 'path' = '" + sourcePath + "'\n" + + " 'path' = '" + sourcePath + "',\n" + + " 'checkpoints' = '" + checkpoints + "'\n" + ")"; } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index d1e310382..f8d4e7d6a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -141,7 +141,7 @@ public class TestData { TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) ); - // merged data set of test_source.data and test_source2.data + // merged data set of test_source.data and test_source_2.data public static List DATA_SET_SOURCE_MERGED = Arrays.asList( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, TimestampData.fromEpochMillis(1000), StringData.fromString("par1")), diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java index bdffcd553..92d9c5572 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java @@ -22,6 +22,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.utils.source.ContinuousFileSource; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.table.api.ValidationException; @@ -38,6 +39,12 @@ import java.util.Set; public class ContinuousFileSourceFactory implements DynamicTableSourceFactory { public static final String FACTORY_ID = "continuous-file-source"; + public static final ConfigOption CHECKPOINTS = ConfigOptions + .key("checkpoints") + .intType() + .defaultValue(2) + .withDescription("Number of checkpoints to write the data set as, default 2"); + @Override public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); @@ -56,11 +63,11 @@ public class ContinuousFileSourceFactory implements DynamicTableSourceFactory { @Override public Set> requiredOptions() { - return Collections.emptySet(); + return Collections.singleton(FlinkOptions.PATH); } @Override public Set> optionalOptions() { - return Collections.singleton(FlinkOptions.PATH); + return Collections.singleton(CHECKPOINTS); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java index d94351320..19b23f54f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java @@ -42,6 +42,8 @@ import java.nio.file.Paths; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.hudi.utils.factory.ContinuousFileSourceFactory.CHECKPOINTS; + /** * A continuous file source that can trigger checkpoints continuously. * @@ -89,7 +91,7 @@ public class ContinuousFileSource implements ScanTableSource { true, TimestampFormat.ISO_8601); - return execEnv.addSource(new BoundedSourceFunction(path, 2)) + return execEnv.addSource(new BoundedSourceFunction(path, conf.getInteger(CHECKPOINTS))) .name("continuous_file_source") .setParallelism(1) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)), diff --git a/hudi-flink/src/test/resources/test_source2.data b/hudi-flink/src/test/resources/test_source_2.data similarity index 100% rename from hudi-flink/src/test/resources/test_source2.data rename to hudi-flink/src/test/resources/test_source_2.data diff --git a/hudi-flink/src/test/resources/test_source_3.data b/hudi-flink/src/test/resources/test_source_3.data new file mode 100644 index 000000000..18f0a9d0e --- /dev/null +++ b/hudi-flink/src/test/resources/test_source_3.data @@ -0,0 +1,8 @@ +{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"} +{"uuid": "id2", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"} +{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"} +{"uuid": "id2", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"} +{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"} +{"uuid": "id2", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"} +{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"} +{"uuid": "id2", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"} diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 7a30b74fb..26af7a3b6 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -31,6 +31,7 @@ true ${project.parent.basedir} + provided org.apache.hudi. 3.1.0 @@ -74,6 +75,8 @@ org.apache.hudi:hudi-client-common org.apache.hudi:hudi-flink-client org.apache.hudi:hudi-flink_${scala.binary.version} + org.apache.hudi:hudi-hive-sync + org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr org.apache.hudi:hudi-timeline-service @@ -123,6 +126,8 @@ org.apache.flink:flink-parquet_${scala.binary.version} org.apache.hive:hive-common + org.apache.hive:hive-service + org.apache.hive:hive-service-rpc org.apache.hive:hive-exec org.apache.hive:hive-metastore org.apache.hive:hive-jdbc @@ -242,6 +247,11 @@ hudi-hadoop-mr ${project.version} + + org.apache.hudi + hudi-hive-sync + ${project.version} + org.apache.hudi hudi-timeline-service @@ -349,6 +359,18 @@ + + ${hive.groupid} + hive-service + ${hive.version} + ${flink.bundle.hive.scope} + + + ${hive.groupid} + hive-service-rpc + ${hive.version} + ${flink.bundle.hive.scope} + ${hive.groupid} hive-exec @@ -368,6 +390,7 @@ ${hive.groupid} hive-metastore ${hive.version} + ${flink.bundle.hive.scope} javax.servlet @@ -383,6 +406,7 @@ ${hive.groupid} hive-jdbc ${hive.version} + ${flink.bundle.hive.scope} javax.servlet @@ -398,6 +422,7 @@ ${hive.groupid} hive-common ${hive.version} + ${flink.bundle.hive.scope} org.eclipse.jetty.orbit @@ -436,4 +461,13 @@ ${hbase.version} + + + + flink-bundle-shade-hive + + compile + + + \ No newline at end of file