1
0

[HUDI-1989] Fix flakiness in TestHoodieMergeOnReadTable (#3574)

* [HUDI-1989] Refactor clustering tests for MoR table

* refactor assertion helper

* add CheckedFunction

* SparkClientFunctionalTestHarness.java

* put back original test case

* move testcases out from TestHoodieMergeOnReadTable.java

* add TestHoodieSparkMergeOnReadTableRollback.java

* use SparkClientFunctionalTestHarness

* add tag
This commit is contained in:
Raymond Xu
2021-09-03 13:17:17 -07:00
committed by GitHub
parent 11398e8480
commit 6bd3ca98d6
10 changed files with 1722 additions and 1310 deletions

View File

@@ -20,10 +20,12 @@
package org.apache.hudi.testutils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.testutils.CheckedFunction;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
/**
@@ -38,4 +40,15 @@ public class Assertions {
assertAll(statuses.stream().map(status -> () ->
assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId())));
}
/**
* Assert each file size equal to its source of truth.
*
* @param fileSizeGetter to retrieve the source of truth of file size.
*/
public static void assertFileSizesEqual(List<WriteStatus> statuses, CheckedFunction<WriteStatus, Long> fileSizeGetter) {
assertAll(statuses.stream().map(status -> () ->
assertEquals(fileSizeGetter.apply(status), status.getStat().getFileSizeInBytes())));
}
}

View File

@@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.hudi.client.functional;
package org.apache.hudi.functional;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.platform.suite.api.IncludeTags;
@@ -25,7 +25,7 @@ import org.junit.platform.suite.api.SelectPackages;
import org.junit.runner.RunWith;
@RunWith(JUnitPlatform.class)
@SelectPackages("org.apache.hudi.client.functional")
@SelectPackages({"org.apache.hudi.client.functional", "org.apache.hudi.table.functional"})
@IncludeTags("functional")
public class SparkClientFunctionalTestSuite {

View File

@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hadoop.fs.FileStatus;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("functional")
class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTestHarness {
private static Stream<Arguments> testClustering() {
return Stream.of(
Arguments.of(true, true, true),
Arguments.of(true, true, false),
Arguments.of(true, false, true),
Arguments.of(true, false, false),
Arguments.of(false, true, true),
Arguments.of(false, true, false),
Arguments.of(false, false, true),
Arguments.of(false, false, false)
);
}
@ParameterizedTest
@MethodSource
void testClustering(boolean doUpdates, boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
// set low compaction small File Size to generate more file groups.
HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(10L)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(1024 * 1024 * 1024)
.parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true)
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0)
.withInlineClusteringNumCommits(1)
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build())
.withRollbackUsingMarkers(false);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, cfg.getProps());
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
/*
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
/*
* Write 2 (more inserts to create new files)
*/
// we already set small file size to small number to force inserts to go into new file.
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
if (doUpdates) {
/*
* Write 3 (updates)
*/
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateRecords(metaClient, records, client, cfg, newCommitTime);
}
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
// expect 2 base files for each partition
assertEquals(dataGen.getPartitionPaths().length * 2, allFiles.length);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
// verify all files are included in clustering plan.
assertEquals(allFiles.length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count());
// Do the clustering and validate
client.cluster(clusteringCommitTime, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context(), metaClient);
Stream<HoodieBaseFile> dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths())
.flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p));
// verify there should be only one base file per partition after clustering.
assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count());
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(),
"Expecting a single commit.");
assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction());
if (cfg.populateMetaFields()) {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")),
"Must contain 200 records");
} else {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty()));
}
}
}
}

View File

