[HUDI-2413] fix Sql source's checkpoint issue (#3648)
* [HUDI-2413] fix Sql source's checkpoint * Fixing sql source checkpoint handling * Fixing docs Co-authored-by: jian.feng <fengjian428@gmial.com> Co-authored-by: sivabalan <n.siva.b@gmail.com>
This commit is contained in:
@@ -460,7 +460,7 @@ public class DeltaSync implements Serializable {
|
||||
schemaProvider = dataAndCheckpoint.getSchemaProvider();
|
||||
}
|
||||
|
||||
if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
|
||||
if (!cfg.allowCommitOnNoCheckpointChange && Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
|
||||
LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=("
|
||||
+ resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
|
||||
String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
|
||||
@@ -596,7 +596,9 @@ public class DeltaSync implements Serializable {
|
||||
long metaSyncTimeMs = 0;
|
||||
if (!hasErrors || cfg.commitOnErrors) {
|
||||
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
|
||||
checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
|
||||
if (checkpointStr != null) {
|
||||
checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
|
||||
}
|
||||
if (cfg.checkpoint != null) {
|
||||
checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint);
|
||||
}
|
||||
|
||||
@@ -378,6 +378,10 @@ public class HoodieDeltaStreamer implements Serializable {
|
||||
@Parameter(names = {"--max-retry-count"}, description = "the max retry count if --retry-on-source-failures is enabled")
|
||||
public Integer maxRetryCount = 3;
|
||||
|
||||
@Parameter(names = {"--allow-commit-on-no-checkpoint-change"}, description = "allow commits even if checkpoint has not changed before and after fetch data"
|
||||
+ "from souce. This might be useful in sources like SqlSource where there is not checkpoint. And is not recommended to enable in continuous mode.")
|
||||
public Boolean allowCommitOnNoCheckpointChange = false;
|
||||
|
||||
@Parameter(names = {"--help", "-h"}, help = true)
|
||||
public Boolean help = false;
|
||||
|
||||
|
||||
@@ -48,6 +48,8 @@ import java.util.Collections;
|
||||
* <p>To fetch and use the latest incremental checkpoint, you need to also set this hoodie_conf for deltastremer jobs:
|
||||
*
|
||||
* <p>hoodie.write.meta.key.prefixes = 'deltastreamer.checkpoint.key'
|
||||
*
|
||||
* Also, users are expected to set --allow-commit-on-no-checkpoint-change while using this SqlSource.
|
||||
*/
|
||||
public class SqlSource extends RowSource {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@@ -61,6 +61,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
|
||||
static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
|
||||
static final String PROPS_FILENAME_TEST_ORC = "test-orc-dfs-source.properties";
|
||||
static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
|
||||
static final String PROPS_FILENAME_TEST_SQL_SOURCE = "test-sql-source-source.properties";
|
||||
static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties";
|
||||
static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
|
||||
static final String FIRST_ORC_FILE_NAME = "1.orc";
|
||||
@@ -71,6 +72,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
|
||||
static final int ORC_NUM_RECORDS = 5;
|
||||
static final int CSV_NUM_RECORDS = 3;
|
||||
static final int JSON_KAFKA_NUM_RECORDS = 5;
|
||||
static final int SQL_SOURCE_NUM_RECORDS = 1000;
|
||||
String kafkaCheckpointType = "string";
|
||||
// Required fields
|
||||
static final String TGT_BASE_PATH_PARAM = "--target-base-path";
|
||||
|
||||
@@ -64,6 +64,7 @@ import org.apache.hudi.utilities.sources.JdbcSource;
|
||||
import org.apache.hudi.utilities.sources.JsonKafkaSource;
|
||||
import org.apache.hudi.utilities.sources.ORCDFSSource;
|
||||
import org.apache.hudi.utilities.sources.ParquetDFSSource;
|
||||
import org.apache.hudi.utilities.sources.SqlSource;
|
||||
import org.apache.hudi.utilities.sources.TestDataSource;
|
||||
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
|
||||
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
|
||||
@@ -208,6 +209,14 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
|
||||
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField,
|
||||
String checkpoint) {
|
||||
return makeConfig(basePath, op, sourceClassName, transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, updatePayloadClass, payloadClassName,
|
||||
tableType, sourceOrderingField, checkpoint, false);
|
||||
}
|
||||
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName,
|
||||
List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
|
||||
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField,
|
||||
String checkpoint, boolean allowCommitOnNoCheckpointChange) {
|
||||
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
|
||||
cfg.targetBasePath = basePath;
|
||||
cfg.targetTableName = "hoodie_trips";
|
||||
@@ -226,6 +235,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
if (useSchemaProviderClass) {
|
||||
cfg.schemaProviderClassName = defaultSchemaProviderClassName;
|
||||
}
|
||||
cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
|
||||
return cfg;
|
||||
}
|
||||
|
||||
@@ -1701,6 +1711,41 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||
}
|
||||
|
||||
private void prepareSqlSource() throws IOException {
|
||||
String sourceRoot = dfsBasePath + "sqlSourceFiles";
|
||||
TypedProperties sqlSourceProps = new TypedProperties();
|
||||
sqlSourceProps.setProperty("include", "base.properties");
|
||||
sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false");
|
||||
sqlSourceProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
|
||||
sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
|
||||
sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query","select * from test_sql_table");
|
||||
|
||||
UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SQL_SOURCE);
|
||||
|
||||
// Data generation
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||
generateSqlSourceTestTable(sourceRoot, "1", "1000", SQL_SOURCE_NUM_RECORDS, dataGenerator);
|
||||
}
|
||||
|
||||
private void generateSqlSourceTestTable(String dfsRoot, String filename, String instantTime, int n, HoodieTestDataGenerator dataGenerator) throws IOException {
|
||||
Path path = new Path(dfsRoot, filename);
|
||||
Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts(instantTime, n, false)), path);
|
||||
sparkSession.read().parquet(dfsRoot).createOrReplaceTempView("test_sql_table");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSqlSourceSource() throws Exception {
|
||||
prepareSqlSource();
|
||||
String tableBasePath = dfsBasePath + "/test_sql_source_table" + testNum++;
|
||||
HoodieDeltaStreamer deltaStreamer =
|
||||
new HoodieDeltaStreamer(TestHelpers.makeConfig(
|
||||
tableBasePath, WriteOperationType.INSERT, SqlSource.class.getName(),
|
||||
Collections.emptyList(), PROPS_FILENAME_TEST_SQL_SOURCE, false,
|
||||
false, 1000, false, null, null, "timestamp", null, true), jsc);
|
||||
deltaStreamer.sync();
|
||||
TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJdbcSourceIncrementalFetchInContinuousMode() {
|
||||
try (Connection connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc")) {
|
||||
|
||||
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
/**
|
||||
@@ -135,6 +136,23 @@ public class TestSqlSource extends UtilitiesTestBase {
|
||||
assertEquals(10000, fetch1AsRows.getBatch().get().count());
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the test scenario of reading data from the source in row format.
|
||||
* Source has no records.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testSqlSourceCheckpoint() throws IOException {
|
||||
props.setProperty(sqlSourceConfig, "select * from test_sql_table where 1=0");
|
||||
sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
|
||||
sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
|
||||
|
||||
InputBatch<Dataset<Row>> fetch1AsRows =
|
||||
sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
|
||||
assertNull(fetch1AsRows.getCheckpointForNextBatch());
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the test scenario of reading data from the source in row format.
|
||||
* Source has more records than source limit.
|
||||
|
||||
Reference in New Issue
Block a user