1
0

[HUDI-344] Improve exporter tests (#1404)

This commit is contained in:
Raymond Xu
2020-03-15 05:24:30 -07:00
committed by GitHub
parent 99b7e9eb9e
commit 14323cb100
3 changed files with 141 additions and 216 deletions

View File

@@ -132,6 +132,7 @@ public class HoodieSnapshotExporter {
// No transformation is needed for output format "HUDI", just copy the original files.
copySnapshot(jsc, fs, cfg, partitions, dataFiles, latestCommitTimestamp, serConf);
}
createSuccessTag(fs, cfg.targetOutputPath);
} else {
LOG.info("The job has 0 partition to copy.");
}
@@ -205,6 +206,14 @@ public class HoodieSnapshotExporter {
}
}
private void createSuccessTag(FileSystem fs, String targetOutputPath) throws IOException {
Path successTagPath = new Path(targetOutputPath + "/_SUCCESS");
if (!fs.exists(successTagPath)) {
LOG.info(String.format("Creating _SUCCESS under target output path: %s", targetOutputPath));
fs.createNewFile(successTagPath);
}
}
public static void main(String[] args) throws IOException {
// Take input configs
final Config cfg = new Config();

View File

@@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* Test utils for data source tests.
*/
public class DataSourceTestUtils {
public static Option<String> convertToString(HoodieRecord record) {
try {
String str = ((TestRawTripPayload) record.getData()).getJsonData();
str = "{" + str.substring(str.indexOf("\"timestamp\":"));
// Remove the last } bracket
str = str.substring(0, str.length() - 1);
return Option.of(str + ", \"partition\": \"" + record.getPartitionPath() + "\"}");
} catch (IOException e) {
return Option.empty();
}
}
public static List<String> convertToStringList(List<HoodieRecord> records) {
return records.stream().map(DataSourceTestUtils::convertToString).filter(Option::isPresent).map(Option::get)
.collect(Collectors.toList());
}
}

View File

@@ -18,205 +18,171 @@
package org.apache.hudi.utilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestHoodieSnapshotExporter extends HoodieCommonTestHarness {
private static String TEST_WRITE_TOKEN = "1-0-1";
@RunWith(Enclosed.class)
public class TestHoodieSnapshotExporter {
private SparkSession spark = null;
private HoodieTestDataGenerator dataGen = null;
private String outputPath = null;
private String rootPath = null;
private FileSystem fs = null;
private Map commonOpts;
private HoodieSnapshotExporter.Config cfg;
private JavaSparkContext jsc = null;
static class ExporterTestHarness extends HoodieClientTestHarness {
@Before
public void initialize() throws IOException {
spark = SparkSession.builder()
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();
jsc = new JavaSparkContext(spark.sparkContext());
dataGen = new HoodieTestDataGenerator();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, spark.sparkContext().hadoopConfiguration());
commonOpts = new HashMap();
static final Logger LOG = LogManager.getLogger(ExporterTestHarness.class);
static final int NUM_RECORDS = 100;
static final String COMMIT_TIME = "20200101000000";
static final String PARTITION_PATH = "2020/01/01";
static final String TABLE_NAME = "testing";
String sourcePath;
String targetPath;
commonOpts.put("hoodie.insert.shuffle.parallelism", "4");
commonOpts.put("hoodie.upsert.shuffle.parallelism", "4");
commonOpts.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
commonOpts.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition");
commonOpts.put(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp");
commonOpts.put(HoodieWriteConfig.TABLE_NAME, "hoodie_test");
@Before
public void setUp() throws Exception {
initSparkContexts();
initDFS();
dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH});
// Initialize test data dirs
sourcePath = dfsBasePath + "/source/";
targetPath = dfsBasePath + "/target/";
dfs.mkdirs(new Path(sourcePath));
dfs.mkdirs(new Path(targetPath));
HoodieTableMetaClient
.initTableType(jsc.hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME,
HoodieAvroPayload.class.getName());
cfg = new HoodieSnapshotExporter.Config();
// Prepare data as source Hudi dataset
HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath);
HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg);
hdfsWriteClient.startCommitWithTime(COMMIT_TIME);
List<HoodieRecord> records = dataGen.generateInserts(COMMIT_TIME, NUM_RECORDS);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME);
hdfsWriteClient.close();
cfg.sourceBasePath = basePath;
cfg.targetOutputPath = outputPath = basePath + "/target";
cfg.outputFormat = "json";
cfg.outputPartitionField = "partition";
RemoteIterator<LocatedFileStatus> itr = dfs.listFiles(new Path(sourcePath), true);
while (itr.hasNext()) {
LOG.info(">>> Prepared test file: " + itr.next().getPath());
}
}
}
@After
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupDFS();
cleanupTestDataGenerator();
}
@After
public void cleanup() {
if (spark != null) {
spark.stop();
private HoodieWriteConfig getHoodieWriteConfig(String basePath) {
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withEmbeddedTimelineServerEnabled(false)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withBulkInsertParallelism(2)
.forTable(TABLE_NAME)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
.build();
}
}
@Test
public void testSnapshotExporter() throws IOException {
// Insert Operation
List<String> records = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100));
Dataset<Row> inputDF = spark.read().json(new JavaSparkContext(spark.sparkContext()).parallelize(records, 2));
inputDF.write().format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
.mode(SaveMode.Overwrite)
.save(basePath);
long sourceCount = inputDF.count();
public static class TestHoodieSnapshotExporterForHudi extends ExporterTestHarness {
HoodieSnapshotExporter hoodieSnapshotExporter = new HoodieSnapshotExporter();
hoodieSnapshotExporter.export(spark, cfg);
@Test
public void testExportAsHudi() throws IOException {
HoodieSnapshotExporter.Config cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
cfg.outputFormat = "hudi";
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
long targetCount = spark.read().json(outputPath).count();
// Check results
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean.inflight")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean.requested")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/hoodie.properties")));
String partition = targetPath + "/" + PARTITION_PATH;
long numParquetFiles = Arrays.stream(dfs.listStatus(new Path(partition)))
.filter(fileStatus -> fileStatus.getPath().toString().endsWith(".parquet"))
.count();
assertTrue("There should exist at least 1 parquet file.", numParquetFiles >= 1);
assertEquals(NUM_RECORDS, sqlContext.read().parquet(partition).count());
assertTrue(dfs.exists(new Path(partition + "/.hoodie_partition_metadata")));
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
}
assertTrue(sourceCount == targetCount);
@Test
public void testExportEmptyDataset() throws IOException {
// delete all source data
dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
// Test Invalid OutputFormat
cfg.outputFormat = "foo";
int isError = hoodieSnapshotExporter.export(spark, cfg);
assertTrue(isError == -1);
// export
HoodieSnapshotExporter.Config cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
cfg.outputFormat = "hudi";
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
// Check results
assertEquals("Target path should be empty.", 0, dfs.listStatus(new Path(targetPath)).length);
assertFalse(dfs.exists(new Path(targetPath + "/_SUCCESS")));
}
}
// for testEmptySnapshotCopy
public void init() throws IOException {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
rootPath = "file://" + folder.getRoot().getAbsolutePath();
basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
outputPath = rootPath + "/output";
@RunWith(Parameterized.class)
public static class TestHoodieSnapshotExporterForNonHudi extends ExporterTestHarness {
final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath, hadoopConf);
HoodieTestUtils.init(hadoopConf, basePath);
}
@Parameters
public static Iterable<String[]> formats() {
return Arrays.asList(new String[][] {{"json"}, {"parquet"}});
}
@Test
public void testEmptySnapshotCopy() throws IOException {
init();
// There is no real data (only .hoodie directory)
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
assertFalse(fs.exists(new Path(outputPath)));
@Parameter
public String format;
// Do the snapshot
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc, basePath, outputPath, true);
// Nothing changed; we just bail out
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
}
// TODO - uncomment this after fixing test failures
// @Test
public void testSnapshotCopy() throws Exception {
// Generate some commits and corresponding parquets
String commitTime1 = "20160501010101";
String commitTime2 = "20160502020601";
String commitTime3 = "20160506030611";
new File(basePath + "/.hoodie").mkdirs();
new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
// Only first two have commit files
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime3 + ".inflight").createNewFile();
// Some parquet files
new File(basePath + "/2016/05/01/").mkdirs();
new File(basePath + "/2016/05/02/").mkdirs();
new File(basePath + "/2016/05/06/").mkdirs();
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{"2016/05/01", "2016/05/02", "2016/05/06"},
basePath);
// Make commit1
File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
file11.createNewFile();
File file12 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
file12.createNewFile();
File file13 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
file13.createNewFile();
// Make commit2
File file21 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
file21.createNewFile();
File file22 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
file22.createNewFile();
File file23 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
file23.createNewFile();
// Make commit3
File file31 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
file31.createNewFile();
File file32 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
file32.createNewFile();
File file33 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
file33.createNewFile();
// Do a snapshot copy
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc, basePath, outputPath, false);
// Check results
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file12.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file13.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file21.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file22.getName())));
assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file23.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + file31.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + file32.getName())));
assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + file33.getName())));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + ".commit")));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + ".commit")));
assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".commit")));
assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".inflight")));
assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
@Test
public void testExportAsNonHudi() throws IOException {
HoodieSnapshotExporter.Config cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
cfg.outputFormat = format;
new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg);
assertEquals(NUM_RECORDS, sqlContext.read().format(format).load(targetPath).count());
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
}
}
}