1
0

[HUDI-759] Integrate checkpoint provider with delta streamer (#1486)

This commit is contained in:
Gary Li
2020-04-14 14:51:04 -07:00
committed by GitHub
parent 644c1cc8bd
commit 14d4fea833
7 changed files with 147 additions and 64 deletions

View File

@@ -32,6 +32,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.transform.ChainedTransformer;
@@ -85,7 +86,7 @@ public class UtilHelpers {
private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException {
SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException {
try {
return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg,
@@ -96,7 +97,7 @@ public class UtilHelpers {
}
public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg,
JavaSparkContext jssc) throws IOException {
JavaSparkContext jssc) throws IOException {
try {
return schemaProviderClass == null ? null
: (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc);
@@ -117,7 +118,17 @@ public class UtilHelpers {
}
}
public static InitialCheckPointProvider createInitialCheckpointProvider(
String className, TypedProperties props) throws IOException {
try {
return (InitialCheckPointProvider) ReflectionUtils.loadClass(className, new Class<?>[] {TypedProperties.class}, props);
} catch (Throwable e) {
throw new IOException("Could not load initial checkpoint provider class " + className, e);
}
}
/**
*
*/
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf;
@@ -157,7 +168,7 @@ public class UtilHelpers {
/**
* Parse Schema from file.
*
* @param fs File System
* @param fs File System
* @param schemaFile Schema File
*/
public static String parseSchema(FileSystem fs, String schemaFile) throws Exception {
@@ -207,7 +218,7 @@ public class UtilHelpers {
/**
* Build Spark Context for ingestion/compaction.
*
*
* @return
*/
public static JavaSparkContext buildSparkContext(String appName, String sparkMaster, String sparkMemory) {
@@ -219,13 +230,13 @@ public class UtilHelpers {
/**
* Build Hoodie write client.
*
* @param jsc Java Spark Context
* @param basePath Base Path
* @param schemaStr Schema
* @param jsc Java Spark Context
* @param basePath Base Path
* @param schemaStr Schema
* @param parallelism Parallelism
*/
public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
HoodieCompactionConfig compactionConfig = compactionStrategyClass
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
@@ -264,6 +275,7 @@ public class UtilHelpers {
/**
* Returns a factory for creating connections to the given JDBC URL.
*
* @param options - JDBC options that contains url, table and other information.
* @return
* @throws SQLException if the driver could not open a JDBC connection.
@@ -323,7 +335,7 @@ public class UtilHelpers {
Connection conn = createConnectionFactory(options);
String url = options.get(JDBCOptions.JDBC_URL());
String table = options.get(JDBCOptions.JDBC_TABLE_NAME());
boolean tableExists = tableExists(conn,options);
boolean tableExists = tableExists(conn, options);
if (tableExists) {
JdbcDialect dialect = JdbcDialects.get(url);

View File

@@ -18,14 +18,43 @@
package org.apache.hudi.utilities.checkpointing;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Provide the initial checkpoint for delta streamer.
*/
public interface InitialCheckPointProvider {
public abstract class InitialCheckPointProvider {
protected transient Path path;
protected transient FileSystem fs;
protected transient TypedProperties props;
static class Config {
private static String CHECKPOINT_PROVIDER_PATH_PROP = "hoodie.deltastreamer.checkpoint.provider.path";
}
/**
* Construct InitialCheckPointProvider.
* @param props All properties passed to Delta Streamer
*/
public InitialCheckPointProvider(TypedProperties props) {
this.props = props;
this.path = new Path(props.getString(Config.CHECKPOINT_PROVIDER_PATH_PROP));
}
/**
* Initialize the class with the current filesystem.
*
* @param config Hadoop configuration
*/
public abstract void init(Configuration config) throws HoodieException;
/**
* Get checkpoint string recognizable for delta streamer.
*/
String getCheckpoint() throws HoodieException;
public abstract String getCheckpoint() throws HoodieException;
}

View File

@@ -18,8 +18,10 @@
package org.apache.hudi.utilities.checkpointing;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -35,15 +37,20 @@ import java.util.regex.Pattern;
* Generate checkpoint from Kafka-Connect-HDFS managed data set.
* Documentation: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html
*/
public class KafkaConnectHdfsProvider implements InitialCheckPointProvider {
private final Path path;
private final FileSystem fs;
public class KafkaConnectHdfsProvider extends InitialCheckPointProvider {
private static String FILENAME_SEPARATOR = "[\\+\\.]";
private static final String FILENAME_SEPARATOR = "[\\+\\.]";
public KafkaConnectHdfsProvider(TypedProperties props) {
super(props);
}
public KafkaConnectHdfsProvider(final Path basePath, final FileSystem fileSystem) {
this.path = basePath;
this.fs = fileSystem;
@Override
public void init(Configuration config) throws HoodieException {
try {
this.fs = FileSystem.get(config);
} catch (IOException e) {
throw new HoodieException("KafkaConnectHdfsProvider initialization failed");
}
}
/**
@@ -72,7 +79,8 @@ public class KafkaConnectHdfsProvider implements InitialCheckPointProvider {
/**
* Convert map contains max offset of each partition to string.
* @param topic Topic name
*
* @param topic Topic name
* @param checkpoint Map with partition as key and max offset as value
* @return Checkpoint string
*/
@@ -88,8 +96,9 @@ public class KafkaConnectHdfsProvider implements InitialCheckPointProvider {
/**
* List file status recursively.
*
* @param curPath Current Path
* @param filter PathFilter
* @param filter PathFilter
* @return All file status match kafka connect naming convention
* @throws IOException
*/

View File

@@ -130,7 +130,7 @@ public class DeltaSync implements Serializable {
/**
* Hive Config.
*/
private transient HiveConf hiveConf;
private transient Configuration conf;
/**
* Bag of properties with source, hoodie client, key generator etc.
@@ -153,7 +153,7 @@ public class DeltaSync implements Serializable {
private transient HoodieWriteClient writeClient;
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf,
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
this.cfg = cfg;
@@ -172,7 +172,7 @@ public class DeltaSync implements Serializable {
this.formatAdapter = new SourceFormatAdapter(
UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider));
this.hiveConf = hiveConf;
this.conf = conf;
// If schemaRegistry already resolved, setup write-client
setupWriteClient();
@@ -449,8 +449,7 @@ public class DeltaSync implements Serializable {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
+ hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
new HiveSyncTool(hiveSyncConfig, new HiveConf(conf, HiveConf.class), fs).syncHoodieTable();
}
}

View File

@@ -35,6 +35,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.HiveIncrementalPuller;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.JsonDFSSource;
@@ -45,7 +46,6 @@ import com.beust.jcommander.ParameterException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -90,38 +90,37 @@ public class HoodieDeltaStreamer implements Serializable {
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()),
getDefaultHiveConf(jssc.hadoopConfiguration()));
jssc.hadoopConfiguration(), null);
}
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, TypedProperties props) throws IOException {
this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()),
getDefaultHiveConf(jssc.hadoopConfiguration()), props);
jssc.hadoopConfiguration(), props);
}
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) throws IOException {
this(cfg, jssc, fs, conf, null);
}
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
TypedProperties properties) throws IOException {
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider =
UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, properties);
checkPointProvider.init(conf);
cfg.checkpoint = checkPointProvider.getCheckpoint();
}
this.cfg = cfg;
this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf, properties);
}
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) throws IOException {
this.cfg = cfg;
this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf);
this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, conf, properties);
}
public void shutdownGracefully() {
deltaSyncService.shutdown(false);
}
private static HiveConf getDefaultHiveConf(Configuration cfg) {
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(cfg);
return hiveConf;
}
/**
* Main method to start syncing.
*
*
* @throws Exception
*/
public void sync() throws Exception {
@@ -143,6 +142,10 @@ public class HoodieDeltaStreamer implements Serializable {
}
}
public Config getConfig() {
return cfg;
}
private boolean onDeltaSyncShutdown(boolean error) {
LOG.info("DeltaSync shutdown. Closing write client. Error?" + error);
deltaSyncService.close();
@@ -293,6 +296,12 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.")
public String checkpoint = null;
@Parameter(names = {"--initial-checkpoint-provider"}, description = "subclass of "
+ "org.apache.hudi.utilities.checkpointing.InitialCheckpointProvider. Generate check point for delta streamer "
+ "for the first run. This field will override the checkpoint of last commit using the checkpoint field. "
+ "Use this field only when switching source, for example, from DFS source to Kafka Source.")
public String initialCheckpointProvider = null;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@@ -371,7 +380,7 @@ public class HoodieDeltaStreamer implements Serializable {
*/
private transient DeltaSync deltaSync;
public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
TypedProperties properties) throws IOException {
this.cfg = cfg;
this.jssc = jssc;
@@ -395,13 +404,13 @@ public class HoodieDeltaStreamer implements Serializable {
LOG.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, hiveConf,
this::onInitializingWriteClient);
deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, conf,
this::onInitializingWriteClient);
}
public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf)
public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf)
throws IOException {
this(cfg, jssc, fs, hiveConf, null);
this(cfg, jssc, fs, conf, null);
}
public DeltaSync getDeltaSync() {
@@ -461,7 +470,7 @@ public class HoodieDeltaStreamer implements Serializable {
/**
* Callback to initialize write client and start compaction service if required.
*
*
* @param writeClient HoodieWriteClient
* @return
*/
@@ -545,7 +554,7 @@ public class HoodieDeltaStreamer implements Serializable {
/**
* Wait till outstanding pending compactions reduces to the passed in value.
*
*
* @param numPendingCompactions Maximum pending compactions allowed
* @throws InterruptedException
*/
@@ -562,7 +571,7 @@ public class HoodieDeltaStreamer implements Serializable {
/**
* Fetch Next pending compaction if available.
*
*
* @return
* @throws InterruptedException
*/

View File

@@ -256,21 +256,21 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames,
String propsFilename, boolean enableHiveSync) {
String propsFilename, boolean enableHiveSync) {
return makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true,
false, null, null);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames,
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
String payloadClassName, String tableType) {
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
String payloadClassName, String tableType) {
return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync,
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp");
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName,
List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) {
List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
@@ -394,6 +394,28 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.getString("hoodie.datasource.write.keygenerator.class"));
}
@Test
public void testKafkaConnectCheckpointProvider() throws IOException {
String tableBasePath = dfsBasePath + "/test_table";
String bootstrapPath = dfsBasePath + "/kafka_topic1";
String partitionPath = bootstrapPath + "/year=2016/month=05/day=01";
String filePath = partitionPath + "/kafka_topic1+0+100+200.parquet";
String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT);
TypedProperties props =
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath);
cfg.initialCheckpointProvider = checkpointProviderClass;
// create regular kafka connect hdfs dirs
dfs.mkdirs(new Path(bootstrapPath));
dfs.mkdirs(new Path(partitionPath));
// generate parquet files using kafka connect naming convention
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100)), new Path(filePath));
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs, hdfsTestService.getHadoopConf(), props);
assertEquals(deltaStreamer.getConfig().checkpoint, "kafka_topic1,0:200");
}
@Test
public void testPropsWithInvalidKeyGenerator() throws Exception {
try {
@@ -595,7 +617,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
assertTrue(e.getMessage().contains("Please provide a valid schema provider class!"));
}
}
@Test
public void testPayloadClassUpdate() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset_mor";

View File

@@ -19,13 +19,11 @@
package org.apache.hudi.utilities.checkpointing;
import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
@@ -34,15 +32,14 @@ import java.io.File;
import static org.junit.Assert.assertEquals;
public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
private FileSystem fs = null;
private String topicPath = null;
private Configuration hadoopConf = null;
@Before
public void init() {
// Prepare directories
initPath();
final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath, hadoopConf);
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
}
@Test
@@ -70,7 +67,10 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
+ "random_snappy_1.parquet").createNewFile();
new File(topicPath + "/year=2016/month=05/day=02/"
+ "random_snappy_2.parquet").createNewFile();
InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs);
final TypedProperties props = new TypedProperties();
props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath);
final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props);
provider.init(hadoopConf);
assertEquals(provider.getCheckpoint(), "topic1,0:300,1:200");
}
@@ -88,7 +88,10 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
+ "topic1+2+100+200.parquet").createNewFile();
new File(topicPath + "/year=2016/month=05/day=02/"
+ "topic1+0+201+300.parquet").createNewFile();
InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs);
final TypedProperties props = new TypedProperties();
props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath);
final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props);
provider.init(hadoopConf);
provider.getCheckpoint();
}
}