[HUDI-2903] get table schema from the last commit with data written (#4180)
This commit is contained in:
@@ -21,8 +21,10 @@ package org.apache.hudi.common.table;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.SchemaCompatibility;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
@@ -40,8 +42,10 @@ import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.InvalidTableException;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.parquet.format.converter.ParquetMetadataConverter;
|
||||
import org.apache.parquet.hadoop.ParquetFileReader;
|
||||
@@ -73,72 +77,41 @@ public class TableSchemaResolver {
|
||||
* commit. We will assume that the schema has not changed within a single atomic write.
|
||||
*
|
||||
* @return Parquet schema for this table
|
||||
* @throws Exception
|
||||
*/
|
||||
private MessageType getTableParquetSchemaFromDataFile() throws Exception {
|
||||
private MessageType getTableParquetSchemaFromDataFile() {
|
||||
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
||||
|
||||
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata =
|
||||
activeTimeline.getLastCommitMetadataWithValidData();
|
||||
try {
|
||||
switch (metaClient.getTableType()) {
|
||||
case COPY_ON_WRITE:
|
||||
// If this is COW, get the last commit and read the schema from a file written in the
|
||||
// last commit
|
||||
HoodieInstant lastCommit =
|
||||
activeTimeline.getCommitsTimeline().filterCompletedInstantsWithCommitMetadata()
|
||||
.lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath()));
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);
|
||||
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
|
||||
.orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit "
|
||||
+ lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :"
|
||||
+ commitMetadata));
|
||||
return readSchemaFromBaseFile(new Path(filePath));
|
||||
case MERGE_ON_READ:
|
||||
// If this is MOR, depending on whether the latest commit is a delta commit or
|
||||
// compaction commit
|
||||
// Get a datafile written and get the schema from that file
|
||||
Option<HoodieInstant> lastCompactionCommit = metaClient.getActiveTimeline().getCommitTimeline()
|
||||
.filterCompletedInstantsWithCommitMetadata().lastInstant();
|
||||
LOG.info("Found the last compaction commit as " + lastCompactionCommit);
|
||||
|
||||
Option<HoodieInstant> lastDeltaCommit;
|
||||
if (lastCompactionCommit.isPresent()) {
|
||||
lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
|
||||
.findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant();
|
||||
// For COW table, the file has data written must be in parquet format currently.
|
||||
if (instantAndCommitMetadata.isPresent()) {
|
||||
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
|
||||
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get();
|
||||
return readSchemaFromBaseFile(new Path(filePath));
|
||||
} else {
|
||||
lastDeltaCommit =
|
||||
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
|
||||
throw new IllegalArgumentException("Could not find any data file written for commit, "
|
||||
+ "so could not get schema for table " + metaClient.getBasePath());
|
||||
}
|
||||
LOG.info("Found the last delta commit " + lastDeltaCommit);
|
||||
|
||||
if (lastDeltaCommit.isPresent()) {
|
||||
HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
|
||||
// read from the log file wrote
|
||||
commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
|
||||
HoodieCommitMetadata.class);
|
||||
Pair<String, HoodieFileFormat> filePathWithFormat =
|
||||
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
|
||||
.filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
|
||||
.map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
|
||||
// No Log files in Delta-Commit. Check if there are any parquet files
|
||||
return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
|
||||
.filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
|
||||
.findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() ->
|
||||
new IllegalArgumentException("Could not find any data file written for commit "
|
||||
+ lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath()
|
||||
+ ", CommitMetadata :" + commitMetadata));
|
||||
});
|
||||
switch (filePathWithFormat.getRight()) {
|
||||
case HOODIE_LOG:
|
||||
return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft()));
|
||||
case PARQUET:
|
||||
return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft()));
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight()
|
||||
+ " for file " + filePathWithFormat.getLeft());
|
||||
case MERGE_ON_READ:
|
||||
// For MOR table, the file has data written may be a parquet file or .log file.
|
||||
// Determine the file format based on the file name, and then extract schema from it.
|
||||
if (instantAndCommitMetadata.isPresent()) {
|
||||
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
|
||||
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get();
|
||||
if (filePath.contains(HoodieLogFile.DELTA_EXTENSION)) {
|
||||
// this is a log file
|
||||
return readSchemaFromLogFile(new Path(filePath));
|
||||
} else if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) {
|
||||
// this is a parquet file
|
||||
return readSchemaFromBaseFile(new Path(filePath));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown file format :" + filePath);
|
||||
}
|
||||
} else {
|
||||
return readSchemaFromLastCompaction(lastCompactionCommit);
|
||||
throw new IllegalArgumentException("Could not find any data file written for commit, "
|
||||
+ "so could not get schema for table " + metaClient.getBasePath());
|
||||
}
|
||||
default:
|
||||
LOG.error("Unknown table type " + metaClient.getTableType());
|
||||
@@ -484,21 +457,6 @@ public class TableSchemaResolver {
|
||||
return readSchemaFromLogFile(metaClient.getRawFs(), path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the schema from the log file on path.
|
||||
* @throws Exception
|
||||
*/
|
||||
public MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path)
|
||||
throws Exception {
|
||||
MessageType messageType = readSchemaFromLogFile(path);
|
||||
// Fall back to read the schema from last compaction
|
||||
if (messageType == null) {
|
||||
LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
|
||||
return readSchemaFromLastCompaction(lastCompactionCommitOpt);
|
||||
}
|
||||
return messageType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the schema from the log file on path.
|
||||
*
|
||||
|
||||
@@ -19,11 +19,13 @@
|
||||
package org.apache.hudi.common.table.timeline;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
@@ -39,11 +41,14 @@ import java.io.Serializable;
|
||||
import java.text.ParseException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
|
||||
@@ -254,6 +259,26 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
return readDataFromPath(detailPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the last instant with valid data, and convert this to HoodieCommitMetadata
|
||||
*/
|
||||
public Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLastCommitMetadataWithValidData() {
|
||||
List<HoodieInstant> completed = getCommitsTimeline().filterCompletedInstants().getInstants()
|
||||
.sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).collect(Collectors.toList());
|
||||
for (HoodieInstant instant : completed) {
|
||||
try {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
|
||||
getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
||||
if (!commitMetadata.getFileIdAndRelativePaths().isEmpty()) {
|
||||
return Option.of(Pair.of(instant, commitMetadata));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to convert instant to HoodieCommitMetadata: " + instant.toString());
|
||||
}
|
||||
}
|
||||
return Option.empty();
|
||||
}
|
||||
|
||||
public Option<byte[]> readCleanerInfoAsBytes(HoodieInstant instant) {
|
||||
// Cleaner metadata are always stored only in timeline .hoodie
|
||||
return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
|
||||
|
||||
@@ -18,8 +18,6 @@
|
||||
|
||||
package org.apache.hudi.common.table.timeline;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
@@ -102,12 +100,6 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted), details);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieTimeline filterCompletedInstantsWithCommitMetadata() {
|
||||
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted)
|
||||
.filter(i -> !isDeletePartitionType(i)), details);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieTimeline filterCompletedAndCompactionInstants() {
|
||||
return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted()
|
||||
@@ -359,21 +351,6 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
return details.apply(instant);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDeletePartitionType(HoodieInstant instant) {
|
||||
Option<WriteOperationType> operationType;
|
||||
|
||||
try {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
||||
operationType = Option.of(commitMetadata.getOperationType());
|
||||
} catch (Exception e) {
|
||||
operationType = Option.empty();
|
||||
}
|
||||
|
||||
return operationType.isPresent() && WriteOperationType.DELETE_PARTITION.equals(operationType.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty(HoodieInstant instant) {
|
||||
return getInstantDetails(instant).get().length == 0;
|
||||
|
||||
@@ -131,14 +131,6 @@ public interface HoodieTimeline extends Serializable {
|
||||
*/
|
||||
HoodieTimeline filterCompletedAndCompactionInstants();
|
||||
|
||||
/**
|
||||
* Filter this timeline to include the completed and exclude operation type is delete partition instants.
|
||||
*
|
||||
* @return New instance of HoodieTimeline with include the completed and
|
||||
* exclude operation type is delete partition instants
|
||||
*/
|
||||
HoodieTimeline filterCompletedInstantsWithCommitMetadata();
|
||||
|
||||
/**
|
||||
* Timeline to just include commits (commit/deltacommit), compaction and replace actions.
|
||||
*
|
||||
@@ -291,11 +283,6 @@ public interface HoodieTimeline extends Serializable {
|
||||
|
||||
boolean isEmpty(HoodieInstant instant);
|
||||
|
||||
/**
|
||||
* Check WriteOperationType is DeletePartition.
|
||||
*/
|
||||
boolean isDeletePartitionType(HoodieInstant instant);
|
||||
|
||||
/**
|
||||
* Helper methods to compare instants.
|
||||
**/
|
||||
|
||||
@@ -0,0 +1,233 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.functional
|
||||
|
||||
import org.apache.hudi.common.model.HoodieFileFormat
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase
|
||||
import org.apache.hudi.{DataSourceWriteOptions, HoodieDataSourceHelpers}
|
||||
|
||||
import org.apache.log4j.LogManager
|
||||
|
||||
import org.apache.spark.sql._
|
||||
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
/**
|
||||
* Tests on HoodieActionTimeLine using the real hudi table.
|
||||
*/
|
||||
class TestHoodieActiveTimeline extends HoodieClientTestBase {
|
||||
|
||||
var spark: SparkSession = null
|
||||
|
||||
private val log = LogManager.getLogger(classOf[TestHoodieActiveTimeline])
|
||||
|
||||
val commonOpts = Map(
|
||||
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||
"hoodie.upsert.shuffle.parallelism" -> "4",
|
||||
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
|
||||
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
|
||||
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
|
||||
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
|
||||
)
|
||||
|
||||
@BeforeEach
|
||||
override def setUp() {
|
||||
setTableName("hoodie_test")
|
||||
initPath()
|
||||
initSparkContexts()
|
||||
spark = sqlContext.sparkSession
|
||||
initTestDataGenerator()
|
||||
initFileSystem()
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
override def tearDown() = {
|
||||
cleanupSparkContexts()
|
||||
cleanupTestDataGenerator()
|
||||
cleanupFileSystem()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testGetLastCommitMetadataWithValidDataForCOW(): Unit = {
|
||||
// First Operation:
|
||||
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
|
||||
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
val commit1Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
val partitionsForCommit1 = spark.read.format("org.apache.hudi").load(basePath)
|
||||
.select("_hoodie_partition_path")
|
||||
.distinct().collect()
|
||||
.map(_.get(0).toString).sorted
|
||||
assert(Array("2015/03/16", "2015/03/17", "2016/03/15").sameElements(partitionsForCommit1))
|
||||
|
||||
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build()
|
||||
var activeTimeline = metaClient.getActiveTimeline
|
||||
|
||||
// check that get the latest parquet file
|
||||
val ret1 = activeTimeline.getLastCommitMetadataWithValidData()
|
||||
assert(ret1.isPresent)
|
||||
val (instant1, commitMetadata1) = (ret1.get().getLeft, ret1.get().getRight)
|
||||
assertEquals(instant1.getTimestamp, commit1Time)
|
||||
val relativePath1 = commitMetadata1.getFileIdAndRelativePaths.values().stream().findAny().get()
|
||||
assert(relativePath1.contains(commit1Time))
|
||||
assert(relativePath1.contains(HoodieFileFormat.PARQUET.getFileExtension))
|
||||
|
||||
// Second Operation:
|
||||
// Drop Partition on 2015/03/16
|
||||
spark.emptyDataFrame.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key, "2015/03/16")
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
val commit2Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
val countPartitionDropped = spark.read.format("org.apache.hudi").load(basePath)
|
||||
.where("_hoodie_partition_path = '2015/03/16'").count()
|
||||
assertEquals(countPartitionDropped, 0)
|
||||
|
||||
// DropPartition will not generate a file with valid data. Get the prev instant and metadata.
|
||||
activeTimeline = activeTimeline.reload()
|
||||
val ret2 = activeTimeline.getLastCommitMetadataWithValidData()
|
||||
assert(ret2.isPresent)
|
||||
val (instant2, commitMetadata2) = (ret2.get().getLeft, ret2.get().getRight)
|
||||
assertEquals(instant2.getTimestamp, commit1Time)
|
||||
val relativePath2 = commitMetadata2.getFileIdAndRelativePaths.values().stream().findAny().get()
|
||||
assert(relativePath2.contains(commit1Time))
|
||||
assert(relativePath2.contains(HoodieFileFormat.PARQUET.getFileExtension))
|
||||
|
||||
// Third Operation:
|
||||
// Upsert with 50 duplicate records. Produced the second log file for each parquet.
|
||||
val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).toList
|
||||
val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2))
|
||||
inputDF3.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
val commit3Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
|
||||
// check that get the latest parquet file generated by compaction
|
||||
activeTimeline = activeTimeline.reload()
|
||||
val ret3 = activeTimeline.getLastCommitMetadataWithValidData()
|
||||
assert(ret3.isPresent)
|
||||
val (instant3, commitMetadata3) = (ret3.get().getLeft, ret3.get().getRight)
|
||||
assertEquals(instant3.getTimestamp, commit3Time)
|
||||
val relativePath3 = commitMetadata3.getFileIdAndRelativePaths.values().stream().findAny().get()
|
||||
assert(relativePath3.contains(commit3Time))
|
||||
assert(relativePath3.contains(HoodieFileFormat.PARQUET.getFileExtension))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testGetLastCommitMetadataWithValidDataForMOR(): Unit = {
|
||||
// First Operation:
|
||||
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
|
||||
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option("hoodie.compact.inline", "false")
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
val commit1Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
|
||||
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build()
|
||||
var activeTimeline = metaClient.getActiveTimeline
|
||||
|
||||
// check that get the latest parquet file
|
||||
val ret1 = activeTimeline.getLastCommitMetadataWithValidData()
|
||||
assert(ret1.isPresent)
|
||||
val (instant1, commitMetadata1) = (ret1.get().getLeft, ret1.get().getRight)
|
||||
assertEquals(instant1.getTimestamp, commit1Time)
|
||||
val relativePath1 = commitMetadata1.getFileIdAndRelativePaths.values().stream().findAny().get()
|
||||
assert(relativePath1.contains(commit1Time))
|
||||
assert(relativePath1.contains(HoodieFileFormat.PARQUET.getFileExtension))
|
||||
|
||||
// Second Operation:
|
||||
// Upsert with duplicate records. Produced a log file for each parquet.
|
||||
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
|
||||
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||
inputDF2.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
val commit2Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
|
||||
// check that get the latest .log file
|
||||
activeTimeline = activeTimeline.reload()
|
||||
val ret2 = activeTimeline.getLastCommitMetadataWithValidData()
|
||||
assert(ret2.isPresent)
|
||||
val (instant2, commitMetadata2) = (ret2.get().getLeft, ret2.get().getRight)
|
||||
assertEquals(instant2.getTimestamp, commit2Time)
|
||||
val relativePath2 = commitMetadata2.getFileIdAndRelativePaths.values().stream().findAny().get()
|
||||
// deltacommit: .log file should contain the timestamp from base parquet file.
|
||||
assert(relativePath2.contains(commit1Time))
|
||||
assert(relativePath2.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension))
|
||||
|
||||
// Third Operation:
|
||||
// Upsert with 50 duplicate records. Produced the second log file for each parquet.
|
||||
// And trigger compaction.
|
||||
val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).toList
|
||||
val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2))
|
||||
inputDF3.write.format("org.apache.hudi")
|
||||
.options(commonOpts).option("hoodie.compact.inline", "true")
|
||||
.option("hoodie.compact.inline.max.delta.commits", "1")
|
||||
.mode(SaveMode.Append).save(basePath)
|
||||
val commit3Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
|
||||
// check that get the latest parquet file generated by compaction
|
||||
activeTimeline = activeTimeline.reload()
|
||||
val ret3 = activeTimeline.getLastCommitMetadataWithValidData()
|
||||
assert(ret3.isPresent)
|
||||
val (instant3, commitMetadata3) = (ret3.get().getLeft, ret3.get().getRight)
|
||||
assertEquals(instant3.getTimestamp, commit3Time)
|
||||
val relativePath3 = commitMetadata3.getFileIdAndRelativePaths.values().stream().findAny().get()
|
||||
assert(relativePath3.contains(commit3Time))
|
||||
assert(relativePath3.contains(HoodieFileFormat.PARQUET.getFileExtension))
|
||||
|
||||
// Fourth Operation:
|
||||
// Upsert with 50 duplicate records.
|
||||
val records4 = recordsToStrings(dataGen.generateUniqueUpdates("004", 50)).toList
|
||||
val inputDF4: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records4, 2))
|
||||
inputDF4.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
val commit4Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
|
||||
activeTimeline = activeTimeline.reload()
|
||||
val ret4 = activeTimeline.getLastCommitMetadataWithValidData()
|
||||
assert(ret4.isPresent)
|
||||
val (instant4, commitMetadata4) = (ret4.get().getLeft, ret4.get().getRight)
|
||||
assertEquals(instant4.getTimestamp, commit4Time)
|
||||
val relativePath4 = commitMetadata4.getFileIdAndRelativePaths.values().stream().findAny().get()
|
||||
// deltacommit: .log file should contain the timestamp from base parquet file.
|
||||
assert(relativePath4.contains(commit3Time))
|
||||
assert(relativePath4.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user