1
0

[HUDI-4372] Enable matadata table by default for flink (#6066)

This commit is contained in:
Danny Chan
2022-07-20 16:10:19 +08:00
committed by GitHub
parent 6c3578069e
commit e3675fe9b0
8 changed files with 57 additions and 19 deletions

View File

@@ -96,7 +96,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
/** /**
* Cached metadata writer for coordinator to reuse for each commit. * Cached metadata writer for coordinator to reuse for each commit.
*/ */
private Option<HoodieBackedTableMetadataWriter> metadataWriterOption = Option.empty(); private HoodieBackedTableMetadataWriter metadataWriter;
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance()); super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
@@ -264,10 +264,17 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
@Override @Override
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
this.metadataWriterOption.ifPresent(w -> { if (this.metadataWriter == null) {
w.initTableMetadata(); // refresh the timeline initMetadataWriter();
w.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); }
}); // refresh the timeline
// Note: the data meta client is not refreshed currently, some code path
// relies on the meta client for resolving the latest data schema,
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
} }
/** /**
@@ -275,9 +282,24 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
* from the filesystem if it does not exist. * from the filesystem if it does not exist.
*/ */
public void initMetadataWriter() { public void initMetadataWriter() {
HoodieBackedTableMetadataWriter metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create( this.metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT); FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
this.metadataWriterOption = Option.of(metadataWriter); }
/**
* Initialized the metadata table on start up, should only be called once on driver.
*/
public void initMetadataTable() {
HoodieFlinkTable<?> table = getHoodieTable();
if (config.isMetadataTableEnabled()) {
// initialize the metadata table path
initMetadataWriter();
// clean the obsolete index stats
table.deleteMetadataIndexIfNecessary();
} else {
// delete the metadata table if it was enabled but is now disabled
table.maybeDeleteMetadataTable();
}
} }
/** /**

View File

@@ -441,9 +441,9 @@ public final class HoodieMetadataConfig extends HoodieConfig {
private boolean getDefaultMetadataEnable(EngineType engineType) { private boolean getDefaultMetadataEnable(EngineType engineType) {
switch (engineType) { switch (engineType) {
case FLINK:
case SPARK: case SPARK:
return ENABLE.defaultValue(); return ENABLE.defaultValue();
case FLINK:
case JAVA: case JAVA:
return false; return false;
default: default:

View File

@@ -103,8 +103,8 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Boolean> METADATA_ENABLED = ConfigOptions public static final ConfigOption<Boolean> METADATA_ENABLED = ConfigOptions
.key("metadata.enabled") .key("metadata.enabled")
.booleanType() .booleanType()
.defaultValue(false) .defaultValue(true)
.withDescription("Enable the internal metadata table which serves table metadata like level file listings, default false"); .withDescription("Enable the internal metadata table which serves table metadata like level file listings, default enabled");
public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions
.key("metadata.compaction.delta_commits") .key("metadata.compaction.delta_commits")

View File

@@ -51,6 +51,7 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@@ -181,8 +182,10 @@ public class StreamWriteOperatorCoordinator
this.gateways = new SubtaskGateway[this.parallelism]; this.gateways = new SubtaskGateway[this.parallelism];
// init table, create if not exists. // init table, create if not exists.
this.metaClient = initTableIfNotExists(this.conf); this.metaClient = initTableIfNotExists(this.conf);
this.ckpMetadata = initCkpMetadata(this.metaClient);
// the write client must create after the table creation // the write client must create after the table creation
this.writeClient = StreamerUtil.createWriteClient(conf); this.writeClient = StreamerUtil.createWriteClient(conf);
initMetadataTable(this.writeClient);
this.tableState = TableState.create(conf); this.tableState = TableState.create(conf);
// start the executor // start the executor
this.executor = NonThrownExecutor.builder(LOG) this.executor = NonThrownExecutor.builder(LOG)
@@ -192,11 +195,6 @@ public class StreamWriteOperatorCoordinator
if (tableState.syncHive) { if (tableState.syncHive) {
initHiveSync(); initHiveSync();
} }
if (tableState.syncMetadata) {
initMetadataSync();
}
this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), metaClient.getBasePath());
this.ckpMetadata.bootstrap(this.metaClient);
} }
@Override @Override
@@ -352,8 +350,14 @@ public class StreamWriteOperatorCoordinator
hiveSyncContext.hiveSyncTool().syncHoodieTable(); hiveSyncContext.hiveSyncTool().syncHoodieTable();
} }
private void initMetadataSync() { private static void initMetadataTable(HoodieFlinkWriteClient<?> writeClient) {
this.writeClient.initMetadataWriter(); writeClient.initMetadataTable();
}
private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException {
CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath());
ckpMetadata.bootstrap(metaClient);
return ckpMetadata;
} }
private void reset() { private void reset() {

View File

@@ -87,8 +87,8 @@ public class ExpressionEvaluator {
* 2. bind the field reference; * 2. bind the field reference;
* 3. bind the column stats. * 3. bind the column stats.
* *
* <p>Normalize the expression to simplify the following decision logic: * <p>Normalize the expression to simplify the subsequent decision logic:
* always put the literal expression in the right. * always put the literal expression in the RHS.
*/ */
public static Evaluator bindCall(CallExpression call, RowData indexRow, RowType.RowField[] queryFields) { public static Evaluator bindCall(CallExpression call, RowData indexRow, RowType.RowField[] queryFields) {
FunctionDefinition funDef = call.getFunctionDefinition(); FunctionDefinition funDef = call.getFunctionDefinition();

View File

@@ -102,6 +102,7 @@ public class ITTestHoodieFlinkCompactor {
tableEnv.getConfig().getConfiguration() tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Map<String, String> options = new HashMap<>(); Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
@@ -214,6 +215,7 @@ public class ITTestHoodieFlinkCompactor {
tableEnv.getConfig().getConfiguration() tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Map<String, String> options = new HashMap<>(); Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
@@ -222,6 +224,7 @@ public class ITTestHoodieFlinkCompactor {
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);
tableEnv.executeSql(TestSQL.INSERT_T1).await(); tableEnv.executeSql(TestSQL.INSERT_T1).await();
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -247,6 +250,7 @@ public class ITTestHoodieFlinkCompactor {
+ "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')"; + "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')";
tableEnv.executeSql(insertT1ForNewPartition).await(); tableEnv.executeSql(insertT1ForNewPartition).await();
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3);
compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient)); compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));

View File

@@ -480,6 +480,8 @@ public class TestInputFormat {
options.put(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), "3"); options.put(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), "3");
options.put(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), "4"); options.put(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), "4");
options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "2"); options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "2");
// disable the metadata table to make the archiving behavior deterministic
options.put(FlinkOptions.METADATA_ENABLED.key(), "false");
options.put("hoodie.commits.archival.batch", "1"); options.put("hoodie.commits.archival.batch", "1");
beforeEach(HoodieTableType.COPY_ON_WRITE, options); beforeEach(HoodieTableType.COPY_ON_WRITE, options);

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkTables; import org.apache.hudi.util.FlinkTables;
@@ -77,6 +78,11 @@ public class TestCompactionUtil {
this.table = FlinkTables.createTable(conf); this.table = FlinkTables.createTable(conf);
this.metaClient = table.getMetaClient(); this.metaClient = table.getMetaClient();
// initialize the metadata table path
if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
FlinkHoodieBackedTableMetadataWriter.create(table.getHadoopConf(), table.getConfig(),
table.getContext(), Option.empty(), Option.empty());
}
} }
@Test @Test