1
0

[HUDI-227] : DeltaStreamer Improvements : Commit empty input batch with progressing checkpoints and allow users to override configs through properties. Original PR : PR-805 and PR-806 (#863)

This commit is contained in:
Balaji Varadarajan
2019-08-30 09:13:34 -07:00
committed by vinoth chandar
parent a6908ef44d
commit 376b59ae5f
4 changed files with 77 additions and 20 deletions

View File

@@ -22,11 +22,13 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.function.Function; import java.util.function.Function;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
@@ -281,6 +283,7 @@ public class DeltaSync implements Serializable {
avroRDDOptional = transformed.map(t -> avroRDDOptional = transformed.map(t ->
AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
); );
// Use Transformed Row's schema if not overridden // Use Transformed Row's schema if not overridden
// Use Transformed Row's schema if not overridden. If target schema is not specified // Use Transformed Row's schema if not overridden. If target schema is not specified
// default to RowBasedSchemaProvider // default to RowBasedSchemaProvider
@@ -299,11 +302,17 @@ public class DeltaSync implements Serializable {
schemaProvider = dataAndCheckpoint.getSchemaProvider(); schemaProvider = dataAndCheckpoint.getSchemaProvider();
} }
if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
log.info("No new data, nothing to commit.. "); log.info("No new data, source checkpoint has not changed. Nothing to commit."
+ "Old checkpoint=(" + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
return null; return null;
} }
if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
log.info("No new data, perform empty commit.");
return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));
}
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> { JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
@@ -332,13 +341,10 @@ public class DeltaSync implements Serializable {
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(), records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(),
writeClient.getTimelineServer()); writeClient.getTimelineServer());
if (records.isEmpty()) {
log.info("No new data, nothing to commit.. ");
return Option.empty();
}
} }
boolean isEmpty = records.isEmpty();
String commitTime = startCommit(); String commitTime = startCommit();
log.info("Starting commit : " + commitTime); log.info("Starting commit : " + commitTime);
@@ -379,10 +385,12 @@ public class DeltaSync implements Serializable {
scheduledCompactionInstant = writeClient.scheduleCompaction(Option.of(checkpointCommitMetadata)); scheduledCompactionInstant = writeClient.scheduleCompaction(Option.of(checkpointCommitMetadata));
} }
// Sync to hive if enabled if (!isEmpty) {
Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext(); // Sync to hive if enabled
syncHive(); Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0; syncHive();
hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
}
} else { } else {
log.info("Commit " + commitTime + " failed!"); log.info("Commit " + commitTime + " failed!");
throw new HoodieException("Commit " + commitTime + " failed!"); throw new HoodieException("Commit " + commitTime + " failed!");
@@ -467,7 +475,6 @@ public class DeltaSync implements Serializable {
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) { private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
HoodieWriteConfig.Builder builder = HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder() HoodieWriteConfig.newBuilder()
.withProps(props)
.withPath(cfg.targetBasePath) .withPath(cfg.targetBasePath)
.combineInput(cfg.filterDupes, true) .combineInput(cfg.filterDupes, true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -476,12 +483,21 @@ public class DeltaSync implements Serializable {
.withInlineCompaction(cfg.isInlineCompactionEnabled()).build()) .withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
.forTable(cfg.targetTableName) .forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withAutoCommit(false); .withAutoCommit(false)
.withProps(props);
if (null != schemaProvider && null != schemaProvider.getTargetSchema()) { if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
builder = builder.withSchema(schemaProvider.getTargetSchema().toString()); builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
} }
HoodieWriteConfig config = builder.build();
return builder.build(); // Validate what deltastreamer assumes of write-config to be really safe
Preconditions.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled());
Preconditions.checkArgument(!config.shouldAutoCommit());
Preconditions.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes);
Preconditions.checkArgument(config.shouldCombineBeforeUpsert());
return config;
} }
/** /**

View File

@@ -33,11 +33,11 @@ public interface Transformer {
* Transform source RDD to target RDD * Transform source RDD to target RDD
* *
* @param jsc JavaSparkContext * @param jsc JavaSparkContext
* @param rowDataset Source DataSet
* @param sparkSession Spark Session * @param sparkSession Spark Session
* @param rowDataset Source DataSet
* @param properties Config properties * @param properties Config properties
* @return Transformed Dataset * @return Transformed Dataset
*/ */
Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties); Dataset<Row> rowDataset, TypedProperties properties);
} }

