1
0

[HUDI-781] Add HoodieWriteableTestTable (#2040)

- Introduce HoodieWriteableTestTable for writing records into files
- Migrate writeParquetFiles() in HoodieClientTestUtils to HoodieWriteableTestTable
- Adopt HoodieWrittableTestTable for test cases in
  - ITTestRepairsCommand.java
  - TestHoodieIndex.java
  - TestHoodieKeyLocationFetchHandle.java
  - TestHoodieGlobalBloomIndex.java
  - TestHoodieBloomIndex.java
- Renamed HoodieTestTable and FileCreateUtils APIs
  - dataFile changed to baseFile
This commit is contained in:
Raymond Xu
2020-09-07 02:54:36 -07:00
committed by GitHub
parent 6537af2676
commit 83e39e2b17
13 changed files with 426 additions and 467 deletions

View File

@@ -23,9 +23,7 @@ import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.RepairsCommand;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -33,7 +31,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieWriteableTestTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
@@ -43,17 +41,12 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.spark.sql.functions.lit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -69,10 +62,10 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
private String repairedOutputPath;
@BeforeEach
public void init() throws IOException, URISyntaxException {
String tablePath = basePath + File.separator + "test_table";
duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
repairedOutputPath = basePath + File.separator + "tmp";
public void init() throws Exception {
final String tablePath = Paths.get(basePath, "test_table").toString();
duplicatedPartitionPath = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString();
repairedOutputPath = Paths.get(basePath, "tmp").toString();
HoodieCLI.conf = jsc.hadoopConfiguration();
@@ -83,33 +76,19 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
// generate 200 records
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);
String fileName1 = "1_0_20160401010101.parquet";
String fileName2 = "2_0_20160401010101.parquet";
List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileName1, hoodieRecords1, schema, null, false);
List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileName2, hoodieRecords2, schema, null, false);
// generate commit file
String fileId1 = UUID.randomUUID().toString();
String testWriteToken = "1-0-1";
String commitTime = FSUtils.getCommitTime(fileName1);
Files.createFile(Paths.get(duplicatedPartitionPath + "/"
+ FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken)));
Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit"));
HoodieRecord[] hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]);
HoodieRecord[] hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]);
testTable.addCommit("20160401010101")
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1)
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2)
.withLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
// read records and get 10 to generate duplicates
Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
String fileName3 = "3_0_20160401010202.parquet";
commitTime = FSUtils.getCommitTime(fileName3);
df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime))
.write().parquet(duplicatedPartitionPath + File.separator + fileName3);
Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit"));
HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10);
testTable.addCommit("20160401010202")
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords);
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
}

View File

