1
0

[HUDI-344] Hudi Dataset Snapshot Exporter (#1360)

Co-authored-by: jason1993 <261049174@qq.com>
This commit is contained in:
openopen2
2020-03-10 09:17:51 +08:00
committed by GitHub
parent f93e64fee4
commit 44700d531a
3 changed files with 496 additions and 0 deletions

View File

@@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.SerializableConfiguration;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.DataSource;
import scala.Tuple2;
import scala.collection.JavaConversions;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* Export the latest records of Hudi dataset to a set of external files (e.g., plain parquet files).
*
* @experimental This export is an experimental tool. If you want to export hudi to hudi, please use HoodieSnapshotCopier.
*/
public class HoodieSnapshotExporter {
private static final Logger LOG = LogManager.getLogger(HoodieSnapshotExporter.class);
public static class Config implements Serializable {
@Parameter(names = {"--source-base-path"}, description = "Base path for the source Hudi dataset to be snapshotted", required = true)
String sourceBasePath = null;
@Parameter(names = {"--target-base-path"}, description = "Base path for the target output files (snapshots)", required = true)
String targetOutputPath = null;
@Parameter(names = {"--output-format"}, description = "e.g. Hudi or Parquet", required = true)
String outputFormat;
@Parameter(names = {"--output-partition-field"}, description = "A field to be used by Spark repartitioning")
String outputPartitionField;
}
public int export(SparkSession spark, Config cfg) throws IOException {
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
final TableFileSystemView.BaseFileOnlyView fsView = new HoodieTableFileSystemView(tableMetadata,
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
// Get the latest commit
Option<HoodieInstant> latestCommit =
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
if (!latestCommit.isPresent()) {
LOG.error("No commits present. Nothing to snapshot");
return -1;
}
final String latestCommitTimestamp = latestCommit.get().getTimestamp();
LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
latestCommitTimestamp));
List<String> partitions = FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, false);
if (partitions.size() > 0) {
List<String> dataFiles = new ArrayList<>();
for (String partition : partitions) {
dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList()));
}
try {
DataSource.lookupDataSource(cfg.outputFormat, spark.sessionState().conf());
} catch (Exception e) {
LOG.error(String.format("The %s output format is not supported! ", cfg.outputFormat));
return -1;
}
if (!cfg.outputFormat.equalsIgnoreCase("hudi")) {
// Do transformation
// A field to do simple Spark repartitioning
DataFrameWriter<Row> write = null;
Dataset<Row> original = spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
List<Column> needColumns = Arrays.asList(original.columns()).stream().filter(col -> !col.startsWith("_hoodie_")).map(col -> new Column(col)).collect(Collectors.toList());
Dataset<Row> reader = original.select(JavaConversions.asScalaIterator(needColumns.iterator()).toSeq());
if (!StringUtils.isNullOrEmpty(cfg.outputPartitionField)) {
write = reader.repartition(new Column(cfg.outputPartitionField))
.write().partitionBy(cfg.outputPartitionField);
} else {
write = reader.write();
}
write.format(cfg.outputFormat)
.mode(SaveMode.Overwrite)
.save(cfg.targetOutputPath);
} else {
// No transformation is needed for output format "HUDI", just copy the original files.
copySnapshot(jsc, fs, cfg, partitions, dataFiles, latestCommitTimestamp, serConf);
}
} else {
LOG.info("The job has 0 partition to copy.");
}
return 0;
}
private void copySnapshot(JavaSparkContext jsc,
FileSystem fs,
Config cfg,
List<String> partitions,
List<String> dataFiles,
String latestCommitTimestamp,
SerializableConfiguration serConf) throws IOException {
// Make sure the output directory is empty
Path outputPath = new Path(cfg.targetOutputPath);
if (fs.exists(outputPath)) {
LOG.warn(String.format("The output path %s targetBasePath already exists, deleting", outputPath));
fs.delete(new Path(cfg.targetOutputPath), true);
}
jsc.parallelize(partitions, partitions.size()).flatMap(partition -> {
// Only take latest version files <= latestCommit.
FileSystem fs1 = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
List<Tuple2<String, String>> filePaths = new ArrayList<>();
dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile)));
// also need to copy over partition metadata
Path partitionMetaFile =
new Path(new Path(cfg.sourceBasePath, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
if (fs1.exists(partitionMetaFile)) {
filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
}
return filePaths.iterator();
}).foreach(tuple -> {
String partition = tuple._1();
Path sourceFilePath = new Path(tuple._2());
Path toPartitionPath = new Path(cfg.targetOutputPath, partition);
FileSystem ifs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
if (!ifs.exists(toPartitionPath)) {
ifs.mkdirs(toPartitionPath);
}
FileUtil.copy(ifs, sourceFilePath, ifs, new Path(toPartitionPath, sourceFilePath.getName()), false,
ifs.getConf());
});
// Also copy the .commit files
LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
FileStatus[] commitFilesToCopy =
fs.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
return true;
} else {
String commitTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
return HoodieTimeline.compareTimestamps(commitTime, latestCommitTimestamp,
HoodieTimeline.LESSER_OR_EQUAL);
}
});
for (FileStatus commitStatus : commitFilesToCopy) {
Path targetFilePath =
new Path(cfg.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
if (!fs.exists(targetFilePath.getParent())) {
fs.mkdirs(targetFilePath.getParent());
}
if (fs.exists(targetFilePath)) {
LOG.error(
String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
}
FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, fs.getConf());
}
}
public static void main(String[] args) throws IOException {
// Take input configs
final Config cfg = new Config();
new JCommander(cfg, null, args);
// Create a spark job to do the snapshot export
SparkSession spark = SparkSession.builder().appName("Hoodie-snapshot-exporter")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate();
LOG.info("Initializing spark job.");
HoodieSnapshotExporter hoodieSnapshotExporter = new HoodieSnapshotExporter();
hoodieSnapshotExporter.export(spark, cfg);
// Stop the job
spark.stop();
}
}