@@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientFunctionalTestHarness {
private JobConf roSnapshotJobConf;
private JobConf roJobConf;
private JobConf rtJobConf;
@BeforeEach
void setUp() {
roSnapshotJobConf = new JobConf(hadoopConf());
roJobConf = new JobConf(hadoopConf());
rtJobConf = new JobConf(hadoopConf());
}
// test incremental read does not go past compaction instant for RO views
// For RT views, incremental read can go past compaction
@Test
public void testIncrementalReadsWithCompaction() throws Exception {
final String partitionPath = "2020/02/20"; // use only one partition for this test
final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
Properties props = new Properties();
props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
HoodieWriteConfig cfg = getConfig(true);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
/*
* Write 1 (only inserts)
*/
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 200);
insertRecords(metaClient, records001, client, cfg, commitTime1);
// verify only one base file shows up with commit time 001
FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath);
validateFiles(partitionPath, 1, snapshotROFiles, false, roSnapshotJobConf, 200, commitTime1);
FileStatus[] incrementalROFiles = getROIncrementalFiles(partitionPath, true);
validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
Path firstFilePath = incrementalROFiles[0].getPath();
FileStatus[] incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf,200, commitTime1);
assertEquals(firstFilePath, incrementalRTFiles[0].getPath());
/*
* Write 2 (updates)
*/
String updateTime = "004";
client.startCommitWithTime(updateTime);
List<HoodieRecord> records004 = dataGen.generateUpdates(updateTime, 100);
updateRecords(metaClient, records004, client, cfg, updateTime);
// verify RO incremental reads - only one base file shows up because updates to into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
// verify RT incremental reads includes updates also
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime);
// request compaction, but do not perform compaction
String compactionCommitTime = "005";
client.scheduleCompactionAtInstant("005", Option.empty());
// verify RO incremental reads - only one base file shows up because updates go into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
validateFiles(partitionPath,1, incrementalROFiles, false, roJobConf, 200, commitTime1);
// verify RT incremental reads includes updates also
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime);
// write 3 - more inserts
String insertsTime = "006";
List<HoodieRecord> records006 = dataGen.generateInserts(insertsTime, 200);
client.startCommitWithTime(insertsTime);
insertRecords(metaClient, records006, client, cfg, insertsTime);
// verify new write shows up in snapshot mode even though there is pending compaction
snapshotROFiles = getROSnapshotFiles(partitionPath);
validateFiles(partitionPath, 2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
// verify 006 does not show up in RO mode because of pending compaction
validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1);
// verify that if stopAtCompaction is disabled, inserts from "insertsTime" show up
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
validateFiles(partitionPath,2, incrementalROFiles, false, roJobConf, 400, commitTime1, insertsTime);
// verify 006 shows up in RT views
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateFiles(partitionPath, 2, incrementalRTFiles, true, rtJobConf, 400, commitTime1, updateTime, insertsTime);
// perform the scheduled compaction
client.compact(compactionCommitTime);
// verify new write shows up in snapshot mode after compaction is complete
snapshotROFiles = getROSnapshotFiles(partitionPath);
validateFiles(partitionPath,2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, compactionCommitTime,
insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true);
assertTrue(incrementalROFiles.length == 2);
// verify 006 shows up because of pending compaction
validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf, 400, commitTime1, compactionCommitTime,
insertsTime);
}
}
private FileStatus[] getROSnapshotFiles(String partitionPath)
throws Exception {
FileInputFormat.setInputPaths(roSnapshotJobConf, Paths.get(basePath(), partitionPath).toString());
return listStatus(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), roSnapshotJobConf, false);
}
private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction)
throws Exception {
return getROIncrementalFiles(partitionPath, "000", -1, stopAtCompaction);
}
private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction)
throws Exception {
setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction);
FileInputFormat.setInputPaths(roJobConf, Paths.get(basePath(), partitionPath).toString());
return listStatus(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), roJobConf, false);
}
private FileStatus[] getRTIncrementalFiles(String partitionPath)
throws Exception {
return getRTIncrementalFiles(partitionPath, "000", -1);
}
private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull)
throws Exception {
setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false);
FileInputFormat.setInputPaths(rtJobConf, Paths.get(basePath(), partitionPath).toString());
return listStatus(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), rtJobConf, true);
}
private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) {
String modePropertyName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String startCommitTimestampName =
String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(startCommitTimestampName, startCommit);
String maxCommitPulls =
String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
String stopAtCompactionPropName =
String.format(HoodieHiveUtils.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction);
}
private void validateFiles(String partitionPath, int expectedNumFiles,
FileStatus[] files, boolean realtime, JobConf jobConf,
int expectedRecords, String... expectedCommits) {
assertEquals(expectedNumFiles, files.length);
Set<String> expectedCommitsSet = Arrays.stream(expectedCommits).collect(Collectors.toSet());
List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(),
Collections.singletonList(Paths.get(basePath(), partitionPath).toString()), basePath(), jobConf, realtime);
assertEquals(expectedRecords, records.size());
Set<String> actualCommits = records.stream().map(r ->
r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()).collect(Collectors.toSet());
assertEquals(expectedCommitsSet, actualCommits);
}
private FileStatus[] listStatus(HoodieFileFormat baseFileFormat, JobConf jobConf, boolean realtime) throws IOException {
// This is required as Hoodie InputFormats do not extend a common base class and FileInputFormat's
// listStatus() is protected.
FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(baseFileFormat, realtime, jobConf);
switch (baseFileFormat) {
case PARQUET:
if (realtime) {
return ((HoodieParquetRealtimeInputFormat)inputFormat).listStatus(jobConf);
} else {
return ((HoodieParquetInputFormat)inputFormat).listStatus(jobConf);
}
case HFILE:
if (realtime) {
return ((HoodieHFileRealtimeInputFormat)inputFormat).listStatus(jobConf);
} else {
return ((HoodieHFileInputFormat)inputFormat).listStatus(jobConf);
}
default:
throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
}
}
}

