[HUDI-344] Add partitioner param to Exporter (#1405)
This commit is contained in:
@@ -18,16 +18,9 @@
|
|||||||
|
|
||||||
package org.apache.hudi.utilities;
|
package org.apache.hudi.utilities;
|
||||||
|
|
||||||
import com.beust.jcommander.JCommander;
|
|
||||||
import com.beust.jcommander.Parameter;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
|
||||||
import org.apache.hudi.common.SerializableConfiguration;
|
import org.apache.hudi.common.SerializableConfiguration;
|
||||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.HoodieTimeline;
|
import org.apache.hudi.common.table.HoodieTimeline;
|
||||||
@@ -36,6 +29,18 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|||||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||||
import org.apache.hudi.common.util.FSUtils;
|
import org.apache.hudi.common.util.FSUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.ReflectionUtils;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
|
|
||||||
|
import com.beust.jcommander.IValueValidator;
|
||||||
|
import com.beust.jcommander.JCommander;
|
||||||
|
import com.beust.jcommander.Parameter;
|
||||||
|
import com.beust.jcommander.ParameterException;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
@@ -45,41 +50,66 @@ import org.apache.spark.sql.Dataset;
|
|||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.apache.spark.sql.execution.datasources.DataSource;
|
|
||||||
|
|
||||||
import scala.Tuple2;
|
|
||||||
import scala.collection.JavaConversions;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
import scala.collection.JavaConversions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Export the latest records of Hudi dataset to a set of external files (e.g., plain parquet files).
|
* Export the latest records of Hudi dataset to a set of external files (e.g., plain parquet files).
|
||||||
*
|
*
|
||||||
* @experimental This export is an experimental tool. If you want to export hudi to hudi, please use HoodieSnapshotCopier.
|
* @experimental This export is an experimental tool. If you want to export hudi to hudi, please use HoodieSnapshotCopier.
|
||||||
*/
|
*/
|
||||||
public class HoodieSnapshotExporter {
|
public class HoodieSnapshotExporter {
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface Partitioner {
|
||||||
|
|
||||||
|
DataFrameWriter<Row> partition(Dataset<Row> source);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(HoodieSnapshotExporter.class);
|
private static final Logger LOG = LogManager.getLogger(HoodieSnapshotExporter.class);
|
||||||
|
|
||||||
|
public static class OutputFormatValidator implements IValueValidator<String> {
|
||||||
|
|
||||||
|
static final String HUDI = "hudi";
|
||||||
|
static final List<String> FORMATS = ImmutableList.of("json", "parquet", HUDI);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void validate(String name, String value) {
|
||||||
|
if (value == null || !FORMATS.contains(value)) {
|
||||||
|
throw new ParameterException(
|
||||||
|
String.format("Invalid output format: value:%s: supported formats:%s", value, FORMATS));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class Config implements Serializable {
|
public static class Config implements Serializable {
|
||||||
|
|
||||||
@Parameter(names = {"--source-base-path"}, description = "Base path for the source Hudi dataset to be snapshotted", required = true)
|
@Parameter(names = {"--source-base-path"}, description = "Base path for the source Hudi dataset to be snapshotted", required = true)
|
||||||
String sourceBasePath = null;
|
String sourceBasePath;
|
||||||
|
|
||||||
@Parameter(names = {"--target-base-path"}, description = "Base path for the target output files (snapshots)", required = true)
|
@Parameter(names = {"--target-output-path"}, description = "Base path for the target output files (snapshots)", required = true)
|
||||||
String targetOutputPath = null;
|
String targetOutputPath;
|
||||||
|
|
||||||
@Parameter(names = {"--output-format"}, description = "e.g. Hudi or Parquet", required = true)
|
@Parameter(names = {"--output-format"}, description = "Output format for the exported dataset; accept these values: json|parquet|hudi", required = true,
|
||||||
|
validateValueWith = OutputFormatValidator.class)
|
||||||
String outputFormat;
|
String outputFormat;
|
||||||
|
|
||||||
@Parameter(names = {"--output-partition-field"}, description = "A field to be used by Spark repartitioning")
|
@Parameter(names = {"--output-partition-field"}, description = "A field to be used by Spark repartitioning")
|
||||||
String outputPartitionField;
|
String outputPartitionField = null;
|
||||||
|
|
||||||
|
@Parameter(names = {"--output-partitioner"}, description = "A class to facilitate custom repartitioning")
|
||||||
|
String outputPartitioner = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int export(SparkSession spark, Config cfg) throws IOException {
|
public void export(SparkSession spark, Config cfg) throws IOException {
|
||||||
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
|
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
|
||||||
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
|
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
@@ -92,7 +122,7 @@ public class HoodieSnapshotExporter {
|
|||||||
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
|
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
|
||||||
if (!latestCommit.isPresent()) {
|
if (!latestCommit.isPresent()) {
|
||||||
LOG.error("No commits present. Nothing to snapshot");
|
LOG.error("No commits present. Nothing to snapshot");
|
||||||
return -1;
|
return;
|
||||||
}
|
}
|
||||||
final String latestCommitTimestamp = latestCommit.get().getTimestamp();
|
final String latestCommitTimestamp = latestCommit.get().getTimestamp();
|
||||||
LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
|
LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
|
||||||
@@ -106,28 +136,8 @@ public class HoodieSnapshotExporter {
|
|||||||
dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList()));
|
dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
if (!cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
|
||||||
DataSource.lookupDataSource(cfg.outputFormat, spark.sessionState().conf());
|
exportAsNonHudi(spark, cfg, dataFiles);
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error(String.format("The %s output format is not supported! ", cfg.outputFormat));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (!cfg.outputFormat.equalsIgnoreCase("hudi")) {
|
|
||||||
// Do transformation
|
|
||||||
// A field to do simple Spark repartitioning
|
|
||||||
DataFrameWriter<Row> write = null;
|
|
||||||
Dataset<Row> original = spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
|
|
||||||
List<Column> needColumns = Arrays.asList(original.columns()).stream().filter(col -> !col.startsWith("_hoodie_")).map(col -> new Column(col)).collect(Collectors.toList());
|
|
||||||
Dataset<Row> reader = original.select(JavaConversions.asScalaIterator(needColumns.iterator()).toSeq());
|
|
||||||
if (!StringUtils.isNullOrEmpty(cfg.outputPartitionField)) {
|
|
||||||
write = reader.repartition(new Column(cfg.outputPartitionField))
|
|
||||||
.write().partitionBy(cfg.outputPartitionField);
|
|
||||||
} else {
|
|
||||||
write = reader.write();
|
|
||||||
}
|
|
||||||
write.format(cfg.outputFormat)
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.save(cfg.targetOutputPath);
|
|
||||||
} else {
|
} else {
|
||||||
// No transformation is needed for output format "HUDI", just copy the original files.
|
// No transformation is needed for output format "HUDI", just copy the original files.
|
||||||
copySnapshot(jsc, fs, cfg, partitions, dataFiles, latestCommitTimestamp, serConf);
|
copySnapshot(jsc, fs, cfg, partitions, dataFiles, latestCommitTimestamp, serConf);
|
||||||
@@ -136,7 +146,25 @@ public class HoodieSnapshotExporter {
|
|||||||
} else {
|
} else {
|
||||||
LOG.info("The job has 0 partition to copy.");
|
LOG.info("The job has 0 partition to copy.");
|
||||||
}
|
}
|
||||||
return 0;
|
}
|
||||||
|
|
||||||
|
private void exportAsNonHudi(SparkSession spark, Config cfg, List<String> dataFiles) {
|
||||||
|
Partitioner defaultPartitioner = dataset -> {
|
||||||
|
Dataset<Row> hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
|
||||||
|
return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
|
||||||
|
? hoodieDroppedDataset.write()
|
||||||
|
: hoodieDroppedDataset.repartition(new Column(cfg.outputPartitionField)).write().partitionBy(cfg.outputPartitionField);
|
||||||
|
};
|
||||||
|
|
||||||
|
Partitioner partitioner = StringUtils.isNullOrEmpty(cfg.outputPartitioner)
|
||||||
|
? defaultPartitioner
|
||||||
|
: ReflectionUtils.loadClass(cfg.outputPartitioner);
|
||||||
|
|
||||||
|
Dataset<Row> sourceDataset = spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
|
||||||
|
partitioner.partition(sourceDataset)
|
||||||
|
.format(cfg.outputFormat)
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(cfg.targetOutputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void copySnapshot(JavaSparkContext jsc,
|
private void copySnapshot(JavaSparkContext jsc,
|
||||||
|
|||||||
@@ -29,13 +29,20 @@ import org.apache.hudi.config.HoodieIndexConfig;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.index.HoodieIndex.IndexType;
|
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||||
import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
|
import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
|
||||||
|
import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator;
|
||||||
|
import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner;
|
||||||
|
|
||||||
|
import com.beust.jcommander.ParameterException;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.sql.Column;
|
||||||
|
import org.apache.spark.sql.DataFrameWriter;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@@ -52,6 +59,7 @@ import java.util.List;
|
|||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
@RunWith(Enclosed.class)
|
@RunWith(Enclosed.class)
|
||||||
@@ -62,7 +70,7 @@ public class TestHoodieSnapshotExporter {
|
|||||||
static final Logger LOG = LogManager.getLogger(ExporterTestHarness.class);
|
static final Logger LOG = LogManager.getLogger(ExporterTestHarness.class);
|
||||||
static final int NUM_RECORDS = 100;
|
static final int NUM_RECORDS = 100;
|
||||||
static final String COMMIT_TIME = "20200101000000";
|
static final String COMMIT_TIME = "20200101000000";
|
||||||
static final String PARTITION_PATH = "2020/01/01";
|
static final String PARTITION_PATH = "2020";
|
||||||
static final String TABLE_NAME = "testing";
|
static final String TABLE_NAME = "testing";
|
||||||
String sourcePath;
|
String sourcePath;
|
||||||
String targetPath;
|
String targetPath;
|
||||||
@@ -119,12 +127,19 @@ public class TestHoodieSnapshotExporter {
|
|||||||
|
|
||||||
public static class TestHoodieSnapshotExporterForHudi extends ExporterTestHarness {
|
public static class TestHoodieSnapshotExporterForHudi extends ExporterTestHarness {
|
||||||
|
|
||||||
@Test
|
private HoodieSnapshotExporter.Config cfg;
|
||||||
public void testExportAsHudi() throws IOException {
|
|
||||||
HoodieSnapshotExporter.Config cfg = new Config();
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
cfg = new Config();
|
||||||
cfg.sourceBasePath = sourcePath;
|
cfg.sourceBasePath = sourcePath;
|
||||||
cfg.targetOutputPath = targetPath;
|
cfg.targetOutputPath = targetPath;
|
||||||
cfg.outputFormat = "hudi";
|
cfg.outputFormat = OutputFormatValidator.HUDI;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExportAsHudi() throws IOException {
|
||||||
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
|
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
|
||||||
|
|
||||||
// Check results
|
// Check results
|
||||||
@@ -151,10 +166,6 @@ public class TestHoodieSnapshotExporter {
|
|||||||
dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
|
dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
|
||||||
|
|
||||||
// export
|
// export
|
||||||
HoodieSnapshotExporter.Config cfg = new Config();
|
|
||||||
cfg.sourceBasePath = sourcePath;
|
|
||||||
cfg.targetOutputPath = targetPath;
|
|
||||||
cfg.outputFormat = "hudi";
|
|
||||||
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
|
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
|
||||||
|
|
||||||
// Check results
|
// Check results
|
||||||
@@ -185,4 +196,85 @@ public class TestHoodieSnapshotExporter {
|
|||||||
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
|
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class TestHoodieSnapshotExporterForRepartitioning extends ExporterTestHarness {
|
||||||
|
|
||||||
|
private static final String PARTITION_NAME = "year";
|
||||||
|
|
||||||
|
public static class UserDefinedPartitioner implements Partitioner {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DataFrameWriter<Row> partition(Dataset<Row> source) {
|
||||||
|
return source
|
||||||
|
.withColumnRenamed(HoodieRecord.PARTITION_PATH_METADATA_FIELD, PARTITION_NAME)
|
||||||
|
.repartition(new Column(PARTITION_NAME))
|
||||||
|
.write()
|
||||||
|
.partitionBy(PARTITION_NAME);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private HoodieSnapshotExporter.Config cfg;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
cfg = new Config();
|
||||||
|
cfg.sourceBasePath = sourcePath;
|
||||||
|
cfg.targetOutputPath = targetPath;
|
||||||
|
cfg.outputFormat = "json";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExportWithPartitionField() throws IOException {
|
||||||
|
// `driver` field is set in HoodieTestDataGenerator
|
||||||
|
cfg.outputPartitionField = "driver";
|
||||||
|
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
|
||||||
|
|
||||||
|
assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count());
|
||||||
|
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
|
||||||
|
assertTrue(dfs.listStatus(new Path(targetPath)).length > 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExportForUserDefinedPartitioner() throws IOException {
|
||||||
|
cfg.outputPartitioner = UserDefinedPartitioner.class.getName();
|
||||||
|
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
|
||||||
|
|
||||||
|
assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count());
|
||||||
|
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
|
||||||
|
assertTrue(dfs.exists(new Path(String.format("%s/%s=%s", targetPath, PARTITION_NAME, PARTITION_PATH))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public static class TestHoodieSnapshotExporterInputValidation {
|
||||||
|
|
||||||
|
@Parameters
|
||||||
|
public static Iterable<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
{"json", true}, {"parquet", true}, {"hudi", true},
|
||||||
|
{"JSON", false}, {"foo", false}, {null, false}, {"", false}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameter
|
||||||
|
public String format;
|
||||||
|
@Parameter(1)
|
||||||
|
public boolean isValid;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateOutputFormat() {
|
||||||
|
Throwable t = null;
|
||||||
|
try {
|
||||||
|
new OutputFormatValidator().validate(null, format);
|
||||||
|
} catch (Exception e) {
|
||||||
|
t = e;
|
||||||
|
}
|
||||||
|
if (isValid) {
|
||||||
|
assertNull(t);
|
||||||
|
} else {
|
||||||
|
assertTrue(t instanceof ParameterException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user