1
0

[HUDI-4439] Fix Amazon CloudWatch reporter for metadata enabled tables (#6164)

Co-authored-by: Udit Mehrotra <uditme@amazon.com>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
This commit is contained in:
Rahil C
2022-07-23 21:08:21 -07:00
committed by GitHub
parent ba11082282
commit 1a5a9f7f03
2 changed files with 47 additions and 22 deletions

View File

@@ -294,7 +294,12 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
builder.withProperties(properties); builder.withProperties(properties);
if (writeConfig.isMetricsOn()) { if (writeConfig.isMetricsOn()) {
// Table Name is needed for metric reporters prefix
Properties commonProperties = new Properties();
commonProperties.put(HoodieWriteConfig.TBL_NAME.key(), tableName);
builder.withMetricsConfig(HoodieMetricsConfig.newBuilder() builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
.fromProperties(commonProperties)
.withReporterType(writeConfig.getMetricsReporterType().toString()) .withReporterType(writeConfig.getMetricsReporterType().toString())
.withExecutorMetrics(writeConfig.isExecutorMetricsEnabled()) .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
.on(true).build()); .on(true).build());

View File

@@ -18,14 +18,6 @@
package org.apache.hudi.client.functional; package org.apache.hudi.client.functional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
@@ -81,14 +73,14 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.hash.ColumnIndexID; import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.io.storage.HoodieHFileReader;
@@ -107,6 +99,15 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus; import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.avro.AvroSchemaConverter;
@@ -178,6 +179,19 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
); );
} }
public static List<Arguments> tableOperationsTestArgs() {
return asList(
Arguments.of(COPY_ON_WRITE, true, true),
Arguments.of(COPY_ON_WRITE, true, false),
Arguments.of(COPY_ON_WRITE, false, true),
Arguments.of(COPY_ON_WRITE, false, false),
Arguments.of(MERGE_ON_READ, true, true),
Arguments.of(MERGE_ON_READ, true, false),
Arguments.of(MERGE_ON_READ, false, true),
Arguments.of(MERGE_ON_READ, false, false)
);
}
/** /**
* Metadata Table bootstrap scenarios. * Metadata Table bootstrap scenarios.
*/ */
@@ -441,28 +455,34 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
* Test various table operations sync to Metadata Table correctly. * Test various table operations sync to Metadata Table correctly.
*/ */
@ParameterizedTest @ParameterizedTest
@MethodSource("tableTypeAndEnableOperationArgs") @MethodSource("tableOperationsTestArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception { public void testTableOperations(HoodieTableType tableType, boolean enableFullScan, boolean enableMetrics) throws Exception {
init(tableType, true, enableFullScan, false, false); List<Long> commitTimeList = new ArrayList<>();
doWriteInsertAndUpsert(testTable); commitTimeList.add(Long.parseLong(HoodieActiveTimeline.createNewInstantTime()));
for (int i = 0; i < 8; i++) {
long nextCommitTime = getNextCommitTime(commitTimeList.get(commitTimeList.size() - 1));
commitTimeList.add(nextCommitTime);
}
init(tableType, true, enableFullScan, enableMetrics, false);
doWriteInsertAndUpsert(testTable, commitTimeList.get(0).toString(), commitTimeList.get(1).toString(), false);
// trigger an upsert // trigger an upsert
doWriteOperationAndValidate(testTable, "0000003"); doWriteOperationAndValidate(testTable, commitTimeList.get(2).toString());
// trigger compaction // trigger compaction
if (MERGE_ON_READ.equals(tableType)) { if (MERGE_ON_READ.equals(tableType)) {
doCompactionAndValidate(testTable, "0000004"); doCompactionAndValidate(testTable, commitTimeList.get(3).toString());
} }
// trigger an upsert // trigger an upsert
doWriteOperation(testTable, "0000005"); doWriteOperation(testTable, commitTimeList.get(4).toString());
// trigger clean // trigger clean
doCleanAndValidate(testTable, "0000006", singletonList("0000001")); doCleanAndValidate(testTable, commitTimeList.get(5).toString(), singletonList(commitTimeList.get(0).toString()));
// trigger few upserts and validate // trigger few upserts and validate
doWriteOperation(testTable, "0000007"); doWriteOperation(testTable, commitTimeList.get(6).toString());
doWriteOperation(testTable, "0000008"); doWriteOperation(testTable, commitTimeList.get(7).toString());
validateMetadata(testTable, emptyList(), true); validateMetadata(testTable, emptyList(), true);
} }