1
0

[HUDI-321] Support bulkinsert in HDFSParquetImporter (#987)

- Add bulk insert feature
- Fix some minor issues
This commit is contained in:
Raymond Xu
2019-11-02 23:12:44 -07:00
committed by vinoth chandar
parent bd77dc792c
commit 91740635b2
2 changed files with 47 additions and 25 deletions

View File

@@ -25,10 +25,11 @@ import com.beust.jcommander.ParameterException;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.text.SimpleDateFormat; import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import org.apache.avro.Schema; import org.apache.avro.Schema;
@@ -64,8 +65,8 @@ public class HDFSParquetImporter implements Serializable {
private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class); private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class);
public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd"); private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd")
private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class); .withZone(ZoneId.systemDefault());
private final Config cfg; private final Config cfg;
private transient FileSystem fs; private transient FileSystem fs;
/** /**
@@ -73,11 +74,8 @@ public class HDFSParquetImporter implements Serializable {
*/ */
private TypedProperties props; private TypedProperties props;
public HDFSParquetImporter(Config cfg) throws IOException { public HDFSParquetImporter(Config cfg) {
this.cfg = cfg; this.cfg = cfg;
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating Cleaner with configs : " + props.toString());
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@@ -98,8 +96,11 @@ public class HDFSParquetImporter implements Serializable {
} }
public int dataImport(JavaSparkContext jsc, int retry) throws Exception { public int dataImport(JavaSparkContext jsc, int retry) {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration()); this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Starting data import with configs : " + props.toString());
int ret = -1; int ret = -1;
try { try {
// Verify that targetPath is not present. // Verify that targetPath is not present.
@@ -110,7 +111,7 @@ public class HDFSParquetImporter implements Serializable {
ret = dataImport(jsc); ret = dataImport(jsc);
} while (ret != 0 && retry-- > 0); } while (ret != 0 && retry-- > 0);
} catch (Throwable t) { } catch (Throwable t) {
logger.error(t); log.error(t);
} }
return ret; return ret;
} }
@@ -141,7 +142,7 @@ public class HDFSParquetImporter implements Serializable {
JavaRDD<WriteStatus> writeResponse = load(client, instantTime, hoodieRecords); JavaRDD<WriteStatus> writeResponse = load(client, instantTime, hoodieRecords);
return UtilHelpers.handleErrors(jsc, instantTime, writeResponse); return UtilHelpers.handleErrors(jsc, instantTime, writeResponse);
} catch (Throwable t) { } catch (Throwable t) {
logger.error("Error occurred.", t); log.error("Error occurred.", t);
} }
return -1; return -1;
} }
@@ -159,8 +160,7 @@ public class HDFSParquetImporter implements Serializable {
return jsc return jsc
.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, .newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class,
job.getConfiguration()) job.getConfiguration())
// To reduce large number of // To reduce large number of tasks.
// tasks.
.coalesce(16 * cfg.parallelism).map(entry -> { .coalesce(16 * cfg.parallelism).map(entry -> {
GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2(); GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2();
Object partitionField = genericRecord.get(cfg.partitionKey); Object partitionField = genericRecord.get(cfg.partitionKey);
@@ -172,16 +172,16 @@ public class HDFSParquetImporter implements Serializable {
throw new HoodieIOException("row field is missing. :" + cfg.rowKey); throw new HoodieIOException("row field is missing. :" + cfg.rowKey);
} }
String partitionPath = partitionField.toString(); String partitionPath = partitionField.toString();
logger.info("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"); log.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
if (partitionField instanceof Number) { if (partitionField instanceof Number) {
try { try {
long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L); long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L);
partitionPath = PARTITION_FORMATTER.format(new Date(ts)); partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
logger.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"); log.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
} }
} }
return new HoodieRecord<>(new HoodieKey((String) rowField, partitionPath), return new HoodieRecord<>(new HoodieKey(rowField.toString(), partitionPath),
new HoodieJsonPayload(genericRecord.toString())); new HoodieJsonPayload(genericRecord.toString()));
}); });
} }
@@ -195,11 +195,31 @@ public class HDFSParquetImporter implements Serializable {
* @param <T> Type * @param <T> Type
*/ */
protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(HoodieWriteClient client, String instantTime, protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(HoodieWriteClient client, String instantTime,
JavaRDD<HoodieRecord<T>> hoodieRecords) { JavaRDD<HoodieRecord<T>> hoodieRecords) throws Exception {
if (cfg.command.toLowerCase().equals("insert")) { switch (cfg.command.toLowerCase()) {
return client.insert(hoodieRecords, instantTime); case "upsert": {
return client.upsert(hoodieRecords, instantTime);
}
case "bulkinsert": {
return client.bulkInsert(hoodieRecords, instantTime);
}
default: {
return client.insert(hoodieRecords, instantTime);
}
}
}
public static class CommandValidator implements IValueValidator<String> {
List<String> validCommands = Arrays.asList("insert", "upsert", "bulkinsert");
@Override
public void validate(String name, String value) throws ParameterException {
if (value == null || !validCommands.contains(value.toLowerCase())) {
throw new ParameterException(
String.format("Invalid command: value:%s: supported commands:%s", value, validCommands));
}
} }
return client.upsert(hoodieRecords, instantTime);
} }
public static class FormatValidator implements IValueValidator<String> { public static class FormatValidator implements IValueValidator<String> {
@@ -217,8 +237,8 @@ public class HDFSParquetImporter implements Serializable {
public static class Config implements Serializable { public static class Config implements Serializable {
@Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert", @Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert/bulkinsert",
required = false) required = false, validateValueWith = CommandValidator.class)
public String command = "INSERT"; public String command = "INSERT";
@Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input dataset", required = true) @Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input dataset", required = true)
public String srcPath = null; public String srcPath = null;
@@ -233,7 +253,7 @@ public class HDFSParquetImporter implements Serializable {
public String rowKey = null; public String rowKey = null;
@Parameter(names = {"--partition-key-field", "-pk"}, description = "Partition key field name", required = true) @Parameter(names = {"--partition-key-field", "-pk"}, description = "Partition key field name", required = true)
public String partitionKey = null; public String partitionKey = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true) @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert(default)/upsert/bulkinsert", required = true)
public int parallelism = 1; public int parallelism = 1;
@Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true) @Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true)
public String schemaFile = null; public String schemaFile = null;

View File

@@ -190,7 +190,9 @@ public class UtilHelpers {
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
.orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build()); .orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config = HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withParallelism(parallelism, parallelism) HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(parallelism, parallelism)
.withBulkInsertParallelism(parallelism)
.withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig) .withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(properties).build(); .withProps(properties).build();