From 8c0dbaa9b3b6ced3826d0bc04e0a91272bbcab73 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Thu, 8 Jul 2021 03:07:27 -0400 Subject: [PATCH] [HUDI-2009] Fixing extra commit metadata in row writer path (#3075) --- .../hudi/testutils/HoodieClientTestUtils.java | 19 ++++++ .../java/org/apache/hudi/DataSourceUtils.java | 13 ++++ .../DataSourceInternalWriterHelper.java | 9 ++- .../apache/hudi/internal/DefaultSource.java | 2 +- .../HoodieDataSourceInternalWriter.java | 12 +++- .../internal/HoodieWriterCommitMessage.java | 1 + .../TestHoodieDataSourceInternalWriter.java | 64 ++++++++++++++++--- .../hudi/spark3/internal/DefaultSource.java | 2 +- .../HoodieDataSourceInternalBatchWrite.java | 9 ++- ...ieDataSourceInternalBatchWriteBuilder.java | 8 ++- .../HoodieDataSourceInternalTable.java | 8 ++- ...estHoodieDataSourceInternalBatchWrite.java | 62 +++++++++++++++--- 12 files changed, 180 insertions(+), 29 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 55c5aa72f..58722053e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.avro.Schema; @@ -211,4 +212,22 @@ public class HoodieClientTestUtils { return valuesAsList.stream(); } + public static Option getCommitMetadataForLatestInstant(HoodieTableMetaClient metaClient) { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + if (timeline.lastInstant().isPresent()) { + return getCommitMetadataForInstant(metaClient, timeline.lastInstant().get()); + } else { + return Option.empty(); + } + } + + private static Option getCommitMetadataForInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) { + try { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + byte[] data = timeline.getInstantDetails(instant).get(); + return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); + } catch (Exception e) { + throw new HoodieException("Failed to read schema from commit metadata", e); + } + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 0cdeeaef0..352a0ca7b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -55,6 +55,7 @@ import org.apache.spark.sql.Row; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -132,6 +133,18 @@ public class DataSourceUtils { } } + public static Map getExtraMetadata(Map properties) { + Map extraMetadataMap = new HashMap<>(); + if (properties.containsKey(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key())) { + properties.entrySet().forEach(entry -> { + if (entry.getKey().startsWith(properties.get(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key()))) { + extraMetadataMap.put(entry.getKey(), entry.getValue()); + } + }); + } + return extraMetadataMap; + } + /** * Create a payload class via reflection, do not ordering/precombine value. */ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java index 0f27ee9b8..3dc8f6055 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java @@ -38,8 +38,8 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructType; -import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Helper class for HoodieDataSourceInternalWriter used by Spark datasource v2. @@ -54,14 +54,17 @@ public class DataSourceInternalWriterHelper { private final SparkRDDWriteClient writeClient; private final HoodieTable hoodieTable; private final WriteOperationType operationType; + private Map extraMetadata; public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig writeConfig, StructType structType, - SparkSession sparkSession, Configuration configuration) { + SparkSession sparkSession, Configuration configuration, Map extraMetadata) { this.instantTime = instantTime; this.operationType = WriteOperationType.BULK_INSERT; + this.extraMetadata = extraMetadata; this.writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), writeConfig); writeClient.setOperationType(operationType); writeClient.startCommitWithTime(instantTime); + this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build(); this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient); } @@ -76,7 +79,7 @@ public class DataSourceInternalWriterHelper { public void commit(List writeStatList) { try { - writeClient.commitStats(instantTime, writeStatList, Option.of(new HashMap<>()), + writeClient.commitStats(instantTime, writeStatList, Option.of(extraMetadata), CommitUtils.getCommitActionType(operationType, metaClient.getTableType())); } catch (Exception ioe) { throw new HoodieException(ioe.getMessage(), ioe); diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java index 6ced37149..6867ef89b 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java @@ -67,6 +67,6 @@ public class DefaultSource extends BaseDefaultSource implements DataSourceV2, options.get(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED).isPresent() ? options.get(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED).get() : null); return Optional.of(new HoodieDataSourceInternalWriter(instantTime, config, schema, getSparkSession(), - getConfiguration(), arePartitionRecordsSorted)); + getConfiguration(), options, arePartitionRecordsSorted)); } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java index 6aa5329f8..96e5f89db 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java @@ -18,6 +18,7 @@ package org.apache.hudi.internal; +import org.apache.hudi.DataSourceUtils; import org.apache.hudi.client.HoodieInternalWriteStatus; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; @@ -26,13 +27,16 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hadoop.conf.Configuration; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; import org.apache.spark.sql.types.StructType; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -46,15 +50,18 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter { private final StructType structType; private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper; private final Boolean arePartitionRecordsSorted; + private Map extraMetadataMap = new HashMap<>(); public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType, - SparkSession sparkSession, Configuration configuration, boolean arePartitionRecordsSorted) { + SparkSession sparkSession, Configuration configuration, DataSourceOptions dataSourceOptions, + boolean arePartitionRecordsSorted) { this.instantTime = instantTime; this.writeConfig = writeConfig; this.structType = structType; this.arePartitionRecordsSorted = arePartitionRecordsSorted; + this.extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap()); this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType, - sparkSession, configuration); + sparkSession, configuration, extraMetadataMap); } @Override @@ -89,4 +96,5 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter { public void abort(WriterCommitMessage[] messages) { dataSourceInternalWriterHelper.abort(); } + } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java index 240e4b981..1644a6d4c 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java @@ -19,6 +19,7 @@ package org.apache.hudi.internal; import org.apache.hudi.client.HoodieInternalWriteStatus; + import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; import java.util.List; diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java index ca8058ff0..e498febc6 100644 --- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java @@ -18,6 +18,8 @@ package org.apache.hudi.internal; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; @@ -26,18 +28,24 @@ import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.writer.DataWriter; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder; import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Unit tests {@link HoodieDataSourceInternalWriter}. @@ -47,12 +55,16 @@ public class TestHoodieDataSourceInternalWriter extends @Test public void testDataSourceWriter() throws Exception { + testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP); + } + + private void testDataSourceWriterInternal(Map extraMetadata, Map expectedExtraMetadata) throws Exception { // init config and table HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); String instantTime = "001"; // init writer HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); + new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(extraMetadata), false); DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong()); String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; @@ -64,7 +76,6 @@ public class TestHoodieDataSourceInternalWriter extends int size = 10 + RANDOM.nextInt(1000); int batches = 5; Dataset totalInputRows = null; - for (int j = 0; j < batches; j++) { String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); @@ -80,11 +91,50 @@ public class TestHoodieDataSourceInternalWriter extends List commitMessages = new ArrayList<>(); commitMessages.add(commitMetadata); dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); + metaClient.reloadActiveTimeline(); Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); // verify output assertOutput(totalInputRows, result, instantTime, Option.empty()); assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); + + // verify extra metadata + Option commitMetadataOption = HoodieClientTestUtils.getCommitMetadataForLatestInstant(metaClient); + assertTrue(commitMetadataOption.isPresent()); + Map actualExtraMetadata = new HashMap<>(); + commitMetadataOption.get().getExtraMetadata().entrySet().stream().filter(entry -> + !entry.getKey().equals(HoodieCommitMetadata.SCHEMA_KEY)).forEach(entry -> actualExtraMetadata.put(entry.getKey(), entry.getValue())); + assertEquals(actualExtraMetadata, expectedExtraMetadata); + } + + @Test + public void testDataSourceWriterExtraCommitMetadata() throws Exception { + + String commitExtraMetaPrefix = "commit_extra_meta_"; + Map extraMeta = new HashMap<>(); + extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key(), commitExtraMetaPrefix); + extraMeta.put(commitExtraMetaPrefix + "a", "valA"); + extraMeta.put(commitExtraMetaPrefix + "b", "valB"); + extraMeta.put("commit_extra_c", "valC"); // should not be part of commit extra metadata + + Map expectedMetadata = new HashMap<>(); + expectedMetadata.putAll(extraMeta); + expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key()); + expectedMetadata.remove("commit_extra_c"); + + testDataSourceWriterInternal(extraMeta, expectedMetadata); + } + + @Test + public void testDataSourceWriterEmptyExtraCommitMetadata() throws Exception { + String commitExtraMetaPrefix = "commit_extra_meta_"; + Map extraMeta = new HashMap<>(); + extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key(), commitExtraMetaPrefix); + extraMeta.put("keyA", "valA"); + extraMeta.put("keyB", "valB"); + extraMeta.put("commit_extra_c", "valC"); + // none of the keys has commit metadata key prefix. + testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP); } @Test @@ -98,8 +148,7 @@ public class TestHoodieDataSourceInternalWriter extends String instantTime = "00" + i; // init writer HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); - + new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false); List commitMessages = new ArrayList<>(); Dataset totalInputRows = null; DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong()); @@ -142,8 +191,7 @@ public class TestHoodieDataSourceInternalWriter extends String instantTime = "00" + i; // init writer HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); - + new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false); List commitMessages = new ArrayList<>(); Dataset totalInputRows = null; DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong()); @@ -189,7 +237,7 @@ public class TestHoodieDataSourceInternalWriter extends String instantTime0 = "00" + 0; // init writer HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); + new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false); DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong()); List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); @@ -227,7 +275,7 @@ public class TestHoodieDataSourceInternalWriter extends // 2nd batch. abort in the end String instantTime1 = "00" + 1; dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); + new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false); writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(1, RANDOM.nextLong(), RANDOM.nextLong()); for (int j = 0; j < batches; j++) { diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java index d6188f43a..6860b7e76 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java @@ -53,6 +53,6 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider { // 1st arg to createHooodieConfig is not really reuqired to be set. but passing it anyways. HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(properties.get(HoodieWriteConfig.AVRO_SCHEMA.key()), path, tblName, properties); return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(), - getConfiguration(), arePartitionRecordsSorted); + getConfiguration(), properties, arePartitionRecordsSorted); } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java index c89758a37..a40053e97 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java @@ -18,6 +18,7 @@ package org.apache.hudi.spark3.internal; +import org.apache.hudi.DataSourceUtils; import org.apache.hudi.client.HoodieInternalWriteStatus; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; @@ -33,7 +34,9 @@ import org.apache.spark.sql.connector.write.PhysicalWriteInfo; import org.apache.spark.sql.types.StructType; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -47,15 +50,17 @@ public class HoodieDataSourceInternalBatchWrite implements BatchWrite { private final StructType structType; private final boolean arePartitionRecordsSorted; private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper; + private Map extraMetadata = new HashMap<>(); public HoodieDataSourceInternalBatchWrite(String instantTime, HoodieWriteConfig writeConfig, StructType structType, - SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) { + SparkSession jss, Configuration hadoopConfiguration, Map properties, boolean arePartitionRecordsSorted) { this.instantTime = instantTime; this.writeConfig = writeConfig; this.structType = structType; this.arePartitionRecordsSorted = arePartitionRecordsSorted; + this.extraMetadata = DataSourceUtils.getExtraMetadata(properties); this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType, - jss, hadoopConfiguration); + jss, hadoopConfiguration, extraMetadata); } @Override diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java index 243b04d29..727861d87 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java @@ -26,6 +26,8 @@ import org.apache.spark.sql.connector.write.BatchWrite; import org.apache.spark.sql.connector.write.WriteBuilder; import org.apache.spark.sql.types.StructType; +import java.util.Map; + /** * Implementation of {@link WriteBuilder} for datasource "hudi.spark3.internal" to be used in datasource implementation * of bulk insert. @@ -38,20 +40,22 @@ public class HoodieDataSourceInternalBatchWriteBuilder implements WriteBuilder { private final SparkSession jss; private final Configuration hadoopConfiguration; private final boolean arePartitionRecordsSorted; + private final Map properties; public HoodieDataSourceInternalBatchWriteBuilder(String instantTime, HoodieWriteConfig writeConfig, StructType structType, - SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) { + SparkSession jss, Configuration hadoopConfiguration, Map properties, boolean arePartitionRecordsSorted) { this.instantTime = instantTime; this.writeConfig = writeConfig; this.structType = structType; this.jss = jss; this.hadoopConfiguration = hadoopConfiguration; this.arePartitionRecordsSorted = arePartitionRecordsSorted; + this.properties = properties; } @Override public BatchWrite buildForBatch() { return new HoodieDataSourceInternalBatchWrite(instantTime, writeConfig, structType, jss, - hadoopConfiguration, arePartitionRecordsSorted); + hadoopConfiguration, properties, arePartitionRecordsSorted); } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java index 436b9c3b2..f950787b5 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.write.WriteBuilder; import org.apache.spark.sql.types.StructType; import java.util.HashSet; +import java.util.Map; import java.util.Set; /** @@ -42,15 +43,18 @@ class HoodieDataSourceInternalTable implements SupportsWrite { private final SparkSession jss; private final Configuration hadoopConfiguration; private final boolean arePartitionRecordsSorted; + private final Map properties; public HoodieDataSourceInternalTable(String instantTime, HoodieWriteConfig config, - StructType schema, SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) { + StructType schema, SparkSession jss, Configuration hadoopConfiguration, Map properties, + boolean arePartitionRecordsSorted) { this.instantTime = instantTime; this.writeConfig = config; this.structType = schema; this.jss = jss; this.hadoopConfiguration = hadoopConfiguration; this.arePartitionRecordsSorted = arePartitionRecordsSorted; + this.properties = properties; } @Override @@ -75,6 +79,6 @@ class HoodieDataSourceInternalTable implements SupportsWrite { @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) { return new HoodieDataSourceInternalBatchWriteBuilder(instantTime, writeConfig, structType, jss, - hadoopConfiguration, arePartitionRecordsSorted); + hadoopConfiguration, properties, arePartitionRecordsSorted); } } diff --git a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java index bd02663b5..a9caebfa2 100644 --- a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java +++ b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java @@ -18,6 +18,8 @@ package org.apache.hudi.spark3.internal; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; @@ -34,13 +36,18 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder; import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Unit tests {@link HoodieDataSourceInternalBatchWrite}. @@ -50,13 +57,17 @@ public class TestHoodieDataSourceInternalBatchWrite extends @Test public void testDataSourceWriter() throws Exception { + testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP); + } + + private void testDataSourceWriterInternal(Map extraMetadata, Map expectedExtraMetadata) throws Exception { // init config and table HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); String instantTime = "001"; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, extraMetadata, false); DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; @@ -84,11 +95,49 @@ public class TestHoodieDataSourceInternalBatchWrite extends List commitMessages = new ArrayList<>(); commitMessages.add(commitMetadata); dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); + metaClient.reloadActiveTimeline(); Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); // verify output assertOutput(totalInputRows, result, instantTime, Option.empty()); assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty()); + + // verify extra metadata + Option commitMetadataOption = HoodieClientTestUtils.getCommitMetadataForLatestInstant(metaClient); + assertTrue(commitMetadataOption.isPresent()); + Map actualExtraMetadata = new HashMap<>(); + commitMetadataOption.get().getExtraMetadata().entrySet().stream().filter(entry -> + !entry.getKey().equals(HoodieCommitMetadata.SCHEMA_KEY)).forEach(entry -> actualExtraMetadata.put(entry.getKey(), entry.getValue())); + assertEquals(actualExtraMetadata, expectedExtraMetadata); + } + + @Test + public void testDataSourceWriterExtraCommitMetadata() throws Exception { + String commitExtraMetaPrefix = "commit_extra_meta_"; + Map extraMeta = new HashMap<>(); + extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key(), commitExtraMetaPrefix); + extraMeta.put(commitExtraMetaPrefix + "a", "valA"); + extraMeta.put(commitExtraMetaPrefix + "b", "valB"); + extraMeta.put("commit_extra_c", "valC"); // should not be part of commit extra metadata + + Map expectedMetadata = new HashMap<>(); + expectedMetadata.putAll(extraMeta); + expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key()); + expectedMetadata.remove("commit_extra_c"); + + testDataSourceWriterInternal(extraMeta, expectedMetadata); + } + + @Test + public void testDataSourceWriterEmptyExtraCommitMetadata() throws Exception { + String commitExtraMetaPrefix = "commit_extra_meta_"; + Map extraMeta = new HashMap<>(); + extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key(), commitExtraMetaPrefix); + extraMeta.put("keyA", "valA"); + extraMeta.put("keyB", "valB"); + extraMeta.put("commit_extra_c", "valC"); + // none of the keys has commit metadata key prefix. + testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP); } @Test @@ -103,8 +152,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends String instantTime = "00" + i; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); - + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false); List commitMessages = new ArrayList<>(); Dataset totalInputRows = null; DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); @@ -148,8 +196,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends String instantTime = "00" + i; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); - + new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false); List commitMessages = new ArrayList<>(); Dataset totalInputRows = null; DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); @@ -195,8 +242,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends String instantTime0 = "00" + 0; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); - + new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false); DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong()); List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); @@ -234,7 +280,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends // 2nd batch. abort in the end String instantTime1 = "00" + 1; dataSourceInternalBatchWrite = - new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false); + new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false); writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, RANDOM.nextLong()); for (int j = 0; j < batches; j++) {