1
0

[HUDI-1729] Asynchronous Hive sync and commits cleaning for Flink writer (#2732)

This commit is contained in:
Danny Chan
2021-03-29 10:47:29 +08:00
committed by GitHub
parent ecbd389a3f
commit d415d45416
23 changed files with 704 additions and 41 deletions

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
@@ -38,6 +39,7 @@ import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
@@ -49,6 +51,8 @@ import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
@@ -117,7 +121,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
@Override
public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieFlinkTable<T> table = getHoodieTable();
Timer.Context indexTimer = metrics.getIndexCtx();
List<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(hoodieRecords, context, table);
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
@@ -198,6 +202,47 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
return postWrite(result, instantTime, table);
}
@Override
protected void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.txnManager.beginTransaction(Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime)), lastCompletedTxnAndMetadata
.isPresent()
? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
syncTableMetadata();
} finally {
this.txnManager.endTransaction();
}
// remove the async cleaning
}
/**
* Starts async cleaning service for finished commits.
*
* <p>The Flink write client is designed to write data set as buckets
* but cleaning action should trigger after all the write actions within a
* checkpoint finish.
*/
public void startAsyncCleaning() {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
}
/**
* Blocks and wait for the async cleaning service to finish.
*
* <p>The Flink write client is designed to write data set as buckets
* but cleaning action should trigger after all the write actions within a
* checkpoint finish.
*/
public void waitForCleaningFinish() {
if (this.asyncCleanerService != null) {
LOG.info("Cleaner has been spawned already. Waiting for it to finish");
AsyncCleanerService.waitForCompletion(asyncCleanerService);
LOG.info("Cleaner has finished");
}
}
@Override
protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
String instantTime,
@@ -208,12 +253,37 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
return result.getWriteStatuses();
}
/**
* Post commit is rewrite to be invoked after a successful commit.
*
* <p>The Flink write client is designed to write data set as buckets
* but cleaning action should trigger after all the write actions within a
* checkpoint finish.
*
* @param instantTime The latest successful commit time
*/
public void postCommit(String instantTime) {
try {
HoodieTable<?, ?, ?, ?> table = createTable(config, hadoopConf);
// Delete the marker directory for the instant.
new MarkerFiles(createTable(config, hadoopConf), instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
} finally {
this.heartbeatClient.stop(instantTime);
}
}
@Override
public void commitCompaction(
String compactionInstantTime,
List<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) throws IOException {
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieFlinkTable<T> table = getHoodieTable();
HoodieCommitMetadata metadata = FlinkCompactHelpers.newInstance().createCompactionMetadata(
table, compactionInstantTime, writeStatuses, config.getSchema());
extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
@@ -318,7 +388,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
setWriteSchemaForDeletes(metaClient);
}
// Create a Hoodie table which encapsulated the commits and files visible
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, metaClient);
HoodieFlinkTable<T> table = getHoodieTable();
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
writeTimer = metrics.getCommitCtx();
} else {
@@ -328,16 +398,14 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
public List<String> getInflightsAndRequestedInstants(String commitType) {
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieTimeline unCompletedTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
return unCompletedTimeline.getInstants().filter(x -> x.getAction().equals(commitType)).map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
}
public String getInflightAndRequestedInstant(String tableType) {
final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieTimeline unCompletedTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
return unCompletedTimeline.getInstants()
.filter(x -> x.getAction().equals(commitType))
.map(HoodieInstant::getTimestamp)
@@ -348,8 +416,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
public String getLastCompletedInstant(String tableType) {
final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieTimeline completedTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
HoodieTimeline completedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterCompletedInstants();
return completedTimeline.getInstants()
.filter(x -> x.getAction().equals(commitType))
.map(HoodieInstant::getTimestamp)
@@ -359,7 +426,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
public void deletePendingInstant(String tableType, String instant) {
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieFlinkTable<T> table = getHoodieTable();
String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, instant);
@@ -367,7 +434,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
public void transitionRequestedToInflight(String tableType, String inFlightInstant) {
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieFlinkTable<T> table = getHoodieTable();
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant);
@@ -376,7 +443,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieFlinkTable<T> table = getHoodieTable();
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
rollbackInflightCompaction(inflightInstant, table);

View File

@@ -97,6 +97,16 @@
<artifactId>hudi-hadoop-mr</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hive-sync</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-sync-common</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Flink -->
<dependency>

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.streamer.FlinkStreamerConfig;
@@ -64,7 +65,7 @@ public class FlinkOptions {
// ------------------------------------------------------------------------
public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
.key("partition.default.name")
.key("partition.default_name")
.stringType()
.defaultValue("__DEFAULT_PARTITION__")
.withDescription("The default partition name in case the dynamic partition"
@@ -233,6 +234,12 @@ public class FlinkOptions {
.withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n"
+ "Actual value obtained by invoking .toString()");
public static final ConfigOption<Boolean> PARTITION_PATH_URL_ENCODE = ConfigOptions
.key("write.partition.url_encode")
.booleanType()
.defaultValue(false)
.withDescription("Whether to encode the partition path url, default false");
public static final ConfigOption<String> KEYGEN_CLASS = ConfigOptions
.key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP)
.stringType()
@@ -287,6 +294,114 @@ public class FlinkOptions {
.defaultValue(3600) // default 1 hour
.withDescription("Max delta seconds time needed to trigger compaction, default 1 hour");
public static final ConfigOption<Boolean> CLEAN_ASYNC_ENABLED = ConfigOptions
.key("clean.async.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether to cleanup the old commits immediately on new commits, enabled by default");
public static final ConfigOption<Integer> CLEAN_RETAIN_COMMITS = ConfigOptions
.key("clean.retain_commits")
.intType()
.defaultValue(10)// default 10 commits
.withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10");
// ------------------------------------------------------------------------
// Hive Sync Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions
.key("hive_sync.enable")
.booleanType()
.defaultValue(false)
.withDescription("Asynchronously sync Hive meta to HMS, default false");
public static final ConfigOption<String> HIVE_SYNC_DB = ConfigOptions
.key("hive_sync.db")
.stringType()
.defaultValue("default")
.withDescription("Database name for hive sync, default 'default'");
public static final ConfigOption<String> HIVE_SYNC_TABLE = ConfigOptions
.key("hive_sync.table")
.stringType()
.defaultValue("unknown")
.withDescription("Table name for hive sync, default 'unknown'");
public static final ConfigOption<String> HIVE_SYNC_FILE_FORMAT = ConfigOptions
.key("hive_sync.file_format")
.stringType()
.defaultValue("PARQUET")
.withDescription("File format for hive sync, default 'PARQUET'");
public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
.key("hive_sync.username")
.stringType()
.defaultValue("hive")
.withDescription("Username for hive sync, default 'hive'");
public static final ConfigOption<String> HIVE_SYNC_PASSWORD = ConfigOptions
.key("hive_sync.password")
.stringType()
.defaultValue("hive")
.withDescription("Password for hive sync, default 'hive'");
public static final ConfigOption<String> HIVE_SYNC_JDBC_URL = ConfigOptions
.key("hive_sync.jdbc_url")
.stringType()
.defaultValue("jdbc:hive2://localhost:10000")
.withDescription("Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'");
public static final ConfigOption<String> HIVE_SYNC_PARTITION_FIELDS = ConfigOptions
.key("hive_sync.partition_fields")
.stringType()
.defaultValue("")
.withDescription("Partition fields for hive sync, default ''");
public static final ConfigOption<String> HIVE_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigOptions
.key("hive_sync.partition_extractor_class")
.stringType()
.defaultValue(SlashEncodedDayPartitionValueExtractor.class.getCanonicalName())
.withDescription("Tool to extract the partition value from HDFS path, "
+ "default 'SlashEncodedDayPartitionValueExtractor'");
public static final ConfigOption<Boolean> HIVE_SYNC_ASSUME_DATE_PARTITION = ConfigOptions
.key("hive_sync.assume_date_partitioning")
.booleanType()
.defaultValue(false)
.withDescription("Assume partitioning is yyyy/mm/dd, default false");
public static final ConfigOption<Boolean> HIVE_SYNC_USE_JDBC = ConfigOptions
.key("hive_sync.use_jdbc")
.booleanType()
.defaultValue(true)
.withDescription("Use JDBC when hive synchronization is enabled, default true");
public static final ConfigOption<Boolean> HIVE_SYNC_AUTO_CREATE_DB = ConfigOptions
.key("hive_sync.auto_create_db")
.booleanType()
.defaultValue(true)
.withDescription("Auto create hive database if it does not exists, default true");
public static final ConfigOption<Boolean> HIVE_SYNC_IGNORE_EXCEPTIONS = ConfigOptions
.key("hive_sync.ignore_exceptions")
.booleanType()
.defaultValue(false)
.withDescription("Ignore exceptions during hive synchronization, default false");
public static final ConfigOption<Boolean> HIVE_SYNC_SKIP_RO_SUFFIX = ConfigOptions
.key("hive_sync.skip_ro_suffix")
.booleanType()
.defaultValue(false)
.withDescription("Skip the _ro suffix for Read optimized table when registering, default false");
public static final ConfigOption<Boolean> HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions
.key("hive_sync.support_timestamp")
.booleanType()
.defaultValue(false)
.withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
+ "Disabled by default for backward compatibility.");
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

View File

@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
/**
* Sink function that cleans the old commits.
*
* <p>It starts a cleaning task on new checkpoints, there is only one cleaning task
* at a time, a new task can not be scheduled until the last task finished(fails or normally succeed).
* The cleaning task never expects to throw but only log.
*/
public class CleanFunction<T> extends AbstractRichFunction
implements SinkFunction<T>, CheckpointedFunction, CheckpointListener {
private final Configuration conf;
private HoodieFlinkWriteClient writeClient;
private NonThrownExecutor executor;
private volatile boolean isCleaning;
public CleanFunction(Configuration conf) {
this.conf = conf;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
this.executor = new NonThrownExecutor();
}
}
@Override
public void notifyCheckpointComplete(long l) throws Exception {
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {
executor.execute(() -> {
try {
this.writeClient.waitForCleaningFinish();
} finally {
// ensure to switch the isCleaning flag
this.isCleaning = false;
}
}, "wait for cleaning finish", "");
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
this.writeClient.startAsyncCleaning();
this.isCleaning = true;
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// no operation
}
}

View File

@@ -26,6 +26,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -107,6 +109,16 @@ public class StreamWriteOperatorCoordinator
*/
private final boolean needsScheduleCompaction;
/**
* A single-thread executor to handle all the asynchronous jobs of the coordinator.
*/
private NonThrownExecutor executor;
/**
* Context that holds variables for asynchronous hive sync.
*/
private HiveSyncContext hiveSyncContext;
/**
* Constructs a StreamingSinkOperatorCoordinator.
*
@@ -131,14 +143,21 @@ public class StreamWriteOperatorCoordinator
initTableIfNotExists(this.conf);
// start a new instant
startInstant();
// start the executor if required
if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
initHiveSync();
}
}
@Override
public void close() {
public void close() throws Exception {
// teardown the resource
if (writeClient != null) {
writeClient.close();
}
if (executor != null) {
executor.close();
}
this.eventBuffer = null;
}
@@ -164,10 +183,25 @@ public class StreamWriteOperatorCoordinator
if (needsScheduleCompaction) {
writeClient.scheduleCompaction(Option.empty());
}
// sync Hive if is enabled
syncHiveIfEnabled();
// start new instant.
startInstant();
}
private void syncHiveIfEnabled() {
if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
this.executor.execute(this::syncHive, "sync hive metadata", this.instant);
}
}
/**
* Sync hoodie table metadata to Hive metastore.
*/
public void syncHive() {
hiveSyncContext.hiveSyncTool().syncHoodieTable();
}
private void startInstant() {
this.instant = this.writeClient.startCommit();
this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.instant);
@@ -232,6 +266,11 @@ public class StreamWriteOperatorCoordinator
true);
}
private void initHiveSync() {
this.executor = new NonThrownExecutor();
this.hiveSyncContext = HiveSyncContext.create(conf);
}
static byte[] readBytes(DataInputStream in, int size) throws IOException {
byte[] bytes = new byte[size];
in.readFully(bytes);
@@ -299,6 +338,7 @@ public class StreamWriteOperatorCoordinator
doCommit();
}
@SuppressWarnings("unchecked")
private void checkAndCommitWithRetry() {
int retryTimes = this.conf.getInteger(FlinkOptions.RETRY_TIMES);
if (retryTimes < 0) {
@@ -378,6 +418,7 @@ public class StreamWriteOperatorCoordinator
boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata));
if (success) {
writeClient.postCommit(this.instant);
reset();
LOG.info("Commit instant [{}] success!", this.instant);
} else {
@@ -415,6 +456,10 @@ public class StreamWriteOperatorCoordinator
return writeClient;
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
/**
* Provider for {@link StreamWriteOperatorCoordinator}.
*/

View File

@@ -30,10 +30,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,8 +51,11 @@ import java.util.stream.Collectors;
* it loads and checks the compaction plan {@link HoodieCompactionPlan},
* if all the compaction operations {@link org.apache.hudi.common.model.CompactionOperation}
* of the plan are finished, tries to commit the compaction action.
*
* <p>It also inherits the {@link CleanFunction} cleaning ability. This is needed because
* the SQL API does not allow multiple sinks in one table sink provider.
*/
public class CompactionCommitSink extends RichSinkFunction<CompactionCommitEvent> {
public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
private static final Logger LOG = LoggerFactory.getLogger(CompactionCommitSink.class);
/**
@@ -76,6 +79,7 @@ public class CompactionCommitSink extends RichSinkFunction<CompactionCommitEvent
private String compactionInstantTime;
public CompactionCommitSink(Configuration conf) {
super(conf);
this.conf = conf;
}

View File

@@ -123,6 +123,12 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
*/
private boolean allPartitionsLoaded = false;
/**
* Flag saying whether to check that all the partitions are loaded.
* So that there is chance that flag {@code allPartitionsLoaded} becomes true.
*/
private boolean checkPartition = true;
public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
@@ -174,6 +180,13 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
// Checks whether all the partitions are loaded first.
if (checkPartition && !allPartitionsLoaded) {
checkPartitionsLoaded();
checkPartition = false;
}
if (!allPartitionsLoaded
&& initialPartitionsToLoad.contains(hoodieKey.getPartitionPath()) // this is an existing partition
&& !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
@@ -213,7 +226,9 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.refreshTable();
checkPartitionsLoaded();
if (!allPartitionsLoaded) {
checkPartition = true;
}
}
/**

View File

@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.utils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
* Hive synchronization context.
*
* <p>Use this context to create the {@link HiveSyncTool} for synchronization.
*/
public class HiveSyncContext {
private final HiveSyncConfig syncConfig;
private final HiveConf hiveConf;
private final FileSystem fs;
private HiveSyncContext(HiveSyncConfig syncConfig, HiveConf hiveConf, FileSystem fs) {
this.syncConfig = syncConfig;
this.hiveConf = hiveConf;
this.fs = fs;
}
public HiveSyncTool hiveSyncTool() {
return new HiveSyncTool(this.syncConfig, this.hiveConf, this.fs);
}
public static HiveSyncContext create(Configuration conf) {
HiveSyncConfig syncConfig = buildSyncConfig(conf);
org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
String path = conf.getString(FlinkOptions.PATH);
FileSystem fs = FSUtils.getFs(path, hadoopConf);
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
return new HiveSyncContext(syncConfig, hiveConf, fs);
}
private static HiveSyncConfig buildSyncConfig(Configuration conf) {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.basePath = conf.getString(FlinkOptions.PATH);
hiveSyncConfig.baseFileFormat = conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT);
hiveSyncConfig.usePreApacheInputFormat = false;
hiveSyncConfig.databaseName = conf.getString(FlinkOptions.HIVE_SYNC_DB);
hiveSyncConfig.tableName = conf.getString(FlinkOptions.HIVE_SYNC_TABLE);
hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
hiveSyncConfig.partitionFields = Arrays.stream(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS)
.split(",")).map(String::trim).collect(Collectors.toList());
hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS);
hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
// needs to support metadata table for flink
hiveSyncConfig.useFileListingFromMetadata = false;
hiveSyncConfig.verifyMetadataFileListing = false;
hiveSyncConfig.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS);
hiveSyncConfig.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP);
hiveSyncConfig.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB);
hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.PARTITION_PATH_URL_ENCODE);
hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
return hiveSyncConfig;
}
}