View File

@@ -0,0 +1,267 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClientFunctionalTestHarness {
private static Stream<Arguments> testSimpleInsertAndUpdate() {
return Stream.of(
Arguments.of(HoodieFileFormat.PARQUET, true),
Arguments.of(HoodieFileFormat.PARQUET, false),
Arguments.of(HoodieFileFormat.HFILE, true)
);
}
@ParameterizedTest
@MethodSource
public void testSimpleInsertAndUpdate(HoodieFileFormat fileFormat, boolean populateMetaFields) throws Exception {
Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), fileFormat.toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
/*
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
insertRecords(metaClient, records, client, cfg, newCommitTime);
/*
* Write 2 (updates)
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateRecords(metaClient, records, client, cfg, newCommitTime);
String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
client.compact(compactionCommitTime);
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(),
"Expecting a single commit.");
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime));
if (cfg.populateMetaFields()) {
assertEquals(200, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")),
"Must contain 200 records");
} else {
assertEquals(200, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty()));
}
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws Exception {
Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
/*
* Write 1 (only inserts, written as base file)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001");
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the base files we wrote in the delta commit");
/*
* Write 2 (only updates, written to .log file)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, records);
writeRecords = jsc().parallelize(records, 1);
statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
/*
* Write 2 (only deletes, written to .log file)
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records);
statuses = client.upsert(jsc().parallelize(fewRecordsForDelete, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("004", deltaCommit.get().getTimestamp(), "Latest Delta commit should be 004");
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
allFiles = listAllBaseFilesInPath(hoodieTable);
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath(), new JobConf(hadoopConf()), true, false);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals(0, recordsRead.size(), "Must contain 0 records");
}
}
@Test
public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception {
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, HoodieIndex.IndexType.INMEMORY).build();
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime);
writeClient.commit(newCommitTime, statuses);
HoodieTable table = HoodieSparkTable.create(config, context(), metaClient);
TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
assertEquals(0, allSlices.stream().filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count());
assertTrue(allSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
long logFileCount = allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
if (logFileCount > 0) {
// check the log versions start from the base version
assertTrue(allSlices.stream().map(slice -> slice.getLogFiles().findFirst().get().getLogVersion())
.allMatch(version -> version.equals(HoodieLogFile.LOGFILE_BASE_VERSION)));
}
numLogFiles += logFileCount;
}
assertTrue(numLogFiles > 0);
// Do a compaction
String instantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
statuses = (JavaRDD<WriteStatus>) writeClient.compact(instantTime);
String extension = table.getBaseFileExtension();
assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count());
assertEquals(numLogFiles, statuses.count());
writeClient.commitCompaction(instantTime, statuses, Option.empty());
}
}
}

View File

@@ -0,0 +1,643 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunctionalTestHarness {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws Exception {
// Set TableType to COW
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
/*
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
// verify there are no errors
assertNoWriteErrors(statuses);
client.commit(newCommitTime, jsc().parallelize(statuses));
metaClient = HoodieTableMetaClient.reload(metaClient);
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertTrue(commit.isPresent());
assertEquals("001", commit.get().getTimestamp(), "commit should be 001");
/*
* Write 2 (updates)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc().parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
// Set TableType to MOR
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ);
// rollback a COW commit when TableType is MOR
client.rollback(newCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
final String absentCommit = newCommitTime;
assertAll(tableView.getLatestBaseFiles().map(file -> () -> assertNotEquals(absentCommit, file.getCommitTime())));
}
}
private static Stream<Arguments> testRollbackWithDeltaAndCompactionCommit() {
return Stream.of(
Arguments.of(true, true),
Arguments.of(true, false),
Arguments.of(false, true),
Arguments.of(false, false)
);
}
@ParameterizedTest
@MethodSource
void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers, boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// Test delta commit rollback
/*
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001");
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the base files we wrote in the delta commit");
/*
* Write 2 (inserts + updates - testing failed delta commit)
*/
final String commitTime1 = "002";
// WriteClient with custom config (disable small file handling)
try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));) {
secondClient.startCommitWithTime(commitTime1);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
basePath());
assertEquals(200, recordsRead.size());
statuses = secondClient.upsert(jsc().parallelize(copyOfRecords, 1), commitTime1).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
// Test failed delta commit rollback
secondClient.rollback(commitTime1);
allFiles = listAllBaseFilesInPath(hoodieTable);
// After rollback, there should be no base file with the failed commit time
List<String> remainingFiles = Arrays.stream(allFiles).filter(file -> file.getPath().getName()
.contains(commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList());
assertEquals(0, remainingFiles.size(), "There files should have been rolled-back "
+ "when rolling back commit " + commitTime1 + " but are still remaining. Files: " + remainingFiles);
dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath());
assertEquals(200, recordsRead.size());
}
/*
* Write 3 (inserts + updates - testing successful delta commit)
*/
final String commitTime2 = "002";
try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(cfg);) {
thirdClient.startCommitWithTime(commitTime2);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
basePath());
assertEquals(200, recordsRead.size());
writeRecords = jsc().parallelize(copyOfRecords, 1);
writeStatusJavaRDD = thirdClient.upsert(writeRecords, commitTime2);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
// Test successful delta commit rollback
thirdClient.rollback(commitTime2);
allFiles = listAllBaseFilesInPath(hoodieTable);
// After rollback, there should be no base file with the failed commit time
assertEquals(0, Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime2)).count());
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath());
// check that the number of records read is still correct after rollback operation
assertEquals(200, recordsRead.size());
// Test compaction commit rollback
/*
* Write 4 (updates)
*/
newCommitTime = "003";
thirdClient.startCommitWithTime(newCommitTime);
writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
statuses = writeStatusJavaRDD.collect();
thirdClient.commit(newCommitTime, writeStatusJavaRDD);
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
thirdClient.compact(compactionInstantTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp();
assertTrue(Arrays.stream(listAllBaseFilesInPath(hoodieTable))
.anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime())));
thirdClient.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime),
hoodieTable);
allFiles = listAllBaseFilesInPath(hoodieTable);
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
assertAll(tableView.getLatestBaseFiles().map(file -> () -> assertNotEquals(compactedCommitTime, file.getCommitTime())));
}
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testMultiRollbackWithDeltaAndCompactionCommit(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
/*
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
client.close();
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001");
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"Should list the base files we wrote in the delta commit");
/*
* Write 2 (inserts + updates)
*/
newCommitTime = "002";
// WriteClient with custom config (disable small file handling)
try (SparkRDDWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(populateMetaFields))) {
nClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
basePath());
assertEquals(200, recordsRead.size());
statuses = nClient.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
nClient.commit(newCommitTime, writeStatusJavaRDD);
copyOfRecords.clear();
}
// Schedule a compaction
/*
* Write 3 (inserts + updates)
*/
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> newInserts = dataGen.generateInserts(newCommitTime, 100);
records = dataGen.generateUpdates(newCommitTime, records);
records.addAll(newInserts);
writeRecords = jsc().parallelize(records, 1);
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime = "004";
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
// Compaction commit
/*
* Write 4 (updates)
*/
newCommitTime = "005";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, records);
writeRecords = jsc().parallelize(records, 1);
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
compactionInstantTime = "006";
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
JavaRDD<WriteStatus> ws = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime);
client.commitCompaction(compactionInstantTime, ws, Option.empty());
allFiles = listAllBaseFilesInPath(hoodieTable);
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
/*
* Write 5 (updates)
*/
newCommitTime = "007";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
statuses = client.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
client.commit(newCommitTime, writeStatusJavaRDD);
copyOfRecords.clear();
// Rollback latest commit first
client.restoreToInstant("000");
metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = listAllBaseFilesInPath(hoodieTable);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
TableFileSystemView.SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
List<HoodieFileGroup> fileGroups =
((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList());
assertTrue(fileGroups.isEmpty());
// make sure there are no log files remaining
assertEquals(0L, ((HoodieTableFileSystemView) rtView).getAllFileGroups()
.filter(fileGroup -> fileGroup.getAllRawFileSlices().noneMatch(f -> f.getLogFiles().count() == 0))
.count());
}
}
private HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff(boolean populateMetaFields) {
HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withEmbeddedTimelineServerEnabled(true)
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024).parquetMaxFileSize(1024).build()).forTable("test-trip-table");
if (!populateMetaFields) {
addConfigsForPopulateMetaFields(cfgBuilder, false);
}
return cfgBuilder.build();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testInsertsGeneratedIntoLogFilesRollback(boolean rollbackUsingMarkers) throws Exception {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.INMEMORY).build();
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime);
// trigger an action
List<WriteStatus> writeStatuses = statuses.collect();
// Ensure that inserts are written to only log files
assertEquals(0,
writeStatuses.stream().filter(writeStatus -> !writeStatus.getStat().getPath().contains("log")).count());
assertTrue(
writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPath().contains("log")));
// rollback a failed commit
boolean rollback = writeClient.rollback(newCommitTime);
assertTrue(rollback);
// insert 100 records
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 100);
recordsRDD = jsc().parallelize(records, 1);
writeClient.insert(recordsRDD, newCommitTime).collect();
// Sleep for small interval (at least 1 second) to force a new rollback start time.
Thread.sleep(1000);
// We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs
// and calling rollback twice
final String lastCommitTime = newCommitTime;
// Save the .commit file to local directory.
// Rollback will be called twice to test the case where rollback failed first time and retried.
// We got the "BaseCommitTime cannot be null" exception before the fix
java.nio.file.Path tempFolder = Files.createTempDirectory(this.getClass().getCanonicalName());
Map<String, String> fileNameMap = new HashMap<>();
for (HoodieInstant.State state : Arrays.asList(HoodieInstant.State.REQUESTED, HoodieInstant.State.INFLIGHT)) {
HoodieInstant toCopy = new HoodieInstant(state, HoodieTimeline.DELTA_COMMIT_ACTION, lastCommitTime);
File file = Files.createTempFile(tempFolder, null, null).toFile();
metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), toCopy.getFileName()),
new Path(file.getAbsolutePath()));
fileNameMap.put(file.getAbsolutePath(), toCopy.getFileName());
}
Path markerDir = new Path(Files.createTempDirectory(tempFolder, null).toAbsolutePath().toString());
if (rollbackUsingMarkers) {
metaClient.getFs().copyToLocalFile(new Path(metaClient.getMarkerFolderPath(lastCommitTime)),
markerDir);
}
writeClient.rollback(newCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(config, context());
TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
}
assertEquals(0, numLogFiles);
for (Map.Entry<String, String> entry : fileNameMap.entrySet()) {
try {
metaClient.getFs().copyFromLocalFile(new Path(entry.getKey()),
new Path(metaClient.getMetaPath(), entry.getValue()));
} catch (IOException e) {
throw new HoodieIOException("Error copying state from local disk.", e);
}
}
if (rollbackUsingMarkers) {
metaClient.getFs().copyFromLocalFile(markerDir,
new Path(metaClient.getMarkerFolderPath(lastCommitTime)));
}
Thread.sleep(1000);
// Rollback again to pretend the first rollback failed partially. This should not error out
writeClient.rollback(newCommitTime);
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(boolean rollbackUsingMarkers) throws Exception {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.INMEMORY).build();
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime);
writeClient.commit(newCommitTime, statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(config, context(), metaClient);
TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
}
assertTrue(numLogFiles > 0);
// Do a compaction
newCommitTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
statuses = (JavaRDD<WriteStatus>) writeClient.compact(newCommitTime);
// Ensure all log files have been compacted into base files
String extension = table.getBaseFileExtension();
assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count());
assertEquals(numLogFiles, statuses.count());
//writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
// Trigger a rollback of compaction
table.getActiveTimeline().reload();
writeClient.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, newCommitTime), table);
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(config, context(), metaClient);
tableRTFileSystemView = table.getSliceView();
((SyncableFileSystemView) tableRTFileSystemView).reset();
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> fileSlices = getFileSystemViewWithUnCommittedSlices(metaClient)
.getAllFileSlices(partitionPath).filter(fs -> fs.getBaseInstantTime().equals("100")).collect(Collectors.toList());
assertTrue(fileSlices.stream().noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
assertTrue(fileSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
}
}
}
private SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) {
try {
return new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline(),
HoodieTestTable.of(metaClient).listAllBaseAndLogFiles()
);
} catch (IOException ioe) {
throw new HoodieIOException("Error getting file system view", ioe);
}
}
}

