diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index dce924586..80bcf3adf 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -96,7 +96,7 @@ public class HoodieFlinkWriteClient extends /** * Cached metadata writer for coordinator to reuse for each commit. */ - private Option metadataWriterOption = Option.empty(); + private HoodieBackedTableMetadataWriter metadataWriter; public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance()); @@ -264,10 +264,17 @@ public class HoodieFlinkWriteClient extends @Override protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { - this.metadataWriterOption.ifPresent(w -> { - w.initTableMetadata(); // refresh the timeline - w.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); - }); + if (this.metadataWriter == null) { + initMetadataWriter(); + } + // refresh the timeline + + // Note: the data meta client is not refreshed currently, some code path + // relies on the meta client for resolving the latest data schema, + // the schema expects to be immutable for SQL jobs but may be not for non-SQL + // jobs. + this.metadataWriter.initTableMetadata(); + this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); } /** @@ -275,9 +282,24 @@ public class HoodieFlinkWriteClient extends * from the filesystem if it does not exist. */ public void initMetadataWriter() { - HoodieBackedTableMetadataWriter metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create( + this.metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create( FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT); - this.metadataWriterOption = Option.of(metadataWriter); + } + + /** + * Initialized the metadata table on start up, should only be called once on driver. + */ + public void initMetadataTable() { + HoodieFlinkTable table = getHoodieTable(); + if (config.isMetadataTableEnabled()) { + // initialize the metadata table path + initMetadataWriter(); + // clean the obsolete index stats + table.deleteMetadataIndexIfNecessary(); + } else { + // delete the metadata table if it was enabled but is now disabled + table.maybeDeleteMetadataTable(); + } } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 14a055cb1..2cd08b9ae 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -441,9 +441,9 @@ public final class HoodieMetadataConfig extends HoodieConfig { private boolean getDefaultMetadataEnable(EngineType engineType) { switch (engineType) { + case FLINK: case SPARK: return ENABLE.defaultValue(); - case FLINK: case JAVA: return false; default: diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index b1a3372e0..7425540de 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -103,8 +103,8 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption METADATA_ENABLED = ConfigOptions .key("metadata.enabled") .booleanType() - .defaultValue(false) - .withDescription("Enable the internal metadata table which serves table metadata like level file listings, default false"); + .defaultValue(true) + .withDescription("Enable the internal metadata table which serves table metadata like level file listings, default enabled"); public static final ConfigOption METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions .key("metadata.compaction.delta_commits") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index b726b02ca..d5ca307a0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -51,6 +51,7 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; @@ -181,8 +182,10 @@ public class StreamWriteOperatorCoordinator this.gateways = new SubtaskGateway[this.parallelism]; // init table, create if not exists. this.metaClient = initTableIfNotExists(this.conf); + this.ckpMetadata = initCkpMetadata(this.metaClient); // the write client must create after the table creation this.writeClient = StreamerUtil.createWriteClient(conf); + initMetadataTable(this.writeClient); this.tableState = TableState.create(conf); // start the executor this.executor = NonThrownExecutor.builder(LOG) @@ -192,11 +195,6 @@ public class StreamWriteOperatorCoordinator if (tableState.syncHive) { initHiveSync(); } - if (tableState.syncMetadata) { - initMetadataSync(); - } - this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), metaClient.getBasePath()); - this.ckpMetadata.bootstrap(this.metaClient); } @Override @@ -352,8 +350,14 @@ public class StreamWriteOperatorCoordinator hiveSyncContext.hiveSyncTool().syncHoodieTable(); } - private void initMetadataSync() { - this.writeClient.initMetadataWriter(); + private static void initMetadataTable(HoodieFlinkWriteClient writeClient) { + writeClient.initMetadataTable(); + } + + private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException { + CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath()); + ckpMetadata.bootstrap(metaClient); + return ckpMetadata; } private void reset() { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java index a0162856f..efe027526 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java @@ -87,8 +87,8 @@ public class ExpressionEvaluator { * 2. bind the field reference; * 3. bind the column stats. * - *

Normalize the expression to simplify the following decision logic: - * always put the literal expression in the right. + *

Normalize the expression to simplify the subsequent decision logic: + * always put the literal expression in the RHS. */ public static Evaluator bindCall(CallExpression call, RowData indexRow, RowType.RowField[] queryFields) { FunctionDefinition funDef = call.getFunctionDefinition(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 7e3b055ad..5c763e055 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -102,6 +102,7 @@ public class ITTestHoodieFlinkCompactor { tableEnv.getConfig().getConfiguration() .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); Map options = new HashMap<>(); + options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false"); options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); @@ -214,6 +215,7 @@ public class ITTestHoodieFlinkCompactor { tableEnv.getConfig().getConfiguration() .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); Map options = new HashMap<>(); + options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false"); options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); @@ -222,6 +224,7 @@ public class ITTestHoodieFlinkCompactor { tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(TestSQL.INSERT_T1).await(); + // wait for the asynchronous commit to finish TimeUnit.SECONDS.sleep(3); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -247,6 +250,7 @@ public class ITTestHoodieFlinkCompactor { + "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')"; tableEnv.executeSql(insertT1ForNewPartition).await(); + // wait for the asynchronous commit to finish TimeUnit.SECONDS.sleep(3); compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient)); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 3bd00634f..b663a4af3 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -480,6 +480,8 @@ public class TestInputFormat { options.put(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), "3"); options.put(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), "4"); options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "2"); + // disable the metadata table to make the archiving behavior deterministic + options.put(FlinkOptions.METADATA_ENABLED.key(), "false"); options.put("hoodie.commits.archival.batch", "1"); beforeEach(HoodieTableType.COPY_ON_WRITE, options); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index 9559b8c8c..87c8379d6 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.FlinkTables; @@ -77,6 +78,11 @@ public class TestCompactionUtil { this.table = FlinkTables.createTable(conf); this.metaClient = table.getMetaClient(); + // initialize the metadata table path + if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) { + FlinkHoodieBackedTableMetadataWriter.create(table.getHadoopConf(), table.getConfig(), + table.getContext(), Option.empty(), Option.empty()); + } } @Test