View File

@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.utils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* An executor service that catches all the throwable with logging.
*/
public class NonThrownExecutor implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NonThrownExecutor.class);
/**
* A single-thread executor to handle all the asynchronous jobs.
*/
private final ExecutorService executor;
public NonThrownExecutor() {
this.executor = Executors.newSingleThreadExecutor();
}
/**
* Run the action in a loop.
*/
public void execute(
final ThrowingRunnable<Throwable> action,
final String actionName,
final String instant) {
executor.execute(
() -> {
try {
action.run();
LOG.info("Executor executes action [{}] for instant [{}] success!", actionName, instant);
} catch (Throwable t) {
// if we have a JVM critical error, promote it immediately, there is a good
// chance the
// logging or job failing will not succeed any more
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
final String errMsg = String.format("Executor executes action [%s] error", actionName);
LOG.error(errMsg, t);
}
});
}
@Override
public void close() throws Exception {
if (executor != null) {
executor.shutdownNow();
// We do not expect this to actually block for long. At this point, there should
// be very few task running in the executor, if any.
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.streamer;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
@@ -32,7 +33,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
@@ -76,7 +76,7 @@ public class HoodieFlinkStreamerV2 {
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
@@ -99,9 +99,11 @@ public class HoodieFlinkStreamerV2 {
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", null, operatorFactory)
.uid("uid_hoodie_stream_write")
.setParallelism(numWriteTask);
env.addOperator(dataStream.getTransformation());
.setParallelism(numWriteTask)
.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits")
.uid("uid_clean_commits");
env.execute(cfg.targetTableName);
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
@@ -34,7 +35,6 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
@@ -95,9 +95,9 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning {
.name("compact_commit")
.setParallelism(1); // compaction commit should be singleton
} else {
return pipeline.addSink(new DummySinkFunction<>())
return pipeline.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("dummy").uid("uid_dummy");
.name("clean_commits").uid("uid_clean_commits");
}
};
}
@@ -131,7 +131,4 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning {
public void applyStaticPartition(Map<String, String> map) {
// no operation
}
// Dummy sink function that does nothing.
private static class DummySinkFunction<T> implements SinkFunction<T> {}
}

View File

@@ -18,7 +18,11 @@
package org.apache.hudi.util;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
@@ -42,6 +46,7 @@ import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
@@ -204,6 +209,11 @@ public class StreamerUtil {
CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT)))
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS))
.withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS))
.withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED))
.retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS))
// override and hardcode to 20,
// actually Flink cleaning is always with parallelism 1 now
.withCleanerParallelism(20)
.build())
.forTable(conf.getString(FlinkOptions.TABLE_NAME))
.withAutoCommit(false)
@@ -302,4 +312,16 @@ public class StreamerUtil {
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
&& conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
}
/**
* Creates the Flink write client.
*/
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(
new SerializableConfiguration(getHadoopConf()),
new FlinkTaskContextSupplier(runtimeContext));
return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf));
}
}