@@ -20,10 +20,8 @@ package org.apache.hudi.index;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
@@ -39,7 +37,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.avro.Schema;
@@ -50,7 +48,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -67,6 +64,7 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class TestHoodieIndex extends HoodieClientTestHarness {
@@ -247,6 +245,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
@EnumSource(value = IndexType.class, names = {"BLOOM", "SIMPLE",})
public void testTagLocationAndFetchRecordLocations(IndexType indexType) throws Exception {
setUp(indexType);
String p1 = "2016/01/31";
String p2 = "2015/01/31";
String rowKey1 = UUID.randomUUID().toString();
String rowKey2 = UUID.randomUUID().toString();
String rowKey3 = UUID.randomUUID().toString();
@@ -279,12 +279,10 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
}
// We create three parquet file, each having one record. (two different partitions)
String filename1 =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, null, true);
String filename2 =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), SCHEMA, null, true);
String filename3 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), SCHEMA, null, true);
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
String fileId1 = testTable.addCommit("001").withInserts(p1, record1);
String fileId2 = testTable.addCommit("002").withInserts(p1, record2);
String fileId3 = testTable.addCommit("003").withInserts(p2, record4);
// We do the tag again
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -295,13 +293,13 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
// Check results
for (HoodieRecord record : taggedRecordRDD.collect()) {
if (record.getRecordKey().equals(rowKey1)) {
if (record.getPartitionPath().equals("2015/01/31")) {
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
if (record.getPartitionPath().equals(p2)) {
assertEquals(record.getCurrentLocation().getFileId(), fileId3);
} else {
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename1));
assertEquals(record.getCurrentLocation().getFileId(), fileId1);
}
} else if (record.getRecordKey().equals(rowKey2)) {
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2));
assertEquals(record.getCurrentLocation().getFileId(), fileId2);
} else if (record.getRecordKey().equals(rowKey3)) {
assertFalse(record.isCurrentLocationKnown());
}
@@ -312,15 +310,15 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
for (Tuple2<HoodieKey, Option<Pair<String, String>>> entry : recordLocations.collect()) {
if (entry._1.getRecordKey().equals(rowKey1)) {
assertTrue(entry._2.isPresent(), "Row1 should have been present ");
if (entry._1.getPartitionPath().equals("2015/01/31")) {
if (entry._1.getPartitionPath().equals(p2)) {
assertTrue(entry._2.isPresent(), "Row1 should have been present ");
assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename3));
assertEquals(entry._2.get().getRight(), fileId3);
} else {
assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename1));
assertEquals(entry._2.get().getRight(), fileId1);
}
} else if (entry._1.getRecordKey().equals(rowKey2)) {
assertTrue(entry._2.isPresent(), "Row2 should have been present ");
assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename2));
assertEquals(entry._2.get().getRight(), fileId2);
} else if (entry._1.getRecordKey().equals(rowKey3)) {
assertFalse(entry._2.isPresent(), "Row3 should have been absent ");
}
@@ -338,12 +336,13 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
.build()).build();
writeClient = getHoodieWriteClient(config);
index = writeClient.getIndex();
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
final String p1 = "2016/01/31";
final String p2 = "2016/02/28";
// Create the original partition, and put a record, along with the meta file
// "2016/01/31": 1 file (1_0_20160131101010.parquet)
new File(basePath + "/2016/01/31").mkdirs();
new File(basePath + "/2016/01/31/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
// this record will be saved in table and will be tagged to an empty record
RawTripTestPayload originalPayload =
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -359,7 +358,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
- tag the new partition of the incomingRecord
*/
RawTripTestPayload incomingPayload =
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-28T03:16:41.415Z\",\"number\":12}");
HoodieRecord incomingRecord =
new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), incomingPayload.getPartitionPath()),
incomingPayload);
@@ -376,67 +375,42 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
incomingPayloadSamePartition);
// We have some records to be tagged (two different partitions)
HoodieClientTestUtils
.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), SCHEMA, null, false);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
// Add some commits
new File(basePath + "/.hoodie").mkdirs();
testTable.addCommit("1000").withInserts(p1, originalRecord);
// test against incoming record with a different partition
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord));
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table);
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable);
assertEquals(2, taggedRecordRDD.count());
for (HoodieRecord record : taggedRecordRDD.collect()) {
switch (record.getPartitionPath()) {
case "2016/01/31":
case p1:
assertEquals("000", record.getRecordKey());
assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
break;
case "2016/02/31":
case p2:
assertEquals("000", record.getRecordKey());
assertEquals(incomingPayload.getJsonData(), ((RawTripTestPayload) record.getData()).getJsonData());
break;
default:
assertFalse(true, String.format("Should not get partition path: %s", record.getPartitionPath()));
fail(String.format("Should not get partition path: %s", record.getPartitionPath()));
}
}
// test against incoming record with the same partition
JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
.parallelize(Collections.singletonList(incomingRecordSamePartition));
JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, table);
JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, hoodieTable);
assertEquals(1, taggedRecordRDDSamePartition.count());
HoodieRecord record = taggedRecordRDDSamePartition.first();
assertEquals("000", record.getRecordKey());
assertEquals("2016/01/31", record.getPartitionPath());
assertEquals(p1, record.getPartitionPath());
assertEquals(incomingPayloadSamePartition.getJsonData(), ((RawTripTestPayload) record.getData()).getJsonData());
}
/**
* Get Config builder with default configs set.
*
* @return Config Builder
*/
public HoodieWriteConfig.Builder getConfigBuilder() {
return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
}
HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
return getConfigBuilder(schemaStr, indexType);
}
/**
* Get Config builder with default configs set.
*
* @return Config Builder
*/
private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
private HoodieWriteConfig.Builder getConfigBuilder() {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
.withWriteStatusClass(MetadataMergeWriteStatus.class)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.index.bloom;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -33,7 +32,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieKeyLookupHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieWriteableTestTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
@@ -46,12 +45,8 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -105,18 +100,17 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws IOException {
public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieBloomIndex index = new HoodieBloomIndex(config);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
// Create some partitions, and put some files
// "2016/01/21": 0 file
// "2016/04/01": 1 file (2_0_20160401010101.parquet)
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet,
// 4_0_20150312101010.parquet)
Files.createDirectories(Paths.get(basePath, "2016", "01", "21"));
Files.createDirectories(Paths.get(basePath, "2016", "04", "01"));
Files.createDirectories(Paths.get(basePath, "2015", "03", "12"));
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet)
testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12");
RawTripTestPayload rowChange1 =
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -135,29 +129,17 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
HoodieRecord record4 =
new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet", new ArrayList<>(),
SCHEMA, null, false);
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet", new ArrayList<>(),
SCHEMA, null, false);
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet", Collections.singletonList(record1),
SCHEMA, null, false);
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet",
Arrays.asList(record2, record3, record4), SCHEMA, null, false);
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12");
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, hoodieTable);
// Still 0, as no valid commit
assertEquals(0, filesList.size());
// Add some commits
java.nio.file.Path hoodieDir = Files.createDirectories(Paths.get(basePath, ".hoodie"));
Files.createFile(hoodieDir.resolve("20160401010101.commit"));
Files.createFile(hoodieDir.resolve("20150312101010.commit"));
testTable.addCommit("20160401010101").withInserts("2016/04/01", "2");
testTable.addCommit("20150312101010").withInserts("2015/03/12", "1")
.withInserts("2015/03/12", "3", record1)
.withInserts("2015/03/12", "4", record2, record3, record4);
table = HoodieTable.create(metaClient, config, hadoopConf);
filesList = index.loadInvolvedFiles(partitions, jsc, table);
filesList = index.loadInvolvedFiles(partitions, jsc, hoodieTable);
assertEquals(4, filesList.size());
if (rangePruning) {
@@ -211,8 +193,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
}
@Test
public void testCheckUUIDsAgainstOneFile() throws IOException, InterruptedException {
public void testCheckUUIDsAgainstOneFile() throws Exception {
final String partition = "2016/01/31";
// Create some records to use
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
@@ -239,8 +221,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
// record2, record3).
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
filter.add(record3.getRecordKey());
String filename = HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1, record2),
SCHEMA, filter, true);
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(metaClient, SCHEMA, filter);
String fileId = testTable.addCommit("000").withInserts(partition, record1, record2);
String filename = testTable.getBaseFileNameById(fileId);
// The bloom filter contains 3 records
assertTrue(filter.mightContain(record1.getRecordKey()));
@@ -254,10 +237,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, table,
Pair.of("2016/01/31/", FSUtils.getFileId(filename)));
HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, table, Pair.of(partition, fileId));
List<String> results = keyHandle.checkCandidatesAgainstFile(hadoopConf, uuids,
new Path(basePath + "/2016/01/31/" + filename));
new Path(Paths.get(basePath, partition, filename).toString()));
assertEquals(results.size(), 2);
assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
|| results.get(1).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
@@ -314,12 +296,12 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
// Also create the metadata and config
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, hoodieTable);
// Should not find any files
for (HoodieRecord record : taggedRecordRDD.collect()) {
@@ -327,29 +309,23 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
}
// We create three parquet file, each having one record. (two different partitions)
String filename1 =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, null, true);
String filename2 =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), SCHEMA, null, true);
String filename3 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), SCHEMA, null, true);
String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", record1);
String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", record2);
String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", record4);
// We do the tag again
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.create(metaClient, config, hadoopConf);
taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, HoodieTable.create(metaClient, config, hadoopConf));
// Check results
for (HoodieRecord record : taggedRecordRDD.collect()) {
if (record.getRecordKey().equals(rowKey1)) {
if (record.getPartitionPath().equals("2015/01/31")) {
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
assertEquals(record.getCurrentLocation().getFileId(), fileId3);
} else {
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename1));
assertEquals(record.getCurrentLocation().getFileId(), fileId1);
}
} else if (record.getRecordKey().equals(rowKey2)) {
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2));
assertEquals(record.getCurrentLocation().getFileId(), fileId2);
} else if (record.getRecordKey().equals(rowKey3)) {
assertFalse(record.isCurrentLocationKnown());
}
@@ -385,13 +361,13 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
// Also create the metadata and config
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
JavaPairRDD<HoodieKey, Option<Pair<String, String>>> taggedRecordRDD =
bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
bloomIndex.fetchRecordLocation(keysRDD, jsc, hoodieTable);
// Should not find any files
for (Tuple2<HoodieKey, Option<Pair<String, String>>> record : taggedRecordRDD.collect()) {
@@ -399,29 +375,26 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
}
// We create three parquet file, each having one record. (two different partitions)
String filename1 =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, null, true);
String filename2 =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), SCHEMA, null, true);
String filename3 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), SCHEMA, null, true);
String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", record1);
String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", record2);
String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", record4);
// We do the tag again
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.create(metaClient, config, hadoopConf);
taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, hoodieTable);
// Check results
for (Tuple2<HoodieKey, Option<Pair<String, String>>> record : taggedRecordRDD.collect()) {
if (record._1.getRecordKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) {
assertTrue(record._2.isPresent());
assertEquals(FSUtils.getFileId(filename1), record._2.get().getRight());
assertEquals(fileId1, record._2.get().getRight());
} else if (record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
assertTrue(record._2.isPresent());
if (record._1.getPartitionPath().equals("2015/01/31")) {
assertEquals(FSUtils.getFileId(filename3), record._2.get().getRight());
assertEquals(fileId3, record._2.get().getRight());
} else {
assertEquals(FSUtils.getFileId(filename2), record._2.get().getRight());
assertEquals(fileId2, record._2.get().getRight());
}
} else if (record._1.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) {
assertFalse(record._2.isPresent());
@@ -431,7 +404,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testBloomFilterFalseError(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws IOException, InterruptedException {
public void testBloomFilterFalseError(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
// We have two hoodie records
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
@@ -449,8 +422,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1,
BloomFilterTypeCode.SIMPLE.name());
filter.add(record2.getRecordKey());
String filename =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, filter, true);
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(metaClient, SCHEMA, filter);
String fileId = testTable.addCommit("000").withInserts("2016/01/31", record1);
assertTrue(filter.mightContain(record1.getRecordKey()));
assertTrue(filter.mightContain(record2.getRecordKey()));
@@ -466,7 +439,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
// Check results
for (HoodieRecord record : taggedRecordRDD.collect()) {
if (record.getKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) {
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename));
assertEquals(record.getCurrentLocation().getFileId(), fileId);
} else if (record.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
assertFalse(record.isCurrentLocationKnown());
}

View File

@@ -18,18 +18,15 @@
package org.apache.hudi.index.bloom;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieWriteableTestTable;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaPairRDD;
@@ -39,10 +36,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -78,21 +71,17 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
}
@Test
public void testLoadInvolvedFiles() throws IOException {
public void testLoadInvolvedFiles() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
// Create some partitions, and put some files, along with the meta file
// "2016/01/21": 0 file
// "2016/04/01": 1 file (2_0_20160401010101.parquet)
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet,
// 4_0_20150312101010.parquet)
Path dir1 = Files.createDirectories(Paths.get(basePath, "2016", "01", "21"));
Files.createFile(dir1.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
Path dir2 = Files.createDirectories(Paths.get(basePath, "2016", "04", "01"));
Files.createFile(dir2.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
Path dir3 = Files.createDirectories(Paths.get(basePath, "2015", "03", "12"));
Files.createFile(dir3.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet)
testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12");
RawTripTestPayload rowChange1 =
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -111,31 +100,19 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
HoodieRecord record4 =
new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet", new ArrayList<>(),
SCHEMA, null, false);
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet", new ArrayList<>(),
SCHEMA, null, false);
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet", Collections.singletonList(record1),
SCHEMA, null, false);
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet",
Arrays.asList(record2, record3, record4), SCHEMA, null, false);
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
// partitions will NOT be respected by this loadInvolvedFiles(...) call
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, hoodieTable);
// Still 0, as no valid commit
assertEquals(0, filesList.size());
// Add some commits
Path hoodieDir = Files.createDirectories(Paths.get(basePath, ".hoodie"));
Files.createFile(hoodieDir.resolve("20160401010101.commit"));
Files.createFile(hoodieDir.resolve("20150312101010.commit"));
testTable.addCommit("20160401010101").withInserts("2016/04/01", "2");
testTable.addCommit("20150312101010").withInserts("2015/03/12", "1")
.withInserts("2015/03/12", "3", record1)
.withInserts("2015/03/12", "4", record2, record3, record4);
table = HoodieTable.create(metaClient, config, hadoopConf);
filesList = index.loadInvolvedFiles(partitions, jsc, table);
filesList = index.loadInvolvedFiles(partitions, jsc, hoodieTable);
assertEquals(4, filesList.size());
Map<String, BloomIndexFileInfo> filesMap = toFileMap(filesList);
@@ -201,18 +178,14 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
public void testTagLocation() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
// Create some partitions, and put some files, along with the meta file
// "2016/01/21": 0 file
// "2016/04/01": 1 file (2_0_20160401010101.parquet)
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet,
// 4_0_20150312101010.parquet)
Path dir1 = Files.createDirectories(Paths.get(basePath, "2016", "01", "21"));
Files.createFile(dir1.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
Path dir2 = Files.createDirectories(Paths.get(basePath, "2016", "04", "01"));
Files.createFile(dir2.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
Path dir3 = Files.createDirectories(Paths.get(basePath, "2015", "03", "12"));
Files.createFile(dir3.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet)
testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12");
RawTripTestPayload rowChange1 =
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -242,33 +215,23 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
String filename0 =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Collections.singletonList(record1), SCHEMA, null, false);
String filename1 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", new ArrayList<>(), SCHEMA, null, false);
String filename2 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record2), SCHEMA, null, false);
String filename3 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record4), SCHEMA, null, false);
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
// Add some commits
Files.createDirectories(Paths.get(basePath, ".hoodie"));
String fileId1 = testTable.addCommit("1000").withInserts("2016/04/01", record1);
String fileId2 = testTable.addCommit("2000").withInserts("2015/03/12");
String fileId3 = testTable.addCommit("3000").withInserts("2015/03/12", record2);
String fileId4 = testTable.addCommit("4000").withInserts("2015/03/12", record4);
// partitions will NOT be respected by this loadInvolvedFiles(...) call
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table);
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable);
for (HoodieRecord record : taggedRecordRDD.collect()) {
switch (record.getRecordKey()) {
case "000":
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename0));
assertEquals(record.getCurrentLocation().getFileId(), fileId1);
assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), rowChange1.getJsonData());
break;
case "001":
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2));
assertEquals(record.getCurrentLocation().getFileId(), fileId3);
assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), rowChange2.getJsonData());
break;
case "002":
@@ -276,11 +239,11 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), rowChange3.getJsonData());
break;
case "003":
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
assertEquals(record.getCurrentLocation().getFileId(), fileId4);
assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), rowChange5.getJsonData());
break;
case "004":
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
assertEquals(record.getCurrentLocation().getFileId(), fileId4);
assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), rowChange4.getJsonData());
break;
default:
@@ -296,12 +259,13 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
.withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(true).build())
.build();
HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
final String p1 = "2016/01/31";
final String p2 = "2016/02/28";
// Create the original partition, and put a record, along with the meta file
// "2016/01/31": 1 file (1_0_20160131101010.parquet)
Path dir = Files.createDirectories(Paths.get(basePath, "2016", "01", "31"));
Files.createFile(dir.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
// this record will be saved in table and will be tagged to an empty record
RawTripTestPayload originalPayload =
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -317,7 +281,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
- tag the new partition of the incomingRecord
*/
RawTripTestPayload incomingPayload =
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-28T03:16:41.415Z\",\"number\":12}");
HoodieRecord incomingRecord =
new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), incomingPayload.getPartitionPath()),
incomingPayload);
@@ -334,27 +298,20 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()),
incomingPayloadSamePartition);
HoodieClientTestUtils
.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), SCHEMA, null, false);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
// Add some commits
Files.createDirectories(Paths.get(basePath, ".hoodie"));
testTable.addCommit("1000").withInserts(p1, originalRecord);
// test against incoming record with a different partition
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord));
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table);
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable);
assertEquals(2, taggedRecordRDD.count());
for (HoodieRecord record : taggedRecordRDD.collect()) {
switch (record.getPartitionPath()) {
case "2016/01/31":
case p1:
assertEquals("000", record.getRecordKey());
assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
break;
case "2016/02/31":
case p2:
assertEquals("000", record.getRecordKey());
assertEquals(incomingPayload.getJsonData(), ((RawTripTestPayload) record.getData()).getJsonData());
break;
@@ -366,17 +323,17 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
// test against incoming record with the same partition
JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
.parallelize(Collections.singletonList(incomingRecordSamePartition));
JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, table);
JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, hoodieTable);
assertEquals(1, taggedRecordRDDSamePartition.count());
HoodieRecord record = taggedRecordRDDSamePartition.first();
assertEquals("000", record.getRecordKey());
assertEquals("2016/01/31", record.getPartitionPath());
assertEquals(p1, record.getPartitionPath());
assertEquals(incomingPayloadSamePartition.getJsonData(), ((RawTripTestPayload) record.getData()).getJsonData());
}
// convert list to map to avoid sorting order dependencies
private Map<String, BloomIndexFileInfo> toFileMap(List<Tuple2<String, BloomIndexFileInfo>> filesList) {
private static Map<String, BloomIndexFileInfo> toFileMap(List<Tuple2<String, BloomIndexFileInfo>> filesList) {
Map<String, BloomIndexFileInfo> filesMap = new HashMap<>();
for (Tuple2<String, BloomIndexFileInfo> t : filesList) {
filesMap.put(t._1() + "/" + t._2().getFileId(), t._2());

View File

@@ -19,16 +19,12 @@
package org.apache.hudi.io;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -37,7 +33,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.spark.api.java.JavaSparkContext;
@@ -46,19 +42,18 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import scala.Tuple2;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.common.testutils.HoodieTestUtils.makeNewCommitTime;
import static org.apache.hudi.common.testutils.Transformations.recordsToPartitionRecordsMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -88,17 +83,12 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
@Test
public void testFetchHandle() throws Exception {
String commitTime = "000";
List<HoodieRecord> records = dataGen.generateInserts(commitTime, 100);
List<HoodieRecord> records = dataGen.generateInserts(makeNewCommitTime(), 100);
Map<String, List<HoodieRecord>> partitionRecordsMap = recordsToPartitionRecordsMap(records);
Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> expectedList = writeToParquetAndGetExpectedRecordLocations(partitionRecordsMap);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
Files.createDirectories(Paths.get(basePath, ".hoodie"));
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, AVRO_SCHEMA_WITH_METADATA_FIELDS);
Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> expectedList =
writeToParquetAndGetExpectedRecordLocations(partitionRecordsMap, testTable);
List<Tuple2<String, HoodieBaseFile>> partitionPathFileIdPairs = loadAllFilesForPartitions(new ArrayList<>(partitionRecordsMap.keySet()), jsc, hoodieTable);
@@ -112,7 +102,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
}
private Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> writeToParquetAndGetExpectedRecordLocations(
Map<String, List<HoodieRecord>> partitionRecordsMap) throws Exception {
Map<String, List<HoodieRecord>> partitionRecordsMap, HoodieWriteableTestTable testTable) throws Exception {
Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> expectedList = new HashMap<>();
for (Map.Entry<String, List<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {
int totalRecordsPerPartition = entry.getValue().size();
@@ -138,7 +128,9 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
}
for (List<HoodieRecord> recordsPerSlice : recordsForFileSlices) {
Tuple2<String, String> fileIdInstantTimePair = writeToParquet(entry.getKey(), recordsPerSlice);
String instantTime = makeNewCommitTime();
String fileId = testTable.addCommit(instantTime).withInserts(entry.getKey(), recordsPerSlice.toArray(new HoodieRecord[0]));
Tuple2<String, String> fileIdInstantTimePair = new Tuple2<>(fileId, instantTime);
List<Tuple2<HoodieKey, HoodieRecordLocation>> expectedEntries = new ArrayList<>();
for (HoodieRecord record : recordsPerSlice) {
expectedEntries.add(new Tuple2<>(record.getKey(), new HoodieRecordLocation(fileIdInstantTimePair._2, fileIdInstantTimePair._1)));
@@ -149,31 +141,16 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
return expectedList;
}
protected List<Tuple2<String, HoodieBaseFile>> loadAllFilesForPartitions(List<String> partitions, final JavaSparkContext jsc,
final HoodieTable hoodieTable) {
private static List<Tuple2<String, HoodieBaseFile>> loadAllFilesForPartitions(List<String> partitions, JavaSparkContext jsc,
HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, HoodieBaseFile>> partitionPathFileIDList = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, jsc, hoodieTable);
return partitionPathFileIDList.stream()
.map(pf -> new Tuple2<>(pf.getKey(), pf.getValue())).collect(toList());
}
/**
* Get Config builder with default configs set.
*
* @return Config Builder
*/
public HoodieWriteConfig.Builder getConfigBuilder() {
return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
}
/**
* Get Config builder with default configs set.
*
* @return Config Builder
*/
private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
private HoodieWriteConfig.Builder getConfigBuilder() {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
.withWriteStatusClass(MetadataMergeWriteStatus.class)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
@@ -184,15 +161,4 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
}
private Tuple2<String, String> writeToParquet(String partitionPath, List<HoodieRecord> records) throws Exception {
Thread.sleep(100);
String instantTime = HoodieTestUtils.makeNewCommitTime();
String fileId = UUID.randomUUID().toString();
String filename = FSUtils.makeDataFileName(instantTime, "1-0-1", fileId);
HoodieTestUtils.createCommitFiles(basePath, instantTime);
HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, filename, records, AVRO_SCHEMA_WITH_METADATA_FIELDS, null,
true);
return new Tuple2<>(fileId, instantTime);
}
}

View File

@@ -527,18 +527,18 @@ public class TestCleaner extends HoodieClientTestBase {
: UUID.randomUUID().toString();
String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId()
: UUID.randomUUID().toString();
testTable.addCommit("00000000000001").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0);
testTable.addCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
// make next commit, with 1 insert & 1 update per partition
Map<String, String> partitionAndFileId002 = testTable.addCommit("00000000000002")
.withUpdates(p0, file1P0C0)
.withUpdates(p1, file1P1C0)
.withInserts(p0, p1);
.withBaseFilesInPartition(p0, file1P0C0)
.withBaseFilesInPartition(p1, file1P1C0)
.withBaseFilesInPartitions(p0, p1);
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
// enableBootstrapSourceClean would delete the bootstrap base file as the same time
@@ -559,10 +559,10 @@ public class TestCleaner extends HoodieClientTestBase {
cleanStat = getCleanStat(hoodieCleanStatsTwo, p1);
String file2P0C1 = partitionAndFileId002.get(p0);
String file2P1C1 = partitionAndFileId002.get(p1);
assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1));
assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0));
assertFalse(testTable.fileExists(p1, "00000000000001", file1P1C0));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1));
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
@@ -579,21 +579,21 @@ public class TestCleaner extends HoodieClientTestBase {
// make next commit, with 2 updates to existing files, and 1 insert
String file3P0C2 = testTable.addCommit("00000000000003")
.withUpdates(p0, file1P0C0, file2P0C1)
.withInserts(p0).get(p0);
.withBaseFilesInPartition(p0, file1P0C0, file2P0C1)
.withBaseFilesInPartitions(p0).get(p0);
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
assertEquals(2,
getCleanStat(hoodieCleanStatsThree, p0)
.getSuccessDeleteFiles().size(), "Must clean two files");
assertFalse(testTable.fileExists(p0, "00000000000002", file1P0C0));
assertFalse(testTable.fileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0));
assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
// No cleaning on partially written file, with no commit.
testTable.forCommit("00000000000004").withUpdates(p0, file3P0C2);
testTable.forCommit("00000000000004").withBaseFilesInPartition(p0, file3P0C2);
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
}
/**
@@ -613,23 +613,23 @@ public class TestCleaner extends HoodieClientTestBase {
String p0 = "2020/01/01";
// Make 3 files, one base file and 2 log files associated with base file
String file1P0 = testTable.addDeltaCommit("000").withInserts(p0).get(p0);
String file1P0 = testTable.addDeltaCommit("000").withBaseFilesInPartitions(p0).get(p0);
testTable.forDeltaCommit("000")
.withLogFile(p0, file1P0, 1)
.withLogFile(p0, file1P0, 2);
// Make 2 files, one base file and 1 log files associated with base file
testTable.addDeltaCommit("001")
.withUpdates(p0, file1P0)
.withBaseFilesInPartition(p0, file1P0)
.withLogFile(p0, file1P0, 3);
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
assertEquals(3,
getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
.size(), "Must clean three files, one parquet and 2 log files");
assertFalse(testTable.fileExists(p0, "000", file1P0));
assertFalse(testTable.baseFileExists(p0, "000", file1P0));
assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
assertTrue(testTable.fileExists(p0, "001", file1P0));
assertTrue(testTable.baseFileExists(p0, "001", file1P0));
assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
}
@@ -831,7 +831,7 @@ public class TestCleaner extends HoodieClientTestBase {
: UUID.randomUUID().toString();
String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId()
: UUID.randomUUID().toString();
testTable.addInflightCommit("00000000000001").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0);
testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
HoodieCommitMetadata commitMetadata = generateCommitMetadata(
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
@@ -849,14 +849,14 @@ public class TestCleaner extends HoodieClientTestBase {
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files");
assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
// make next commit, with 1 insert & 1 update per partition
Map<String, String> partitionAndFileId002 = testTable.addInflightCommit("00000000000002").withInserts(p0, p1);
Map<String, String> partitionAndFileId002 = testTable.addInflightCommit("00000000000002").withBaseFilesInPartitions(p0, p1);
String file2P0C1 = partitionAndFileId002.get(p0);
String file2P1C1 = partitionAndFileId002.get(p1);
testTable.forCommit("00000000000002").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0);
testTable.forCommit("00000000000002").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
commitMetadata = generateCommitMetadata(new HashMap<String, List<String>>() {
{
put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
@@ -868,16 +868,16 @@ public class TestCleaner extends HoodieClientTestBase {
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files");
assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1));
assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1));
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
// make next commit, with 2 updates to existing files, and 1 insert
String file3P0C2 = testTable.addInflightCommit("00000000000003")
.withUpdates(p0, file1P0C0)
.withUpdates(p0, file2P0C1)
.withInserts(p0).get(p0);
.withBaseFilesInPartition(p0, file1P0C0)
.withBaseFilesInPartition(p0, file2P0C1)
.withBaseFilesInPartitions(p0).get(p0);
commitMetadata = generateCommitMetadata(CollectionUtils
.createImmutableMap(p0,
CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2)));
@@ -888,13 +888,13 @@ public class TestCleaner extends HoodieClientTestBase {
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry);
assertEquals(0, hoodieCleanStatsThree.size(),
"Must not clean any file. We have to keep 1 version before the latest commit time to keep");
assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
// make next commit, with 2 updates to existing files, and 1 insert
String file4P0C3 = testTable.addInflightCommit("00000000000004")
.withUpdates(p0, file1P0C0)
.withUpdates(p0, file2P0C1)
.withInserts(p0).get(p0);
.withBaseFilesInPartition(p0, file1P0C0)
.withBaseFilesInPartition(p0, file2P0C1)
.withBaseFilesInPartitions(p0).get(p0);
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
metaClient.getActiveTimeline().saveAsComplete(
@@ -908,20 +908,20 @@ public class TestCleaner extends HoodieClientTestBase {
assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size()
+ (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file");
assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0));
assertTrue(testTable.fileExists(p0, "00000000000003", file1P0C0));
assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.fileExists(p0, "00000000000003", file2P0C1));
assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
assertTrue(testTable.fileExists(p0, "00000000000004", file4P0C3));
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file1P0C0));
assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2));
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
if (enableBootstrapSourceClean) {
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p0).get(0).getBoostrapFileStatus().getPath().getUri())));
}
// No cleaning on partially written file, with no commit.
testTable.forCommit("00000000000005").withUpdates(p0, file3P0C2);
testTable.forCommit("00000000000005").withBaseFilesInPartition(p0, file3P0C2);
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(p0,
CollectionUtils.createImmutableList(file3P0C2)));
metaClient.getActiveTimeline().createNewInstant(
@@ -932,9 +932,9 @@ public class TestCleaner extends HoodieClientTestBase {
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry);
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0);
assertNull(cleanStat, "Must not clean any files");
assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0));
assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.fileExists(p0, "00000000000005", file3P0C2));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file1P0C0));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
}
/**

View File

@@ -66,9 +66,9 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@ParameterizedTest
@MethodSource("consistencyGuardType")
public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f2");
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f3");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f2");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f3");
ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000);
ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName())
@@ -88,7 +88,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test
public void testCheckFailingAppearFailSafe() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> {
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
@@ -98,7 +98,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test
public void testCheckFailingAppearTimedWait() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
.asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
@@ -106,7 +106,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test
public void testCheckFailingAppearsFailSafe() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> {
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
@@ -115,14 +115,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test
public void testCheckFailingAppearsTimedWait() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
}
@Test
public void testCheckFailingDisappearFailSafe() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> {
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
@@ -132,7 +132,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test
public void testCheckFailingDisappearTimedWait() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
.asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
@@ -140,8 +140,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test
public void testCheckFailingDisappearsFailSafe() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> {
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
@@ -150,8 +150,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test
public void testCheckFailingDisappearsTimedWait() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
}

View File

@@ -75,7 +75,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
.build();
FileCreateUtils.createCommit(basePath, "001");
FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
FileCreateUtils.createBaseFile(basePath, testPartitionPath, "001", "file1", fileSize);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);

View File

@@ -58,10 +58,10 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
// given: wrote some base files and corresponding markers
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
.withInserts("partA").get("partA");
.withBaseFilesInPartitions("partA").get("partA");
String f1 = testTable.addCommit("001")
.withUpdates("partA", f0)
.withInserts("partB").get("partB");
.withBaseFilesInPartition("partA", f0)
.withBaseFilesInPartitions("partB").get("partB");
String f2 = "f2";
testTable.forCommit("001")
.withMarkerFile("partA", f0, IOType.MERGE)
@@ -89,10 +89,10 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
// given: wrote some base + log files and corresponding markers
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f2 = testTable.addRequestedDeltaCommit("000")
.withInserts("partA").get("partA");
.withBaseFilesInPartitions("partA").get("partA");
String f1 = testTable.addDeltaCommit("001")
.withLogFile("partA", f2)
.withInserts("partB").get("partB");
.withBaseFilesInPartitions("partB").get("partB");
String f3 = "f3";
String f4 = "f4";
testTable.forDeltaCommit("001")

View File

@@ -19,12 +19,7 @@
package org.apache.hudi.testutils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -36,11 +31,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetWriter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -52,9 +43,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -62,13 +50,11 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -214,54 +200,4 @@ public class HoodieClientTestUtils {
return valuesAsList.stream();
}
/**
* TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}.
*/
public static String writeParquetFile(String basePath, String partitionPath, String filename,
List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException {
if (filter == null) {
filter = BloomFilterFactory
.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
}
HoodieAvroWriteSupport writeSupport =
new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter);
String instantTime = FSUtils.getCommitTime(filename);
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
HoodieTestUtils.getDefaultHadoopConf(), Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
HoodieParquetWriter writer =
new HoodieParquetWriter(instantTime, new Path(basePath + "/" + partitionPath + "/" + filename), config,
schema, new SparkTaskContextSupplier());
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, instantTime, "" + seqId++);
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename);
writer.writeAvro(record.getRecordKey(), avroRecord);
filter.add(record.getRecordKey());
}
writer.close();
if (createCommitTime) {
HoodieTestUtils.createMetadataFolder(basePath);
HoodieTestUtils.createCommitFiles(basePath, instantTime);
}
return filename;
}
/**
* TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}.
*/
public static String writeParquetFile(String basePath, String partitionPath, List<HoodieRecord> records,
Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException {
Thread.sleep(1000);
String instantTime = HoodieTestUtils.makeNewCommitTime();
String fileId = UUID.randomUUID().toString();
String filename = FSUtils.makeDataFileName(instantTime, "1-0-1", fileId);
HoodieTestUtils.createCommitFiles(basePath, instantTime);
return HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, filename, records, schema, filter,
createCommitTime);
}
}

