diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java index 6e5027b7f..0387731d6 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java @@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.deltastreamer.DeltaSync; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; @@ -38,30 +39,30 @@ public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer { super(cfg, jssc); } - public JavaRDD upsert(Operation operation) throws Exception { + public JavaRDD upsert(WriteOperationType operation) throws Exception { cfg.operation = operation; return deltaSyncService.get().getDeltaSync().syncOnce().getRight(); } public JavaRDD insert() throws Exception { - return upsert(Operation.INSERT); + return upsert(WriteOperationType.INSERT); } public JavaRDD bulkInsert() throws Exception { - return upsert(Operation.BULK_INSERT); + return upsert(WriteOperationType.BULK_INSERT); } public void scheduleCompact() throws Exception { // Since we don't support scheduleCompact() operation in delta-streamer, assume upsert without any data that will // trigger scheduling compaction - upsert(Operation.UPSERT); + upsert(WriteOperationType.UPSERT); } public JavaRDD compact() throws Exception { // Since we don't support compact() operation in delta-streamer, assume upsert without any data that will trigger // inline compaction - return upsert(Operation.UPSERT); + return upsert(WriteOperationType.UPSERT); } public Pair>> fetchSource() throws Exception { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index a5c1b0ef2..bf6fca70e 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -25,6 +25,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -38,7 +39,6 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode; import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode; import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; -import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -126,7 +126,7 @@ public class HoodieTestSuiteWriter { public JavaRDD upsert(Option instantTime) throws Exception { if (cfg.useDeltaStreamer) { - return deltaStreamerWrapper.upsert(Operation.UPSERT); + return deltaStreamerWrapper.upsert(WriteOperationType.UPSERT); } else { Pair>> nextBatch = fetchSource(); lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index a268c7baa..a27ce996d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -378,7 +379,7 @@ public class DeltaSync implements Serializable { return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD())); } - boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(HoodieDeltaStreamer.Operation.UPSERT); + boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT); JavaRDD avroRDD = avroRDDOptional.get(); JavaRDD records = avroRDD.map(gr -> { HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index a6de17d81..6d25f4665 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; @@ -192,15 +193,11 @@ public class HoodieDeltaStreamer implements Serializable { return true; } - public enum Operation { - UPSERT, INSERT, BULK_INSERT - } - - protected static class OperationConverter implements IStringConverter { + protected static class OperationConverter implements IStringConverter { @Override - public Operation convert(String value) throws ParameterException { - return Operation.valueOf(value); + public WriteOperationType convert(String value) throws ParameterException { + return WriteOperationType.valueOf(value); } } @@ -272,7 +269,7 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " + "is purely new data/inserts to gain speed)", converter = OperationConverter.class) - public Operation operation = Operation.UPSERT; + public WriteOperationType operation = WriteOperationType.UPSERT; @Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert") @@ -552,7 +549,7 @@ public class HoodieDeltaStreamer implements Serializable { } } - ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT, + ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != WriteOperationType.UPSERT, "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); this.props = properties.get(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 5333b64fe..9d5ca3ca8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -22,6 +22,7 @@ import com.beust.jcommander.Parameter; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.config.TypedProperties; @@ -69,7 +70,7 @@ public class HoodieMultiTableDeltaStreamer { this.jssc = jssc; String commonPropsFile = config.propsFilePath; String configFolder = config.configFolder; - ValidationUtils.checkArgument(!config.filterDupes || config.operation != HoodieDeltaStreamer.Operation.UPSERT, + ValidationUtils.checkArgument(!config.filterDupes || config.operation != WriteOperationType.UPSERT, "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration()); configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder; @@ -268,7 +269,7 @@ public class HoodieMultiTableDeltaStreamer { @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " + "is purely new data/inserts to gain speed)", converter = HoodieDeltaStreamer.OperationConverter.class) - public HoodieDeltaStreamer.Operation operation = HoodieDeltaStreamer.Operation.UPSERT; + public WriteOperationType operation = WriteOperationType.UPSERT; @Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert") diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index e7bb15a3c..9b0097e8b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -41,7 +42,6 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; -import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.CsvDFSSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; @@ -264,32 +264,32 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { static class TestHelpers { - static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, Operation op) { + static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) { return makeConfig(basePath, op, Collections.singletonList(DropAllTransformer.class.getName())); } - static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) { + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op) { return makeConfig(basePath, op, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } - static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List transformerClassNames) { + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List transformerClassNames) { return makeConfig(basePath, op, transformerClassNames, PROPS_FILENAME_TEST_SOURCE, false); } - static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List transformerClassNames, + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List transformerClassNames, String propsFilename, boolean enableHiveSync) { return makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true, false, null, null); } - static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List transformerClassNames, + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, String payloadClassName, String tableType) { return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp"); } - static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName, + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, List transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); @@ -312,7 +312,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { return cfg; } - static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op, + static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, WriteOperationType op, boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; @@ -509,7 +509,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { String partitionPath = bootstrapPath + "/year=2016/month=05/day=01"; String filePath = partitionPath + "/kafka_topic1+0+100+200.parquet"; String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider"; - HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT); + HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT); TypedProperties props = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig(); props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath); @@ -529,7 +529,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { Exception e = assertThrows(IOException.class, () -> { String tableBasePath = dfsBasePath + "/test_table"; HoodieDeltaStreamer deltaStreamer = - new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT, + new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc); deltaStreamer.sync(); }, "Should error out when setting the key generator class property to an invalid value"); @@ -543,7 +543,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { Exception e = assertThrows(TableNotFoundException.class, () -> { dfs.mkdirs(new Path(dfsBasePath + "/not_a_table")); HoodieDeltaStreamer deltaStreamer = - new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", Operation.BULK_INSERT), jsc); + new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", WriteOperationType.BULK_INSERT), jsc); deltaStreamer.sync(); }, "Should error out when pointed out at a dir thats not a table"); // expected @@ -555,7 +555,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { String tableBasePath = dfsBasePath + "/test_table"; // Initial bulk insert - HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT); + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); new HoodieDeltaStreamer(cfg, jsc).sync(); TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext); @@ -570,7 +570,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // upsert() #1 cfg.sourceLimit = 2000; - cfg.operation = Operation.UPSERT; + cfg.operation = WriteOperationType.UPSERT; new HoodieDeltaStreamer(cfg, jsc).sync(); TestHelpers.assertRecordCount(1950, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1950, tableBasePath + "/*/*.parquet", sqlContext); @@ -624,7 +624,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { int totalRecords = 3000; // Initial bulk insert - HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT); + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.continuousMode = true; cfg.tableType = tableType.name(); cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); @@ -666,7 +666,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips"); // Initial bulk insert to ingest to first hudi table - HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT, + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext); @@ -676,7 +676,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Now incrementally pull from the above hudi table and ingest to second table HoodieDeltaStreamer.Config downstreamCfg = - TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, Operation.BULK_INSERT, + TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, null); new HoodieDeltaStreamer(downstreamCfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext); @@ -695,7 +695,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // with no change in upstream table, no change in downstream too when pulled. HoodieDeltaStreamer.Config downstreamCfg1 = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, - Operation.BULK_INSERT, true, DummySchemaProvider.class.getName()); + WriteOperationType.BULK_INSERT, true, DummySchemaProvider.class.getName()); new HoodieDeltaStreamer(downstreamCfg1, jsc).sync(); TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext); @@ -704,7 +704,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // upsert() #1 on upstream hudi table cfg.sourceLimit = 2000; - cfg.operation = Operation.UPSERT; + cfg.operation = WriteOperationType.UPSERT; new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1950, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1950, tableBasePath + "/*/*.parquet", sqlContext); @@ -715,7 +715,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Incrementally pull changes in upstream hudi table and apply to downstream table downstreamCfg = - TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, Operation.UPSERT, + TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.UPSERT, false, null); downstreamCfg.sourceLimit = 2000; new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); @@ -740,7 +740,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { @Test public void testNullSchemaProvider() throws Exception { String tableBasePath = dfsBasePath + "/test_table"; - HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT, + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, false, false, null, null); Exception e = assertThrows(HoodieException.class, () -> { @@ -753,14 +753,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { @Test public void testPayloadClassUpdate() throws Exception { String dataSetBasePath = dfsBasePath + "/test_dataset_mor"; - HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, false, null, "MERGE_ON_READ"); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext); //now create one more deltaStreamer instance and update payload class - cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, + cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ"); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()); @@ -779,14 +779,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { @Test public void testPayloadClassUpdateWithCOWTable() throws Exception { String dataSetBasePath = dfsBasePath + "/test_dataset_cow"; - HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, false, null, null); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext); //now create one more deltaStreamer instance and update payload class - cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, + cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, true, DummyAvroPayload.class.getName(), null); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()); @@ -807,7 +807,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { String tableBasePath = dfsBasePath + "/test_dupes_table"; // Initial bulk insert - HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT); + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); new HoodieDeltaStreamer(cfg, jsc).sync(); TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1); @@ -815,7 +815,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Generate the same 1000 records + 1000 new ones for upsert cfg.filterDupes = true; cfg.sourceLimit = 2000; - cfg.operation = Operation.INSERT; + cfg.operation = WriteOperationType.INSERT; new HoodieDeltaStreamer(cfg, jsc).sync(); TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2); @@ -827,10 +827,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Test with empty commits HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true); HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); - HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT); + HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT); cfg2.filterDupes = false; cfg2.sourceLimit = 2000; - cfg2.operation = Operation.UPSERT; + cfg2.operation = WriteOperationType.UPSERT; cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc); ds2.sync(); @@ -847,7 +847,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Try UPSERT with filterDupes true. Expect exception cfg2.filterDupes = true; - cfg2.operation = Operation.UPSERT; + cfg2.operation = WriteOperationType.UPSERT; try { new HoodieDeltaStreamer(cfg2, jsc).sync(); } catch (IllegalArgumentException e) { @@ -897,7 +897,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null); String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( - TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(), + TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider, 100000, false, null, null, "timestamp"), jsc); deltaStreamer.sync(); @@ -971,7 +971,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0"; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig( - tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(), + tableBasePath, WriteOperationType.INSERT, CsvDFSSource.class.getName(), transformerClassNames, PROPS_FILENAME_TEST_CSV, false, useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc); deltaStreamer.sync();