View File

@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* Test utils for data source tests.
*/
public class DataSourceTestUtils {
public static Option<String> convertToString(HoodieRecord record) {
try {
String str = ((TestRawTripPayload) record.getData()).getJsonData();
str = "{" + str.substring(str.indexOf("\"timestamp\":"));
// Remove the last } bracket
str = str.substring(0, str.length() - 1);
return Option.of(str + ", \"partition\": \"" + record.getPartitionPath() + "\"}");
} catch (IOException e) {
return Option.empty();
}
}
public static List<String> convertToStringList(List<HoodieRecord> records) {
return records.stream().map(DataSourceTestUtils::convertToString).filter(Option::isPresent).map(Option::get)
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestHoodieSnapshotExporter extends HoodieCommonTestHarness {
private static String TEST_WRITE_TOKEN = "1-0-1";
private SparkSession spark = null;
private HoodieTestDataGenerator dataGen = null;
private String outputPath = null;
private String rootPath = null;
private FileSystem fs = null;
private Map commonOpts;
private HoodieSnapshotExporter.Config cfg;
private JavaSparkContext jsc = null;
@Before
public void initialize() throws IOException {
spark = SparkSession.builder()
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();
jsc = new JavaSparkContext(spark.sparkContext());
dataGen = new HoodieTestDataGenerator();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, spark.sparkContext().hadoopConfiguration());
commonOpts = new HashMap();
commonOpts.put("hoodie.insert.shuffle.parallelism", "4");
commonOpts.put("hoodie.upsert.shuffle.parallelism", "4");
commonOpts.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
commonOpts.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition");
commonOpts.put(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp");
commonOpts.put(HoodieWriteConfig.TABLE_NAME, "hoodie_test");
cfg = new HoodieSnapshotExporter.Config();
cfg.sourceBasePath = basePath;
cfg.targetOutputPath = outputPath = basePath + "/target";
cfg.outputFormat = "json";
cfg.outputPartitionField = "partition";
}
@After
public void cleanup() {
if (spark != null) {
spark.stop();
}
}
@Test
public void testSnapshotExporter() throws IOException {
// Insert Operation
List<String> records = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100));
Dataset<Row> inputDF = spark.read().json(new JavaSparkContext(spark.sparkContext()).parallelize(records, 2));
inputDF.write().format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
.mode(SaveMode.Overwrite)
.save(basePath);
long sourceCount = inputDF.count();
HoodieSnapshotExporter hoodieSnapshotExporter = new HoodieSnapshotExporter();
hoodieSnapshotExporter.export(spark, cfg);
long targetCount = spark.read().json(outputPath).count();
assertTrue(sourceCount == targetCount);
// Test Invalid OutputFormat
cfg.outputFormat = "foo";
int isError = hoodieSnapshotExporter.export(spark, cfg);
assertTrue(isError == -1);
}
// for testEmptySnapshotCopy
public void init() throws IOException {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
rootPath = "file://" + folder.getRoot().getAbsolutePath();
basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
outputPath = rootPath + "/output";
final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath, hadoopConf);
HoodieTestUtils.init(hadoopConf, basePath);
}
@Test
public void testEmptySnapshotCopy() throws IOException {
init();
// There is no real data (only .hoodie directory)
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
assertFalse(fs.exists(new Path(outputPath)));
// Do the snapshot
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc, basePath, outputPath, true);
// Nothing changed; we just bail out
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
}
// TODO - uncomment this after fixing test failures
// @Test
public void testSnapshotCopy() throws Exception {
// Generate some commits and corresponding parquets
String commitTime1 = "20160501010101";
String commitTime2 = "20160502020601";
String commitTime3 = "20160506030611";
new File(basePath + "/.hoodie").mkdirs();
new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
// Only first two have commit files
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime3 + ".inflight").createNewFile();
// Some parquet files
new File(basePath + "/2016/05/01/").mkdirs();
new File(basePath + "/2016/05/02/").mkdirs();
new File(basePath + "/2016/05/06/").mkdirs();
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{"2016/05/01", "2016/05/02", "2016/05/06"},
basePath);
// Make commit1
File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
file11.createNewFile();
File file12 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
file12.createNewFile();
File file13 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
file13.createNewFile();
// Make commit2
File file21 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
file21.createNewFile();
File file22 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
file22.createNewFile();
File file23 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
file23.createNewFile();
// Make commit3
File file31 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
file31.createNewFile();
File file32 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
file32.createNewFile();
File file33 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
file33.createNewFile();
// Do a snapshot copy
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc, basePath, outputPath, false);
// Check results
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file12.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file13.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file21.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file22.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file23.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + file31.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + file32.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + file33.getName())));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + ".commit")));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + ".commit")));
assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".commit")));
assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".inflight")));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
}
}