View File

@@ -243,7 +243,7 @@ public class StreamWriteITCase extends TestLogger {
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");
execEnv
DataStream<Object> pipeline = execEnv
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
@@ -259,10 +259,15 @@ public class StreamWriteITCase extends TestLogger {
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write")
.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.uid("uid_hoodie_stream_write");
pipeline.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits").uid("uid_clean_commits");
pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.uid("uid_compact_plan_generate")
.setParallelism(1) // plan generate must be singleton
.keyBy(event -> event.getOperation().hashCode())

View File

@@ -22,11 +22,13 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,6 +44,7 @@ import java.util.concurrent.CompletableFuture;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -63,7 +66,7 @@ public class TestStreamWriteOperatorCoordinator {
}
@AfterEach
public void after() {
public void after() throws Exception {
coordinator.close();
}
@@ -148,4 +151,31 @@ public class TestStreamWriteOperatorCoordinator {
() -> coordinator.notifyCheckpointComplete(1),
"Try 3 to commit instant");
}
@Test
public void testHiveSyncInvoked() throws Exception {
// override the default configuration
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true);
coordinator = new StreamWriteOperatorCoordinator(conf, 1);
coordinator.start();
String instant = coordinator.getInstant();
assertNotEquals("", instant);
WriteStatus writeStatus = new WriteStatus(true, 0.1D);
writeStatus.setPartitionPath("par1");
writeStatus.setStat(new HoodieWriteStat());
OperatorEvent event0 = BatchWriteSuccessEvent.builder()
.taskID(0)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus))
.isLastBatch(true)
.build();
coordinator.handleEventFromOperator(0, event0);
// never throw for hive synchronization now
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));
}
}

