diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index b09301075..446fd9cfc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -22,11 +22,13 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; import com.codahale.metrics.Timer; +import com.google.common.base.Preconditions; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -281,6 +283,7 @@ public class DeltaSync implements Serializable { avroRDDOptional = transformed.map(t -> AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() ); + // Use Transformed Row's schema if not overridden // Use Transformed Row's schema if not overridden. If target schema is not specified // default to RowBasedSchemaProvider @@ -299,11 +302,17 @@ public class DeltaSync implements Serializable { schemaProvider = dataAndCheckpoint.getSchemaProvider(); } - if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { - log.info("No new data, nothing to commit.. "); + if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) { + log.info("No new data, source checkpoint has not changed. Nothing to commit." + + "Old checkpoint=(" + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"); return null; } + if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { + log.info("No new data, perform empty commit."); + return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD())); + } + JavaRDD avroRDD = avroRDDOptional.get(); JavaRDD records = avroRDD.map(gr -> { HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, @@ -332,13 +341,10 @@ public class DeltaSync implements Serializable { cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(), writeClient.getTimelineServer()); - - if (records.isEmpty()) { - log.info("No new data, nothing to commit.. "); - return Option.empty(); - } } + boolean isEmpty = records.isEmpty(); + String commitTime = startCommit(); log.info("Starting commit : " + commitTime); @@ -379,10 +385,12 @@ public class DeltaSync implements Serializable { scheduledCompactionInstant = writeClient.scheduleCompaction(Option.of(checkpointCommitMetadata)); } - // Sync to hive if enabled - Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext(); - syncHive(); - hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0; + if (!isEmpty) { + // Sync to hive if enabled + Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext(); + syncHive(); + hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0; + } } else { log.info("Commit " + commitTime + " failed!"); throw new HoodieException("Commit " + commitTime + " failed!"); @@ -467,7 +475,6 @@ public class DeltaSync implements Serializable { private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() - .withProps(props) .withPath(cfg.targetBasePath) .combineInput(cfg.filterDupes, true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() @@ -476,12 +483,21 @@ public class DeltaSync implements Serializable { .withInlineCompaction(cfg.isInlineCompactionEnabled()).build()) .forTable(cfg.targetTableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .withAutoCommit(false); + .withAutoCommit(false) + .withProps(props); + if (null != schemaProvider && null != schemaProvider.getTargetSchema()) { builder = builder.withSchema(schemaProvider.getTargetSchema().toString()); } + HoodieWriteConfig config = builder.build(); - return builder.build(); + // Validate what deltastreamer assumes of write-config to be really safe + Preconditions.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled()); + Preconditions.checkArgument(!config.shouldAutoCommit()); + Preconditions.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes); + Preconditions.checkArgument(config.shouldCombineBeforeUpsert()); + + return config; } /** diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/Transformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/Transformer.java index 6983ec3d7..8b3e42a08 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/Transformer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/Transformer.java @@ -33,11 +33,11 @@ public interface Transformer { * Transform source RDD to target RDD * * @param jsc JavaSparkContext - * @param rowDataset Source DataSet * @param sparkSession Spark Session + * @param rowDataset Source DataSet * @param properties Config properties * @return Transformed Dataset */ - Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, + Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, TypedProperties properties); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index cce1034bd..7f513692c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -81,6 +81,7 @@ import org.junit.Test; * upserts, inserts. Check counts at the end. */ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { + private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static volatile Logger log = LogManager.getLogger(TestHoodieDeltaStreamer.class); @@ -156,6 +157,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } static class TestHelpers { + + static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, Operation op) { + return makeConfig(basePath, op, DropAllTransformer.class.getName()); + } + static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) { return makeConfig(basePath, op, TripsWithDistanceTransformer.class.getName()); } @@ -392,9 +398,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { * Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline * The first step involves using a SQL template to transform a source * TEST-DATA-SOURCE ============================> HUDI TABLE 1 ===============> HUDI TABLE 2 - * (incr-pull with transform) (incr-pull) + * (incr-pull with transform) (incr-pull) * Hudi Table 1 is synced with Hive. - * @throws Exception */ @Test public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception { @@ -491,6 +496,29 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { List counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext); assertEquals(1000, counts.get(0).getLong(1)); assertEquals(1000, counts.get(1).getLong(1)); + + // Test with empty commits + HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), datasetBasePath, true); + HoodieInstant lastFinished = + mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(datasetBasePath, Operation.UPSERT); + cfg2.filterDupes = true; + cfg2.sourceLimit = 2000; + cfg2.operation = Operation.UPSERT; + cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); + HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc); + ds2.sync(); + mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), datasetBasePath, true); + HoodieInstant newLastFinished = + mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + Assert.assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), lastFinished.getTimestamp(), + HoodieTimeline.GREATER)); + + // Ensure it is empty + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class); + System.out.println("New Commit Metadata=" + commitMetadata); + Assert.assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty()); } @Test @@ -513,7 +541,6 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { public static class DistanceUDF implements UDF4 { /** - * * Taken from https://stackoverflow.com/questions/3694380/calculating-distance-between-two-points-using-latitude- * longitude-what-am-i-doi * Calculate distance between two points in latitude and longitude taking @@ -522,6 +549,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { * * lat1, lon1 Start point lat2, lon2 End point el1 Start altitude in meters * el2 End altitude in meters + * * @returns Distance in Meters */ @Override @@ -566,4 +594,17 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { super(props); } } + + /** + * Return empty dataset + */ + public static class DropAllTransformer implements Transformer { + + @Override + public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, + TypedProperties properties) { + System.out.println("DropAllTransformer called !!"); + return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema()); + } + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java index 61a0c5c51..ec5f7db94 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java @@ -53,7 +53,7 @@ public class TestDataSource extends AbstractBaseTestSource { // No new data. if (sourceLimit <= 0) { - return new InputBatch<>(Option.empty(), commitTime); + return new InputBatch<>(Option.empty(), lastCheckpointStr.orElse(null)); } List records = fetchNextBatch(props, (int)sourceLimit, commitTime, DEFAULT_PARTITION_NUM)