[HUDI-4484] Add default lock config options for flink metadata table (#6222)
This commit is contained in:
@@ -85,6 +85,10 @@ public class TransactionManager implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public LockManager getLockManager() {
|
||||||
|
return lockManager;
|
||||||
|
}
|
||||||
|
|
||||||
public Option<HoodieInstant> getLastCompletedTransactionOwner() {
|
public Option<HoodieInstant> getLastCompletedTransactionOwner() {
|
||||||
return lastCompletedTxnOwnerInstant;
|
return lastCompletedTxnOwnerInstant;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -311,6 +311,16 @@ public class HoodieLockConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HoodieLockConfig.Builder withFileSystemLockPath(String path) {
|
||||||
|
lockConfig.setValue(FILESYSTEM_LOCK_PATH, path);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HoodieLockConfig.Builder withFileSystemLockExpire(Integer expireTime) {
|
||||||
|
lockConfig.setValue(FILESYSTEM_LOCK_EXPIRE, String.valueOf(expireTime));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public HoodieLockConfig build() {
|
public HoodieLockConfig build() {
|
||||||
lockConfig.setDefaults(HoodieLockConfig.class.getName());
|
lockConfig.setDefaults(HoodieLockConfig.class.getName());
|
||||||
return lockConfig;
|
return lockConfig;
|
||||||
|
|||||||
@@ -267,14 +267,21 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
if (this.metadataWriter == null) {
|
if (this.metadataWriter == null) {
|
||||||
initMetadataWriter();
|
initMetadataWriter();
|
||||||
}
|
}
|
||||||
// refresh the timeline
|
try {
|
||||||
|
// guard the metadata writer with concurrent lock
|
||||||
|
this.txnManager.getLockManager().lock();
|
||||||
|
|
||||||
// Note: the data meta client is not refreshed currently, some code path
|
// refresh the timeline
|
||||||
// relies on the meta client for resolving the latest data schema,
|
|
||||||
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
|
// Note: the data meta client is not refreshed currently, some code path
|
||||||
// jobs.
|
// relies on the meta client for resolving the latest data schema,
|
||||||
this.metadataWriter.initTableMetadata();
|
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
|
||||||
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
|
// jobs.
|
||||||
|
this.metadataWriter.initTableMetadata();
|
||||||
|
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
|
||||||
|
} finally {
|
||||||
|
this.txnManager.getLockManager().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -152,11 +152,6 @@ public class StreamWriteOperatorCoordinator
|
|||||||
*/
|
*/
|
||||||
private CkpMetadata ckpMetadata;
|
private CkpMetadata ckpMetadata;
|
||||||
|
|
||||||
/**
|
|
||||||
* Current checkpoint.
|
|
||||||
*/
|
|
||||||
private long checkpointId = -1;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a StreamingSinkOperatorCoordinator.
|
* Constructs a StreamingSinkOperatorCoordinator.
|
||||||
*
|
*
|
||||||
@@ -219,7 +214,6 @@ public class StreamWriteOperatorCoordinator
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
|
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
|
||||||
this.checkpointId = checkpointId;
|
|
||||||
executor.execute(
|
executor.execute(
|
||||||
() -> {
|
() -> {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ package org.apache.hudi.util;
|
|||||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||||
|
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
|
||||||
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
|
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||||
@@ -43,6 +44,7 @@ import org.apache.hudi.config.HoodieArchivalConfig;
|
|||||||
import org.apache.hudi.config.HoodieCleanConfig;
|
import org.apache.hudi.config.HoodieCleanConfig;
|
||||||
import org.apache.hudi.config.HoodieClusteringConfig;
|
import org.apache.hudi.config.HoodieClusteringConfig;
|
||||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
|
import org.apache.hudi.config.HoodieLockConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.config.HoodieMemoryConfig;
|
import org.apache.hudi.config.HoodieMemoryConfig;
|
||||||
import org.apache.hudi.config.HoodieStorageConfig;
|
import org.apache.hudi.config.HoodieStorageConfig;
|
||||||
@@ -88,6 +90,7 @@ import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
|
|||||||
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
|
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
|
||||||
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
|
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
|
||||||
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
|
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
|
||||||
|
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utilities for Flink stream read and write.
|
* Utilities for Flink stream read and write.
|
||||||
@@ -170,7 +173,7 @@ public class StreamerUtil {
|
|||||||
.withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
|
.withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
|
||||||
.withClusteringConfig(
|
.withClusteringConfig(
|
||||||
HoodieClusteringConfig.newBuilder()
|
HoodieClusteringConfig.newBuilder()
|
||||||
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
|
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED))
|
||||||
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
|
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
|
||||||
.withClusteringPlanPartitionFilterMode(
|
.withClusteringPlanPartitionFilterMode(
|
||||||
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
|
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
|
||||||
@@ -218,6 +221,12 @@ public class StreamerUtil {
|
|||||||
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
|
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
|
||||||
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
|
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
|
||||||
.build())
|
.build())
|
||||||
|
.withLockConfig(HoodieLockConfig.newBuilder()
|
||||||
|
.withLockProvider(FileSystemBasedLockProvider.class)
|
||||||
|
.withLockWaitTimeInMillis(2000L) // 2s
|
||||||
|
.withFileSystemLockExpire(1) // 1 minute
|
||||||
|
.withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
|
||||||
|
.build())
|
||||||
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
|
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
|
||||||
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
|
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
|
||||||
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
||||||
@@ -231,6 +240,7 @@ public class StreamerUtil {
|
|||||||
.withProps(flinkConf2TypedProperties(conf))
|
.withProps(flinkConf2TypedProperties(conf))
|
||||||
.withSchema(getSourceSchema(conf).toString());
|
.withSchema(getSourceSchema(conf).toString());
|
||||||
|
|
||||||
|
// do not configure cleaning strategy as LAZY until multi-writers is supported.
|
||||||
HoodieWriteConfig writeConfig = builder.build();
|
HoodieWriteConfig writeConfig = builder.build();
|
||||||
if (loadFsViewStorageConfig) {
|
if (loadFsViewStorageConfig) {
|
||||||
// do not use the builder to give a change for recovering the original fs view storage config
|
// do not use the builder to give a change for recovering the original fs view storage config
|
||||||
@@ -548,4 +558,11 @@ public class StreamerUtil {
|
|||||||
throw new HoodieException("Exception while checking file " + path + " existence", e);
|
throw new HoodieException("Exception while checking file " + path + " existence", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the auxiliary path.
|
||||||
|
*/
|
||||||
|
public static String getAuxiliaryPath(Configuration conf) {
|
||||||
|
return conf.getString(FlinkOptions.PATH) + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -382,13 +382,9 @@ public class ITTestDataStreamWrite extends TestLogger {
|
|||||||
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
|
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
|
||||||
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
|
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
|
||||||
|
|
||||||
options.put(FlinkOptions.INDEX_TYPE.key(), "FLINK_STATE");
|
|
||||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||||
options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4");
|
|
||||||
options.put("table.type", HoodieTableType.MERGE_ON_READ.name());
|
|
||||||
options.put(FlinkOptions.INDEX_KEY_FIELD.key(), "id");
|
|
||||||
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
|
|
||||||
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
|
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
|
||||||
|
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
|
||||||
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
|
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
|
||||||
Configuration conf = Configuration.fromMap(options);
|
Configuration conf = Configuration.fromMap(options);
|
||||||
// Read from file source
|
// Read from file source
|
||||||
|
|||||||
@@ -42,7 +42,6 @@ import org.apache.flink.table.api.EnvironmentSettings;
|
|||||||
import org.apache.flink.table.api.TableEnvironment;
|
import org.apache.flink.table.api.TableEnvironment;
|
||||||
import org.apache.flink.table.api.config.ExecutionConfigOptions;
|
import org.apache.flink.table.api.config.ExecutionConfigOptions;
|
||||||
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
|
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
|
||||||
import org.junit.jupiter.api.Disabled;
|
|
||||||
import org.junit.jupiter.api.io.TempDir;
|
import org.junit.jupiter.api.io.TempDir;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.ValueSource;
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
@@ -166,7 +165,6 @@ public class ITTestHoodieFlinkCompactor {
|
|||||||
TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
|
TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Disabled
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(booleans = {true, false})
|
@ValueSource(booleans = {true, false})
|
||||||
public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exception {
|
public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exception {
|
||||||
@@ -201,14 +199,13 @@ public class ITTestHoodieFlinkCompactor {
|
|||||||
asyncCompactionService.start(null);
|
asyncCompactionService.start(null);
|
||||||
|
|
||||||
// wait for the asynchronous commit to finish
|
// wait for the asynchronous commit to finish
|
||||||
TimeUnit.SECONDS.sleep(5);
|
TimeUnit.SECONDS.sleep(10);
|
||||||
|
|
||||||
asyncCompactionService.shutDown();
|
asyncCompactionService.shutDown();
|
||||||
|
|
||||||
TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
|
TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Disabled
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(booleans = {true, false})
|
@ValueSource(booleans = {true, false})
|
||||||
public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception {
|
public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception {
|
||||||
@@ -218,7 +215,6 @@ public class ITTestHoodieFlinkCompactor {
|
|||||||
tableEnv.getConfig().getConfiguration()
|
tableEnv.getConfig().getConfiguration()
|
||||||
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
|
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
|
||||||
Map<String, String> options = new HashMap<>();
|
Map<String, String> options = new HashMap<>();
|
||||||
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
|
|
||||||
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
|
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
|
||||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||||
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
|
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
|
||||||
@@ -227,9 +223,6 @@ public class ITTestHoodieFlinkCompactor {
|
|||||||
tableEnv.executeSql(hoodieTableDDL);
|
tableEnv.executeSql(hoodieTableDDL);
|
||||||
tableEnv.executeSql(TestSQL.INSERT_T1).await();
|
tableEnv.executeSql(TestSQL.INSERT_T1).await();
|
||||||
|
|
||||||
// wait for the asynchronous commit to finish
|
|
||||||
TimeUnit.SECONDS.sleep(3);
|
|
||||||
|
|
||||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
|
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
|
||||||
cfg.path = tempFile.getAbsolutePath();
|
cfg.path = tempFile.getAbsolutePath();
|
||||||
@@ -253,9 +246,13 @@ public class ITTestHoodieFlinkCompactor {
|
|||||||
+ "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')";
|
+ "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')";
|
||||||
tableEnv.executeSql(insertT1ForNewPartition).await();
|
tableEnv.executeSql(insertT1ForNewPartition).await();
|
||||||
|
|
||||||
// wait for the asynchronous commit to finish
|
writeClient.close();
|
||||||
TimeUnit.SECONDS.sleep(3);
|
// re-create the write client/fs view server
|
||||||
|
// or there is low probability that connection refused occurs then
|
||||||
|
// the reader metadata view is not complete
|
||||||
|
writeClient = StreamerUtil.createWriteClient(conf);
|
||||||
|
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));
|
compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));
|
||||||
|
|
||||||
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
||||||
|
|||||||
@@ -222,7 +222,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testStreamWriteBatchReadOptimized() {
|
void testStreamWriteBatchReadOptimized() throws Exception {
|
||||||
// create filesystem table named source
|
// create filesystem table named source
|
||||||
String createSource = TestConfigurations.getFileSourceDDL("source");
|
String createSource = TestConfigurations.getFileSourceDDL("source");
|
||||||
streamTableEnv.executeSql(createSource);
|
streamTableEnv.executeSql(createSource);
|
||||||
@@ -236,11 +236,16 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
|
|||||||
.option(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)
|
.option(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)
|
||||||
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
|
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
|
||||||
.option(FlinkOptions.COMPACTION_TASKS, 1)
|
.option(FlinkOptions.COMPACTION_TASKS, 1)
|
||||||
|
// disable the metadata table because
|
||||||
|
// the lock conflicts resolution takes time
|
||||||
|
.option(FlinkOptions.METADATA_ENABLED, false)
|
||||||
.end();
|
.end();
|
||||||
streamTableEnv.executeSql(hoodieTableDDL);
|
streamTableEnv.executeSql(hoodieTableDDL);
|
||||||
String insertInto = "insert into t1 select * from source";
|
String insertInto = "insert into t1 select * from source";
|
||||||
execInsertSql(streamTableEnv, insertInto);
|
execInsertSql(streamTableEnv, insertInto);
|
||||||
|
|
||||||
|
// give some buffer time for finishing the async compaction tasks
|
||||||
|
TimeUnit.SECONDS.sleep(5);
|
||||||
List<Row> rows = CollectionUtil.iterableToList(
|
List<Row> rows = CollectionUtil.iterableToList(
|
||||||
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
|
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user