[HUDI-2151] Part3 Enabling marker based rollback as default rollback strategy (#3950)
* Enabling timeline server based markers * Enabling timeline server based markers and marker based rollback * Removing constraint that timeline server can be enabled only for hdfs * Fixing tests
This commit is contained in:
committed by
GitHub
parent
04eb5fdc65
commit
ce7d233307
@@ -90,6 +90,7 @@ public class TestRollbacksCommand extends CLIFunctionalTestHarness {
|
||||
.withBaseFilesInPartitions(partitionAndFileId);
|
||||
// generate two rollback
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
|
||||
.withRollbackUsingMarkers(false)
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
|
||||
|
||||
try (AbstractHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) {
|
||||
|
||||
@@ -120,9 +120,9 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
|
||||
public static final ConfigProperty<String> ROLLBACK_USING_MARKERS_ENABLE = ConfigProperty
|
||||
.key("hoodie.rollback.using.markers")
|
||||
.defaultValue("false")
|
||||
.defaultValue("true")
|
||||
.withDocumentation("Enables a more efficient mechanism for rollbacks based on the marker files generated "
|
||||
+ "during the writes. Turned off by default.");
|
||||
+ "during the writes. Turned on by default.");
|
||||
|
||||
public static final ConfigProperty<String> TIMELINE_LAYOUT_VERSION_NUM = ConfigProperty
|
||||
.key("hoodie.timeline.layout.version")
|
||||
|
||||
@@ -45,6 +45,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
|
||||
import org.apache.hudi.common.table.marker.MarkerType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
@@ -180,6 +181,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
.withAutoCommit(true)
|
||||
.withAvroSchemaValidate(true)
|
||||
.withEmbeddedTimelineServerEnabled(false)
|
||||
.withMarkersType(MarkerType.DIRECT.name())
|
||||
.withRollbackUsingMarkers(false)
|
||||
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
|
||||
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
|
||||
.forTable(tableName)
|
||||
|
||||
@@ -18,8 +18,6 @@
|
||||
|
||||
package org.apache.hudi.table.marker;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.fs.StorageSchemes;
|
||||
import org.apache.hudi.common.table.marker.MarkerType;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
@@ -45,12 +43,6 @@ public class WriteMarkersFactory {
|
||||
case DIRECT:
|
||||
return new DirectWriteMarkers(table, instantTime);
|
||||
case TIMELINE_SERVER_BASED:
|
||||
String basePath = table.getMetaClient().getBasePath();
|
||||
if (StorageSchemes.HDFS.getScheme().equals(
|
||||
FSUtils.getFs(basePath, table.getContext().getHadoopConf().newCopy()).getScheme())) {
|
||||
throw new HoodieException("Timeline-server-based markers are not supported for HDFS: "
|
||||
+ "base path " + basePath);
|
||||
}
|
||||
return new TimelineServerBasedWriteMarkers(table, instantTime);
|
||||
default:
|
||||
throw new HoodieException("The marker type \"" + markerType.name() + "\" is not supported.");
|
||||
|
||||
@@ -201,6 +201,7 @@ public class TestClientRollback extends HoodieClientTestBase {
|
||||
.withBaseFilesInPartitions(partitionAndFileId3);
|
||||
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withRollbackUsingMarkers(false)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
|
||||
@@ -308,6 +309,7 @@ public class TestClientRollback extends HoodieClientTestBase {
|
||||
|
||||
// Set Failed Writes rollback to EAGER
|
||||
config = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withRollbackUsingMarkers(false)
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
|
||||
final String commitTime5 = "20160506030631";
|
||||
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
|
||||
|
||||
@@ -316,7 +316,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
||||
.setTimelineLayoutVersion(VERSION_1)
|
||||
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
|
||||
|
||||
HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
|
||||
HoodieWriteConfig hoodieWriteConfig = getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA).withRollbackUsingMarkers(false).build();
|
||||
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
|
||||
|
||||
// Initial inserts with TRIP_EXAMPLE_SCHEMA
|
||||
@@ -507,11 +507,14 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
||||
}
|
||||
|
||||
private HoodieWriteConfig getWriteConfig(String schema) {
|
||||
return getWriteConfigBuilder(schema).build();
|
||||
}
|
||||
|
||||
private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) {
|
||||
return getConfigBuilder(schema)
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.withAvroSchemaValidate(true)
|
||||
.build();
|
||||
.withAvroSchemaValidate(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -278,7 +278,7 @@ public class TestHBaseIndex extends SparkClientFunctionalTestHarness {
|
||||
final int numRecords = 10;
|
||||
final String oldPartitionPath = "1970/01/01";
|
||||
final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName();
|
||||
HoodieWriteConfig config = getConfig(true, true);
|
||||
HoodieWriteConfig config = getConfigBuilder(100, true, true).withRollbackUsingMarkers(false).build();
|
||||
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
|
||||
|
||||
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||
@@ -337,6 +337,7 @@ public class TestHBaseIndex extends SparkClientFunctionalTestHarness {
|
||||
public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
|
||||
// Load to memory
|
||||
HoodieWriteConfig config = getConfigBuilder(100, false, false)
|
||||
.withRollbackUsingMarkers(false)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
|
||||
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
|
||||
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
|
||||
@@ -383,7 +384,7 @@ public class TestHBaseIndex extends SparkClientFunctionalTestHarness {
|
||||
@Test
|
||||
public void testSimpleTagLocationWithInvalidCommit() throws Exception {
|
||||
// Load to memory
|
||||
HoodieWriteConfig config = getConfig();
|
||||
HoodieWriteConfig config = getConfigBuilder(100, false, false).withRollbackUsingMarkers(false).build();
|
||||
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
|
||||
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
|
||||
|
||||
@@ -425,6 +426,7 @@ public class TestHBaseIndex extends SparkClientFunctionalTestHarness {
|
||||
public void testEnsureTagLocationUsesCommitTimeline() throws Exception {
|
||||
// Load to memory
|
||||
HoodieWriteConfig config = getConfigBuilder(100, false, false)
|
||||
.withRollbackUsingMarkers(false)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
|
||||
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
|
||||
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
|
||||
|
||||
@@ -98,10 +98,10 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
||||
private HoodieWriteConfig config;
|
||||
|
||||
private void setUp(IndexType indexType, boolean populateMetaFields) throws Exception {
|
||||
setUp(indexType, populateMetaFields, true);
|
||||
setUp(indexType, populateMetaFields, true, true);
|
||||
}
|
||||
|
||||
private void setUp(IndexType indexType, boolean populateMetaFields, boolean enableMetadata) throws Exception {
|
||||
private void setUp(IndexType indexType, boolean populateMetaFields, boolean enableMetadata, boolean rollbackUsingMarkers) throws Exception {
|
||||
this.indexType = indexType;
|
||||
initPath();
|
||||
initSparkContexts();
|
||||
@@ -111,6 +111,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
||||
: getPropertiesForKeyGen());
|
||||
config = getConfigBuilder()
|
||||
.withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen())
|
||||
.withRollbackUsingMarkers(rollbackUsingMarkers)
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
|
||||
.build()).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()).build();
|
||||
writeClient = getHoodieWriteClient(config);
|
||||
@@ -225,7 +226,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
||||
@ParameterizedTest
|
||||
@MethodSource("indexTypeParams")
|
||||
public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, boolean populateMetaFields) throws Exception {
|
||||
setUp(indexType, populateMetaFields, true);
|
||||
setUp(indexType, populateMetaFields, true, false);
|
||||
String newCommitTime = writeClient.startCommit();
|
||||
int totalRecords = 20 + random.nextInt(20);
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
|
||||
|
||||
@@ -162,8 +162,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
|
||||
public void testRollbackWhenFirstCommitFail() throws Exception {
|
||||
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
|
||||
.withRollbackUsingMarkers(false)
|
||||
.withPath(basePath).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
|
||||
|
||||
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
|
||||
client.startCommitWithTime("001");
|
||||
client.insert(jsc.emptyRDD(), "001");
|
||||
|
||||
Reference in New Issue
Block a user