diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 222a3916d..fa261d47a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -32,6 +32,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.transform.ChainedTransformer; @@ -85,7 +86,7 @@ public class UtilHelpers { private static final Logger LOG = LogManager.getLogger(UtilHelpers.class); public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, - SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException { + SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException { try { return (Source) ReflectionUtils.loadClass(sourceClass, new Class[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg, @@ -96,7 +97,7 @@ public class UtilHelpers { } public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg, - JavaSparkContext jssc) throws IOException { + JavaSparkContext jssc) throws IOException { try { return schemaProviderClass == null ? null : (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc); @@ -117,7 +118,17 @@ public class UtilHelpers { } } + public static InitialCheckPointProvider createInitialCheckpointProvider( + String className, TypedProperties props) throws IOException { + try { + return (InitialCheckPointProvider) ReflectionUtils.loadClass(className, new Class[] {TypedProperties.class}, props); + } catch (Throwable e) { + throw new IOException("Could not load initial checkpoint provider class " + className, e); + } + } + /** + * */ public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List overriddenProps) { DFSPropertiesConfiguration conf; @@ -157,7 +168,7 @@ public class UtilHelpers { /** * Parse Schema from file. * - * @param fs File System + * @param fs File System * @param schemaFile Schema File */ public static String parseSchema(FileSystem fs, String schemaFile) throws Exception { @@ -207,7 +218,7 @@ public class UtilHelpers { /** * Build Spark Context for ingestion/compaction. - * + * * @return */ public static JavaSparkContext buildSparkContext(String appName, String sparkMaster, String sparkMemory) { @@ -219,13 +230,13 @@ public class UtilHelpers { /** * Build Hoodie write client. * - * @param jsc Java Spark Context - * @param basePath Base Path - * @param schemaStr Schema + * @param jsc Java Spark Context + * @param basePath Base Path + * @param schemaStr Schema * @param parallelism Parallelism */ public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, - int parallelism, Option compactionStrategyClass, TypedProperties properties) { + int parallelism, Option compactionStrategyClass, TypedProperties properties) { HoodieCompactionConfig compactionConfig = compactionStrategyClass .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) @@ -264,6 +275,7 @@ public class UtilHelpers { /** * Returns a factory for creating connections to the given JDBC URL. + * * @param options - JDBC options that contains url, table and other information. * @return * @throws SQLException if the driver could not open a JDBC connection. @@ -323,7 +335,7 @@ public class UtilHelpers { Connection conn = createConnectionFactory(options); String url = options.get(JDBCOptions.JDBC_URL()); String table = options.get(JDBCOptions.JDBC_TABLE_NAME()); - boolean tableExists = tableExists(conn,options); + boolean tableExists = tableExists(conn, options); if (tableExists) { JdbcDialect dialect = JdbcDialects.get(url); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java index 741b05c45..4cdc01ece 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java @@ -18,14 +18,43 @@ package org.apache.hudi.utilities.checkpointing; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + /** * Provide the initial checkpoint for delta streamer. */ -public interface InitialCheckPointProvider { +public abstract class InitialCheckPointProvider { + protected transient Path path; + protected transient FileSystem fs; + protected transient TypedProperties props; + + static class Config { + private static String CHECKPOINT_PROVIDER_PATH_PROP = "hoodie.deltastreamer.checkpoint.provider.path"; + } + + /** + * Construct InitialCheckPointProvider. + * @param props All properties passed to Delta Streamer + */ + public InitialCheckPointProvider(TypedProperties props) { + this.props = props; + this.path = new Path(props.getString(Config.CHECKPOINT_PROVIDER_PATH_PROP)); + } + + /** + * Initialize the class with the current filesystem. + * + * @param config Hadoop configuration + */ + public abstract void init(Configuration config) throws HoodieException; + /** * Get checkpoint string recognizable for delta streamer. */ - String getCheckpoint() throws HoodieException; + public abstract String getCheckpoint() throws HoodieException; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java index f464f686e..8e8af55a3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java @@ -18,8 +18,10 @@ package org.apache.hudi.utilities.checkpointing; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,15 +37,20 @@ import java.util.regex.Pattern; * Generate checkpoint from Kafka-Connect-HDFS managed data set. * Documentation: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html */ -public class KafkaConnectHdfsProvider implements InitialCheckPointProvider { - private final Path path; - private final FileSystem fs; +public class KafkaConnectHdfsProvider extends InitialCheckPointProvider { + private static String FILENAME_SEPARATOR = "[\\+\\.]"; - private static final String FILENAME_SEPARATOR = "[\\+\\.]"; + public KafkaConnectHdfsProvider(TypedProperties props) { + super(props); + } - public KafkaConnectHdfsProvider(final Path basePath, final FileSystem fileSystem) { - this.path = basePath; - this.fs = fileSystem; + @Override + public void init(Configuration config) throws HoodieException { + try { + this.fs = FileSystem.get(config); + } catch (IOException e) { + throw new HoodieException("KafkaConnectHdfsProvider initialization failed"); + } } /** @@ -72,7 +79,8 @@ public class KafkaConnectHdfsProvider implements InitialCheckPointProvider { /** * Convert map contains max offset of each partition to string. - * @param topic Topic name + * + * @param topic Topic name * @param checkpoint Map with partition as key and max offset as value * @return Checkpoint string */ @@ -88,8 +96,9 @@ public class KafkaConnectHdfsProvider implements InitialCheckPointProvider { /** * List file status recursively. + * * @param curPath Current Path - * @param filter PathFilter + * @param filter PathFilter * @return All file status match kafka connect naming convention * @throws IOException */ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 7ec730394..210c94849 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -130,7 +130,7 @@ public class DeltaSync implements Serializable { /** * Hive Config. */ - private transient HiveConf hiveConf; + private transient Configuration conf; /** * Bag of properties with source, hoodie client, key generator etc. @@ -153,7 +153,7 @@ public class DeltaSync implements Serializable { private transient HoodieWriteClient writeClient; public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, - TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, + TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, Function onInitializingHoodieWriteClient) throws IOException { this.cfg = cfg; @@ -172,7 +172,7 @@ public class DeltaSync implements Serializable { this.formatAdapter = new SourceFormatAdapter( UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider)); - this.hiveConf = hiveConf; + this.conf = conf; // If schemaRegistry already resolved, setup write-client setupWriteClient(); @@ -449,8 +449,7 @@ public class DeltaSync implements Serializable { HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath); LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :" + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath); - - new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable(); + new HiveSyncTool(hiveSyncConfig, new HiveConf(conf, HiveConf.class), fs).syncHoodieTable(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 0325eaf48..ee0d68c0a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -35,6 +35,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.HiveIncrementalPuller; import org.apache.hudi.utilities.UtilHelpers; +import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.JsonDFSSource; @@ -45,7 +46,6 @@ import com.beust.jcommander.ParameterException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -90,38 +90,37 @@ public class HoodieDeltaStreamer implements Serializable { public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException { this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), - getDefaultHiveConf(jssc.hadoopConfiguration())); + jssc.hadoopConfiguration(), null); } public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, TypedProperties props) throws IOException { this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), - getDefaultHiveConf(jssc.hadoopConfiguration()), props); + jssc.hadoopConfiguration(), props); } - public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, + public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) throws IOException { + this(cfg, jssc, fs, conf, null); + } + + public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, TypedProperties properties) throws IOException { + if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { + InitialCheckPointProvider checkPointProvider = + UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, properties); + checkPointProvider.init(conf); + cfg.checkpoint = checkPointProvider.getCheckpoint(); + } this.cfg = cfg; - this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf, properties); - } - - public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) throws IOException { - this.cfg = cfg; - this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf); + this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, conf, properties); } public void shutdownGracefully() { deltaSyncService.shutdown(false); } - private static HiveConf getDefaultHiveConf(Configuration cfg) { - HiveConf hiveConf = new HiveConf(); - hiveConf.addResource(cfg); - return hiveConf; - } - /** * Main method to start syncing. - * + * * @throws Exception */ public void sync() throws Exception { @@ -143,6 +142,10 @@ public class HoodieDeltaStreamer implements Serializable { } } + public Config getConfig() { + return cfg; + } + private boolean onDeltaSyncShutdown(boolean error) { LOG.info("DeltaSync shutdown. Closing write client. Error?" + error); deltaSyncService.close(); @@ -293,6 +296,12 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.") public String checkpoint = null; + @Parameter(names = {"--initial-checkpoint-provider"}, description = "subclass of " + + "org.apache.hudi.utilities.checkpointing.InitialCheckpointProvider. Generate check point for delta streamer " + + "for the first run. This field will override the checkpoint of last commit using the checkpoint field. " + + "Use this field only when switching source, for example, from DFS source to Kafka Source.") + public String initialCheckpointProvider = null; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; @@ -371,7 +380,7 @@ public class HoodieDeltaStreamer implements Serializable { */ private transient DeltaSync deltaSync; - public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, + public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, TypedProperties properties) throws IOException { this.cfg = cfg; this.jssc = jssc; @@ -395,13 +404,13 @@ public class HoodieDeltaStreamer implements Serializable { LOG.info("Creating delta streamer with configs : " + props.toString()); this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); - deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, hiveConf, - this::onInitializingWriteClient); + deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, + this::onInitializingWriteClient); } - public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) + public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) throws IOException { - this(cfg, jssc, fs, hiveConf, null); + this(cfg, jssc, fs, conf, null); } public DeltaSync getDeltaSync() { @@ -461,7 +470,7 @@ public class HoodieDeltaStreamer implements Serializable { /** * Callback to initialize write client and start compaction service if required. - * + * * @param writeClient HoodieWriteClient * @return */ @@ -545,7 +554,7 @@ public class HoodieDeltaStreamer implements Serializable { /** * Wait till outstanding pending compactions reduces to the passed in value. - * + * * @param numPendingCompactions Maximum pending compactions allowed * @throws InterruptedException */ @@ -562,7 +571,7 @@ public class HoodieDeltaStreamer implements Serializable { /** * Fetch Next pending compaction if available. - * + * * @return * @throws InterruptedException */ diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 8b661e729..1232f16d7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -256,21 +256,21 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List transformerClassNames, - String propsFilename, boolean enableHiveSync) { + String propsFilename, boolean enableHiveSync) { return makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true, false, null, null); } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List transformerClassNames, - String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, - String payloadClassName, String tableType) { + String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, + String payloadClassName, String tableType) { return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp"); } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName, - List transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, - int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) { + List transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, + int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; @@ -394,6 +394,28 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { props.getString("hoodie.datasource.write.keygenerator.class")); } + @Test + public void testKafkaConnectCheckpointProvider() throws IOException { + String tableBasePath = dfsBasePath + "/test_table"; + String bootstrapPath = dfsBasePath + "/kafka_topic1"; + String partitionPath = bootstrapPath + "/year=2016/month=05/day=01"; + String filePath = partitionPath + "/kafka_topic1+0+100+200.parquet"; + String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT); + TypedProperties props = + new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig(); + props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath); + cfg.initialCheckpointProvider = checkpointProviderClass; + // create regular kafka connect hdfs dirs + dfs.mkdirs(new Path(bootstrapPath)); + dfs.mkdirs(new Path(partitionPath)); + // generate parquet files using kafka connect naming convention + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100)), new Path(filePath)); + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs, hdfsTestService.getHadoopConf(), props); + assertEquals(deltaStreamer.getConfig().checkpoint, "kafka_topic1,0:200"); + } + @Test public void testPropsWithInvalidKeyGenerator() throws Exception { try { @@ -595,7 +617,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { assertTrue(e.getMessage().contains("Please provide a valid schema provider class!")); } } - + @Test public void testPayloadClassUpdate() throws Exception { String dataSetBasePath = dfsBasePath + "/test_dataset_mor"; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java index fed8e46e0..e79a574a9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java @@ -19,13 +19,11 @@ package org.apache.hudi.utilities.checkpointing; import org.apache.hudi.common.HoodieCommonTestHarness; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieTestUtils; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Test; @@ -34,15 +32,14 @@ import java.io.File; import static org.junit.Assert.assertEquals; public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { - private FileSystem fs = null; private String topicPath = null; + private Configuration hadoopConf = null; @Before public void init() { // Prepare directories initPath(); - final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); - fs = FSUtils.getFs(basePath, hadoopConf); + hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); } @Test @@ -70,7 +67,10 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { + "random_snappy_1.parquet").createNewFile(); new File(topicPath + "/year=2016/month=05/day=02/" + "random_snappy_2.parquet").createNewFile(); - InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs); + final TypedProperties props = new TypedProperties(); + props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath); + final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props); + provider.init(hadoopConf); assertEquals(provider.getCheckpoint(), "topic1,0:300,1:200"); } @@ -88,7 +88,10 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { + "topic1+2+100+200.parquet").createNewFile(); new File(topicPath + "/year=2016/month=05/day=02/" + "topic1+0+201+300.parquet").createNewFile(); - InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs); + final TypedProperties props = new TypedProperties(); + props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath); + final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props); + provider.init(hadoopConf); provider.getCheckpoint(); } }