diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 91f09da1d..c376243ba 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -460,7 +460,7 @@ public class DeltaSync implements Serializable { schemaProvider = dataAndCheckpoint.getSchemaProvider(); } - if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) { + if (!cfg.allowCommitOnNoCheckpointChange && Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) { LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"); String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType)); @@ -596,7 +596,9 @@ public class DeltaSync implements Serializable { long metaSyncTimeMs = 0; if (!hasErrors || cfg.commitOnErrors) { HashMap checkpointCommitMetadata = new HashMap<>(); - checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr); + if (checkpointStr != null) { + checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr); + } if (cfg.checkpoint != null) { checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 8edd45d44..65cf2c3d3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -378,6 +378,10 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--max-retry-count"}, description = "the max retry count if --retry-on-source-failures is enabled") public Integer maxRetryCount = 3; + @Parameter(names = {"--allow-commit-on-no-checkpoint-change"}, description = "allow commits even if checkpoint has not changed before and after fetch data" + + "from souce. This might be useful in sources like SqlSource where there is not checkpoint. And is not recommended to enable in continuous mode.") + public Boolean allowCommitOnNoCheckpointChange = false; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java index d832e43d2..056e03517 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java @@ -48,6 +48,8 @@ import java.util.Collections; *

To fetch and use the latest incremental checkpoint, you need to also set this hoodie_conf for deltastremer jobs: * *

hoodie.write.meta.key.prefixes = 'deltastreamer.checkpoint.key' + * + * Also, users are expected to set --allow-commit-on-no-checkpoint-change while using this SqlSource. */ public class SqlSource extends RowSource { private static final long serialVersionUID = 1L; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java index b24faf7c0..9b7ee3b30 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java @@ -61,6 +61,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; static final String PROPS_FILENAME_TEST_ORC = "test-orc-dfs-source.properties"; static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties"; + static final String PROPS_FILENAME_TEST_SQL_SOURCE = "test-sql-source-source.properties"; static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties"; static final String FIRST_PARQUET_FILE_NAME = "1.parquet"; static final String FIRST_ORC_FILE_NAME = "1.orc"; @@ -71,6 +72,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { static final int ORC_NUM_RECORDS = 5; static final int CSV_NUM_RECORDS = 3; static final int JSON_KAFKA_NUM_RECORDS = 5; + static final int SQL_SOURCE_NUM_RECORDS = 1000; String kafkaCheckpointType = "string"; // Required fields static final String TGT_BASE_PATH_PARAM = "--target-base-path"; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 7fabcaeb6..afd888e96 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -64,6 +64,7 @@ import org.apache.hudi.utilities.sources.JdbcSource; import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.ORCDFSSource; import org.apache.hudi.utilities.sources.ParquetDFSSource; +import org.apache.hudi.utilities.sources.SqlSource; import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.testutils.JdbcTestUtils; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -208,6 +209,14 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { List transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, String checkpoint) { + return makeConfig(basePath, op, sourceClassName, transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, updatePayloadClass, payloadClassName, + tableType, sourceOrderingField, checkpoint, false); + } + + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, + List transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, + int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, + String checkpoint, boolean allowCommitOnNoCheckpointChange) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; @@ -226,6 +235,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { if (useSchemaProviderClass) { cfg.schemaProviderClassName = defaultSchemaProviderClassName; } + cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange; return cfg; } @@ -1701,6 +1711,41 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } + private void prepareSqlSource() throws IOException { + String sourceRoot = dfsBasePath + "sqlSourceFiles"; + TypedProperties sqlSourceProps = new TypedProperties(); + sqlSourceProps.setProperty("include", "base.properties"); + sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false"); + sqlSourceProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query","select * from test_sql_table"); + + UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SQL_SOURCE); + + // Data generation + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + generateSqlSourceTestTable(sourceRoot, "1", "1000", SQL_SOURCE_NUM_RECORDS, dataGenerator); + } + + private void generateSqlSourceTestTable(String dfsRoot, String filename, String instantTime, int n, HoodieTestDataGenerator dataGenerator) throws IOException { + Path path = new Path(dfsRoot, filename); + Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts(instantTime, n, false)), path); + sparkSession.read().parquet(dfsRoot).createOrReplaceTempView("test_sql_table"); + } + + @Test + public void testSqlSourceSource() throws Exception { + prepareSqlSource(); + String tableBasePath = dfsBasePath + "/test_sql_source_table" + testNum++; + HoodieDeltaStreamer deltaStreamer = + new HoodieDeltaStreamer(TestHelpers.makeConfig( + tableBasePath, WriteOperationType.INSERT, SqlSource.class.getName(), + Collections.emptyList(), PROPS_FILENAME_TEST_SQL_SOURCE, false, + false, 1000, false, null, null, "timestamp", null, true), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + } + @Test public void testJdbcSourceIncrementalFetchInContinuousMode() { try (Connection connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc")) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java index 9c3d5584a..e4ca51842 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -135,6 +136,23 @@ public class TestSqlSource extends UtilitiesTestBase { assertEquals(10000, fetch1AsRows.getBatch().get().count()); } + /** + * Runs the test scenario of reading data from the source in row format. + * Source has no records. + * + * @throws IOException + */ + @Test + public void testSqlSourceCheckpoint() throws IOException { + props.setProperty(sqlSourceConfig, "select * from test_sql_table where 1=0"); + sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider); + sourceFormatAdapter = new SourceFormatAdapter(sqlSource); + + InputBatch> fetch1AsRows = + sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); + assertNull(fetch1AsRows.getCheckpointForNextBatch()); + } + /** * Runs the test scenario of reading data from the source in row format. * Source has more records than source limit.