From 3a150ee1812993553f36cdc8d321766c267d3f4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=91=A3=E5=8F=AF=E4=BC=A6?= Date: Fri, 17 Sep 2021 19:45:22 +0800 Subject: [PATCH] [HUDI-2447] Extract common business logic & Fix typo (#3683) --- .../java/org/apache/hudi/dla/DLASyncTool.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java index 786c72085..bf0369ae2 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java @@ -19,18 +19,17 @@ package org.apache.hudi.dla; import com.beust.jcommander.JCommander; -import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.util.Option; import org.apache.hudi.dla.util.Utils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; -import org.apache.hudi.hadoop.HoodieParquetInputFormat; -import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hive.SchemaDifference; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.sync.common.AbstractSyncHoodieClient; @@ -39,6 +38,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -147,18 +147,14 @@ public class DLASyncTool extends AbstractSyncTool { // Check and sync schema if (!tableExists) { LOG.info("DLA table " + tableName + " is not found. Creating it"); - if (!useRealTimeInputFormat) { - String inputFormatClassName = HoodieParquetInputFormat.class.getName(); - hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(), - ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>()); - } else { - // Custom serde will not work with ALTER TABLE REPLACE COLUMNS - // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive - // /ql/exec/DDLTask.java#L3488 - String inputFormatClassName = HoodieParquetRealtimeInputFormat.class.getName(); - hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(), - ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>()); - } + + String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET, useRealTimeInputFormat); + + // Custom serde will not work with ALTER TABLE REPLACE COLUMNS + // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive + // /ql/exec/DDLTask.java#L3488 + hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(), + ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>()); } else { // Check if the table schema has evolved Map tableSchema = hoodieDLAClient.getTableSchema(tableName); @@ -173,7 +169,7 @@ public class DLASyncTool extends AbstractSyncTool { } /** - * Syncs the list of storage parititions passed in (checks if the partition is in dla, if not adds it or if the + * Syncs the list of storage partitions passed in (checks if the partition is in dla, if not adds it or if the * partition path does not match, it updates the partition path). */ private void syncPartitions(String tableName, List writtenPartitionsSince) {