From 3c90d252cc464fbd4ec3554fc930e41a0fcaa29f Mon Sep 17 00:00:00 2001 From: pratyakshsharma Date: Thu, 9 Jan 2020 18:43:36 +0530 Subject: [PATCH] [HUDI-114]: added option to overwrite payload implementation in hoodie.properties file --- .../hudi/config/HoodieCompactionConfig.java | 4 +- .../hudi/common/model}/BaseAvroPayload.java | 6 +- .../OverwriteWithLatestAvroPayload.java | 14 ++-- .../hudi/common/table/HoodieTableConfig.java | 16 ++-- .../common/table/HoodieTableMetaClient.java | 25 ++++-- .../java/org/apache/hudi/QuickstartUtils.java | 1 + .../hudi/payload/AWSDmsAvroPayload.java | 2 +- .../org/apache/hudi/DataSourceOptions.scala | 1 + .../apache/hudi/HoodieSparkSqlWriter.scala | 2 +- .../test/scala/TestDataSourceDefaults.scala | 4 +- .../utilities/deltastreamer/DeltaSync.java | 7 +- .../deltastreamer/HoodieDeltaStreamer.java | 2 +- .../utilities/TestHoodieDeltaStreamer.java | 84 +++++++++++++++++-- 13 files changed, 129 insertions(+), 39 deletions(-) rename {hudi-spark/src/main/java/org/apache/hudi => hudi-common/src/main/java/org/apache/hudi/common/model}/BaseAvroPayload.java (92%) rename {hudi-spark/src/main/java/org/apache/hudi => hudi-common/src/main/java/org/apache/hudi/common/model}/OverwriteWithLatestAvroPayload.java (84%) diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 54d65356e..376f299f6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -18,8 +18,8 @@ package org.apache.hudi.config; -import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.io.compact.strategy.CompactionStrategy; import org.apache.hudi.io.compact.strategy.LogFileSizeBasedCompactionStrategy; @@ -82,7 +82,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { // 200GB of target IO per compaction public static final String DEFAULT_COMPACTION_STRATEGY = LogFileSizeBasedCompactionStrategy.class.getName(); // used to merge records written to log file - public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName(); + public static final String DEFAULT_PAYLOAD_CLASS = OverwriteWithLatestAvroPayload.class.getName(); public static final String PAYLOAD_CLASS_PROP = "hoodie.compaction.payload.class"; // used to choose a trade off between IO vs Memory when performing compaction process diff --git a/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java similarity index 92% rename from hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java rename to hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java index 3bf07d98b..8ed925bd4 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi; +package org.apache.hudi.common.model; import org.apache.hudi.common.util.HoodieAvroUtils; import org.apache.hudi.exception.HoodieException; @@ -34,7 +34,7 @@ public abstract class BaseAvroPayload implements Serializable { /** * Avro data extracted from the source converted to bytes. */ - protected final byte[] recordBytes; + public final byte[] recordBytes; /** * For purposes of preCombining. @@ -49,7 +49,7 @@ public abstract class BaseAvroPayload implements Serializable { */ public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { try { - this.recordBytes = HoodieAvroUtils.avroToBytes(record); + this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0]; } catch (IOException io) { throw new HoodieIOException("Cannot convert GenericRecord to bytes", io); } diff --git a/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java similarity index 84% rename from hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java rename to hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index 32d584eab..88811f53b 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.hudi; +package org.apache.hudi.common.model; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.HoodieAvroUtils; import org.apache.hudi.common.util.Option; @@ -45,7 +44,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload } public OverwriteWithLatestAvroPayload(Option record) { - this(record.get(), (record1) -> 0); // natural order + this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order } @Override @@ -61,7 +60,12 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { - GenericRecord genericRecord = (GenericRecord) getInsertValue(schema).get(); + Option recordOption = getInsertValue(schema); + if (!recordOption.isPresent()) { + return Option.empty(); + } + + GenericRecord genericRecord = (GenericRecord) recordOption.get(); // combining strategy here trivially ignores currentValue on disk and writes this record Object deleteMarker = genericRecord.get("_hoodie_is_deleted"); if (deleteMarker instanceof Boolean && (boolean) deleteMarker) { @@ -73,6 +77,6 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload @Override public Option getInsertValue(Schema schema) throws IOException { - return Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema)); + return recordBytes.length == 0 ? Option.empty() : Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema)); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index a63b88558..b19456536 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -18,9 +18,9 @@ package org.apache.hudi.common.table; -import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.TimelineLayoutVersion; import org.apache.hudi.exception.HoodieIOException; @@ -62,13 +62,12 @@ public class HoodieTableConfig implements Serializable { public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE; public static final HoodieFileFormat DEFAULT_RO_FILE_FORMAT = HoodieFileFormat.PARQUET; public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG; + public static final String DEFAULT_PAYLOAD_CLASS = OverwriteWithLatestAvroPayload.class.getName(); public static final Integer DEFAULT_TIMELINE_LAYOUT_VERSION = TimelineLayoutVersion.VERSION_0; - - public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName(); public static final String DEFAULT_ARCHIVELOG_FOLDER = ""; private Properties props; - public HoodieTableConfig(FileSystem fs, String metaPath) { + public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) { Properties props = new Properties(); Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE); LOG.info("Loading table properties from " + propertyPath); @@ -76,6 +75,13 @@ public class HoodieTableConfig implements Serializable { try (FSDataInputStream inputStream = fs.open(propertyPath)) { props.load(inputStream); } + if (props.containsKey(HOODIE_PAYLOAD_CLASS_PROP_NAME) && payloadClassName != null + && !props.getProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME).equals(payloadClassName)) { + props.setProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME, payloadClassName); + try (FSDataOutputStream outputStream = fs.create(propertyPath)) { + props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); + } + } } catch (IOException e) { throw new HoodieIOException("Could not load Hoodie properties from " + propertyPath, e); } @@ -109,7 +115,7 @@ public class HoodieTableConfig implements Serializable { if (!properties.containsKey(HOODIE_TABLE_TYPE_PROP_NAME)) { properties.setProperty(HOODIE_TABLE_TYPE_PROP_NAME, DEFAULT_TABLE_TYPE.name()); } - if (properties.getProperty(HOODIE_TABLE_TYPE_PROP_NAME) == HoodieTableType.MERGE_ON_READ.name() + if (properties.getProperty(HOODIE_TABLE_TYPE_PROP_NAME).equals(HoodieTableType.MERGE_ON_READ.name()) && !properties.containsKey(HOODIE_PAYLOAD_CLASS_PROP_NAME)) { properties.setProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME, DEFAULT_PAYLOAD_CLASS); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 24432739e..1d27d81b7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -89,13 +89,22 @@ public class HoodieTableMetaClient implements Serializable { this(conf, basePath, false); } - public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad) { - this(conf, basePath, loadActiveTimelineOnLoad, ConsistencyGuardConfig.newBuilder().build(), - Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION)); + public HoodieTableMetaClient(Configuration conf, String basePath, String payloadClassName) { + this(conf, basePath, false, ConsistencyGuardConfig.newBuilder().build(), Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION), + payloadClassName); } public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, - ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion) + ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion) { + this(conf, basePath, loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, null); + } + + public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad) { + this(conf, basePath, loadActiveTimelineOnLoad, ConsistencyGuardConfig.newBuilder().build(), Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION), null); + } + + public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, + ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, String payloadClassName) throws TableNotFoundException { LOG.info("Loading HoodieTableMetaClient from " + basePath); this.basePath = basePath; @@ -106,7 +115,7 @@ public class HoodieTableMetaClient implements Serializable { Path metaPathDir = new Path(this.metaPath); this.fs = getFs(); TableNotFoundException.checkTableValidity(fs, basePathDir, metaPathDir); - this.tableConfig = new HoodieTableConfig(fs, metaPath); + this.tableConfig = new HoodieTableConfig(fs, metaPath, payloadClassName); this.tableType = tableConfig.getTableType(); this.timelineLayoutVersion = layoutVersion.orElse(tableConfig.getTimelineLayoutVersion()); this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad; @@ -127,7 +136,7 @@ public class HoodieTableMetaClient implements Serializable { public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) { return new HoodieTableMetaClient(oldMetaClient.hadoopConf.get(), oldMetaClient.basePath, oldMetaClient.loadActiveTimelineOnLoad, oldMetaClient.consistencyGuardConfig, - Option.of(oldMetaClient.timelineLayoutVersion)); + Option.of(oldMetaClient.timelineLayoutVersion), null); } /** @@ -284,9 +293,9 @@ public class HoodieTableMetaClient implements Serializable { * Helper method to initialize a table, with given basePath, tableType, name, archiveFolder. */ public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, String tableType, - String tableName, String archiveLogFolder) throws IOException { + String tableName, String archiveLogFolder, String payloadClassName) throws IOException { return initTableType(hadoopConf, basePath, HoodieTableType.valueOf(tableType), tableName, - archiveLogFolder, null, null); + archiveLogFolder, payloadClassName, null); } /** diff --git a/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java b/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java index 22276e254..0cdd26fc7 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.HoodieAvroUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java index cc6801633..975151c3c 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java +++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java @@ -18,7 +18,7 @@ package org.apache.hudi.payload; -import org.apache.hudi.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.Option; import org.apache.avro.Schema; diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 16b5d7925..e21e025a0 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -18,6 +18,7 @@ package org.apache.hudi import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor /** diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index a8f14b8fb..5e6100280 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -120,7 +120,7 @@ private[hudi] object HoodieSparkSqlWriter { // Create the table if not present if (!exists) { HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType, - tblName.get, "archived") + tblName.get, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) } // Create a HoodieWriteClient & issue the write. diff --git a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala index 0b550dc8c..f4b4fecd6 100644 --- a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala +++ b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala @@ -16,10 +16,10 @@ */ import org.apache.avro.generic.GenericRecord -import org.apache.hudi.common.model.EmptyHoodieRecordPayload +import org.apache.hudi.common.model.{EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.util.{Option, SchemaTestUtil, TypedProperties} import org.apache.hudi.exception.{HoodieException, HoodieKeyException} -import org.apache.hudi.{ComplexKeyGenerator, DataSourceWriteOptions, OverwriteWithLatestAvroPayload, SimpleKeyGenerator} +import org.apache.hudi.{ComplexKeyGenerator, DataSourceWriteOptions, SimpleKeyGenerator} import org.junit.Assert._ import org.junit.{Before, Test} import org.scalatest.junit.AssertionsForJUnit diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 621f1baa8..291b4b8db 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -193,12 +193,13 @@ public class DeltaSync implements Serializable { */ private void refreshTimeline() throws IOException { if (fs.exists(new Path(cfg.targetBasePath))) { - HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath); + HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath, + cfg.payloadClassName); this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); } else { this.commitTimelineOpt = Option.empty(); HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, - cfg.storageType, cfg.targetTableName, "archived"); + cfg.storageType, cfg.targetTableName, "archived", cfg.payloadClassName); } } @@ -260,7 +261,7 @@ public class DeltaSync implements Serializable { } } else { HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, - cfg.storageType, cfg.targetTableName, "archived"); + cfg.storageType, cfg.targetTableName, "archived", cfg.payloadClassName); } if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 4d05092fd..a24add722 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -19,8 +19,8 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.HoodieWriteClient; -import org.apache.hudi.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 4e879defc..36ebb41ca 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -22,10 +22,13 @@ import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.SimpleKeyGenerator; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.DFSPropertiesConfiguration; +import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.config.HoodieCompactionConfig; @@ -48,6 +51,7 @@ import org.apache.hudi.utilities.transform.Transformer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -71,6 +75,7 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -79,6 +84,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -149,13 +155,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { @Before public void setup() throws Exception { super.setup(); - TestDataSource.initDataGen(); } @After public void teardown() throws Exception { super.teardown(); - TestDataSource.resetDataGen(); } static class TestHelpers { @@ -174,15 +178,17 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, String propsFilename, boolean enableHiveSync) { - return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true); + return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true, + false, null, null); } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, - String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) { + String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, + String payloadClassName, String storageType) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; - cfg.storageType = "COPY_ON_WRITE"; + cfg.storageType = storageType == null ? "COPY_ON_WRITE" : storageType; cfg.sourceClassName = TestDataSource.class.getName(); cfg.transformerClassName = transformerClassName; cfg.operation = op; @@ -190,6 +196,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { cfg.sourceOrderingField = "timestamp"; cfg.propsFilePath = dfsBasePath + "/" + propsFilename; cfg.sourceLimit = 1000; + if (updatePayloadClass) { + cfg.payloadClassName = payloadClassName; + } if (useSchemaProviderClass) { cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); } @@ -491,7 +500,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { String tableBasePath = dfsBasePath + "/test_table"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT, SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, - false); + false, false, null, null); try { new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); fail("Should error out when schema provider is not provided"); @@ -501,6 +510,58 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } } + @Test + public void testPayloadClassUpdate() throws Exception { + String dataSetBasePath = dfsBasePath + "/test_dataset_mor"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, + SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, + true, false, null, "MERGE_ON_READ"); + new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); + TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext); + + //now create one more deltaStreamer instance and update payload class + cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, + SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, + true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ"); + new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()); + + //now assert that hoodie.properties file now has updated payload class name + Properties props = new Properties(); + String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties"; + FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration()); + try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) { + props.load(inputStream); + } + + assertEquals(props.getProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME), DummyAvroPayload.class.getName()); + } + + @Test + public void testPayloadClassUpdateWithCOWTable() throws Exception { + String dataSetBasePath = dfsBasePath + "/test_dataset_cow"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, + SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, + true, false, null, null); + new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); + TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext); + + //now create one more deltaStreamer instance and update payload class + cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, + SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, + true, true, DummyAvroPayload.class.getName(), null); + new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()); + + //now assert that hoodie.properties file does not have payload class prop since it is a COW table + Properties props = new Properties(); + String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties"; + FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration()); + try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) { + props.load(inputStream); + } + + assertFalse(props.containsKey(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME)); + } + @Test public void testFilterDupes() throws Exception { String tableBasePath = dfsBasePath + "/test_dupes_table"; @@ -535,14 +596,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { ds2.sync(); mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true); HoodieInstant newLastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); - Assert.assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), lastFinished.getTimestamp(), + assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), lastFinished.getTimestamp(), HoodieTimeline.GREATER)); // Ensure it is empty HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class); System.out.println("New Commit Metadata=" + commitMetadata); - Assert.assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty()); + assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty()); } @Test @@ -598,6 +659,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } } + public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload { + + public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) { + super(gr, orderingVal); + } + } + /** * Return empty table. */