View File

@@ -81,6 +81,7 @@ import org.junit.Test;
* upserts, inserts. Check counts at the end. * upserts, inserts. Check counts at the end.
*/ */
public class TestHoodieDeltaStreamer extends UtilitiesTestBase { public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static volatile Logger log = LogManager.getLogger(TestHoodieDeltaStreamer.class); private static volatile Logger log = LogManager.getLogger(TestHoodieDeltaStreamer.class);
@@ -156,6 +157,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
} }
static class TestHelpers { static class TestHelpers {
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, Operation op) {
return makeConfig(basePath, op, DropAllTransformer.class.getName());
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) { static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) {
return makeConfig(basePath, op, TripsWithDistanceTransformer.class.getName()); return makeConfig(basePath, op, TripsWithDistanceTransformer.class.getName());
} }
@@ -392,9 +398,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
* Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline * Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline
* The first step involves using a SQL template to transform a source * The first step involves using a SQL template to transform a source
* TEST-DATA-SOURCE ============================> HUDI TABLE 1 ===============> HUDI TABLE 2 * TEST-DATA-SOURCE ============================> HUDI TABLE 1 ===============> HUDI TABLE 2
* (incr-pull with transform) (incr-pull) * (incr-pull with transform) (incr-pull)
* Hudi Table 1 is synced with Hive. * Hudi Table 1 is synced with Hive.
* @throws Exception
*/ */
@Test @Test
public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception { public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception {
@@ -491,6 +496,29 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext); List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(1000, counts.get(0).getLong(1)); assertEquals(1000, counts.get(0).getLong(1));
assertEquals(1000, counts.get(1).getLong(1)); assertEquals(1000, counts.get(1).getLong(1));
// Test with empty commits
HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), datasetBasePath, true);
HoodieInstant lastFinished =
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(datasetBasePath, Operation.UPSERT);
cfg2.filterDupes = true;
cfg2.sourceLimit = 2000;
cfg2.operation = Operation.UPSERT;
cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
ds2.sync();
mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), datasetBasePath, true);
HoodieInstant newLastFinished =
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
Assert.assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), lastFinished.getTimestamp(),
HoodieTimeline.GREATER));
// Ensure it is empty
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class);
System.out.println("New Commit Metadata=" + commitMetadata);
Assert.assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());
} }
@Test @Test
@@ -513,7 +541,6 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
public static class DistanceUDF implements UDF4<Double, Double, Double, Double, Double> { public static class DistanceUDF implements UDF4<Double, Double, Double, Double, Double> {
/** /**
*
* Taken from https://stackoverflow.com/questions/3694380/calculating-distance-between-two-points-using-latitude- * Taken from https://stackoverflow.com/questions/3694380/calculating-distance-between-two-points-using-latitude-
* longitude-what-am-i-doi * longitude-what-am-i-doi
* Calculate distance between two points in latitude and longitude taking * Calculate distance between two points in latitude and longitude taking
@@ -522,6 +549,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
* *
* lat1, lon1 Start point lat2, lon2 End point el1 Start altitude in meters * lat1, lon1 Start point lat2, lon2 End point el1 Start altitude in meters
* el2 End altitude in meters * el2 End altitude in meters
*
* @returns Distance in Meters * @returns Distance in Meters
*/ */
@Override @Override
@@ -566,4 +594,17 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
super(props); super(props);
} }
} }
/**
* Return empty dataset
*/
public static class DropAllTransformer implements Transformer {
@Override
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
System.out.println("DropAllTransformer called !!");
return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
}
}
} }

View File

@@ -53,7 +53,7 @@ public class TestDataSource extends AbstractBaseTestSource {
// No new data. // No new data.
if (sourceLimit <= 0) { if (sourceLimit <= 0) {
return new InputBatch<>(Option.empty(), commitTime); return new InputBatch<>(Option.empty(), lastCheckpointStr.orElse(null));
} }
List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime, DEFAULT_PARTITION_NUM) List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime, DEFAULT_PARTITION_NUM)