diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/Assertions.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/Assertions.java index ad6561025..bb2ba84f8 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/Assertions.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/Assertions.java @@ -20,10 +20,12 @@ package org.apache.hudi.testutils; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.testutils.CheckedFunction; import java.util.List; import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; /** @@ -38,4 +40,15 @@ public class Assertions { assertAll(statuses.stream().map(status -> () -> assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId()))); } + + /** + * Assert each file size equal to its source of truth. + * + * @param fileSizeGetter to retrieve the source of truth of file size. + */ + public static void assertFileSizesEqual(List statuses, CheckedFunction fileSizeGetter) { + assertAll(statuses.stream().map(status -> () -> + assertEquals(fileSizeGetter.apply(status), status.getStat().getFileSizeInBytes()))); + } + } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/SparkClientFunctionalTestSuite.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/functional/SparkClientFunctionalTestSuite.java similarity index 89% rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/SparkClientFunctionalTestSuite.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/functional/SparkClientFunctionalTestSuite.java index 03baff441..ee7427866 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/SparkClientFunctionalTestSuite.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/functional/SparkClientFunctionalTestSuite.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.client.functional; +package org.apache.hudi.functional; import org.junit.platform.runner.JUnitPlatform; import org.junit.platform.suite.api.IncludeTags; @@ -25,7 +25,7 @@ import org.junit.platform.suite.api.SelectPackages; import org.junit.runner.RunWith; @RunWith(JUnitPlatform.class) -@SelectPackages("org.apache.hudi.client.functional") +@SelectPackages({"org.apache.hudi.client.functional", "org.apache.hudi.table.functional"}) @IncludeTags("functional") public class SparkClientFunctionalTestSuite { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 023913813..f0046afe0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -21,15 +21,9 @@ package org.apache.hudi.table; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieFileGroup; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; @@ -39,415 +33,55 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; -import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.testutils.HoodieTestTable; -import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.Transformations; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieClusteringConfig; -import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.HoodieHFileInputFormat; -import org.apache.hudi.hadoop.HoodieParquetInputFormat; -import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat; -import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; -import org.apache.hudi.hadoop.utils.HoodieHiveUtils; -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; -import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.table.action.deltacommit.AbstractSparkDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor; -import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; import org.apache.hudi.testutils.HoodieSparkWriteableTestTable; import org.apache.hudi.testutils.MetadataMergeWriteStatus; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaRDD; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; -import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.apache.hudi.testutils.HoodieClientTestHarness.buildProfile; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { - private JobConf roSnapshotJobConf; - private JobConf roJobConf; - private JobConf rtJobConf; +public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness { - @TempDir - public java.nio.file.Path tempFolder; - - private HoodieFileFormat baseFileFormat; - - public void init(HoodieFileFormat baseFileFormat, boolean populateMetaFields) throws IOException { - this.baseFileFormat = baseFileFormat; - initDFS(); - initSparkContexts("TestHoodieMergeOnReadTable"); - hadoopConf.addResource(dfs.getConf()); - jsc.hadoopConfiguration().addResource(dfs.getConf()); - context = new HoodieSparkEngineContext(jsc); - initPath(); - dfs.mkdirs(new Path(basePath)); - - Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); - properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.toString()); - - metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, properties); - initTestDataGenerator(); - - roSnapshotJobConf = new JobConf(hadoopConf); - roJobConf = new JobConf(hadoopConf); - rtJobConf = new JobConf(hadoopConf); - } + private HoodieTableMetaClient metaClient; + private HoodieTestDataGenerator dataGen; @BeforeEach - public void init() throws IOException { - init(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), true); - } - - @AfterEach - public void clean() throws IOException { - cleanupResources(); - } - - private static Stream populateMetaFieldsParams() { - return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of); - } - - private static Stream populateMetaFieldsAndPreserveMetadataParams() { - return Arrays.stream(new Boolean[][] { - {true, true}, - {false, true}, - {true, false}, - {false, false} - }).map(Arguments::of); - } - - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testSimpleInsertAndUpdate(boolean populateMetaFields) throws Exception { - clean(); - init(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), populateMetaFields); - - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); - addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); - HoodieWriteConfig cfg = cfgBuilder.build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - - /** - * Write 1 (only inserts) - */ - String newCommitTime = "001"; - client.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 200); - insertRecords(records, client, cfg, newCommitTime); - - /** - * Write 2 (updates) - */ - newCommitTime = "004"; - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 100); - updateRecords(records, client, cfg, newCommitTime); - - String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString(); - client.compact(compactionCommitTime); - - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - Stream dataFilesToRead = tableView.getLatestBaseFiles(); - assertTrue(dataFilesToRead.findAny().isPresent()); - - // verify that there is a commit - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants(); - assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), - "Expecting a single commit."); - String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); - assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime)); - - if (cfg.populateMetaFields()) { - assertEquals(200, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of("000")), - "Must contain 200 records"); - } else { - assertEquals(200, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.empty())); - } - } - } - - @Test - public void testSimpleInsertAndUpdateHFile() throws Exception { - clean(); - init(HoodieFileFormat.HFILE, true); - HoodieWriteConfig cfg = getConfigBuilder(true).build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - - /** - * Write 1 (only inserts) - */ - String newCommitTime = "001"; - client.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 200); - insertRecords(records, client, cfg, newCommitTime); - - /** - * Write 2 (updates) - */ - newCommitTime = "004"; - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 100); - updateRecords(records, client, cfg, newCommitTime); - - String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString(); - client.compact(compactionCommitTime); - - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - Stream dataFilesToRead = tableView.getLatestBaseFiles(); - assertTrue(dataFilesToRead.findAny().isPresent()); - - // verify that there is a commit - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants(); - assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), - "Expecting a single commit."); - String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); - assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime)); - - assertEquals(200, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of("000")), - "Must contain 200 records"); - } - } - - @ParameterizedTest - @MethodSource("populateMetaFieldsAndPreserveMetadataParams") - public void testSimpleClusteringNoUpdates(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { - clean(); - init(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), populateMetaFields); - testClustering(false, populateMetaFields, preserveCommitMetadata); - } - - @ParameterizedTest - @MethodSource("populateMetaFieldsAndPreserveMetadataParams") - public void testSimpleClusteringWithUpdates(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { - clean(); - init(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), populateMetaFields); - testClustering(true, populateMetaFields, preserveCommitMetadata); - } - - private void testClustering(boolean doUpdates, boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { - // set low compaction small File Size to generate more file groups. - HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) - .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withPreserveHoodieCommitMetadata(preserveCommitMetadata).build(); - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, 10L, clusteringConfig); - addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); - HoodieWriteConfig cfg = cfgBuilder.build(); - - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - - /** - * Write 1 (only inserts) - */ - String newCommitTime = "001"; - client.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 400); - insertRecords(records.subList(0, 200), client, cfg, newCommitTime); - - /** - * Write 2 (more inserts to create new files) - */ - // we already set small file size to small number to force inserts to go into new file. - newCommitTime = "002"; - client.startCommitWithTime(newCommitTime); - insertRecords(records.subList(200, 400), client, cfg, newCommitTime); - - if (doUpdates) { - /** - * Write 3 (updates) - */ - newCommitTime = "003"; - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 100); - updateRecords(records, client, cfg, newCommitTime); - } - - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - // expect 2 base files for each partition - assertEquals(dataGen.getPartitionPaths().length * 2, allFiles.length); - - String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString(); - metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - // verify all files are included in clustering plan. - assertEquals(allFiles.length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count()); - - // Do the clustering and validate - client.cluster(clusteringCommitTime, true); - - metaClient = HoodieTableMetaClient.reload(metaClient); - final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context, metaClient); - Stream dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths()) - .flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p)); - // verify there should be only one base file per partition after clustering. - assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count()); - - HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants(); - assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(), - "Expecting a single commit."); - assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp()); - assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction()); - if (cfg.populateMetaFields()) { - assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of("000")), - "Must contain 200 records"); - } else { - assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.empty())); - } - } - } - - // test incremental read does not go past compaction instant for RO views - // For RT views, incremental read can go past compaction - @Test - public void testIncrementalReadsWithCompaction() throws Exception { - String partitionPath = "2020/02/20"; // use only one partition for this test - dataGen = new HoodieTestDataGenerator(new String[] { partitionPath }); - HoodieWriteConfig cfg = getConfig(true); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - - /** - * Write 1 (only inserts) - */ - String commitTime1 = "001"; - client.startCommitWithTime(commitTime1); - - List records001 = dataGen.generateInserts(commitTime1, 200); - insertRecords(records001, client, cfg, commitTime1); - - // verify only one base file shows up with commit time 001 - FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath); - validateFiles(partitionPath, 1, snapshotROFiles, false, roSnapshotJobConf, 200, commitTime1); - - FileStatus[] incrementalROFiles = getROIncrementalFiles(partitionPath, true); - validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1); - Path firstFilePath = incrementalROFiles[0].getPath(); - - FileStatus[] incrementalRTFiles = getRTIncrementalFiles(partitionPath); - validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf,200, commitTime1); - - assertEquals(firstFilePath, incrementalRTFiles[0].getPath()); - - /** - * Write 2 (updates) - */ - String updateTime = "004"; - client.startCommitWithTime(updateTime); - List records004 = dataGen.generateUpdates(updateTime, 100); - updateRecords(records004, client, cfg, updateTime); - - // verify RO incremental reads - only one base file shows up because updates to into log files - incrementalROFiles = getROIncrementalFiles(partitionPath, false); - validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1); - assertEquals(firstFilePath, incrementalROFiles[0].getPath()); - - // verify RT incremental reads includes updates also - incrementalRTFiles = getRTIncrementalFiles(partitionPath); - validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime); - - // request compaction, but do not perform compaction - String compactionCommitTime = "005"; - client.scheduleCompactionAtInstant("005", Option.empty()); - - // verify RO incremental reads - only one base file shows up because updates go into log files - incrementalROFiles = getROIncrementalFiles(partitionPath, true); - validateFiles(partitionPath,1, incrementalROFiles, false, roJobConf, 200, commitTime1); - - // verify RT incremental reads includes updates also - incrementalRTFiles = getRTIncrementalFiles(partitionPath); - validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime); - - // write 3 - more inserts - String insertsTime = "006"; - List records006 = dataGen.generateInserts(insertsTime, 200); - client.startCommitWithTime(insertsTime); - insertRecords(records006, client, cfg, insertsTime); - - // verify new write shows up in snapshot mode even though there is pending compaction - snapshotROFiles = getROSnapshotFiles(partitionPath); - validateFiles(partitionPath, 2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, insertsTime); - - incrementalROFiles = getROIncrementalFiles(partitionPath, true); - assertEquals(firstFilePath, incrementalROFiles[0].getPath()); - // verify 006 does not show up in RO mode because of pending compaction - - validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1); - - // verify that if stopAtCompaction is disabled, inserts from "insertsTime" show up - incrementalROFiles = getROIncrementalFiles(partitionPath, false); - validateFiles(partitionPath,2, incrementalROFiles, false, roJobConf, 400, commitTime1, insertsTime); - - // verify 006 shows up in RT views - incrementalRTFiles = getRTIncrementalFiles(partitionPath); - validateFiles(partitionPath, 2, incrementalRTFiles, true, rtJobConf, 400, commitTime1, updateTime, insertsTime); - - // perform the scheduled compaction - client.compact(compactionCommitTime); - - // verify new write shows up in snapshot mode after compaction is complete - snapshotROFiles = getROSnapshotFiles(partitionPath); - validateFiles(partitionPath,2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, compactionCommitTime, - insertsTime); - - incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true); - assertTrue(incrementalROFiles.length == 2); - // verify 006 shows up because of pending compaction - validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf, 400, commitTime1, compactionCommitTime, - insertsTime); - } + void setUp() throws IOException { + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + dataGen = new HoodieTestDataGenerator(); } // Check if record level metadata is aggregated properly at the end of write. @@ -458,7 +92,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { String newCommitTime = "001"; List records = dataGen.generateInserts(newCommitTime, 200); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); client.startCommitWithTime(newCommitTime); @@ -475,498 +109,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { } @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws Exception { - clean(); - init(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), populateMetaFields); - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); - addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); - HoodieWriteConfig cfg = cfgBuilder.build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - - /** - * Write 1 (only inserts, written as base file) - */ - String newCommitTime = "001"; - client.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 20); - JavaRDD writeRecords = jsc.parallelize(records, 1); - - List statuses = client.upsert(writeRecords, newCommitTime).collect(); - assertNoWriteErrors(statuses); - - metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - - Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); - assertTrue(deltaCommit.isPresent()); - assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); - assertFalse(commit.isPresent()); - - FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream dataFilesToRead = tableView.getLatestBaseFiles(); - assertFalse(dataFilesToRead.findAny().isPresent()); - - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = tableView.getLatestBaseFiles(); - assertTrue(dataFilesToRead.findAny().isPresent(), - "should list the base files we wrote in the delta commit"); - - /** - * Write 2 (only updates, written to .log file) - */ - newCommitTime = "002"; - client.startCommitWithTime(newCommitTime); - - records = dataGen.generateUpdates(newCommitTime, records); - writeRecords = jsc.parallelize(records, 1); - statuses = client.upsert(writeRecords, newCommitTime).collect(); - assertNoWriteErrors(statuses); - - /** - * Write 2 (only deletes, written to .log file) - */ - newCommitTime = "004"; - client.startCommitWithTime(newCommitTime); - - List fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records); - - statuses = client.upsert(jsc.parallelize(fewRecordsForDelete, 1), newCommitTime).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - - metaClient = HoodieTableMetaClient.reload(metaClient); - deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); - assertTrue(deltaCommit.isPresent()); - assertEquals("004", deltaCommit.get().getTimestamp(), "Latest Delta commit should be 004"); - - commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); - assertFalse(commit.isPresent()); - - allFiles = listAllBaseFilesInPath(hoodieTable); - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = tableView.getLatestBaseFiles(); - assertTrue(dataFilesToRead.findAny().isPresent()); - - List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath, new JobConf(hadoopConf), true, false); - // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0 - assertEquals(0, recordsRead.size(), "Must contain 0 records"); - } - } - - private void testCOWToMORConvertedTableRollback(Boolean rollbackUsingMarkers) throws Exception { - // Set TableType to COW - HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); - - HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - - /** - * Write 1 (only inserts) - */ - String newCommitTime = "001"; - client.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 200); - JavaRDD writeRecords = jsc.parallelize(records, 1); - - List statuses = client.upsert(writeRecords, newCommitTime).collect(); - // verify there are no errors - assertNoWriteErrors(statuses); - client.commit(newCommitTime, jsc.parallelize(statuses)); - - metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); - assertTrue(commit.isPresent()); - assertEquals("001", commit.get().getTimestamp(), "commit should be 001"); - - /** - * Write 2 (updates) - */ - newCommitTime = "002"; - client.startCommitWithTime(newCommitTime); - - records = dataGen.generateUpdates(newCommitTime, records); - - statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - - // Set TableType to MOR - HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); - - // rollback a COW commit when TableType is MOR - client.rollback(newCommitTime); - - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - - final String absentCommit = newCommitTime; - assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime()))); - } - } - - @Test - public void testCOWToMORConvertedTableRollbackUsingFileList() throws Exception { - testCOWToMORConvertedTableRollback(false); - } - - @Test - public void testCOWToMORConvertedTableRollbackUsingMarkers() throws Exception { - testCOWToMORConvertedTableRollback(true); - } - - private void testRollbackWithDeltaAndCompactionCommit(Boolean rollbackUsingMarkers, boolean populateMetaFields) throws Exception { - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, IndexType.SIMPLE); - addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); - HoodieWriteConfig cfg = cfgBuilder.build(); - - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - - // Test delta commit rollback - /** - * Write 1 (only inserts) - */ - String newCommitTime = "001"; - client.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 200); - JavaRDD writeRecords = jsc.parallelize(records, 1); - - JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); - List statuses = writeStatusJavaRDD.collect(); - assertNoWriteErrors(statuses); - - metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - - Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); - assertTrue(deltaCommit.isPresent()); - assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); - assertFalse(commit.isPresent()); - - FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - tableView = - getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream dataFilesToRead = tableView.getLatestBaseFiles(); - assertTrue(!dataFilesToRead.findAny().isPresent()); - - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = tableView.getLatestBaseFiles(); - assertTrue(dataFilesToRead.findAny().isPresent(), - "should list the base files we wrote in the delta commit"); - - /** - * Write 2 (inserts + updates - testing failed delta commit) - */ - final String commitTime1 = "002"; - // WriteClient with custom config (disable small file handling) - try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));) { - secondClient.startCommitWithTime(commitTime1); - - List copyOfRecords = new ArrayList<>(records); - copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); - copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); - - List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, - basePath); - assertEquals(200, recordsRead.size()); - - statuses = secondClient.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - - // Test failed delta commit rollback - secondClient.rollback(commitTime1); - allFiles = listAllBaseFilesInPath(hoodieTable); - // After rollback, there should be no base file with the failed commit time - List remainingFiles = Arrays.stream(allFiles).filter(file -> file.getPath().getName() - .contains(commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList()); - assertEquals(0, remainingFiles.size(), "There files should have been rolled-back " - + "when rolling back commit " + commitTime1 + " but are still remaining. Files: " + remainingFiles); - dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath); - assertEquals(200, recordsRead.size()); - } - - /** - * Write 3 (inserts + updates - testing successful delta commit) - */ - final String commitTime2 = "002"; - try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(cfg);) { - thirdClient.startCommitWithTime(commitTime2); - - List copyOfRecords = new ArrayList<>(records); - copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords); - copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200)); - - List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, - basePath); - assertEquals(200, recordsRead.size()); - - writeRecords = jsc.parallelize(copyOfRecords, 1); - writeStatusJavaRDD = thirdClient.upsert(writeRecords, commitTime2); - statuses = writeStatusJavaRDD.collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - - // Test successful delta commit rollback - thirdClient.rollback(commitTime2); - allFiles = listAllBaseFilesInPath(hoodieTable); - // After rollback, there should be no base file with the failed commit time - assertEquals(0, Arrays.stream(allFiles) - .filter(file -> file.getPath().getName().contains(commitTime2)).count()); - - metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath); - // check that the number of records read is still correct after rollback operation - assertEquals(200, recordsRead.size()); - - // Test compaction commit rollback - /** - * Write 4 (updates) - */ - newCommitTime = "003"; - thirdClient.startCommitWithTime(newCommitTime); - - writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime); - statuses = writeStatusJavaRDD.collect(); - thirdClient.commit(newCommitTime, writeStatusJavaRDD); - // Verify there are no errors - assertNoWriteErrors(statuses); - - metaClient = HoodieTableMetaClient.reload(metaClient); - - String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString(); - thirdClient.compact(compactionInstantTime); - - allFiles = listAllBaseFilesInPath(hoodieTable); - metaClient = HoodieTableMetaClient.reload(metaClient); - tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); - - final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp(); - assertTrue(Arrays.stream(listAllBaseFilesInPath(hoodieTable)) - .anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime()))); - thirdClient.rollbackInflightCompaction(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime), - hoodieTable); - allFiles = listAllBaseFilesInPath(hoodieTable); - metaClient = HoodieTableMetaClient.reload(metaClient); - tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); - - assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); - } - } - } - - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testRollbackWithDeltaAndCompactionCommitUsingFileList(boolean populateMetaFields) throws Exception { - clean(); - init(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), populateMetaFields); - testRollbackWithDeltaAndCompactionCommit(false, populateMetaFields); - } - - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testRollbackWithDeltaAndCompactionCommitUsingMarkers(boolean populateMetaFields) throws Exception { - clean(); - init(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), populateMetaFields); - testRollbackWithDeltaAndCompactionCommit(true, populateMetaFields); - } - - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testMultiRollbackWithDeltaAndCompactionCommit(boolean populateMetaFields) throws Exception { - clean(); - init(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), populateMetaFields); - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false); - addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); - HoodieWriteConfig cfg = cfgBuilder.build(); - - try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - /** - * Write 1 (only inserts) - */ - String newCommitTime = "001"; - client.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 200); - JavaRDD writeRecords = jsc.parallelize(records, 1); - - JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); - List statuses = writeStatusJavaRDD.collect(); - assertNoWriteErrors(statuses); - - metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - - Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); - assertTrue(deltaCommit.isPresent()); - assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); - assertFalse(commit.isPresent()); - - FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream dataFilesToRead = tableView.getLatestBaseFiles(); - assertFalse(dataFilesToRead.findAny().isPresent()); - - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = tableView.getLatestBaseFiles(); - assertTrue(dataFilesToRead.findAny().isPresent(), - "Should list the base files we wrote in the delta commit"); - - /** - * Write 2 (inserts + updates) - */ - newCommitTime = "002"; - // WriteClient with custom config (disable small file handling) - SparkRDDWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(populateMetaFields)); - nClient.startCommitWithTime(newCommitTime); - - List copyOfRecords = new ArrayList<>(records); - copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); - copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); - - List dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, - basePath); - assertEquals(200, recordsRead.size()); - - statuses = nClient.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - nClient.commit(newCommitTime, writeStatusJavaRDD); - copyOfRecords.clear(); - - // Schedule a compaction - /** - * Write 3 (inserts + updates) - */ - newCommitTime = "003"; - client.startCommitWithTime(newCommitTime); - - List newInserts = dataGen.generateInserts(newCommitTime, 100); - records = dataGen.generateUpdates(newCommitTime, records); - records.addAll(newInserts); - writeRecords = jsc.parallelize(records, 1); - - writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); - statuses = writeStatusJavaRDD.collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - - metaClient = HoodieTableMetaClient.reload(metaClient); - - String compactionInstantTime = "004"; - client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); - - // Compaction commit - /** - * Write 4 (updates) - */ - newCommitTime = "005"; - client.startCommitWithTime(newCommitTime); - - records = dataGen.generateUpdates(newCommitTime, records); - writeRecords = jsc.parallelize(records, 1); - - writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); - statuses = writeStatusJavaRDD.collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - - metaClient = HoodieTableMetaClient.reload(metaClient); - - compactionInstantTime = "006"; - client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); - JavaRDD ws = (JavaRDD) client.compact(compactionInstantTime); - client.commitCompaction(compactionInstantTime, ws, Option.empty()); - - allFiles = listAllBaseFilesInPath(hoodieTable); - metaClient = HoodieTableMetaClient.reload(metaClient); - tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); - - final String compactedCommitTime = - metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp(); - - assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); - - /** - * Write 5 (updates) - */ - newCommitTime = "007"; - client.startCommitWithTime(newCommitTime); - copyOfRecords = new ArrayList<>(records); - copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); - copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); - - statuses = client.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - client.commit(newCommitTime, writeStatusJavaRDD); - copyOfRecords.clear(); - - // Rollback latest commit first - client.restoreToInstant("000"); - - metaClient = HoodieTableMetaClient.reload(metaClient); - allFiles = listAllBaseFilesInPath(hoodieTable); - tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - dataFilesToRead = tableView.getLatestBaseFiles(); - assertFalse(dataFilesToRead.findAny().isPresent()); - SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - List fileGroups = - ((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList()); - assertTrue(fileGroups.isEmpty()); - - // make sure there are no log files remaining - assertEquals(0L, ((HoodieTableFileSystemView) rtView).getAllFileGroups() - .filter(fileGroup -> fileGroup.getAllRawFileSlices().noneMatch(f -> f.getLogFiles().count() == 0)) - .count()); - - } - } - - protected HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff(boolean populateMetaFields) { - HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) - .withDeleteParallelism(2) - .withAutoCommit(false) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024) - .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withEmbeddedTimelineServerEnabled(true) - .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024).parquetMaxFileSize(1024).build()).forTable("test-trip-table"); - - if (!populateMetaFields) { - addConfigsForPopulateMetaFields(cfgBuilder, false); - } - return cfgBuilder.build(); - } - - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") + @ValueSource(booleans = {true, false}) public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { - clean(); - init(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), populateMetaFields); HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); @@ -979,13 +123,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 20); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); List statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -1016,7 +159,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { List newRecords = dataGen.generateUpdates(newCommitTime, records); newRecords.addAll(dataGen.generateInserts(newCommitTime, 20)); - statuses = client.upsert(jsc.parallelize(newRecords), newCommitTime).collect(); + statuses = client.upsert(jsc().parallelize(newRecords), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); @@ -1039,15 +182,15 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue())); List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, - basePath, new JobConf(hadoopConf), true, false); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + basePath(), new JobConf(hadoopConf()), true, false); // Wrote 20 records in 2 batches assertEquals(40, recordsRead.size(), "Must contain 40 records"); } } @ParameterizedTest - @MethodSource("populateMetaFieldsParams") + @ValueSource(booleans = {true, false}) public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws Exception { // insert 100 records HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); @@ -1059,24 +202,22 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { writeClient.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 100); - JavaRDD recordsRDD = jsc.parallelize(records, 1); + JavaRDD recordsRDD = jsc().parallelize(records, 1); writeClient.insert(recordsRDD, newCommitTime).collect(); // Update all the 100 records - metaClient = getHoodieMetaClient(hadoopConf, basePath); - newCommitTime = "101"; writeClient.startCommitWithTime(newCommitTime); List updatedRecords = dataGen.generateUpdates(newCommitTime, records); - JavaRDD updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); + JavaRDD updatedRecordsRDD = jsc().parallelize(updatedRecords, 1); - HoodieReadClient readClient = new HoodieReadClient(context, config); + HoodieReadClient readClient = new HoodieReadClient(context(), config); updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect(); // Write them to corresponding avro logfiles metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieSparkTable.create(config, context, metaClient); + HoodieTable table = HoodieSparkTable.create(config, context(), metaClient); HoodieSparkWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS) .withLogAppends(updatedRecords); // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state @@ -1103,7 +244,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // Verify that recently written compacted data file has no log file metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieSparkTable.create(config, context, metaClient); + table = HoodieSparkTable.create(config, context(), metaClient); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); assertTrue(HoodieTimeline @@ -1122,218 +263,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { } } - @Test - public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception { - // insert 100 records - // Setting IndexType to be InMemory to simulate Global Index nature - HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); - - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { - String newCommitTime = "100"; - writeClient.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 100); - JavaRDD recordsRDD = jsc.parallelize(records, 1); - JavaRDD statuses = writeClient.insert(recordsRDD, newCommitTime); - writeClient.commit(newCommitTime, statuses); - - HoodieTable table = - HoodieSparkTable.create(config, context, getHoodieMetaClient(hadoopConf, basePath)); - SliceView tableRTFileSystemView = table.getSliceView(); - - long numLogFiles = 0; - for (String partitionPath : dataGen.getPartitionPaths()) { - List allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); - assertEquals(0, allSlices.stream().filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count()); - assertTrue(allSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); - long logFileCount = allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); - if (logFileCount > 0) { - // check the log versions start from the base version - assertTrue(allSlices.stream().map(slice -> slice.getLogFiles().findFirst().get().getLogVersion()) - .allMatch(version -> version.equals(HoodieLogFile.LOGFILE_BASE_VERSION))); - } - numLogFiles += logFileCount; - } - - assertTrue(numLogFiles > 0); - // Do a compaction - String instantTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); - statuses = (JavaRDD) writeClient.compact(instantTime); - String extension = table.getBaseFileExtension(); - assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count()); - assertEquals(numLogFiles, statuses.count()); - writeClient.commitCompaction(instantTime, statuses, Option.empty()); - } - } - - private void testInsertsGeneratedIntoLogFilesRollback(Boolean rollbackUsingMarkers) throws Exception { - // insert 100 records - // Setting IndexType to be InMemory to simulate Global Index nature - HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build(); - - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { - String newCommitTime = "100"; - writeClient.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 100); - JavaRDD recordsRDD = jsc.parallelize(records, 1); - JavaRDD statuses = writeClient.insert(recordsRDD, newCommitTime); - // trigger an action - List writeStatuses = statuses.collect(); - - // Ensure that inserts are written to only log files - assertEquals(0, - writeStatuses.stream().filter(writeStatus -> !writeStatus.getStat().getPath().contains("log")).count()); - assertTrue( - writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPath().contains("log"))); - - // rollback a failed commit - boolean rollback = writeClient.rollback(newCommitTime); - assertTrue(rollback); - - // insert 100 records - newCommitTime = "101"; - writeClient.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 100); - recordsRDD = jsc.parallelize(records, 1); - writeClient.insert(recordsRDD, newCommitTime).collect(); - - // Sleep for small interval (at least 1 second) to force a new rollback start time. - Thread.sleep(1000); - - // We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs - // and calling rollback twice - final String lastCommitTime = newCommitTime; - metaClient = getHoodieMetaClient(hadoopConf, basePath); - - // Save the .commit file to local directory. - // Rollback will be called twice to test the case where rollback failed first time and retried. - // We got the "BaseCommitTime cannot be null" exception before the fix - Map fileNameMap = new HashMap<>(); - for (State state : Arrays.asList(State.REQUESTED, State.INFLIGHT)) { - HoodieInstant toCopy = new HoodieInstant(state, HoodieTimeline.DELTA_COMMIT_ACTION, lastCommitTime); - File file = Files.createTempFile(tempFolder, null, null).toFile(); - metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), toCopy.getFileName()), - new Path(file.getAbsolutePath())); - fileNameMap.put(file.getAbsolutePath(), toCopy.getFileName()); - } - Path markerDir = new Path(Files.createTempDirectory(tempFolder,null).toAbsolutePath().toString()); - if (rollbackUsingMarkers) { - metaClient.getFs().copyToLocalFile(new Path(metaClient.getMarkerFolderPath(lastCommitTime)), - markerDir); - } - - writeClient.rollback(newCommitTime); - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieSparkTable.create(config, context); - SliceView tableRTFileSystemView = table.getSliceView(); - - long numLogFiles = 0; - for (String partitionPath : dataGen.getPartitionPaths()) { - assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent())); - assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); - numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath) - .filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); - } - assertEquals(0, numLogFiles); - fileNameMap.forEach((key, value) -> { - try { - metaClient.getFs().copyFromLocalFile(new Path(key), - new Path(metaClient.getMetaPath(), value)); - } catch (IOException e) { - throw new HoodieIOException("Error copying state from local disk.", e); - } - }); - if (rollbackUsingMarkers) { - metaClient.getFs().copyFromLocalFile(markerDir, - new Path(metaClient.getMarkerFolderPath(lastCommitTime))); - } - Thread.sleep(1000); - // Rollback again to pretend the first rollback failed partially. This should not error out - writeClient.rollback(newCommitTime); - } - } - - @Test - public void testInsertsGeneratedIntoLogFilesRollbackUsingFileList() throws Exception { - testInsertsGeneratedIntoLogFilesRollback(false); - } - - @Test - public void testInsertsGeneratedIntoLogFilesRollbackUsingMarkers() throws Exception { - testInsertsGeneratedIntoLogFilesRollback(true); - } - - private void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(Boolean rollbackUsingMarkers) throws Exception { - // insert 100 records - // Setting IndexType to be InMemory to simulate Global Index nature - HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build(); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { - String newCommitTime = "100"; - writeClient.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 100); - JavaRDD recordsRDD = jsc.parallelize(records, 1); - JavaRDD statuses = writeClient.insert(recordsRDD, newCommitTime); - writeClient.commit(newCommitTime, statuses); - - HoodieTable table = HoodieSparkTable.create(config, context, getHoodieMetaClient(hadoopConf, basePath)); - SliceView tableRTFileSystemView = table.getSliceView(); - - long numLogFiles = 0; - for (String partitionPath : dataGen.getPartitionPaths()) { - assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent())); - assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); - numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath) - .filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); - } - - assertTrue(numLogFiles > 0); - // Do a compaction - newCommitTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); - statuses = (JavaRDD) writeClient.compact(newCommitTime); - // Ensure all log files have been compacted into base files - String extension = table.getBaseFileExtension(); - assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count()); - assertEquals(numLogFiles, statuses.count()); - //writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); - // Trigger a rollback of compaction - table.getActiveTimeline().reload(); - writeClient.rollbackInflightCompaction(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, newCommitTime), table); - - table = HoodieSparkTable.create(config, context, getHoodieMetaClient(hadoopConf, basePath)); - tableRTFileSystemView = table.getSliceView(); - ((SyncableFileSystemView) tableRTFileSystemView).reset(); - - for (String partitionPath : dataGen.getPartitionPaths()) { - List fileSlices = getFileSystemViewWithUnCommittedSlices(getHoodieMetaClient(hadoopConf, basePath)) - .getAllFileSlices(partitionPath).filter(fs -> fs.getBaseInstantTime().equals("100")).collect(Collectors.toList()); - assertTrue(fileSlices.stream().noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent())); - assertTrue(fileSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); - } - } - } - - @Test - public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingFileList() throws Exception { - testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(false); - } - - @Test - public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingMarkers() throws Exception { - testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(true); - } - /** * Test to ensure metadata stats are correctly written to metadata file. */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY) .withAutoCommit(false).build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - metaClient = getHoodieMetaClient(hadoopConf, basePath); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient); // Create a commit without metadata stats in metadata to test backwards compatibility HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); @@ -1348,13 +288,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.startCommitWithTime(instantTime); List records = dataGen.generateInserts(instantTime, 200); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); JavaRDD statuses = client.insert(writeRecords, instantTime); assertTrue(client.commit(instantTime, statuses), "Commit should succeed"); // Read from commit file - table = HoodieSparkTable.create(cfg, context); + table = HoodieSparkTable.create(cfg, context()); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline().getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); @@ -1369,7 +309,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { instantTime = "002"; client.startCommitWithTime(instantTime); records = dataGen.generateUpdates(instantTime, records); - writeRecords = jsc.parallelize(records, 1); + writeRecords = jsc().parallelize(records, 1); statuses = client.upsert(writeRecords, instantTime); //assertTrue(client.commit(instantTime, statuses), "Commit should succeed"); inserts = 0; @@ -1387,7 +327,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.rollback(instantTime); // Read from commit file - table = HoodieSparkTable.create(cfg, context); + table = HoodieSparkTable.create(cfg, context()); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1405,19 +345,6 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { } } - /** - * Test to ensure rolling stats are correctly written to metadata file. - */ - @Test - public void testMetadataStatsOnCommitUsingFileList() throws Exception { - testMetadataStatsOnCommit(false); - } - - @Test - public void testMetadataStatsOnCommitUsingMarkers() throws Exception { - testMetadataStatsOnCommit(true); - } - /** * Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them. */ @@ -1432,13 +359,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.startCommitWithTime(instantTime); List records = dataGen.generateInserts(instantTime, 200); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); JavaRDD statuses = client.insert(writeRecords, instantTime); assertTrue(client.commit(instantTime, statuses), "Commit should succeed"); // Read from commit file - HoodieTable table = HoodieSparkTable.create(cfg, context); + HoodieTable table = HoodieSparkTable.create(cfg, context()); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1458,12 +385,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // generate updates + inserts. inserts should be handled into small files records = dataGen.generateUpdates(instantTime, records); records.addAll(dataGen.generateInserts(instantTime, 200)); - writeRecords = jsc.parallelize(records, 1); + writeRecords = jsc().parallelize(records, 1); statuses = client.upsert(writeRecords, instantTime); assertTrue(client.commit(instantTime, statuses), "Commit should succeed"); // Read from commit file - table = HoodieSparkTable.create(cfg, context); + table = HoodieSparkTable.create(cfg, context()); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1489,7 +416,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.commitCompaction(instantTime, statuses, Option.empty()); // Read from commit file - table = HoodieSparkTable.create(cfg, context); + table = HoodieSparkTable.create(cfg, context()); HoodieCommitMetadata metadata1 = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(), @@ -1508,12 +435,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // generate updates + inserts. inserts should be handled into small files records = dataGen.generateUpdates(instantTime, records); records.addAll(dataGen.generateInserts(instantTime, 200)); - writeRecords = jsc.parallelize(records, 1); + writeRecords = jsc().parallelize(records, 1); statuses = client.upsert(writeRecords, instantTime); assertTrue(client.commit(instantTime, statuses), "Commit should succeed"); // Read from commit file - table = HoodieSparkTable.create(cfg, context); + table = HoodieSparkTable.create(cfg, context()); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1548,13 +475,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 20); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); List statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); - HoodieSparkMergeOnReadTable hoodieTable = (HoodieSparkMergeOnReadTable) HoodieSparkTable.create(cfg, context, metaClient); + HoodieSparkMergeOnReadTable hoodieTable = (HoodieSparkMergeOnReadTable) HoodieSparkTable.create(cfg, context(), metaClient); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -1581,7 +507,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.startCommitWithTime(newCommitTime); metaClient.reloadActiveTimeline(); records = dataGen.generateUpdates(newCommitTime, records); - writeRecords = jsc.parallelize(records, 1); + writeRecords = jsc().parallelize(records, 1); statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); @@ -1595,13 +521,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { metaClient.reloadActiveTimeline(); List fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records); - JavaRDD deleteRDD = jsc.parallelize(fewRecordsForDelete, 1); + JavaRDD deleteRDD = jsc().parallelize(fewRecordsForDelete, 1); // initialize partitioner - AbstractSparkDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(context, cfg, hoodieTable, + AbstractSparkDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(context(), cfg, hoodieTable, newDeleteTime, deleteRDD); actionExecutor.getUpsertPartitioner(new WorkloadProfile(buildProfile(deleteRDD))); - final List> deleteStatus = jsc.parallelize(Arrays.asList(1)).map(x -> { + final List> deleteStatus = jsc().parallelize(Arrays.asList(1)).map(x -> { return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator()); }).map(Transformations::flatten).collect(); @@ -1614,194 +540,5 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { assertEquals(fewRecordsForDelete.size() - numRecordsInPartition, status.getTotalErrorRecords()); } } - - private HoodieWriteConfig getConfig(Boolean autoCommit) { - return getConfigBuilder(autoCommit).build(); - } - - private HoodieWriteConfig getConfig(Boolean autoCommit, Boolean rollbackUsingMarkers) { - return getConfigBuilder(autoCommit, rollbackUsingMarkers, IndexType.BLOOM).build(); - } - - protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) { - return getConfigBuilder(autoCommit, IndexType.BLOOM); - } - - protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieIndex.IndexType indexType) { - return getConfigBuilder(autoCommit, false, indexType); - } - - protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) { - return getConfigBuilder(autoCommit, false, IndexType.BLOOM, compactionSmallFileSize, clusteringConfig); - } - - protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) { - return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build()); - } - - protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType, - long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) - .withDeleteParallelism(2) - .withAutoCommit(autoCommit) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize) - .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build()) - .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") - .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() - .withEnableBackupForRemoteFileSystemView(false).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) - .withClusteringConfig(clusteringConfig) - .withRollbackUsingMarkers(rollbackUsingMarkers); - } - - private void insertRecords(List records, SparkRDDWriteClient client, - HoodieWriteConfig cfg, String commitTime) throws IOException { - JavaRDD writeRecords = jsc.parallelize(records, 1); - - List statuses = client.insert(writeRecords, commitTime).collect(); - assertNoWriteErrors(statuses); - assertFileSizes(statuses); - - metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); - HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - - Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); - assertTrue(deltaCommit.isPresent()); - assertEquals(commitTime, deltaCommit.get().getTimestamp(), "Delta commit should be specified value"); - - Option commit = metaClient.getActiveTimeline().getCommitTimeline().lastInstant(); - assertFalse(commit.isPresent()); - - FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - BaseFileOnlyView roView = - getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream dataFilesToRead = roView.getLatestBaseFiles(); - assertTrue(!dataFilesToRead.findAny().isPresent()); - - roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); - assertTrue(dataFilesToRead.findAny().isPresent(), - "should list the base files we wrote in the delta commit"); - } - - private void updateRecords(List records, SparkRDDWriteClient client, - HoodieWriteConfig cfg, String commitTime) throws IOException { - Map recordsMap = new HashMap<>(); - for (HoodieRecord rec : records) { - if (!recordsMap.containsKey(rec.getKey())) { - recordsMap.put(rec.getKey(), rec); - } - } - - List statuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - assertFileSizes(statuses); - - metaClient = HoodieTableMetaClient.reload(metaClient); - Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); - assertTrue(deltaCommit.isPresent()); - assertEquals(commitTime, deltaCommit.get().getTimestamp(), - "Latest Delta commit should match specified time"); - - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); - assertFalse(commit.isPresent()); - } - - private void assertFileSizes(List statuses) throws IOException { - for (WriteStatus status: statuses) { - assertEquals(FSUtils.getFileSize(metaClient.getFs(), new Path(metaClient.getBasePath(), status.getStat().getPath())), - status.getStat().getFileSizeInBytes()); - } - } - - private FileStatus[] getROSnapshotFiles(String partitionPath) - throws Exception { - FileInputFormat.setInputPaths(roSnapshotJobConf, basePath + "/" + partitionPath); - return listStatus(roSnapshotJobConf, false); - } - - private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction) - throws Exception { - return getROIncrementalFiles(partitionPath, "000", -1, stopAtCompaction); - } - - private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction) - throws Exception { - setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction); - FileInputFormat.setInputPaths(roJobConf, Paths.get(basePath, partitionPath).toString()); - return listStatus(roJobConf, false); - } - - private FileStatus[] getRTIncrementalFiles(String partitionPath) - throws Exception { - return getRTIncrementalFiles(partitionPath, "000", -1); - } - - private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull) - throws Exception { - setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false); - FileInputFormat.setInputPaths(rtJobConf, Paths.get(basePath, partitionPath).toString()); - return listStatus(rtJobConf, true); - } - - private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) { - String modePropertyName = - String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); - jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); - - String startCommitTimestampName = - String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); - jobConf.set(startCommitTimestampName, startCommit); - - String maxCommitPulls = - String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); - jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); - - String stopAtCompactionPropName = - String.format(HoodieHiveUtils.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); - jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction); - } - - private void validateFiles(String partitionPath, int expectedNumFiles, - FileStatus[] files, boolean realtime, JobConf jobConf, - int expectedRecords, String... expectedCommits) { - - assertEquals(expectedNumFiles, files.length); - Set expectedCommitsSet = Arrays.stream(expectedCommits).collect(Collectors.toSet()); - List records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, - Collections.singletonList(Paths.get(basePath, partitionPath).toString()), basePath, jobConf, realtime); - assertEquals(expectedRecords, records.size()); - Set actualCommits = records.stream().map(r -> - r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()).collect(Collectors.toSet()); - assertEquals(expectedCommitsSet, actualCommits); - } - - private FileStatus[] listAllBaseFilesInPath(HoodieTable table) throws IOException { - return HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles(table.getBaseFileExtension()); - } - - private FileStatus[] listStatus(JobConf jobConf, boolean realtime) throws IOException { - // This is required as Hoodie InputFormats do not extend a common base class and FileInputFormat's - // listStatus() is protected. - FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(baseFileFormat, realtime, jobConf); - switch (baseFileFormat) { - case PARQUET: - if (realtime) { - return ((HoodieParquetRealtimeInputFormat)inputFormat).listStatus(jobConf); - } else { - return ((HoodieParquetInputFormat)inputFormat).listStatus(jobConf); - } - case HFILE: - if (realtime) { - return ((HoodieHFileRealtimeInputFormat)inputFormat).listStatus(jobConf); - } else { - return ((HoodieHFileInputFormat)inputFormat).listStatus(jobConf); - } - default: - throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat); - } - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java new file mode 100644 index 000000000..03dd3b055 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; + +import org.apache.hadoop.fs.FileStatus; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("functional") +class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTestHarness { + + private static Stream testClustering() { + return Stream.of( + Arguments.of(true, true, true), + Arguments.of(true, true, false), + Arguments.of(true, false, true), + Arguments.of(true, false, false), + Arguments.of(false, true, true), + Arguments.of(false, true, false), + Arguments.of(false, false, true), + Arguments.of(false, false, false) + ); + } + + @ParameterizedTest + @MethodSource + void testClustering(boolean doUpdates, boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { + // set low compaction small File Size to generate more file groups. + HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder() + .forTable("test-trip-table") + .withPath(basePath()) + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withDeleteParallelism(2) + .withAutoCommit(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .compactionSmallFileSize(10L) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder() + .hfileMaxFileSize(1024 * 1024 * 1024) + .parquetMaxFileSize(1024 * 1024 * 1024).build()) + .withEmbeddedTimelineServerEnabled(true) + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0) + .withInlineClusteringNumCommits(1) + .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build()) + .withRollbackUsingMarkers(false); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, cfg.getProps()); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + + /* + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 400); + insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime); + + /* + * Write 2 (more inserts to create new files) + */ + // we already set small file size to small number to force inserts to go into new file. + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime); + + if (doUpdates) { + /* + * Write 3 (updates) + */ + newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, 100); + updateRecords(metaClient, records, client, cfg, newCommitTime); + } + + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); + // expect 2 base files for each partition + assertEquals(dataGen.getPartitionPaths().length * 2, allFiles.length); + + String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString(); + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + // verify all files are included in clustering plan. + assertEquals(allFiles.length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count()); + + // Do the clustering and validate + client.cluster(clusteringCommitTime, true); + + metaClient = HoodieTableMetaClient.reload(metaClient); + final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context(), metaClient); + Stream dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths()) + .flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p)); + // verify there should be only one base file per partition after clustering. + assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count()); + + HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants(); + assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(), + "Expecting a single commit."); + assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp()); + assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction()); + if (cfg.populateMetaFields()) { + assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")), + "Must contain 200 records"); + } else { + assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty())); + } + } + } + +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java new file mode 100644 index 000000000..e7e707409 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.HoodieHFileInputFormat; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("functional") +public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientFunctionalTestHarness { + + private JobConf roSnapshotJobConf; + private JobConf roJobConf; + private JobConf rtJobConf; + + @BeforeEach + void setUp() { + roSnapshotJobConf = new JobConf(hadoopConf()); + roJobConf = new JobConf(hadoopConf()); + rtJobConf = new JobConf(hadoopConf()); + } + + // test incremental read does not go past compaction instant for RO views + // For RT views, incremental read can go past compaction + @Test + public void testIncrementalReadsWithCompaction() throws Exception { + final String partitionPath = "2020/02/20"; // use only one partition for this test + final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] { partitionPath }); + Properties props = new Properties(); + props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); + HoodieWriteConfig cfg = getConfig(true); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + + /* + * Write 1 (only inserts) + */ + String commitTime1 = "001"; + client.startCommitWithTime(commitTime1); + + List records001 = dataGen.generateInserts(commitTime1, 200); + insertRecords(metaClient, records001, client, cfg, commitTime1); + + // verify only one base file shows up with commit time 001 + FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath); + validateFiles(partitionPath, 1, snapshotROFiles, false, roSnapshotJobConf, 200, commitTime1); + + FileStatus[] incrementalROFiles = getROIncrementalFiles(partitionPath, true); + validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1); + Path firstFilePath = incrementalROFiles[0].getPath(); + + FileStatus[] incrementalRTFiles = getRTIncrementalFiles(partitionPath); + validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf,200, commitTime1); + + assertEquals(firstFilePath, incrementalRTFiles[0].getPath()); + + /* + * Write 2 (updates) + */ + String updateTime = "004"; + client.startCommitWithTime(updateTime); + List records004 = dataGen.generateUpdates(updateTime, 100); + updateRecords(metaClient, records004, client, cfg, updateTime); + + // verify RO incremental reads - only one base file shows up because updates to into log files + incrementalROFiles = getROIncrementalFiles(partitionPath, false); + validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1); + assertEquals(firstFilePath, incrementalROFiles[0].getPath()); + + // verify RT incremental reads includes updates also + incrementalRTFiles = getRTIncrementalFiles(partitionPath); + validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime); + + // request compaction, but do not perform compaction + String compactionCommitTime = "005"; + client.scheduleCompactionAtInstant("005", Option.empty()); + + // verify RO incremental reads - only one base file shows up because updates go into log files + incrementalROFiles = getROIncrementalFiles(partitionPath, true); + validateFiles(partitionPath,1, incrementalROFiles, false, roJobConf, 200, commitTime1); + + // verify RT incremental reads includes updates also + incrementalRTFiles = getRTIncrementalFiles(partitionPath); + validateFiles(partitionPath, 1, incrementalRTFiles, true, rtJobConf, 200, commitTime1, updateTime); + + // write 3 - more inserts + String insertsTime = "006"; + List records006 = dataGen.generateInserts(insertsTime, 200); + client.startCommitWithTime(insertsTime); + insertRecords(metaClient, records006, client, cfg, insertsTime); + + // verify new write shows up in snapshot mode even though there is pending compaction + snapshotROFiles = getROSnapshotFiles(partitionPath); + validateFiles(partitionPath, 2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, insertsTime); + + incrementalROFiles = getROIncrementalFiles(partitionPath, true); + assertEquals(firstFilePath, incrementalROFiles[0].getPath()); + // verify 006 does not show up in RO mode because of pending compaction + + validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1); + + // verify that if stopAtCompaction is disabled, inserts from "insertsTime" show up + incrementalROFiles = getROIncrementalFiles(partitionPath, false); + validateFiles(partitionPath,2, incrementalROFiles, false, roJobConf, 400, commitTime1, insertsTime); + + // verify 006 shows up in RT views + incrementalRTFiles = getRTIncrementalFiles(partitionPath); + validateFiles(partitionPath, 2, incrementalRTFiles, true, rtJobConf, 400, commitTime1, updateTime, insertsTime); + + // perform the scheduled compaction + client.compact(compactionCommitTime); + + // verify new write shows up in snapshot mode after compaction is complete + snapshotROFiles = getROSnapshotFiles(partitionPath); + validateFiles(partitionPath,2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, compactionCommitTime, + insertsTime); + + incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true); + assertTrue(incrementalROFiles.length == 2); + // verify 006 shows up because of pending compaction + validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf, 400, commitTime1, compactionCommitTime, + insertsTime); + } + } + + private FileStatus[] getROSnapshotFiles(String partitionPath) + throws Exception { + FileInputFormat.setInputPaths(roSnapshotJobConf, Paths.get(basePath(), partitionPath).toString()); + return listStatus(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), roSnapshotJobConf, false); + } + + private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction) + throws Exception { + return getROIncrementalFiles(partitionPath, "000", -1, stopAtCompaction); + } + + private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction) + throws Exception { + setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction); + FileInputFormat.setInputPaths(roJobConf, Paths.get(basePath(), partitionPath).toString()); + return listStatus(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), roJobConf, false); + } + + private FileStatus[] getRTIncrementalFiles(String partitionPath) + throws Exception { + return getRTIncrementalFiles(partitionPath, "000", -1); + } + + private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull) + throws Exception { + setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false); + FileInputFormat.setInputPaths(rtJobConf, Paths.get(basePath(), partitionPath).toString()); + return listStatus(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue(), rtJobConf, true); + } + + private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) { + String modePropertyName = + String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); + + String startCommitTimestampName = + String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(startCommitTimestampName, startCommit); + + String maxCommitPulls = + String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); + + String stopAtCompactionPropName = + String.format(HoodieHiveUtils.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction); + } + + private void validateFiles(String partitionPath, int expectedNumFiles, + FileStatus[] files, boolean realtime, JobConf jobConf, + int expectedRecords, String... expectedCommits) { + + assertEquals(expectedNumFiles, files.length); + Set expectedCommitsSet = Arrays.stream(expectedCommits).collect(Collectors.toSet()); + List records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), + Collections.singletonList(Paths.get(basePath(), partitionPath).toString()), basePath(), jobConf, realtime); + assertEquals(expectedRecords, records.size()); + Set actualCommits = records.stream().map(r -> + r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()).collect(Collectors.toSet()); + assertEquals(expectedCommitsSet, actualCommits); + } + + private FileStatus[] listStatus(HoodieFileFormat baseFileFormat, JobConf jobConf, boolean realtime) throws IOException { + // This is required as Hoodie InputFormats do not extend a common base class and FileInputFormat's + // listStatus() is protected. + FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(baseFileFormat, realtime, jobConf); + switch (baseFileFormat) { + case PARQUET: + if (realtime) { + return ((HoodieParquetRealtimeInputFormat)inputFormat).listStatus(jobConf); + } else { + return ((HoodieParquetInputFormat)inputFormat).listStatus(jobConf); + } + case HFILE: + if (realtime) { + return ((HoodieHFileRealtimeInputFormat)inputFormat).listStatus(jobConf); + } else { + return ((HoodieHFileInputFormat)inputFormat).listStatus(jobConf); + } + default: + throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java new file mode 100644 index 000000000..caecbef52 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("functional") +public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClientFunctionalTestHarness { + + private static Stream testSimpleInsertAndUpdate() { + return Stream.of( + Arguments.of(HoodieFileFormat.PARQUET, true), + Arguments.of(HoodieFileFormat.PARQUET, false), + Arguments.of(HoodieFileFormat.HFILE, true) + ); + } + + @ParameterizedTest + @MethodSource + public void testSimpleInsertAndUpdate(HoodieFileFormat fileFormat, boolean populateMetaFields) throws Exception { + Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), fileFormat.toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + /* + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 200); + insertRecords(metaClient, records, client, cfg, newCommitTime); + + /* + * Write 2 (updates) + */ + newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, 100); + updateRecords(metaClient, records, client, cfg, newCommitTime); + + String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString(); + client.compact(compactionCommitTime); + + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + Stream dataFilesToRead = tableView.getLatestBaseFiles(); + assertTrue(dataFilesToRead.findAny().isPresent()); + + // verify that there is a commit + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants(); + assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), + "Expecting a single commit."); + String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); + assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime)); + + if (cfg.populateMetaFields()) { + assertEquals(200, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")), + "Must contain 200 records"); + } else { + assertEquals(200, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty())); + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws Exception { + Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + /* + * Write 1 (only inserts, written as base file) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 20); + JavaRDD writeRecords = jsc().parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + + Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); + + Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = tableView.getLatestBaseFiles(); + assertFalse(dataFilesToRead.findAny().isPresent()); + + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); + assertTrue(dataFilesToRead.findAny().isPresent(), + "should list the base files we wrote in the delta commit"); + + /* + * Write 2 (only updates, written to .log file) + */ + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, records); + writeRecords = jsc().parallelize(records, 1); + statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + /* + * Write 2 (only deletes, written to .log file) + */ + newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + + List fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records); + + statuses = client.upsert(jsc().parallelize(fewRecordsForDelete, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = HoodieTableMetaClient.reload(metaClient); + deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("004", deltaCommit.get().getTimestamp(), "Latest Delta commit should be 004"); + + commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + allFiles = listAllBaseFilesInPath(hoodieTable); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); + assertTrue(dataFilesToRead.findAny().isPresent()); + + List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath(), new JobConf(hadoopConf()), true, false); + // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0 + assertEquals(0, recordsRead.size(), "Must contain 0 records"); + } + } + + @Test + public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception { + // insert 100 records + // Setting IndexType to be InMemory to simulate Global Index nature + HoodieWriteConfig config = getConfigBuilder(false, HoodieIndex.IndexType.INMEMORY).build(); + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { + String newCommitTime = "100"; + writeClient.startCommitWithTime(newCommitTime); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD recordsRDD = jsc().parallelize(records, 1); + JavaRDD statuses = writeClient.insert(recordsRDD, newCommitTime); + writeClient.commit(newCommitTime, statuses); + + HoodieTable table = HoodieSparkTable.create(config, context(), metaClient); + TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView(); + + long numLogFiles = 0; + for (String partitionPath : dataGen.getPartitionPaths()) { + List allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); + assertEquals(0, allSlices.stream().filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count()); + assertTrue(allSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); + long logFileCount = allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); + if (logFileCount > 0) { + // check the log versions start from the base version + assertTrue(allSlices.stream().map(slice -> slice.getLogFiles().findFirst().get().getLogVersion()) + .allMatch(version -> version.equals(HoodieLogFile.LOGFILE_BASE_VERSION))); + } + numLogFiles += logFileCount; + } + + assertTrue(numLogFiles > 0); + // Do a compaction + String instantTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); + statuses = (JavaRDD) writeClient.compact(instantTime); + String extension = table.getBaseFileExtension(); + assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count()); + assertEquals(numLogFiles, statuses.count()); + writeClient.commitCompaction(instantTime, statuses, Option.empty()); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java new file mode 100644 index 000000000..7ab5a80e5 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("functional") +public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunctionalTestHarness { + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws Exception { + // Set TableType to COW + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); + + HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + /* + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc().parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + // verify there are no errors + assertNoWriteErrors(statuses); + client.commit(newCommitTime, jsc().parallelize(statuses)); + + metaClient = HoodieTableMetaClient.reload(metaClient); + Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertTrue(commit.isPresent()); + assertEquals("001", commit.get().getTimestamp(), "commit should be 001"); + + /* + * Write 2 (updates) + */ + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, records); + + statuses = client.upsert(jsc().parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + // Set TableType to MOR + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ); + + // rollback a COW commit when TableType is MOR + client.rollback(newCommitTime); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + + final String absentCommit = newCommitTime; + assertAll(tableView.getLatestBaseFiles().map(file -> () -> assertNotEquals(absentCommit, file.getCommitTime()))); + } + } + + private static Stream testRollbackWithDeltaAndCompactionCommit() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false) + ); + } + + @ParameterizedTest + @MethodSource + void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers, boolean populateMetaFields) throws Exception { + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); + + Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + + // Test delta commit rollback + /* + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc().parallelize(records, 1); + + JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + List statuses = writeStatusJavaRDD.collect(); + assertNoWriteErrors(statuses); + + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + + Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); + + Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = tableView.getLatestBaseFiles(); + assertFalse(dataFilesToRead.findAny().isPresent()); + + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); + assertTrue(dataFilesToRead.findAny().isPresent(), + "should list the base files we wrote in the delta commit"); + + /* + * Write 2 (inserts + updates - testing failed delta commit) + */ + final String commitTime1 = "002"; + // WriteClient with custom config (disable small file handling) + try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));) { + secondClient.startCommitWithTime(commitTime1); + + List copyOfRecords = new ArrayList<>(records); + copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); + copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); + + List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + basePath()); + assertEquals(200, recordsRead.size()); + + statuses = secondClient.upsert(jsc().parallelize(copyOfRecords, 1), commitTime1).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + // Test failed delta commit rollback + secondClient.rollback(commitTime1); + allFiles = listAllBaseFilesInPath(hoodieTable); + // After rollback, there should be no base file with the failed commit time + List remainingFiles = Arrays.stream(allFiles).filter(file -> file.getPath().getName() + .contains(commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList()); + assertEquals(0, remainingFiles.size(), "There files should have been rolled-back " + + "when rolling back commit " + commitTime1 + " but are still remaining. Files: " + remainingFiles); + dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath()); + assertEquals(200, recordsRead.size()); + } + + /* + * Write 3 (inserts + updates - testing successful delta commit) + */ + final String commitTime2 = "002"; + try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(cfg);) { + thirdClient.startCommitWithTime(commitTime2); + + List copyOfRecords = new ArrayList<>(records); + copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords); + copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200)); + + List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + basePath()); + assertEquals(200, recordsRead.size()); + + writeRecords = jsc().parallelize(copyOfRecords, 1); + writeStatusJavaRDD = thirdClient.upsert(writeRecords, commitTime2); + statuses = writeStatusJavaRDD.collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + // Test successful delta commit rollback + thirdClient.rollback(commitTime2); + allFiles = listAllBaseFilesInPath(hoodieTable); + // After rollback, there should be no base file with the failed commit time + assertEquals(0, Arrays.stream(allFiles) + .filter(file -> file.getPath().getName().contains(commitTime2)).count()); + + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath()); + // check that the number of records read is still correct after rollback operation + assertEquals(200, recordsRead.size()); + + // Test compaction commit rollback + /* + * Write 4 (updates) + */ + newCommitTime = "003"; + thirdClient.startCommitWithTime(newCommitTime); + + writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime); + statuses = writeStatusJavaRDD.collect(); + thirdClient.commit(newCommitTime, writeStatusJavaRDD); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString(); + thirdClient.compact(compactionInstantTime); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp(); + assertTrue(Arrays.stream(listAllBaseFilesInPath(hoodieTable)) + .anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime()))); + thirdClient.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime), + hoodieTable); + allFiles = listAllBaseFilesInPath(hoodieTable); + metaClient = HoodieTableMetaClient.reload(metaClient); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + + assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); + assertAll(tableView.getLatestBaseFiles().map(file -> () -> assertNotEquals(compactedCommitTime, file.getCommitTime()))); + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testMultiRollbackWithDeltaAndCompactionCommit(boolean populateMetaFields) throws Exception { + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); + + Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + + try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + /* + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc().parallelize(records, 1); + + JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + List statuses = writeStatusJavaRDD.collect(); + assertNoWriteErrors(statuses); + client.close(); + + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + + Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); + + Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = tableView.getLatestBaseFiles(); + assertFalse(dataFilesToRead.findAny().isPresent()); + + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); + assertTrue(dataFilesToRead.findAny().isPresent(), + "Should list the base files we wrote in the delta commit"); + /* + * Write 2 (inserts + updates) + */ + newCommitTime = "002"; + // WriteClient with custom config (disable small file handling) + try (SparkRDDWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(populateMetaFields))) { + nClient.startCommitWithTime(newCommitTime); + + List copyOfRecords = new ArrayList<>(records); + copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); + copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); + + List dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + basePath()); + assertEquals(200, recordsRead.size()); + + statuses = nClient.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + nClient.commit(newCommitTime, writeStatusJavaRDD); + copyOfRecords.clear(); + } + + // Schedule a compaction + /* + * Write 3 (inserts + updates) + */ + newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + + List newInserts = dataGen.generateInserts(newCommitTime, 100); + records = dataGen.generateUpdates(newCommitTime, records); + records.addAll(newInserts); + writeRecords = jsc().parallelize(records, 1); + + writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + statuses = writeStatusJavaRDD.collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + String compactionInstantTime = "004"; + client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + + // Compaction commit + /* + * Write 4 (updates) + */ + newCommitTime = "005"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, records); + writeRecords = jsc().parallelize(records, 1); + + writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + statuses = writeStatusJavaRDD.collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + compactionInstantTime = "006"; + client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + JavaRDD ws = (JavaRDD) client.compact(compactionInstantTime); + client.commitCompaction(compactionInstantTime, ws, Option.empty()); + + allFiles = listAllBaseFilesInPath(hoodieTable); + metaClient = HoodieTableMetaClient.reload(metaClient); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + + final String compactedCommitTime = + metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp(); + + assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); + + /* + * Write 5 (updates) + */ + newCommitTime = "007"; + client.startCommitWithTime(newCommitTime); + List copyOfRecords = new ArrayList<>(records); + copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); + copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); + + statuses = client.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + client.commit(newCommitTime, writeStatusJavaRDD); + copyOfRecords.clear(); + + // Rollback latest commit first + client.restoreToInstant("000"); + + metaClient = HoodieTableMetaClient.reload(metaClient); + allFiles = listAllBaseFilesInPath(hoodieTable); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); + assertFalse(dataFilesToRead.findAny().isPresent()); + TableFileSystemView.SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + List fileGroups = + ((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList()); + assertTrue(fileGroups.isEmpty()); + + // make sure there are no log files remaining + assertEquals(0L, ((HoodieTableFileSystemView) rtView).getAllFileGroups() + .filter(fileGroup -> fileGroup.getAllRawFileSlices().noneMatch(f -> f.getLogFiles().count() == 0)) + .count()); + + } + } + + private HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff(boolean populateMetaFields) { + HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withDeleteParallelism(2) + .withAutoCommit(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withEmbeddedTimelineServerEnabled(true) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024).parquetMaxFileSize(1024).build()).forTable("test-trip-table"); + + if (!populateMetaFields) { + addConfigsForPopulateMetaFields(cfgBuilder, false); + } + return cfgBuilder.build(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testInsertsGeneratedIntoLogFilesRollback(boolean rollbackUsingMarkers) throws Exception { + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + // insert 100 records + // Setting IndexType to be InMemory to simulate Global Index nature + HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.INMEMORY).build(); + + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { + String newCommitTime = "100"; + writeClient.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD recordsRDD = jsc().parallelize(records, 1); + JavaRDD statuses = writeClient.insert(recordsRDD, newCommitTime); + // trigger an action + List writeStatuses = statuses.collect(); + + // Ensure that inserts are written to only log files + assertEquals(0, + writeStatuses.stream().filter(writeStatus -> !writeStatus.getStat().getPath().contains("log")).count()); + assertTrue( + writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPath().contains("log"))); + + // rollback a failed commit + boolean rollback = writeClient.rollback(newCommitTime); + assertTrue(rollback); + + // insert 100 records + newCommitTime = "101"; + writeClient.startCommitWithTime(newCommitTime); + records = dataGen.generateInserts(newCommitTime, 100); + recordsRDD = jsc().parallelize(records, 1); + writeClient.insert(recordsRDD, newCommitTime).collect(); + + // Sleep for small interval (at least 1 second) to force a new rollback start time. + Thread.sleep(1000); + + // We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs + // and calling rollback twice + final String lastCommitTime = newCommitTime; + + // Save the .commit file to local directory. + // Rollback will be called twice to test the case where rollback failed first time and retried. + // We got the "BaseCommitTime cannot be null" exception before the fix + java.nio.file.Path tempFolder = Files.createTempDirectory(this.getClass().getCanonicalName()); + Map fileNameMap = new HashMap<>(); + for (HoodieInstant.State state : Arrays.asList(HoodieInstant.State.REQUESTED, HoodieInstant.State.INFLIGHT)) { + HoodieInstant toCopy = new HoodieInstant(state, HoodieTimeline.DELTA_COMMIT_ACTION, lastCommitTime); + File file = Files.createTempFile(tempFolder, null, null).toFile(); + metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), toCopy.getFileName()), + new Path(file.getAbsolutePath())); + fileNameMap.put(file.getAbsolutePath(), toCopy.getFileName()); + } + Path markerDir = new Path(Files.createTempDirectory(tempFolder, null).toAbsolutePath().toString()); + if (rollbackUsingMarkers) { + metaClient.getFs().copyToLocalFile(new Path(metaClient.getMarkerFolderPath(lastCommitTime)), + markerDir); + } + + writeClient.rollback(newCommitTime); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieSparkTable.create(config, context()); + TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView(); + + long numLogFiles = 0; + for (String partitionPath : dataGen.getPartitionPaths()) { + assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent())); + assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); + numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath) + .filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); + } + assertEquals(0, numLogFiles); + for (Map.Entry entry : fileNameMap.entrySet()) { + try { + metaClient.getFs().copyFromLocalFile(new Path(entry.getKey()), + new Path(metaClient.getMetaPath(), entry.getValue())); + } catch (IOException e) { + throw new HoodieIOException("Error copying state from local disk.", e); + } + } + if (rollbackUsingMarkers) { + metaClient.getFs().copyFromLocalFile(markerDir, + new Path(metaClient.getMarkerFolderPath(lastCommitTime))); + } + Thread.sleep(1000); + // Rollback again to pretend the first rollback failed partially. This should not error out + writeClient.rollback(newCommitTime); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(boolean rollbackUsingMarkers) throws Exception { + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + // insert 100 records + // Setting IndexType to be InMemory to simulate Global Index nature + HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.INMEMORY).build(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { + String newCommitTime = "100"; + writeClient.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD recordsRDD = jsc().parallelize(records, 1); + JavaRDD statuses = writeClient.insert(recordsRDD, newCommitTime); + writeClient.commit(newCommitTime, statuses); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieSparkTable.create(config, context(), metaClient); + TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView(); + + long numLogFiles = 0; + for (String partitionPath : dataGen.getPartitionPaths()) { + assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent())); + assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); + numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath) + .filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); + } + + assertTrue(numLogFiles > 0); + // Do a compaction + newCommitTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); + statuses = (JavaRDD) writeClient.compact(newCommitTime); + // Ensure all log files have been compacted into base files + String extension = table.getBaseFileExtension(); + assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count()); + assertEquals(numLogFiles, statuses.count()); + //writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); + // Trigger a rollback of compaction + table.getActiveTimeline().reload(); + writeClient.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, newCommitTime), table); + + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieSparkTable.create(config, context(), metaClient); + tableRTFileSystemView = table.getSliceView(); + ((SyncableFileSystemView) tableRTFileSystemView).reset(); + + for (String partitionPath : dataGen.getPartitionPaths()) { + List fileSlices = getFileSystemViewWithUnCommittedSlices(metaClient) + .getAllFileSlices(partitionPath).filter(fs -> fs.getBaseInstantTime().equals("100")).collect(Collectors.toList()); + assertTrue(fileSlices.stream().noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent())); + assertTrue(fileSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); + } + } + } + + private SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) { + try { + return new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline(), + HoodieTestTable.of(metaClient).listAllBaseAndLogFiles() + ); + } catch (IOException ioe) { + throw new HoodieIOException("Error getting file system view", ioe); + } + } + +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index a6ac67b85..1e52e4494 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -385,7 +385,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im return tableView; } - protected Pair, WorkloadStat> buildProfile(JavaRDD inputRecordsRDD) { + public static Pair, WorkloadStat> buildProfile(JavaRDD inputRecordsRDD) { HashMap partitionPathStatMap = new HashMap<>(); WorkloadStat globalStat = new WorkloadStat(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java new file mode 100644 index 000000000..b8b470434 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.testutils; + +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.providers.HoodieMetaClientProvider; +import org.apache.hudi.testutils.providers.HoodieWriteClientProvider; +import org.apache.hudi.testutils.providers.SparkProvider; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Stream; + +import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME; +import static org.apache.hudi.testutils.Assertions.assertFileSizesEqual; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMetaClientProvider, HoodieWriteClientProvider { + + private static transient SparkSession spark; + private static transient SQLContext sqlContext; + private static transient JavaSparkContext jsc; + private static transient HoodieSparkEngineContext context; + + /** + * An indicator of the initialization status. + */ + protected boolean initialized = false; + @TempDir + protected java.nio.file.Path tempDir; + + public String basePath() { + return tempDir.toAbsolutePath().toUri().toString(); + } + + @Override + public SparkSession spark() { + return spark; + } + + @Override + public SQLContext sqlContext() { + return sqlContext; + } + + @Override + public JavaSparkContext jsc() { + return jsc; + } + + public Configuration hadoopConf() { + return jsc.hadoopConfiguration(); + } + + @Override + public HoodieSparkEngineContext context() { + return context; + } + + public HoodieTableMetaClient getHoodieMetaClient(HoodieTableType tableType) throws IOException { + return getHoodieMetaClient(tableType, new Properties()); + } + + public HoodieTableMetaClient getHoodieMetaClient(HoodieTableType tableType, Properties props) throws IOException { + return getHoodieMetaClient(hadoopConf(), basePath(), tableType, props); + } + + public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, HoodieTableType tableType, Properties props) throws IOException { + props = HoodieTableMetaClient.withPropertyBuilder() + .setTableName(RAW_TRIPS_TEST_NAME) + .setTableType(tableType) + .setPayloadClass(HoodieAvroPayload.class) + .fromProperties(props) + .build(); + return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props); + } + + public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException { + return getHoodieMetaClient(hadoopConf, basePath, new Properties()); + } + + @Override + public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException { + props = HoodieTableMetaClient.withPropertyBuilder() + .setTableName(RAW_TRIPS_TEST_NAME) + .setTableType(COPY_ON_WRITE) + .setPayloadClass(HoodieAvroPayload.class) + .fromProperties(props) + .build(); + return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props); + } + + @Override + public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException { + return new SparkRDDWriteClient(context(), cfg); + } + + @BeforeEach + public synchronized void runBeforeEach() { + initialized = spark != null; + if (!initialized) { + SparkConf sparkConf = conf(); + SparkRDDWriteClient.registerClasses(sparkConf); + HoodieReadClient.addHoodieSupport(sparkConf); + spark = SparkSession.builder().config(sparkConf).getOrCreate(); + sqlContext = spark.sqlContext(); + jsc = new JavaSparkContext(spark.sparkContext()); + context = new HoodieSparkEngineContext(jsc); + } + } + + @AfterAll + public static synchronized void cleanUpAfterAll() { + if (spark != null) { + spark.close(); + spark = null; + } + } + + protected void insertRecords(HoodieTableMetaClient metaClient, List records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { + HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient); + + JavaRDD writeRecords = jsc().parallelize(records, 1); + List statuses = client.insert(writeRecords, commitTime).collect(); + assertNoWriteErrors(statuses); + assertFileSizesEqual(statuses, status -> FSUtils.getFileSize(reloadedMetaClient.getFs(), new Path(reloadedMetaClient.getBasePath(), status.getStat().getPath()))); + + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), reloadedMetaClient); + + Option deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals(commitTime, deltaCommit.get().getTimestamp(), "Delta commit should be specified value"); + + Option commit = reloadedMetaClient.getActiveTimeline().getCommitTimeline().lastInstant(); + assertFalse(commit.isPresent()); + + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); + TableFileSystemView.BaseFileOnlyView roView = + getHoodieTableFileSystemView(reloadedMetaClient, reloadedMetaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = roView.getLatestBaseFiles(); + assertTrue(!dataFilesToRead.findAny().isPresent()); + + roView = getHoodieTableFileSystemView(reloadedMetaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = roView.getLatestBaseFiles(); + assertTrue(dataFilesToRead.findAny().isPresent(), + "should list the base files we wrote in the delta commit"); + } + + protected void updateRecords(HoodieTableMetaClient metaClient, List records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { + HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient); + + Map recordsMap = new HashMap<>(); + for (HoodieRecord rec : records) { + if (!recordsMap.containsKey(rec.getKey())) { + recordsMap.put(rec.getKey(), rec); + } + } + + List statuses = client.upsert(jsc().parallelize(records, 1), commitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + assertFileSizesEqual(statuses, status -> FSUtils.getFileSize(reloadedMetaClient.getFs(), new Path(reloadedMetaClient.getBasePath(), status.getStat().getPath()))); + + Option deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals(commitTime, deltaCommit.get().getTimestamp(), + "Latest Delta commit should match specified time"); + + Option commit = reloadedMetaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + } + + protected FileStatus[] listAllBaseFilesInPath(HoodieTable table) throws IOException { + return HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles(table.getBaseFileExtension()); + } + + protected Properties getPropertiesForKeyGen() { + Properties properties = new Properties(); + properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); + properties.put("hoodie.datasource.write.recordkey.field", "_row_key"); + properties.put("hoodie.datasource.write.partitionpath.field", "partition_path"); + properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); + properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path"); + properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); + return properties; + } + + protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) { + if (!populateMetaFields) { + configBuilder.withProperties(getPropertiesForKeyGen()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build()); + } + } + + protected HoodieWriteConfig getConfig(Boolean autoCommit) { + return getConfigBuilder(autoCommit).build(); + } + + protected HoodieWriteConfig getConfig(Boolean autoCommit, Boolean rollbackUsingMarkers) { + return getConfigBuilder(autoCommit, rollbackUsingMarkers, HoodieIndex.IndexType.BLOOM).build(); + } + + protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) { + return getConfigBuilder(autoCommit, HoodieIndex.IndexType.BLOOM); + } + + protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieIndex.IndexType indexType) { + return getConfigBuilder(autoCommit, false, indexType); + } + + protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) { + return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig); + } + + protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) { + return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build()); + } + + protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType, + long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) { + return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withDeleteParallelism(2) + .withAutoCommit(autoCommit) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build()) + .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) + .withClusteringConfig(clusteringConfig) + .withRollbackUsingMarkers(rollbackUsingMarkers); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CheckedFunction.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CheckedFunction.java new file mode 100644 index 000000000..b0b3588c6 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CheckedFunction.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.testutils; + +@FunctionalInterface +public interface CheckedFunction { + R apply(T t) throws Exception; +}