1
0

[HUDI-1513] Introduce WriteClient#preWrite() and relocate metadata table syncing (#2413)

- Syncing to metadata table, setting operation type, starting async cleaner done in preWrite()
 - Fixes an issues where delete() was not starting async cleaner correctly
 - Fixed tests and enabled metadata table for TestAsyncCompaction
This commit is contained in:
vinoth chandar
2021-01-06 23:08:30 -08:00
committed by GitHub
parent b593f10629
commit 5ff8e88d58
8 changed files with 54 additions and 57 deletions

View File

@@ -96,8 +96,8 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;
private transient AsyncCleanerService asyncCleanerService;
protected final boolean rollbackPending;
protected transient AsyncCleanerService asyncCleanerService;
/**
* Create a write client, without cleaning up failed/inflight commits.
@@ -134,7 +134,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;
this.index = createIndex(writeConfig);
syncTableMetadata();
}
protected abstract HoodieIndex<T, I, K, O> createIndex(HoodieWriteConfig writeConfig);
@@ -368,6 +367,19 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
*/
public abstract O delete(K keys, final String instantTime);
/**
* Common method containing steps to be performed before write (upsert/insert/...
*
* @param instantTime Instant Time
* @param hoodieTable Hoodie Table
* @return Write Status
*/
protected void preWrite(String instantTime, WriteOperationType writeOperationType) {
setOperationType(writeOperationType);
syncTableMetadata();
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
}
/**
* Common method containing steps to be performed after write (upsert/insert/..) operations including auto-commit.
* @param result Commit Action Result

View File

@@ -36,7 +36,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -365,7 +364,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync);
// Read each instant in order and sync it to metadata table
final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline();
for (HoodieInstant instant : instantsToSync) {
LOG.info("Syncing instant " + instant + " to metadata table");

View File

@@ -106,8 +106,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.UPSERT);
HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -125,8 +124,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT);
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -158,7 +156,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
setOperationType(WriteOperationType.DELETE);
preWrite(instantTime, WriteOperationType.DELETE);
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context,instantTime, keys);
return postWrite(result, instantTime, table);
}

View File

@@ -97,8 +97,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.UPSERT);
HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -112,8 +111,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED);
HoodieWriteMetadata<List<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -123,8 +121,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT);
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -138,8 +135,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT_PREPPED);
HoodieWriteMetadata<List<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -169,7 +165,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
setOperationType(WriteOperationType.DELETE);
preWrite(instantTime, WriteOperationType.DELETE);
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context,instantTime, keys);
return postWrite(result, instantTime, table);
}

View File

@@ -45,7 +45,7 @@ public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends Abstrac
@Override
public void compact(HoodieInstant instant) throws IOException {
LOG.info("Compactor executing compaction " + instant);
SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>)compactionClient;
SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) compactionClient;
JavaRDD<WriteStatus> res = writeClient.compact(instant.getTimestamp());
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status");
long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count();

View File

@@ -140,8 +140,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.UPSERT);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -154,8 +153,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -165,8 +163,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insert(context,instantTime, records);
return postWrite(result, instantTime, table);
}
@@ -176,8 +173,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT_PREPPED);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
@@ -192,8 +188,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE);
HoodieWriteMetadata result = table.insertOverwrite(context, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
@@ -209,8 +204,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE);
HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
@@ -225,8 +219,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.BULK_INSERT);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
@@ -236,8 +229,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
@@ -245,14 +237,14 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
setOperationType(WriteOperationType.DELETE);
preWrite(instantTime, WriteOperationType.DELETE);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.delete(context,instantTime, keys);
return postWrite(result, instantTime, table);
}
public HoodieWriteResult deletePartitions(List<String> partitions, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime);
setOperationType(WriteOperationType.DELETE_PARTITION);
preWrite(instantTime, WriteOperationType.DELETE_PARTITION);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.deletePartitions(context,instantTime, partitions);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}

View File

@@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -62,7 +61,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.nio.file.Files;
@@ -125,9 +123,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
assertThrows(TableNotFoundException.class, () -> new HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
}
// Metadata table created when enabled by config
// Metadata table created when enabled by config & sync is called
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
client.startCommitWithTime("001");
client.syncTableMetadata();
assertTrue(fs.exists(new Path(metadataTableBasePath)));
validateMetadata(client);
}
@@ -504,11 +503,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
// Enable metadata table and ensure it is synced
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details
client.syncTableMetadata();
client.restoreToInstant(restoreToInstant);
assertFalse(metadata(client).isInSync());
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
client.syncTableMetadata();
validateMetadata(client);
assertTrue(metadata(client).isInSync());
@@ -519,9 +520,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
* Instants on Metadata Table should be archived as per config.
* Metadata Table should be automatically compacted as per config.
*/
@ParameterizedTest
@ValueSource(booleans = {false})
public void testCleaningArchivingAndCompaction(boolean asyncClean) throws Exception {
@Test
public void testCleaningArchivingAndCompaction() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
@@ -530,8 +530,9 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.archiveCommitsWith(6, 8).retainCommits(1)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3)
.retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(asyncClean).build())
// don't archive the data timeline at all.
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(Integer.MAX_VALUE - 1, Integer.MAX_VALUE)
.retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(true).build())
.build();
List<HoodieRecord> records;
@@ -551,17 +552,16 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, config.getBasePath());
HoodieActiveTimeline metadataTimeline = metadataMetaClient.getActiveTimeline();
// check that there are 2 compactions.
assertEquals(2, metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants());
// check that cleaning has, once after each compaction. There will be more instances on the timeline, since it's less aggressively archived
assertEquals(4, metadataTimeline.getCleanerTimeline().filterCompletedInstants().countInstants());
// check that there are compactions.
assertTrue(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants() > 0);
// check that cleaning has, once after each compaction.
assertTrue(metadataTimeline.getCleanerTimeline().filterCompletedInstants().countInstants() > 0);
// ensure archiving has happened
List<HoodieInstant> instants = metadataTimeline.getCommitsAndCompactionTimeline()
.getInstants().collect(Collectors.toList());
Collections.reverse(instants);
long numDeltaCommits = instants.stream().filter(instant -> instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)).count();
assertEquals(5, numDeltaCommits);
long numDataCompletedInstants = datasetMetaClient.getActiveTimeline().filterCompletedInstants().countInstants();
long numDeltaCommits = metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants();
assertTrue(numDeltaCommits < numDataCompletedInstants, "Must have less delta commits than total completed instants on data timeline.");
}
/**

View File

@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -50,7 +51,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestAsyncCompaction extends CompactionTestBase {
private HoodieWriteConfig getConfig(Boolean autoCommit) {
return getConfigBuilder(autoCommit).build();
return getConfigBuilder(autoCommit)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).validate(true).build())
.build();
}
@Test
@@ -85,8 +88,6 @@ public class TestAsyncCompaction extends CompactionTestBase {
// Reload and rollback inflight compaction
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
// hoodieTable.rollback(jsc,
// new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false);
client.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable);