1
0

[HUDI-711] Refactor exporter main logic (#1436)

* Refactor exporter main logic
* break main method into multiple readable methods
* fix bug of passing wrong file list
* avoid deleting output path when exists
* throw exception to early abort on multiple cases
* use JavaSparkContext instead of SparkSession
* improve unit test for expected exceptions
This commit is contained in:
Raymond Xu
2020-03-25 03:02:24 -07:00
committed by GitHub
parent cafc87041b
commit bc82e2be6c
3 changed files with 154 additions and 88 deletions

View File

@@ -19,18 +19,20 @@
package org.apache.hudi.utilities;
import org.apache.hudi.common.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView;
import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import com.beust.jcommander.IValueValidator;
import com.beust.jcommander.JCommander;
@@ -43,19 +45,21 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import scala.Tuple2;
import scala.collection.JavaConversions;
@@ -109,46 +113,56 @@ public class HoodieSnapshotExporter {
String outputPartitioner = null;
}
public void export(SparkSession spark, Config cfg) throws IOException {
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
public void export(JavaSparkContext jsc, Config cfg) throws IOException {
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
final TableFileSystemView.BaseFileOnlyView fsView = new HoodieTableFileSystemView(tableMetadata,
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
// Get the latest commit
Option<HoodieInstant> latestCommit =
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
if (!latestCommit.isPresent()) {
LOG.error("No commits present. Nothing to snapshot");
return;
if (outputPathExists(fs, cfg)) {
throw new HoodieSnapshotExporterException("The target output path already exists.");
}
final String latestCommitTimestamp = latestCommit.get().getTimestamp();
final String latestCommitTimestamp = getLatestCommitTimestamp(fs, cfg).<HoodieSnapshotExporterException>orElseThrow(() -> {
throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
});
LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
latestCommitTimestamp));
List<String> partitions = FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, false);
if (partitions.size() > 0) {
List<String> dataFiles = new ArrayList<>();
final List<String> partitions = getPartitions(fs, cfg);
if (partitions.isEmpty()) {
throw new HoodieSnapshotExporterException("The source dataset has 0 partition to snapshot.");
}
LOG.info(String.format("The job needs to export %d partitions.", partitions.size()));
for (String partition : partitions) {
dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList()));
}
if (!cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
exportAsNonHudi(spark, cfg, dataFiles);
} else {
// No transformation is needed for output format "HUDI", just copy the original files.
copySnapshot(jsc, fs, cfg, partitions, dataFiles, latestCommitTimestamp, serConf);
}
createSuccessTag(fs, cfg.targetOutputPath);
if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
exportAsHudi(jsc, cfg, partitions, latestCommitTimestamp);
} else {
LOG.info("The job has 0 partition to copy.");
exportAsNonHudi(jsc, cfg, partitions, latestCommitTimestamp);
}
createSuccessTag(fs, cfg);
}
private boolean outputPathExists(FileSystem fs, Config cfg) throws IOException {
return fs.exists(new Path(cfg.targetOutputPath));
}
private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) {
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
return latestCommit.isPresent() ? Option.of(latestCommit.get().getTimestamp()) : Option.empty();
}
private List<String> getPartitions(FileSystem fs, Config cfg) throws IOException {
return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, false);
}
private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
Path successTagPath = new Path(cfg.targetOutputPath + "/_SUCCESS");
if (!fs.exists(successTagPath)) {
LOG.info(String.format("Creating _SUCCESS under target output path: %s", cfg.targetOutputPath));
fs.createNewFile(successTagPath);
}
}
private void exportAsNonHudi(SparkSession spark, Config cfg, List<String> dataFiles) {
private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) {
Partitioner defaultPartitioner = dataset -> {
Dataset<Row> hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
@@ -160,37 +174,35 @@ public class HoodieSnapshotExporter {
? defaultPartitioner
: ReflectionUtils.loadClass(cfg.outputPartitioner);
Dataset<Row> sourceDataset = spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
Iterator<String> exportingFilePaths = jsc
.parallelize(partitions, partitions.size())
.flatMap(partition -> fsView
.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp)
.map(HoodieBaseFile::getPath).iterator())
.toLocalIterator();
Dataset<Row> sourceDataset = new SQLContext(jsc).read().parquet(JavaConversions.asScalaIterator(exportingFilePaths).toSeq());
partitioner.partition(sourceDataset)
.format(cfg.outputFormat)
.mode(SaveMode.Overwrite)
.save(cfg.targetOutputPath);
}
private void copySnapshot(JavaSparkContext jsc,
FileSystem fs,
Config cfg,
List<String> partitions,
List<String> dataFiles,
String latestCommitTimestamp,
SerializableConfiguration serConf) throws IOException {
// Make sure the output directory is empty
Path outputPath = new Path(cfg.targetOutputPath);
if (fs.exists(outputPath)) {
LOG.warn(String.format("The output path %s targetBasePath already exists, deleting", outputPath));
fs.delete(new Path(cfg.targetOutputPath), true);
}
private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
jsc.parallelize(partitions, partitions.size()).flatMap(partition -> {
// Only take latest version files <= latestCommit.
FileSystem fs1 = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
List<Tuple2<String, String>> filePaths = new ArrayList<>();
dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile)));
Stream<HoodieBaseFile> dataFiles = fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp);
dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath())));
// also need to copy over partition metadata
Path partitionMetaFile =
new Path(new Path(cfg.sourceBasePath, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
if (fs1.exists(partitionMetaFile)) {
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
if (fs.exists(partitionMetaFile)) {
filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
}
@@ -199,19 +211,20 @@ public class HoodieSnapshotExporter {
String partition = tuple._1();
Path sourceFilePath = new Path(tuple._2());
Path toPartitionPath = new Path(cfg.targetOutputPath, partition);
FileSystem ifs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
if (!ifs.exists(toPartitionPath)) {
ifs.mkdirs(toPartitionPath);
if (!fs.exists(toPartitionPath)) {
fs.mkdirs(toPartitionPath);
}
FileUtil.copy(ifs, sourceFilePath, ifs, new Path(toPartitionPath, sourceFilePath.getName()), false,
ifs.getConf());
FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()), false,
fs.getConf());
});
// Also copy the .commit files
LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
final FileSystem fileSystem = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
FileStatus[] commitFilesToCopy =
fs.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
fileSystem.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
return true;
} else {
@@ -223,39 +236,37 @@ public class HoodieSnapshotExporter {
for (FileStatus commitStatus : commitFilesToCopy) {
Path targetFilePath =
new Path(cfg.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
if (!fs.exists(targetFilePath.getParent())) {
fs.mkdirs(targetFilePath.getParent());
if (!fileSystem.exists(targetFilePath.getParent())) {
fileSystem.mkdirs(targetFilePath.getParent());
}
if (fs.exists(targetFilePath)) {
if (fileSystem.exists(targetFilePath)) {
LOG.error(
String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
}
FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, fs.getConf());
FileUtil.copy(fileSystem, commitStatus.getPath(), fileSystem, targetFilePath, false, fileSystem.getConf());
}
}
private void createSuccessTag(FileSystem fs, String targetOutputPath) throws IOException {
Path successTagPath = new Path(targetOutputPath + "/_SUCCESS");
if (!fs.exists(successTagPath)) {
LOG.info(String.format("Creating _SUCCESS under target output path: %s", targetOutputPath));
fs.createNewFile(successTagPath);
}
private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config cfg) {
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
return new HoodieTableFileSystemView(tableMetadata, tableMetadata
.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
}
public static void main(String[] args) throws IOException {
// Take input configs
final Config cfg = new Config();
new JCommander(cfg, null, args);
// Create a spark job to do the snapshot export
SparkSession spark = SparkSession.builder().appName("Hoodie-snapshot-exporter")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate();
SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-exporter");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
LOG.info("Initializing spark job.");
HoodieSnapshotExporter hoodieSnapshotExporter = new HoodieSnapshotExporter();
hoodieSnapshotExporter.export(spark, cfg);
// Stop the job
spark.stop();
try {
new HoodieSnapshotExporter().export(jsc, cfg);
} finally {
jsc.stop();
}
}
}

View File

@@ -0,0 +1,10 @@
package org.apache.hudi.utilities.exception;
import org.apache.hudi.exception.HoodieException;
public class HoodieSnapshotExporterException extends HoodieException {
public HoodieSnapshotExporterException(String msg) {
super(msg);
}
}

View File

@@ -31,8 +31,10 @@ import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator;
import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import com.beust.jcommander.ParameterException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
@@ -43,11 +45,12 @@ import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@@ -56,9 +59,9 @@ import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -85,7 +88,6 @@ public class TestHoodieSnapshotExporter {
sourcePath = dfsBasePath + "/source/";
targetPath = dfsBasePath + "/target/";
dfs.mkdirs(new Path(sourcePath));
dfs.mkdirs(new Path(targetPath));
HoodieTableMetaClient
.initTableType(jsc.hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME,
HoodieAvroPayload.class.getName());
@@ -140,7 +142,7 @@ public class TestHoodieSnapshotExporter {
@Test
public void testExportAsHudi() throws IOException {
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
new HoodieSnapshotExporter().export(jsc, cfg);
// Check results
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean")));
@@ -159,18 +161,61 @@ public class TestHoodieSnapshotExporter {
assertTrue(dfs.exists(new Path(partition + "/.hoodie_partition_metadata")));
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
}
}
public static class TestHoodieSnapshotExporterForEarlyAbort extends ExporterTestHarness {
private HoodieSnapshotExporter.Config cfg;
@Rule
public ExpectedException exceptionRule = ExpectedException.none();
@Before
public void setUp() throws Exception {
super.setUp();
cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
cfg.outputFormat = OutputFormatValidator.HUDI;
}
@Test
public void testExportEmptyDataset() throws IOException {
public void testExportWhenTargetPathExists() throws IOException {
// make target output path present
dfs.mkdirs(new Path(targetPath));
// export
exceptionRule.expect(HoodieSnapshotExporterException.class);
exceptionRule.expectMessage("The target output path already exists.");
new HoodieSnapshotExporter().export(jsc, cfg);
}
@Test
public void testExportDatasetWithNoCommit() throws IOException {
// delete commit files
List<Path> commitFiles = Arrays.stream(dfs.listStatus(new Path(sourcePath + "/.hoodie")))
.map(FileStatus::getPath)
.filter(filePath -> filePath.getName().endsWith(".commit"))
.collect(Collectors.toList());
for (Path p : commitFiles) {
dfs.delete(p, false);
}
// export
exceptionRule.expect(HoodieSnapshotExporterException.class);
exceptionRule.expectMessage("No commits present. Nothing to snapshot.");
new HoodieSnapshotExporter().export(jsc, cfg);
}
@Test
public void testExportDatasetWithNoPartition() throws IOException {
// delete all source data
dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
// export
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
// Check results
assertEquals("Target path should be empty.", 0, dfs.listStatus(new Path(targetPath)).length);
assertFalse(dfs.exists(new Path(targetPath + "/_SUCCESS")));
exceptionRule.expect(HoodieSnapshotExporterException.class);
exceptionRule.expectMessage("The source dataset has 0 partition to snapshot.");
new HoodieSnapshotExporter().export(jsc, cfg);
}
}
@@ -191,7 +236,7 @@ public class TestHoodieSnapshotExporter {
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
cfg.outputFormat = format;
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
new HoodieSnapshotExporter().export(jsc, cfg);
assertEquals(NUM_RECORDS, sqlContext.read().format(format).load(targetPath).count());
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
}
@@ -228,7 +273,7 @@ public class TestHoodieSnapshotExporter {
public void testExportWithPartitionField() throws IOException {
// `driver` field is set in HoodieTestDataGenerator
cfg.outputPartitionField = "driver";
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
new HoodieSnapshotExporter().export(jsc, cfg);
assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count());
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
@@ -238,7 +283,7 @@ public class TestHoodieSnapshotExporter {
@Test
public void testExportForUserDefinedPartitioner() throws IOException {
cfg.outputPartitioner = UserDefinedPartitioner.class.getName();
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
new HoodieSnapshotExporter().export(jsc, cfg);
assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count());
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));