View File

@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.testutils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.nio.file.Paths;
import java.util.UUID;
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
public class HoodieWriteableTestTable extends HoodieTestTable {
private final Schema schema;
private final BloomFilter filter;
private HoodieWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
super(basePath, fs, metaClient);
this.schema = schema;
this.filter = filter;
}
public static HoodieWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
return new HoodieWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, schema, filter);
}
public static HoodieWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema) {
BloomFilter filter = BloomFilterFactory
.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
return of(metaClient, schema, filter);
}
public static HoodieWriteableTestTable of(HoodieTable hoodieTable, Schema schema) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
return of(metaClient, schema);
}
public static HoodieWriteableTestTable of(HoodieTable hoodieTable, Schema schema, BloomFilter filter) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
return of(metaClient, schema, filter);
}
@Override
public HoodieWriteableTestTable addCommit(String instantTime) throws Exception {
return (HoodieWriteableTestTable) super.addCommit(instantTime);
}
@Override
public HoodieWriteableTestTable forCommit(String instantTime) {
return (HoodieWriteableTestTable) super.forCommit(instantTime);
}
public String withInserts(String partition) throws Exception {
return withInserts(partition, new HoodieRecord[0]);
}
public String withInserts(String partition, HoodieRecord... records) throws Exception {
String fileId = UUID.randomUUID().toString();
withInserts(partition, fileId, records);
return fileId;
}
public HoodieWriteableTestTable withInserts(String partition, String fileId) throws Exception {
return withInserts(partition, fileId, new HoodieRecord[0]);
}
public HoodieWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception {
FileCreateUtils.createPartitionMetaFile(basePath, partition);
String fileName = baseFileName(currentInstantTime, fileId);
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(schema), schema, filter);
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
try (HoodieParquetWriter writer = new HoodieParquetWriter(
currentInstantTime,
new Path(Paths.get(basePath, partition, fileName).toString()),
config, schema, new SparkTaskContextSupplier())) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
writer.writeAvro(record.getRecordKey(), avroRecord);
filter.add(record.getRecordKey());
}
}
return this;
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.common.testutils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -40,6 +41,32 @@ import java.util.Map;
public class FileCreateUtils {
private static final String WRITE_TOKEN = "1-0-1";
public static String baseFileName(String instantTime, String fileId) {
return baseFileName(instantTime, fileId, HoodieFileFormat.PARQUET.getFileExtension());
}
public static String baseFileName(String instantTime, String fileId, String fileExtension) {
return FSUtils.makeDataFileName(instantTime, WRITE_TOKEN, fileId, fileExtension);
}
public static String logFileName(String instantTime, String fileId, int version) {
return logFileName(instantTime, fileId, version, HoodieFileFormat.HOODIE_LOG.getFileExtension());
}
public static String logFileName(String instantTime, String fileId, int version, String fileExtension) {
return FSUtils.makeLogFileName(fileId, fileExtension, instantTime, version, WRITE_TOKEN);
}
public static String markerFileName(String instantTime, String fileId, IOType ioType) {
return markerFileName(instantTime, fileId, ioType, HoodieFileFormat.PARQUET.getFileExtension());
}
public static String markerFileName(String instantTime, String fileId, IOType ioType, String fileExtension) {
return String.format("%s_%s_%s%s%s.%s", fileId, WRITE_TOKEN, instantTime, fileExtension, HoodieTableMetaClient.MARKER_EXTN, ioType);
}
private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException {
Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
Files.createDirectories(parentPath);
@@ -73,45 +100,52 @@ public class FileCreateUtils {
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
}
public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId)
throws Exception {
createDataFile(basePath, partitionPath, instantTime, fileId, 0);
public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException {
Path parentPath = Paths.get(basePath, partitionPath);
Files.createDirectories(parentPath);
Path metaFilePath = parentPath.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
if (Files.notExists(metaFilePath)) {
Files.createFile(metaFilePath);
}
}
public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length)
public static void createBaseFile(String basePath, String partitionPath, String instantTime, String fileId)
throws Exception {
createBaseFile(basePath, partitionPath, instantTime, fileId, 0);
}
public static void createBaseFile(String basePath, String partitionPath, String instantTime, String fileId, long length)
throws Exception {
Path parentPath = Paths.get(basePath, partitionPath);
Files.createDirectories(parentPath);
Path dataFilePath = parentPath.resolve(FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
if (Files.notExists(dataFilePath)) {
Files.createFile(dataFilePath);
Path baseFilePath = parentPath.resolve(baseFileName(instantTime, fileId));
if (Files.notExists(baseFilePath)) {
Files.createFile(baseFilePath);
}
new RandomAccessFile(dataFilePath.toFile(), "rw").setLength(length);
new RandomAccessFile(baseFilePath.toFile(), "rw").setLength(length);
}
public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version)
public static void createLogFile(String basePath, String partitionPath, String instantTime, String fileId, int version)
throws Exception {
createLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0);
createLogFile(basePath, partitionPath, instantTime, fileId, version, 0);
}
public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length)
public static void createLogFile(String basePath, String partitionPath, String instantTime, String fileId, int version, int length)
throws Exception {
Path parentPath = Paths.get(basePath, partitionPath);
Files.createDirectories(parentPath);
Path logFilePath = parentPath.resolve(FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1"));
Path logFilePath = parentPath.resolve(logFileName(instantTime, fileId, version));
if (Files.notExists(logFilePath)) {
Files.createFile(logFilePath);
}
new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length);
}
public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType)
public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileId, IOType ioType)
throws IOException {
Path folderPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
Files.createDirectories(folderPath);
String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, "1-0-1", instantTime,
HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType);
Path markerFilePath = folderPath.resolve(markerFileName);
Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
Files.createDirectories(parentPath);
Path markerFilePath = parentPath.resolve(markerFileName(instantTime, fileId, ioType));
if (Files.notExists(markerFilePath)) {
Files.createFile(markerFilePath);
}
@@ -119,11 +153,11 @@ public class FileCreateUtils {
}
public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException {
Path markerDir = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
if (Files.notExists(markerDir)) {
Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
if (Files.notExists(parentPath)) {
return 0;
}
return Files.list(markerDir).filter(p -> p.getFileName().toString()
return Files.list(parentPath).filter(p -> p.getFileName().toString()
.endsWith(String.format("%s.%s", HoodieTableMetaClient.MARKER_EXTN, ioType))).count();
}

View File

@@ -19,8 +19,6 @@
package org.apache.hudi.common.testutils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
@@ -39,6 +37,7 @@ import java.util.Objects;
import java.util.UUID;
import java.util.stream.IntStream;
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit;
@@ -46,15 +45,16 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDel
import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
public class HoodieTestTable {
private final String basePath;
private final FileSystem fs;
private HoodieTableMetaClient metaClient;
private String currentInstantTime;
protected final String basePath;
protected final FileSystem fs;
protected HoodieTableMetaClient metaClient;
protected String currentInstantTime;
private HoodieTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient) {
protected HoodieTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient) {
ValidationUtils.checkArgument(Objects.equals(basePath, metaClient.getBasePath()));
ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs()));
this.basePath = basePath;
@@ -124,6 +124,13 @@ public class HoodieTestTable {
return this;
}
public HoodieTestTable withPartitionMetaFiles(String... partitionPaths) throws IOException {
for (String partitionPath : partitionPaths) {
FileCreateUtils.createPartitionMetaFile(basePath, partitionPath);
}
return this;
}
public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) throws IOException {
return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType);
}
@@ -150,19 +157,19 @@ public class HoodieTestTable {
*
* @return A {@link Map} of partition and its newly inserted file's id.
*/
public Map<String, String> withInserts(String... partitions) throws Exception {
public Map<String, String> withBaseFilesInPartitions(String... partitions) throws Exception {
Map<String, String> partitionFileIdMap = new HashMap<>();
for (String p : partitions) {
String fileId = UUID.randomUUID().toString();
FileCreateUtils.createDataFile(basePath, p, currentInstantTime, fileId);
FileCreateUtils.createBaseFile(basePath, p, currentInstantTime, fileId);
partitionFileIdMap.put(p, fileId);
}
return partitionFileIdMap;
}
public HoodieTestTable withUpdates(String partition, String... fileIds) throws Exception {
public HoodieTestTable withBaseFilesInPartition(String partition, String... fileIds) throws Exception {
for (String f : fileIds) {
FileCreateUtils.createDataFile(basePath, partition, currentInstantTime, f);
FileCreateUtils.createBaseFile(basePath, partition, currentInstantTime, f);
}
return this;
}
@@ -182,35 +189,37 @@ public class HoodieTestTable {
return this;
}
public boolean filesExist(Map<String, String> partitionAndFileId, String instantTime) {
public boolean baseFilesExist(Map<String, String> partitionAndFileId, String instantTime) {
return partitionAndFileId.entrySet().stream().allMatch(entry -> {
String partition = entry.getKey();
String fileId = entry.getValue();
return fileExists(partition, instantTime, fileId);
return baseFileExists(partition, instantTime, fileId);
});
}
public boolean filesExist(String partition, String instantTime, String... fileIds) {
return Arrays.stream(fileIds).allMatch(f -> fileExists(partition, instantTime, f));
public boolean baseFilesExist(String partition, String instantTime, String... fileIds) {
return Arrays.stream(fileIds).allMatch(f -> baseFileExists(partition, instantTime, f));
}
public boolean fileExists(String partition, String instantTime, String fileId) {
public boolean baseFileExists(String partition, String instantTime, String fileId) {
try {
return fs.exists(new Path(Paths.get(basePath, partition,
FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)).toString()));
return fs.exists(new Path(Paths.get(basePath, partition, baseFileName(instantTime, fileId)).toString()));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
}
public String getBaseFileNameById(String fileId) {
return baseFileName(currentInstantTime, fileId);
}
public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) {
return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v));
}
public boolean logFileExists(String partition, String instantTime, String fileId, int version) {
try {
return fs.exists(new Path(Paths.get(basePath, partition,
FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, version, "1-0-1")).toString()));
return fs.exists(new Path(Paths.get(basePath, partition, logFileName(instantTime, fileId, version)).toString()));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}