1
0

[HUDI-2009] Fixing extra commit metadata in row writer path (#3075)

This commit is contained in:
Sivabalan Narayanan
2021-07-08 03:07:27 -04:00
committed by GitHub
parent 1d3cd06572
commit 8c0dbaa9b3
12 changed files with 180 additions and 29 deletions

View File

@@ -53,6 +53,6 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
// 1st arg to createHooodieConfig is not really reuqired to be set. but passing it anyways.
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(properties.get(HoodieWriteConfig.AVRO_SCHEMA.key()), path, tblName, properties);
return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
getConfiguration(), arePartitionRecordsSorted);
getConfiguration(), properties, arePartitionRecordsSorted);
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.spark3.internal;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
@@ -33,7 +34,9 @@ import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -47,15 +50,17 @@ public class HoodieDataSourceInternalBatchWrite implements BatchWrite {
private final StructType structType;
private final boolean arePartitionRecordsSorted;
private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
private Map<String, String> extraMetadata = new HashMap<>();
public HoodieDataSourceInternalBatchWrite(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) {
SparkSession jss, Configuration hadoopConfiguration, Map<String, String> properties, boolean arePartitionRecordsSorted) {
this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.extraMetadata = DataSourceUtils.getExtraMetadata(properties);
this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
jss, hadoopConfiguration);
jss, hadoopConfiguration, extraMetadata);
}
@Override

View File

@@ -26,6 +26,8 @@ import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.StructType;
import java.util.Map;
/**
* Implementation of {@link WriteBuilder} for datasource "hudi.spark3.internal" to be used in datasource implementation
* of bulk insert.
@@ -38,20 +40,22 @@ public class HoodieDataSourceInternalBatchWriteBuilder implements WriteBuilder {
private final SparkSession jss;
private final Configuration hadoopConfiguration;
private final boolean arePartitionRecordsSorted;
private final Map<String, String> properties;
public HoodieDataSourceInternalBatchWriteBuilder(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) {
SparkSession jss, Configuration hadoopConfiguration, Map<String, String> properties, boolean arePartitionRecordsSorted) {
this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
this.jss = jss;
this.hadoopConfiguration = hadoopConfiguration;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.properties = properties;
}
@Override
public BatchWrite buildForBatch() {
return new HoodieDataSourceInternalBatchWrite(instantTime, writeConfig, structType, jss,
hadoopConfiguration, arePartitionRecordsSorted);
hadoopConfiguration, properties, arePartitionRecordsSorted);
}
}

View File

@@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.StructType;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
@@ -42,15 +43,18 @@ class HoodieDataSourceInternalTable implements SupportsWrite {
private final SparkSession jss;
private final Configuration hadoopConfiguration;
private final boolean arePartitionRecordsSorted;
private final Map<String, String> properties;
public HoodieDataSourceInternalTable(String instantTime, HoodieWriteConfig config,
StructType schema, SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) {
StructType schema, SparkSession jss, Configuration hadoopConfiguration, Map<String, String> properties,
boolean arePartitionRecordsSorted) {
this.instantTime = instantTime;
this.writeConfig = config;
this.structType = schema;
this.jss = jss;
this.hadoopConfiguration = hadoopConfiguration;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.properties = properties;
}
@Override
@@ -75,6 +79,6 @@ class HoodieDataSourceInternalTable implements SupportsWrite {
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) {
return new HoodieDataSourceInternalBatchWriteBuilder(instantTime, writeConfig, structType, jss,
hadoopConfiguration, arePartitionRecordsSorted);
hadoopConfiguration, properties, arePartitionRecordsSorted);
}
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.spark3.internal;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -34,13 +36,18 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Unit tests {@link HoodieDataSourceInternalBatchWrite}.
@@ -50,13 +57,17 @@ public class TestHoodieDataSourceInternalBatchWrite extends
@Test
public void testDataSourceWriter() throws Exception {
testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP);
}
private void testDataSourceWriterInternal(Map<String, String> extraMetadata, Map<String, String> expectedExtraMetadata) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
String instantTime = "001";
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false);
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, extraMetadata, false);
DataWriter<InternalRow> writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong());
String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
@@ -84,11 +95,49 @@ public class TestHoodieDataSourceInternalBatchWrite extends
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
commitMessages.add(commitMetadata);
dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify output
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
// verify extra metadata
Option<HoodieCommitMetadata> commitMetadataOption = HoodieClientTestUtils.getCommitMetadataForLatestInstant(metaClient);
assertTrue(commitMetadataOption.isPresent());
Map<String, String> actualExtraMetadata = new HashMap<>();
commitMetadataOption.get().getExtraMetadata().entrySet().stream().filter(entry ->
!entry.getKey().equals(HoodieCommitMetadata.SCHEMA_KEY)).forEach(entry -> actualExtraMetadata.put(entry.getKey(), entry.getValue()));
assertEquals(actualExtraMetadata, expectedExtraMetadata);
}
@Test
public void testDataSourceWriterExtraCommitMetadata() throws Exception {
String commitExtraMetaPrefix = "commit_extra_meta_";
Map<String, String> extraMeta = new HashMap<>();
extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key(), commitExtraMetaPrefix);
extraMeta.put(commitExtraMetaPrefix + "a", "valA");
extraMeta.put(commitExtraMetaPrefix + "b", "valB");
extraMeta.put("commit_extra_c", "valC"); // should not be part of commit extra metadata
Map<String, String> expectedMetadata = new HashMap<>();
expectedMetadata.putAll(extraMeta);
expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key());
expectedMetadata.remove("commit_extra_c");
testDataSourceWriterInternal(extraMeta, expectedMetadata);
}
@Test
public void testDataSourceWriterEmptyExtraCommitMetadata() throws Exception {
String commitExtraMetaPrefix = "commit_extra_meta_";
Map<String, String> extraMeta = new HashMap<>();
extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key(), commitExtraMetaPrefix);
extraMeta.put("keyA", "valA");
extraMeta.put("keyB", "valB");
extraMeta.put("commit_extra_c", "valC");
// none of the keys has commit metadata key prefix.
testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP);
}
@Test
@@ -103,8 +152,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
String instantTime = "00" + i;
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false);
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false);
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
Dataset<Row> totalInputRows = null;
DataWriter<InternalRow> writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong());
@@ -148,8 +196,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
String instantTime = "00" + i;
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false);
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false);
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
Dataset<Row> totalInputRows = null;
DataWriter<InternalRow> writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong());
@@ -195,8 +242,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
String instantTime0 = "00" + 0;
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false);
new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false);
DataWriter<InternalRow> writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong());
List<String> partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
@@ -234,7 +280,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
// 2nd batch. abort in the end
String instantTime1 = "00" + 1;
dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false);
new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false);
writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, RANDOM.nextLong());
for (int j = 0; j < batches; j++) {