View File

@@ -491,7 +491,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(2);
String instant = funcWrapper.getWriteClient()
.getInflightAndRequestedInstant("COPY_ON_WRITE");
.getInflightAndRequestedInstant(getTableType());
nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
@@ -507,6 +507,8 @@ public class TestWriteCopyOnWrite {
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED2);
// next element triggers all partitions load check
funcWrapper.invoke(TestData.DATA_SET_INSERT.get(0));
assertTrue(funcWrapper.isAllPartitionsLoaded(),
"All partitions assume to be loaded into the index state");
}

View File

@@ -18,12 +18,15 @@
package org.apache.hudi.table;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.apache.hudi.utils.factory.CollectSinkTableFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
@@ -49,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.hudi.utils.TestData.assertRowsEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* IT cases for Hoodie table source and sink.
@@ -105,7 +109,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
void testStreamReadAppendData() throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
String createSource2 = TestConfigurations.getFileSourceDDL("source2", "test_source2.data");
String createSource2 = TestConfigurations.getFileSourceDDL("source2", "test_source_2.data");
streamTableEnv.executeSql(createSource);
streamTableEnv.executeSql(createSource2);
@@ -175,6 +179,35 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}
@Test
void testStreamWriteWithCleaning() throws InterruptedException {
// create filesystem table named source
// the source generates 4 commits but the cleaning task
// would always try to keep the remaining commits number as 1
String createSource = TestConfigurations.getFileSourceDDL(
"source", "test_source_3.data", 4);
streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "1"); // only keep 1 commits
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);
Configuration defaultConf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
Map<String, String> options1 = new HashMap<>(defaultConf.toMap());
options1.put(FlinkOptions.TABLE_NAME.key(), "t1");
Configuration conf = Configuration.fromMap(options1);
HoodieTimeline timeline = StreamerUtil.createWriteClient(conf, null)
.getHoodieTable().getActiveTimeline();
assertTrue(timeline.filterCompletedInstants()
.getInstants().anyMatch(instant -> instant.getAction().equals("clean")),
"some commits should be cleaned");
}
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndRead(ExecMode execMode) {

View File

@@ -91,7 +91,15 @@ public class TestConfigurations {
return getFileSourceDDL(tableName, "test_source.data");
}
public static String getFileSourceDDL(String tableName, int checkpoints) {
return getFileSourceDDL(tableName, "test_source.data", checkpoints);
}
public static String getFileSourceDDL(String tableName, String fileName) {
return getFileSourceDDL(tableName, fileName, 2);
}
public static String getFileSourceDDL(String tableName, String fileName, int checkpoints) {
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource(fileName)).toString();
return "create table " + tableName + "(\n"
@@ -102,7 +110,8 @@ public class TestConfigurations {
+ " `partition` varchar(20)\n"
+ ") with (\n"
+ " 'connector' = '" + ContinuousFileSourceFactory.FACTORY_ID + "',\n"
+ " 'path' = '" + sourcePath + "'\n"
+ " 'path' = '" + sourcePath + "',\n"
+ " 'checkpoints' = '" + checkpoints + "'\n"
+ ")";
}

View File

@@ -141,7 +141,7 @@ public class TestData {
TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
);
// merged data set of test_source.data and test_source2.data
// merged data set of test_source.data and test_source_2.data
public static List<RowData> DATA_SET_SOURCE_MERGED = Arrays.asList(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,
TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.utils.source.ContinuousFileSource;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.ValidationException;
@@ -38,6 +39,12 @@ import java.util.Set;
public class ContinuousFileSourceFactory implements DynamicTableSourceFactory {
public static final String FACTORY_ID = "continuous-file-source";
public static final ConfigOption<Integer> CHECKPOINTS = ConfigOptions
.key("checkpoints")
.intType()
.defaultValue(2)
.withDescription("Number of checkpoints to write the data set as, default 2");
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
@@ -56,11 +63,11 @@ public class ContinuousFileSourceFactory implements DynamicTableSourceFactory {
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
return Collections.singleton(FlinkOptions.PATH);
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.singleton(FlinkOptions.PATH);
return Collections.singleton(CHECKPOINTS);
}
}

View File

@@ -42,6 +42,8 @@ import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hudi.utils.factory.ContinuousFileSourceFactory.CHECKPOINTS;
/**
* A continuous file source that can trigger checkpoints continuously.
*
@@ -89,7 +91,7 @@ public class ContinuousFileSource implements ScanTableSource {
true,
TimestampFormat.ISO_8601);
return execEnv.addSource(new BoundedSourceFunction(path, 2))
return execEnv.addSource(new BoundedSourceFunction(path, conf.getInteger(CHECKPOINTS)))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)),

View File

@@ -0,0 +1,8 @@
{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"}
{"uuid": "id2", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"}
{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"}
{"uuid": "id2", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"}
{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"}
{"uuid": "id2", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"}
{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"}
{"uuid": "id2", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"}

View File

@@ -31,6 +31,7 @@
<properties>
<checkstyle.skip>true</checkstyle.skip>
<main.basedir>${project.parent.basedir}</main.basedir>
<flink.bundle.hive.scope>provided</flink.bundle.hive.scope>
<flink.bundle.hive.shade.prefix>org.apache.hudi.</flink.bundle.hive.shade.prefix>
<javax.servlet.version>3.1.0</javax.servlet.version>
<!-- override to be same with flink 1.12.2 -->
@@ -74,6 +75,8 @@
<include>org.apache.hudi:hudi-client-common</include>
<include>org.apache.hudi:hudi-flink-client</include>
<include>org.apache.hudi:hudi-flink_${scala.binary.version}</include>
<include>org.apache.hudi:hudi-hive-sync</include>
<include>org.apache.hudi:hudi-sync-common</include>
<include>org.apache.hudi:hudi-hadoop-mr</include>
<include>org.apache.hudi:hudi-timeline-service</include>
@@ -123,6 +126,8 @@
<include>org.apache.flink:flink-parquet_${scala.binary.version}</include>
<include>org.apache.hive:hive-common</include>
<include>org.apache.hive:hive-service</include>
<include>org.apache.hive:hive-service-rpc</include>
<include>org.apache.hive:hive-exec</include>
<include>org.apache.hive:hive-metastore</include>
<include>org.apache.hive:hive-jdbc</include>
@@ -242,6 +247,11 @@
<artifactId>hudi-hadoop-mr</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hive-sync</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-timeline-service</artifactId>
@@ -349,6 +359,18 @@
</dependency>
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<scope>${flink.bundle.hive.scope}</scope>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service-rpc</artifactId>
<version>${hive.version}</version>
<scope>${flink.bundle.hive.scope}</scope>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-exec</artifactId>
@@ -368,6 +390,7 @@
<groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<scope>${flink.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
@@ -383,6 +406,7 @@
<groupId>${hive.groupid}</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>${flink.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
@@ -398,6 +422,7 @@
<groupId>${hive.groupid}</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
<scope>${flink.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
@@ -436,4 +461,13 @@
<version>${hbase.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>flink-bundle-shade-hive</id>
<properties>
<flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
</properties>
</profile>
</profiles>
</project>