[HUDI-3001] Clean up the marker directory when finish bootstrap operation. (#4298)
This commit is contained in:
@@ -77,6 +77,7 @@ import org.apache.avro.generic.GenericRecord;
|
|||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.parquet.avro.AvroParquetReader;
|
import org.apache.parquet.avro.AvroParquetReader;
|
||||||
@@ -144,6 +145,9 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
|
|||||||
Option<HoodieWriteMetadata> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
|
Option<HoodieWriteMetadata> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
|
||||||
// if there are full bootstrap to be performed, perform that too
|
// if there are full bootstrap to be performed, perform that too
|
||||||
Option<HoodieWriteMetadata> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
|
Option<HoodieWriteMetadata> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
|
||||||
|
// Delete the marker directory for the instant
|
||||||
|
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
|
||||||
|
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||||
return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
|
return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||||
|
|||||||
@@ -17,7 +17,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.functional
|
package org.apache.hudi.functional
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileSystem
|
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||||
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
|
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
|
||||||
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
|
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
|
||||||
import org.apache.hudi.common.fs.FSUtils
|
import org.apache.hudi.common.fs.FSUtils
|
||||||
@@ -31,9 +31,9 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
|
|||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
import org.junit.jupiter.api.io.TempDir
|
import org.junit.jupiter.api.io.TempDir
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
||||||
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.Collections
|
import java.util.Collections
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
class TestDataSourceForBootstrap {
|
class TestDataSourceForBootstrap {
|
||||||
@@ -106,6 +106,8 @@ class TestDataSourceForBootstrap {
|
|||||||
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
|
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
|
||||||
extraOpts = Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
|
extraOpts = Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
|
||||||
)
|
)
|
||||||
|
// check marked directory clean up
|
||||||
|
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
|
||||||
|
|
||||||
// Read bootstrapped table and verify count
|
// Read bootstrapped table and verify count
|
||||||
var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
||||||
@@ -161,6 +163,9 @@ class TestDataSourceForBootstrap {
|
|||||||
Some("datestr"),
|
Some("datestr"),
|
||||||
Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"))
|
Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"))
|
||||||
|
|
||||||
|
// check marked directory clean up
|
||||||
|
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
|
||||||
|
|
||||||
// Read bootstrapped table and verify count
|
// Read bootstrapped table and verify count
|
||||||
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
||||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||||
|
|||||||
Reference in New Issue
Block a user