View File

@@ -385,7 +385,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
return tableView;
}
protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(JavaRDD<HoodieRecord> inputRecordsRDD) {
public static Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(JavaRDD<HoodieRecord> inputRecordsRDD) {
HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
WorkloadStat globalStat = new WorkloadStat();

View File

@@ -0,0 +1,296 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.testutils;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
import static org.apache.hudi.testutils.Assertions.assertFileSizesEqual;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMetaClientProvider, HoodieWriteClientProvider {
private static transient SparkSession spark;
private static transient SQLContext sqlContext;
private static transient JavaSparkContext jsc;
private static transient HoodieSparkEngineContext context;
/**
* An indicator of the initialization status.
*/
protected boolean initialized = false;
@TempDir
protected java.nio.file.Path tempDir;
public String basePath() {
return tempDir.toAbsolutePath().toUri().toString();
}
@Override
public SparkSession spark() {
return spark;
}
@Override
public SQLContext sqlContext() {
return sqlContext;
}
@Override
public JavaSparkContext jsc() {
return jsc;
}
public Configuration hadoopConf() {
return jsc.hadoopConfiguration();
}
@Override
public HoodieSparkEngineContext context() {
return context;
}
public HoodieTableMetaClient getHoodieMetaClient(HoodieTableType tableType) throws IOException {
return getHoodieMetaClient(tableType, new Properties());
}
public HoodieTableMetaClient getHoodieMetaClient(HoodieTableType tableType, Properties props) throws IOException {
return getHoodieMetaClient(hadoopConf(), basePath(), tableType, props);
}
public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, HoodieTableType tableType, Properties props) throws IOException {
props = HoodieTableMetaClient.withPropertyBuilder()
.setTableName(RAW_TRIPS_TEST_NAME)
.setTableType(tableType)
.setPayloadClass(HoodieAvroPayload.class)
.fromProperties(props)
.build();
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
}
public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException {
return getHoodieMetaClient(hadoopConf, basePath, new Properties());
}
@Override
public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
props = HoodieTableMetaClient.withPropertyBuilder()
.setTableName(RAW_TRIPS_TEST_NAME)
.setTableType(COPY_ON_WRITE)
.setPayloadClass(HoodieAvroPayload.class)
.fromProperties(props)
.build();
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
}
@Override
public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException {
return new SparkRDDWriteClient(context(), cfg);
}
@BeforeEach
public synchronized void runBeforeEach() {
initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
HoodieReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
context = new HoodieSparkEngineContext(jsc);
}
}
@AfterAll
public static synchronized void cleanUpAfterAll() {
if (spark != null) {
spark.close();
spark = null;
}
}
protected void insertRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
List<WriteStatus> statuses = client.insert(writeRecords, commitTime).collect();
assertNoWriteErrors(statuses);
assertFileSizesEqual(statuses, status -> FSUtils.getFileSize(reloadedMetaClient.getFs(), new Path(reloadedMetaClient.getBasePath(), status.getStat().getPath())));
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), reloadedMetaClient);
Option<HoodieInstant> deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals(commitTime, deltaCommit.get().getTimestamp(), "Delta commit should be specified value");
Option<HoodieInstant> commit = reloadedMetaClient.getActiveTimeline().getCommitTimeline().lastInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
TableFileSystemView.BaseFileOnlyView roView =
getHoodieTableFileSystemView(reloadedMetaClient, reloadedMetaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
roView = getHoodieTableFileSystemView(reloadedMetaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the base files we wrote in the delta commit");
}
protected void updateRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
Map<HoodieKey, HoodieRecord> recordsMap = new HashMap<>();
for (HoodieRecord rec : records) {
if (!recordsMap.containsKey(rec.getKey())) {
recordsMap.put(rec.getKey(), rec);
}
}
List<WriteStatus> statuses = client.upsert(jsc().parallelize(records, 1), commitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
assertFileSizesEqual(statuses, status -> FSUtils.getFileSize(reloadedMetaClient.getFs(), new Path(reloadedMetaClient.getBasePath(), status.getStat().getPath())));
Option<HoodieInstant> deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals(commitTime, deltaCommit.get().getTimestamp(),
"Latest Delta commit should match specified time");
Option<HoodieInstant> commit = reloadedMetaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
}
protected FileStatus[] listAllBaseFilesInPath(HoodieTable table) throws IOException {
return HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles(table.getBaseFileExtension());
}
protected Properties getPropertiesForKeyGen() {
Properties properties = new Properties();
properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
properties.put("hoodie.datasource.write.partitionpath.field", "partition_path");
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path");
properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName());
return properties;
}
protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) {
if (!populateMetaFields) {
configBuilder.withProperties(getPropertiesForKeyGen())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
}
}
protected HoodieWriteConfig getConfig(Boolean autoCommit) {
return getConfigBuilder(autoCommit).build();
}
protected HoodieWriteConfig getConfig(Boolean autoCommit, Boolean rollbackUsingMarkers) {
return getConfigBuilder(autoCommit, rollbackUsingMarkers, HoodieIndex.IndexType.BLOOM).build();
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
return getConfigBuilder(autoCommit, HoodieIndex.IndexType.BLOOM);
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieIndex.IndexType indexType) {
return getConfigBuilder(autoCommit, false, indexType);
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig);
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType,
long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(autoCommit)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
.withClusteringConfig(clusteringConfig)
.withRollbackUsingMarkers(rollbackUsingMarkers);
}
}

View File

@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.testutils;
@FunctionalInterface
public interface CheckedFunction<T, R> {
R apply(T t) throws Exception;
}