[HUDI-2272] Pass base file format to sync clients (#3397)
Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
This commit is contained in:
@@ -574,22 +574,19 @@ public class DeltaSync implements Serializable {
|
|||||||
for (String impl : syncClientToolClasses) {
|
for (String impl : syncClientToolClasses) {
|
||||||
Timer.Context syncContext = metrics.getMetaSyncTimerContext();
|
Timer.Context syncContext = metrics.getMetaSyncTimerContext();
|
||||||
impl = impl.trim();
|
impl = impl.trim();
|
||||||
AbstractSyncTool syncTool = null;
|
|
||||||
switch (impl) {
|
switch (impl) {
|
||||||
case "org.apache.hudi.hive.HiveSyncTool":
|
case "org.apache.hudi.hive.HiveSyncTool":
|
||||||
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat);
|
syncHive();
|
||||||
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
|
|
||||||
+ hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
|
|
||||||
syncTool = new HiveSyncTool(hiveSyncConfig, new HiveConf(conf, HiveConf.class), fs);
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration());
|
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration());
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.putAll(props);
|
properties.putAll(props);
|
||||||
properties.put("basePath", cfg.targetBasePath);
|
properties.put("basePath", cfg.targetBasePath);
|
||||||
syncTool = (AbstractSyncTool) ReflectionUtils.loadClass(impl, new Class[]{Properties.class, FileSystem.class}, properties, fs);
|
properties.put("baseFileFormat", cfg.baseFileFormat);
|
||||||
}
|
AbstractSyncTool syncTool = (AbstractSyncTool) ReflectionUtils.loadClass(impl, new Class[]{Properties.class, FileSystem.class}, properties, fs);
|
||||||
syncTool.syncHoodieTable();
|
syncTool.syncHoodieTable();
|
||||||
|
}
|
||||||
long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
|
long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
|
||||||
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl), metaSyncTimeMs);
|
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl), metaSyncTimeMs);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user