1
0

[HUDI-1450] Use metadata table for listing in HoodieROTablePathFilter (apache#2326)

[HUDI-1394] [RFC-15] Use metadata table (if present) to get all partition paths (apache#2351)
This commit is contained in:
Udit Mehrotra
2020-12-31 01:20:02 -08:00
committed by vinoth chandar
parent 298808baaf
commit 4e64226844
38 changed files with 308 additions and 102 deletions

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelect
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
@@ -372,7 +373,8 @@ public class TestBootstrap extends HoodieClientTestBase {
reloadInputFormats();
List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
assertEquals(totalRecords, records.size());
@@ -390,7 +392,8 @@ public class TestBootstrap extends HoodieClientTestBase {
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
assertEquals(totalRecords, records.size());
@@ -406,7 +409,8 @@ public class TestBootstrap extends HoodieClientTestBase {
reloadInputFormats();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES,
true, HoodieRecord.HOODIE_META_COLUMNS);
@@ -423,7 +427,8 @@ public class TestBootstrap extends HoodieClientTestBase {
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true,
HoodieRecord.HOODIE_META_COLUMNS);
@@ -438,7 +443,8 @@ public class TestBootstrap extends HoodieClientTestBase {
reloadInputFormats();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true,
Arrays.asList("_row_key"));
@@ -455,7 +461,8 @@ public class TestBootstrap extends HoodieClientTestBase {
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
jsc.hadoopConfiguration(),
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true,
Arrays.asList("_row_key"));

View File

@@ -22,6 +22,7 @@ import java.util.function.Supplier
import java.util.stream.Stream
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -34,6 +35,8 @@ import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.{DataTypes, DateType, IntegerType, StringType, StructField, StructType, TimestampType}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.collection.JavaConversions._
@@ -82,13 +85,16 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}
@Test def testCopyOnWriteStorage() {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testCopyOnWriteStorage(isMetadataEnabled: Boolean) {
// Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -96,7 +102,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
// Snapshot query
val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
val snapshotDF1 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
.load(basePath + "/*/*/*/*")
assertEquals(100, snapshotDF1.count())
// Upsert based on the written table with Hudi metadata columns
@@ -120,6 +128,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)
@@ -128,6 +137,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
// Snapshot Query
val snapshotDF3 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
.load(basePath + "/*/*/*/*")
assertEquals(100, snapshotDF3.count()) // still 100, since we only updated
@@ -149,6 +159,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
emptyDF.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)