[HUDI-2447] Extract common business logic & Fix typo (#3683)
This commit is contained in:
@@ -19,18 +19,17 @@
|
||||
package org.apache.hudi.dla;
|
||||
|
||||
import com.beust.jcommander.JCommander;
|
||||
import java.util.HashMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
|
||||
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.dla.util.Utils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.InvalidTableException;
|
||||
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
||||
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
|
||||
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
|
||||
import org.apache.hudi.hive.SchemaDifference;
|
||||
import org.apache.hudi.hive.util.HiveSchemaUtil;
|
||||
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
|
||||
@@ -39,6 +38,7 @@ import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
@@ -147,18 +147,14 @@ public class DLASyncTool extends AbstractSyncTool {
|
||||
// Check and sync schema
|
||||
if (!tableExists) {
|
||||
LOG.info("DLA table " + tableName + " is not found. Creating it");
|
||||
if (!useRealTimeInputFormat) {
|
||||
String inputFormatClassName = HoodieParquetInputFormat.class.getName();
|
||||
hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
|
||||
ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>());
|
||||
} else {
|
||||
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
|
||||
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
|
||||
// /ql/exec/DDLTask.java#L3488
|
||||
String inputFormatClassName = HoodieParquetRealtimeInputFormat.class.getName();
|
||||
hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
|
||||
ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>());
|
||||
}
|
||||
|
||||
String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET, useRealTimeInputFormat);
|
||||
|
||||
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
|
||||
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
|
||||
// /ql/exec/DDLTask.java#L3488
|
||||
hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
|
||||
ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>());
|
||||
} else {
|
||||
// Check if the table schema has evolved
|
||||
Map<String, String> tableSchema = hoodieDLAClient.getTableSchema(tableName);
|
||||
@@ -173,7 +169,7 @@ public class DLASyncTool extends AbstractSyncTool {
|
||||
}
|
||||
|
||||
/**
|
||||
* Syncs the list of storage parititions passed in (checks if the partition is in dla, if not adds it or if the
|
||||
* Syncs the list of storage partitions passed in (checks if the partition is in dla, if not adds it or if the
|
||||
* partition path does not match, it updates the partition path).
|
||||
*/
|
||||
private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
|
||||
|
||||
Reference in New Issue
Block a user