[HUDI-114]: added option to overwrite payload implementation in hoodie.properties file
This commit is contained in:
committed by
Balaji Varadarajan
parent
5af3dc6aed
commit
3c90d252cc
@@ -193,12 +193,13 @@ public class DeltaSync implements Serializable {
|
||||
*/
|
||||
private void refreshTimeline() throws IOException {
|
||||
if (fs.exists(new Path(cfg.targetBasePath))) {
|
||||
HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath);
|
||||
HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath,
|
||||
cfg.payloadClassName);
|
||||
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
|
||||
} else {
|
||||
this.commitTimelineOpt = Option.empty();
|
||||
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
|
||||
cfg.storageType, cfg.targetTableName, "archived");
|
||||
cfg.storageType, cfg.targetTableName, "archived", cfg.payloadClassName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,7 +261,7 @@ public class DeltaSync implements Serializable {
|
||||
}
|
||||
} else {
|
||||
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
|
||||
cfg.storageType, cfg.targetTableName, "archived");
|
||||
cfg.storageType, cfg.targetTableName, "archived", cfg.payloadClassName);
|
||||
}
|
||||
|
||||
if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) {
|
||||
|
||||
@@ -19,8 +19,8 @@
|
||||
package org.apache.hudi.utilities.deltastreamer;
|
||||
|
||||
import org.apache.hudi.HoodieWriteClient;
|
||||
import org.apache.hudi.OverwriteWithLatestAvroPayload;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
|
||||
@@ -22,10 +22,13 @@ import org.apache.hudi.DataSourceWriteOptions;
|
||||
import org.apache.hudi.SimpleKeyGenerator;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.DFSPropertiesConfiguration;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.TypedProperties;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
@@ -48,6 +51,7 @@ import org.apache.hudi.utilities.transform.Transformer;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -71,6 +75,7 @@ import org.junit.Test;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
@@ -79,6 +84,7 @@ import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@@ -149,13 +155,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
TestDataSource.initDataGen();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws Exception {
|
||||
super.teardown();
|
||||
TestDataSource.resetDataGen();
|
||||
}
|
||||
|
||||
static class TestHelpers {
|
||||
@@ -174,15 +178,17 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
|
||||
String propsFilename, boolean enableHiveSync) {
|
||||
return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true);
|
||||
return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true,
|
||||
false, null, null);
|
||||
}
|
||||
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
|
||||
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) {
|
||||
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
|
||||
String payloadClassName, String storageType) {
|
||||
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
|
||||
cfg.targetBasePath = basePath;
|
||||
cfg.targetTableName = "hoodie_trips";
|
||||
cfg.storageType = "COPY_ON_WRITE";
|
||||
cfg.storageType = storageType == null ? "COPY_ON_WRITE" : storageType;
|
||||
cfg.sourceClassName = TestDataSource.class.getName();
|
||||
cfg.transformerClassName = transformerClassName;
|
||||
cfg.operation = op;
|
||||
@@ -190,6 +196,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
cfg.sourceOrderingField = "timestamp";
|
||||
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
|
||||
cfg.sourceLimit = 1000;
|
||||
if (updatePayloadClass) {
|
||||
cfg.payloadClassName = payloadClassName;
|
||||
}
|
||||
if (useSchemaProviderClass) {
|
||||
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
|
||||
}
|
||||
@@ -491,7 +500,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
String tableBasePath = dfsBasePath + "/test_table";
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
false);
|
||||
false, false, null, null);
|
||||
try {
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
||||
fail("Should error out when schema provider is not provided");
|
||||
@@ -501,6 +510,58 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPayloadClassUpdate() throws Exception {
|
||||
String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
true, false, null, "MERGE_ON_READ");
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
||||
TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext);
|
||||
|
||||
//now create one more deltaStreamer instance and update payload class
|
||||
cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
|
||||
|
||||
//now assert that hoodie.properties file now has updated payload class name
|
||||
Properties props = new Properties();
|
||||
String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
|
||||
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration());
|
||||
try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) {
|
||||
props.load(inputStream);
|
||||
}
|
||||
|
||||
assertEquals(props.getProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME), DummyAvroPayload.class.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPayloadClassUpdateWithCOWTable() throws Exception {
|
||||
String dataSetBasePath = dfsBasePath + "/test_dataset_cow";
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
true, false, null, null);
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
||||
TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext);
|
||||
|
||||
//now create one more deltaStreamer instance and update payload class
|
||||
cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
true, true, DummyAvroPayload.class.getName(), null);
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
|
||||
|
||||
//now assert that hoodie.properties file does not have payload class prop since it is a COW table
|
||||
Properties props = new Properties();
|
||||
String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
|
||||
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration());
|
||||
try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) {
|
||||
props.load(inputStream);
|
||||
}
|
||||
|
||||
assertFalse(props.containsKey(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterDupes() throws Exception {
|
||||
String tableBasePath = dfsBasePath + "/test_dupes_table";
|
||||
@@ -535,14 +596,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
ds2.sync();
|
||||
mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
|
||||
HoodieInstant newLastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
|
||||
Assert.assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), lastFinished.getTimestamp(),
|
||||
assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), lastFinished.getTimestamp(),
|
||||
HoodieTimeline.GREATER));
|
||||
|
||||
// Ensure it is empty
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class);
|
||||
System.out.println("New Commit Metadata=" + commitMetadata);
|
||||
Assert.assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());
|
||||
assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -598,6 +659,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload {
|
||||
|
||||
public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) {
|
||||
super(gr, orderingVal);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return empty table.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user