1
0

[HUDI-1400] Replace Operation enum with WriteOperationType (#2259)

This commit is contained in:
wangxianghu
2020-11-19 13:40:04 +08:00
committed by GitHub
parent 4d05680038
commit a23230c8c2
6 changed files with 50 additions and 50 deletions

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -41,7 +42,6 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
@@ -264,32 +264,32 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
static class TestHelpers {
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, Operation op) {
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) {
return makeConfig(basePath, op, Collections.singletonList(DropAllTransformer.class.getName()));
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) {
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op) {
return makeConfig(basePath, op, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames) {
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames) {
return makeConfig(basePath, op, transformerClassNames, PROPS_FILENAME_TEST_SOURCE, false);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames,
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames,
String propsFilename, boolean enableHiveSync) {
return makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true,
false, null, null);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames,
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames,
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
String payloadClassName, String tableType) {
return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync,
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp");
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName,
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName,
List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
@@ -312,7 +312,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
return cfg;
}
static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op,
static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, WriteOperationType op,
boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
@@ -509,7 +509,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
String partitionPath = bootstrapPath + "/year=2016/month=05/day=01";
String filePath = partitionPath + "/kafka_topic1+0+100+200.parquet";
String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT);
HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
TypedProperties props =
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath);
@@ -529,7 +529,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
Exception e = assertThrows(IOException.class, () -> {
String tableBasePath = dfsBasePath + "/test_table";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc);
deltaStreamer.sync();
}, "Should error out when setting the key generator class property to an invalid value");
@@ -543,7 +543,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
Exception e = assertThrows(TableNotFoundException.class, () -> {
dfs.mkdirs(new Path(dfsBasePath + "/not_a_table"));
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", Operation.BULK_INSERT), jsc);
new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", WriteOperationType.BULK_INSERT), jsc);
deltaStreamer.sync();
}, "Should error out when pointed out at a dir thats not a table");
// expected
@@ -555,7 +555,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
String tableBasePath = dfsBasePath + "/test_table";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT);
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
@@ -570,7 +570,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// upsert() #1
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
cfg.operation = WriteOperationType.UPSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
@@ -624,7 +624,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT);
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = true;
cfg.tableType = tableType.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
@@ -666,7 +666,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
// Initial bulk insert to ingest to first hudi table
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
@@ -676,7 +676,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Now incrementally pull from the above hudi table and ingest to second table
HoodieDeltaStreamer.Config downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, Operation.BULK_INSERT,
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT,
true, null);
new HoodieDeltaStreamer(downstreamCfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
@@ -695,7 +695,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// with no change in upstream table, no change in downstream too when pulled.
HoodieDeltaStreamer.Config downstreamCfg1 =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath,
Operation.BULK_INSERT, true, DummySchemaProvider.class.getName());
WriteOperationType.BULK_INSERT, true, DummySchemaProvider.class.getName());
new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
@@ -704,7 +704,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// upsert() #1 on upstream hudi table
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
cfg.operation = WriteOperationType.UPSERT;
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
@@ -715,7 +715,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Incrementally pull changes in upstream hudi table and apply to downstream table
downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, Operation.UPSERT,
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.UPSERT,
false, null);
downstreamCfg.sourceLimit = 2000;
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
@@ -740,7 +740,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Test
public void testNullSchemaProvider() throws Exception {
String tableBasePath = dfsBasePath + "/test_table";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
false, false, null, null);
Exception e = assertThrows(HoodieException.class, () -> {
@@ -753,14 +753,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Test
public void testPayloadClassUpdate() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, false, null, "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext);
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
@@ -779,14 +779,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Test
public void testPayloadClassUpdateWithCOWTable() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset_cow";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, false, null, null);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext);
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
true, true, DummyAvroPayload.class.getName(), null);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
@@ -807,7 +807,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
String tableBasePath = dfsBasePath + "/test_dupes_table";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT);
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
@@ -815,7 +815,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Generate the same 1000 records + 1000 new ones for upsert
cfg.filterDupes = true;
cfg.sourceLimit = 2000;
cfg.operation = Operation.INSERT;
cfg.operation = WriteOperationType.INSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
@@ -827,10 +827,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Test with empty commits
HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT);
HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
cfg2.filterDupes = false;
cfg2.sourceLimit = 2000;
cfg2.operation = Operation.UPSERT;
cfg2.operation = WriteOperationType.UPSERT;
cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
ds2.sync();
@@ -847,7 +847,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Try UPSERT with filterDupes true. Expect exception
cfg2.filterDupes = true;
cfg2.operation = Operation.UPSERT;
cfg2.operation = WriteOperationType.UPSERT;
try {
new HoodieDeltaStreamer(cfg2, jsc).sync();
} catch (IllegalArgumentException e) {
@@ -897,7 +897,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null);
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(),
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
@@ -971,7 +971,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(
tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
tableBasePath, WriteOperationType.INSERT, CsvDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc);
deltaStreamer.sync();