[HUDI-1269] Make whether the failure of connect hive affects hudi ingest process configurable (#2443)
Co-authored-by: Sivabalan Narayanan <sivabala@uber.com>
This commit is contained in:
@@ -293,6 +293,8 @@ public class DataSourceUtils {
|
|||||||
DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL()));
|
DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL()));
|
||||||
hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE_OPT_KEY(),
|
hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE_OPT_KEY(),
|
||||||
DataSourceWriteOptions.DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY()));
|
DataSourceWriteOptions.DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY()));
|
||||||
|
hiveSyncConfig.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS_OPT_KEY(),
|
||||||
|
DataSourceWriteOptions.DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY()));
|
||||||
hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(),
|
hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(),
|
||||||
DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL()));
|
DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL()));
|
||||||
hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP(),
|
hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP(),
|
||||||
|
|||||||
@@ -347,6 +347,7 @@ object DataSourceWriteOptions {
|
|||||||
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format"
|
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format"
|
||||||
val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc"
|
val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc"
|
||||||
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database"
|
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database"
|
||||||
|
val HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "hoodie.datasource.hive_sync.ignore_exceptions"
|
||||||
val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix"
|
val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix"
|
||||||
val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp"
|
val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp"
|
||||||
|
|
||||||
@@ -365,6 +366,7 @@ object DataSourceWriteOptions {
|
|||||||
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
|
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
|
||||||
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
|
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
|
||||||
val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "true"
|
val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "true"
|
||||||
|
val DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "false"
|
||||||
val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false"
|
val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false"
|
||||||
val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false"
|
val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false"
|
||||||
|
|
||||||
|
|||||||
@@ -374,6 +374,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
|
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
|
||||||
hiveSyncConfig.useFileListingFromMetadata = parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean
|
hiveSyncConfig.useFileListingFromMetadata = parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean
|
||||||
hiveSyncConfig.verifyMetadataFileListing = parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean
|
hiveSyncConfig.verifyMetadataFileListing = parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean
|
||||||
|
hiveSyncConfig.ignoreExceptions = parameters.get(HIVE_IGNORE_EXCEPTIONS_OPT_KEY).exists(r => r.toBoolean)
|
||||||
hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
|
hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
|
||||||
hiveSyncConfig.autoCreateDatabase = parameters.get(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).exists(r => r.toBoolean)
|
hiveSyncConfig.autoCreateDatabase = parameters.get(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).exists(r => r.toBoolean)
|
||||||
hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
|
hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
|
||||||
|
|||||||
@@ -76,6 +76,9 @@ public class HiveSyncConfig implements Serializable {
|
|||||||
@Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
|
@Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
|
||||||
public Boolean autoCreateDatabase = true;
|
public Boolean autoCreateDatabase = true;
|
||||||
|
|
||||||
|
@Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions")
|
||||||
|
public Boolean ignoreExceptions = false;
|
||||||
|
|
||||||
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
|
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
|
||||||
public Boolean skipROSuffix = false;
|
public Boolean skipROSuffix = false;
|
||||||
|
|
||||||
@@ -130,6 +133,7 @@ public class HiveSyncConfig implements Serializable {
|
|||||||
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat
|
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat
|
||||||
+ ", useJdbc=" + useJdbc
|
+ ", useJdbc=" + useJdbc
|
||||||
+ ", autoCreateDatabase=" + autoCreateDatabase
|
+ ", autoCreateDatabase=" + autoCreateDatabase
|
||||||
|
+ ", ignoreExceptions=" + ignoreExceptions
|
||||||
+ ", skipROSuffix=" + skipROSuffix
|
+ ", skipROSuffix=" + skipROSuffix
|
||||||
+ ", help=" + help
|
+ ", help=" + help
|
||||||
+ ", supportTimestamp=" + supportTimestamp
|
+ ", supportTimestamp=" + supportTimestamp
|
||||||
|
|||||||
@@ -58,56 +58,72 @@ public class HiveSyncTool extends AbstractSyncTool {
|
|||||||
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
|
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
|
||||||
|
|
||||||
private final HiveSyncConfig cfg;
|
private final HiveSyncConfig cfg;
|
||||||
private final HoodieHiveClient hoodieHiveClient;
|
private HoodieHiveClient hoodieHiveClient = null;
|
||||||
private final String snapshotTableName;
|
private String snapshotTableName = null;
|
||||||
private final Option<String> roTableTableName;
|
private Option<String> roTableName = null;
|
||||||
|
|
||||||
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
|
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
|
||||||
super(configuration.getAllProperties(), fs);
|
super(configuration.getAllProperties(), fs);
|
||||||
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
|
|
||||||
|
try {
|
||||||
|
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
if (cfg.ignoreExceptions) {
|
||||||
|
LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e);
|
||||||
|
} else {
|
||||||
|
throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
this.cfg = cfg;
|
this.cfg = cfg;
|
||||||
// Set partitionFields to empty, when the NonPartitionedExtractor is used
|
// Set partitionFields to empty, when the NonPartitionedExtractor is used
|
||||||
if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) {
|
if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) {
|
||||||
LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
|
LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
|
||||||
cfg.partitionFields = new ArrayList<>();
|
cfg.partitionFields = new ArrayList<>();
|
||||||
}
|
}
|
||||||
switch (hoodieHiveClient.getTableType()) {
|
if (hoodieHiveClient != null) {
|
||||||
case COPY_ON_WRITE:
|
switch (hoodieHiveClient.getTableType()) {
|
||||||
this.snapshotTableName = cfg.tableName;
|
case COPY_ON_WRITE:
|
||||||
this.roTableTableName = Option.empty();
|
this.snapshotTableName = cfg.tableName;
|
||||||
break;
|
this.roTableName = Option.empty();
|
||||||
case MERGE_ON_READ:
|
break;
|
||||||
this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
|
case MERGE_ON_READ:
|
||||||
this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
|
this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
|
||||||
Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
|
this.roTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
|
||||||
break;
|
Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
|
||||||
default:
|
break;
|
||||||
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
|
default:
|
||||||
throw new InvalidTableException(hoodieHiveClient.getBasePath());
|
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
|
||||||
|
throw new InvalidTableException(hoodieHiveClient.getBasePath());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void syncHoodieTable() {
|
public void syncHoodieTable() {
|
||||||
try {
|
try {
|
||||||
switch (hoodieHiveClient.getTableType()) {
|
if (hoodieHiveClient != null) {
|
||||||
case COPY_ON_WRITE:
|
switch (hoodieHiveClient.getTableType()) {
|
||||||
syncHoodieTable(snapshotTableName, false);
|
case COPY_ON_WRITE:
|
||||||
break;
|
syncHoodieTable(snapshotTableName, false);
|
||||||
case MERGE_ON_READ:
|
break;
|
||||||
// sync a RO table for MOR
|
case MERGE_ON_READ:
|
||||||
syncHoodieTable(roTableTableName.get(), false);
|
// sync a RO table for MOR
|
||||||
// sync a RT table for MOR
|
syncHoodieTable(roTableName.get(), false);
|
||||||
syncHoodieTable(snapshotTableName, true);
|
// sync a RT table for MOR
|
||||||
break;
|
syncHoodieTable(snapshotTableName, true);
|
||||||
default:
|
break;
|
||||||
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
|
default:
|
||||||
throw new InvalidTableException(hoodieHiveClient.getBasePath());
|
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
|
||||||
|
throw new InvalidTableException(hoodieHiveClient.getBasePath());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (RuntimeException re) {
|
} catch (RuntimeException re) {
|
||||||
throw new HoodieException("Got runtime exception when hive syncing " + cfg.tableName, re);
|
throw new HoodieException("Got runtime exception when hive syncing " + cfg.tableName, re);
|
||||||
} finally {
|
} finally {
|
||||||
hoodieHiveClient.close();
|
if (hoodieHiveClient != null) {
|
||||||
|
hoodieHiveClient.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ import org.junit.jupiter.params.ParameterizedTest;
|
|||||||
import org.junit.jupiter.params.provider.MethodSource;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -613,4 +614,25 @@ public class TestHiveSyncTool {
|
|||||||
"The last commit that was sycned should be 103");
|
"The last commit that was sycned should be 103");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxException {
|
||||||
|
HiveTestUtil.hiveSyncConfig.useJdbc = true;
|
||||||
|
String instantTime = "100";
|
||||||
|
HiveTestUtil.createCOWTable(instantTime, 5, false);
|
||||||
|
HoodieHiveClient hiveClient =
|
||||||
|
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||||
|
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
|
||||||
|
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
|
||||||
|
// Lets do the sync
|
||||||
|
|
||||||
|
HiveSyncConfig syncToolConfig = HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig);
|
||||||
|
syncToolConfig.ignoreExceptions = true;
|
||||||
|
syncToolConfig.jdbcUrl = HiveTestUtil.hiveSyncConfig.jdbcUrl.replace("9999","9031");
|
||||||
|
HiveSyncTool tool = new HiveSyncTool(syncToolConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||||
|
tool.syncHoodieTable();
|
||||||
|
|
||||||
|
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
|
||||||
|
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user