1
0

[HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS (#4739)

- This change makes sure MT records are updated appropriately on HDFS: previously after Log File append operations MT records were updated w/ just the size of the deltas being appended to the original files, which have been found to be the cause of issues in case of Rollbacks that were instead updating MT with records bearing the full file-size.

- To make sure that we hedge against similar issues going f/w, this PR alleviates this discrepancy and streamlines the flow of MT table always ingesting records bearing full file-sizes.
This commit is contained in:
Alexey Kudinkin
2022-03-07 12:38:27 -08:00
committed by GitHub
parent f0bcee3c01
commit a66fd40692
18 changed files with 415 additions and 255 deletions

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -45,6 +47,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -73,9 +76,8 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -107,15 +109,16 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
protected final transient HoodieMetrics metrics;
private final transient HoodieIndex<?, ?> index;
private final SupportsUpgradeDowngrade upgradeDowngradeHelper;
private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;
protected final transient HoodieMetrics metrics;
protected transient Timer.Context writeTimer = null;
protected transient Timer.Context compactionTimer;
protected transient Timer.Context clusteringTimer;
private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;
protected transient AsyncCleanerService asyncCleanerService;
protected transient AsyncArchiveService asyncArchiveService;
protected final TransactionManager txnManager;
@@ -125,25 +128,32 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* Create a write client, with new hudi index.
* @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param upgradeDowngradeHelper engine-specific instance of {@link SupportsUpgradeDowngrade}
*/
@Deprecated
public BaseHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
this(context, writeConfig, Option.empty());
public BaseHoodieWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
this(context, writeConfig, Option.empty(), upgradeDowngradeHelper);
}
/**
* Create a write client, allows to specify all parameters.
*
* @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param timelineService Timeline Service that runs as part of write client.
*/
@Deprecated
public BaseHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) {
public BaseHoodieWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
super(context, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config);
this.index = createIndex(writeConfig);
this.txnManager = new TransactionManager(config, fs);
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
}
protected abstract HoodieIndex<?, ?> createIndex(HoodieWriteConfig writeConfig);
@@ -291,7 +301,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
throw new HoodieException("Cannot bootstrap the table in multi-writer mode");
}
HoodieTable<T, I, K, O> table = getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
rollbackFailedBootstrap();
table.bootstrap(context, extraMetadata);
}
@@ -299,7 +309,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
/**
* Main API to rollback failed bootstrap.
*/
public void rollbackFailedBootstrap() {
protected void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
@@ -628,7 +638,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @return true if the savepoint was restored to successfully
*/
public void restoreToSavepoint(String savepointTime) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
SavepointHelpers.validateSavepointPresence(table, savepointTime);
restoreToInstant(savepointTime);
SavepointHelpers.validateSavepointRestore(table, savepointTime);
@@ -636,25 +646,11 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
@Deprecated
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
Option<HoodiePendingRollbackInfo> pendingRollbackInfo = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
return rollback(commitInstantTime, pendingRollbackInfo, false);
}
/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String)}
* Adding this api for backwards compatability.
* @param commitInstantTime Instant time of the commit
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @throws HoodieRollbackException if rollback cannot be performed successfully
*/
@Deprecated
public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException {
return rollback(commitInstantTime, Option.empty(), skipLocking);
}
/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
@@ -711,7 +707,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
Timer.Context timerContext = metrics.getRollbackCtx();
try {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
if (restorePlanOption.isPresent()) {
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
@@ -988,7 +984,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
/**
* Rollback all failed writes.
*/
public Boolean rollbackFailedWrites() {
protected Boolean rollbackFailedWrites() {
return rollbackFailedWrites(false);
}
@@ -996,7 +992,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* Rollback all failed writes.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
*/
public Boolean rollbackFailedWrites(boolean skipLocking) {
protected Boolean rollbackFailedWrites(boolean skipLocking) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty());
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
@@ -1246,13 +1242,75 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}
/**
* Get HoodieTable and init {@link Timer.Context}.
* Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary
* bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped)
*
* @param operationType write operation type
* NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
* NOT REQUIRING EXTERNAL SYNCHRONIZATION
*
* @param metaClient instance of {@link HoodieTableMetaClient}
* @param instantTime current inflight instant time
* @return HoodieTable
* @return instantiated {@link HoodieTable}
*/
protected abstract HoodieTable<T, I, K, O> getTableAndInitCtx(WriteOperationType operationType, String instantTime);
protected abstract HoodieTable<T, I, K, O> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);
/**
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
* operations such as:
*
* NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please check on
* {@link #doInitTable(HoodieTableMetaClient, Option<String>)} instead
*
* <ul>
* <li>Checking whether upgrade/downgrade is required</li>
* <li>Bootstrapping Metadata Table (if required)</li>
* <li>Initializing metrics contexts</li>
* </ul>
*/
protected final HoodieTable<T, I, K, O> initTable(WriteOperationType operationType, Option<String> instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
// Setup write schemas for deletes
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
}
HoodieTable<T, I, K, O> table;
this.txnManager.beginTransaction();
try {
tryUpgrade(metaClient, instantTime);
table = doInitTable(metaClient, instantTime);
} finally {
this.txnManager.endTransaction();
}
// Validate table properties
metaClient.validateTableProperties(config.getProps(), operationType);
// Make sure that FS View is in sync
table.getHoodieView().sync();
switch (operationType) {
case INSERT:
case INSERT_PREPPED:
case UPSERT:
case UPSERT_PREPPED:
case BULK_INSERT:
case BULK_INSERT_PREPPED:
case INSERT_OVERWRITE:
case INSERT_OVERWRITE_TABLE:
setWriteTimer(table);
break;
case CLUSTER:
clusteringTimer = metrics.getClusteringCtx();
break;
case COMPACT:
compactionTimer = metrics.getCompactionCtx();
break;
default:
}
return table;
}
/**
* Sets write schema from last instant since deletes may not have schema set in the config.
@@ -1301,4 +1359,33 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
this.heartbeatClient.stop();
this.txnManager.close();
}
private void setWriteTimer(HoodieTable<T, I, K, O> table) {
String commitType = table.getMetaClient().getCommitActionType();
if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
writeTimer = metrics.getCommitCtx();
} else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
writeTimer = metrics.getDeltaCommitCtx();
}
}
private void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
UpgradeDowngrade upgradeDowngrade =
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);
if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
List<String> instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime);
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
rollbackFailedWrites(pendingRollbacks, true);
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
.run(HoodieTableVersion.current(), instantTime.orElse(null));
metaClient.reloadActiveTimeline();
}
}
}

View File

@@ -417,6 +417,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
writer = null;
// update final size, once for all log files
// TODO we can actually deduce file size purely from AppendResult (based on offset and size
// of the appended block)
for (WriteStatus status: statuses) {
long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath()));
status.getStat().setFileSizeInBytes(logFileSize);

View File

@@ -18,6 +18,10 @@
package org.apache.hudi.io;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
@@ -35,11 +39,6 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -235,20 +234,14 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(writeStatus.getFileId());
stat.setPath(new Path(config.getBasePath()), path);
stat.setTotalWriteBytes(computeTotalWriteBytes());
stat.setFileSizeInBytes(computeFileSizeInBytes());
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
long fileSize = FSUtils.getFileSize(fs, path);
stat.setTotalWriteBytes(fileSize);
stat.setFileSizeInBytes(fileSize);
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalCreateTime(timer.endTimer());
stat.setRuntimeStats(runtimeStats);
}
protected long computeTotalWriteBytes() throws IOException {
return FSUtils.getFileSize(fs, path);
}
protected long computeFileSizeInBytes() throws IOException {
return FSUtils.getFileSize(fs, path);
}
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieList;
@@ -62,9 +64,6 @@ import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,7 +92,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
private Option<HoodieBackedTableMetadataWriter> metadataWriterOption = Option.empty();
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig);
super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
this.bucketToHandles = new HashMap<>();
}
@@ -136,7 +135,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
@@ -152,7 +151,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
// only used for metadata table, the upsert happens in single thread
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(preppedRecords.get(0), getConfig(),
@@ -164,7 +163,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
@Override
public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
// create the write handle if not exists
@@ -187,7 +186,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
public List<WriteStatus> insertOverwrite(
List<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
// create the write handle if not exists
@@ -206,7 +205,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
*/
public List<WriteStatus> insertOverwriteTable(
List<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
// create the write handle if not exists
@@ -239,7 +238,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
@Override
public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context, instantTime, keys);
return postWrite(result, instantTime, table);
@@ -397,11 +396,9 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
@Override
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime);
return getTableAndInitCtx(metaClient, operationType);
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
// Create a Hoodie table which encapsulated the commits and files visible
return getHoodieTable();
}
/**
@@ -488,20 +485,6 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
return writeHandle;
}
private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) {
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
}
// Create a Hoodie table which encapsulated the commits and files visible
HoodieFlinkTable<T> table = getHoodieTable();
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
writeTimer = metrics.getCommitCtx();
} else {
writeTimer = metrics.getDeltaCommitCtx();
}
return table;
}
public HoodieFlinkTable<T> getHoodieTable() {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.data.HoodieList;
@@ -30,7 +32,6 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -40,9 +41,7 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieJavaTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.table.upgrade.JavaUpgradeDowngradeHelper;
import java.util.List;
import java.util.Map;
@@ -52,14 +51,14 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig);
super(context, clientConfig, JavaUpgradeDowngradeHelper.getInstance());
}
public HoodieJavaWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig,
boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
super(context, writeConfig, timelineService, JavaUpgradeDowngradeHelper.getInstance());
}
@Override
@@ -99,7 +98,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
@@ -113,7 +112,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords,
String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
@@ -123,7 +122,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
@Override
public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
@@ -137,7 +136,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> preppedRecords,
String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
initTable(WriteOperationType.INSERT_PREPPED, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_PREPPED, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
@@ -169,7 +168,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
String instantTime,
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.bulkInsertPrepped(context, instantTime, preppedRecords, bulkInsertPartitioner);
@@ -180,7 +179,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
public List<WriteStatus> delete(List<HoodieKey> keys,
String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context,instantTime, keys);
return postWrite(result, instantTime, table);
@@ -233,23 +232,11 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
}
@Override
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
// new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
return getTableAndInitCtx(metaClient, operationType);
// Create a Hoodie table which encapsulated the commits and files visible
return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient);
}
private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) {
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
}
// Create a Hoodie table which encapsulated the commits and files visible
HoodieJavaTable<T> table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient);
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
writeTimer = metrics.getCommitCtx();
} else {
writeTimer = metrics.getDeltaCommitCtx();
}
return table;
}
}

View File

@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieJavaTable;
import org.apache.hudi.table.HoodieTable;
/**
* Java upgrade and downgrade helper
*/
public class JavaUpgradeDowngradeHelper implements SupportsUpgradeDowngrade {
private static final JavaUpgradeDowngradeHelper SINGLETON_INSTANCE =
new JavaUpgradeDowngradeHelper();
private JavaUpgradeDowngradeHelper() {}
public static JavaUpgradeDowngradeHelper getInstance() {
return SINGLETON_INSTANCE;
}
@Override
public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
return HoodieJavaTable.create(config, context);
}
@Override
public String getPartitionColumns(HoodieWriteConfig config) {
return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
}
}

View File

@@ -18,15 +18,15 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -35,7 +35,6 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -55,10 +54,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
@@ -94,7 +89,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance());
}
/**
@@ -147,13 +142,13 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
*/
@Override
public void bootstrap(Option<Map<String, String>> extraMetadata) {
getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS).bootstrap(context, extraMetadata);
initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)).bootstrap(context, extraMetadata);
}
@Override
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsert(context, instantTime, records);
@@ -166,7 +161,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
@@ -176,7 +171,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insert(context,instantTime, records);
@@ -186,7 +181,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
initTable(WriteOperationType.INSERT_PREPPED, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_PREPPED, table.getMetaClient());
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
@@ -201,7 +196,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
HoodieWriteMetadata result = table.insertOverwrite(context, instantTime, records);
@@ -216,7 +211,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records);
@@ -231,7 +226,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime);
initTable(WriteOperationType.BULK_INSERT, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.BULK_INSERT, table.getMetaClient());
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner);
@@ -241,7 +236,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override
public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient());
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner);
@@ -250,14 +245,14 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.delete(context,instantTime, keys);
return postWrite(result, instantTime, table);
}
public HoodieWriteResult deletePartitions(List<String> partitions, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime);
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient());
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.deletePartitions(context, instantTime, partitions);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
@@ -420,34 +415,14 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
}
@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType,
String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance());
try {
this.txnManager.beginTransaction();
if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
List<String> instantsToRollback = getInstantsToRollback(
metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime));
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
this.rollbackFailedWrites(pendingRollbacks, true);
new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime);
metaClient.reloadActiveTimeline();
}
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// if it didn't exist before
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details
initializeMetadataTable(Option.of(instantTime));
} finally {
this.txnManager.endTransaction();
}
metaClient.validateTableProperties(config.getProps(), operationType);
return getTableAndInitCtx(metaClient, operationType, instantTime);
initializeMetadataTable(instantTime);
// Create a Hoodie table which encapsulated the commits and files visible
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled());
}
/**
@@ -480,22 +455,6 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
}
}
private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getTableAndInitCtx(
HoodieTableMetaClient metaClient, WriteOperationType operationType, String instantTime) {
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
}
// Create a Hoodie table which encapsulated the commits and files visible
HoodieSparkTable<T> table = HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled());
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
writeTimer = metrics.getCommitCtx();
} else {
writeTimer = metrics.getDeltaCommitCtx();
}
table.getHoodieView().sync();
return table;
}
@Override
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.

View File

@@ -1077,6 +1077,8 @@ public class TestCleaner extends HoodieClientTestBase {
writeStat.setPartitionPath(partition);
writeStat.setPath(partition + "/" + getBaseFilename(instantTime, newFileId));
writeStat.setFileId(newFileId);
writeStat.setTotalWriteBytes(1);
writeStat.setFileSizeInBytes(1);
replaceMetadata.addWriteStat(partition, writeStat);
}
return Pair.of(requestedReplaceMetadata, replaceMetadata);
@@ -1756,6 +1758,8 @@ public class TestCleaner extends HoodieClientTestBase {
writeStat.setPartitionPath(partitionPath);
writeStat.setPath(partitionPath + "/" + getBaseFilename(instantTime, f));
writeStat.setFileId(f);
writeStat.setTotalWriteBytes(1);
writeStat.setFileSizeInBytes(1);
metadata.addWriteStat(partitionPath, writeStat);
}));
return metadata;

View File

@@ -102,6 +102,7 @@ import scala.Tuple2;
import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertLinesMatch;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -617,21 +618,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
Collections.sort(fsFileNames);
Collections.sort(metadataFilenames);
if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
for (String fileName : fsFileNames) {
if (!metadataFilenames.contains(fileName)) {
LOG.error(partition + "FsFilename " + fileName + " not found in Meta data");
}
}
for (String fileName : metadataFilenames) {
if (!fsFileNames.contains(fileName)) {
LOG.error(partition + "Metadata file " + fileName + " not found in original FS");
}
}
}
assertLinesMatch(fsFileNames, metadataFilenames);
assertEquals(fsStatuses.length, partitionToFilesMap.get(partitionPath.toString()).length);
// Block sizes should be valid

View File

@@ -77,8 +77,8 @@ public class CollectionUtils {
* NOTE: That values associated with overlapping keys from the second map, will override
* values from the first one
*/
public static <K, V> Map<K, V> combine(Map<K, V> one, Map<K, V> another) {
Map<K, V> combined = new HashMap<>(one.size() + another.size());
public static <K, V> HashMap<K, V> combine(Map<K, V> one, Map<K, V> another) {
HashMap<K, V> combined = new HashMap<>(one.size() + another.size());
combined.putAll(one);
combined.putAll(another);
return combined;

View File

@@ -52,6 +52,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -239,10 +240,22 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
Option<Map<String, Long>> filesAdded,
Option<List<String>> filesDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
filesAdded.ifPresent(
m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false))));
filesDeleted.ifPresent(
m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true))));
filesAdded.ifPresent(filesMap ->
fileInfo.putAll(
filesMap.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, (entry) -> {
long fileSize = entry.getValue();
// Assert that the file-size of the file being added is positive, since Hudi
// should not be creating empty files
checkState(fileSize > 0);
return new HoodieMetadataFileInfo(fileSize, false);
})))
);
filesDeleted.ifPresent(filesList ->
fileInfo.putAll(
filesList.stream().collect(
Collectors.toMap(Function.identity(), (ignored) -> new HoodieMetadataFileInfo(0L, true))))
);
HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.getPartitionPath());
HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
@@ -288,7 +301,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
switch (type) {
case METADATA_TYPE_PARTITION_LIST:
case METADATA_TYPE_FILE_LIST:
Map<String, HoodieMetadataFileInfo> combinedFileInfo = combineFilesystemMetadata(previousRecord);
Map<String, HoodieMetadataFileInfo> combinedFileInfo = combineFileSystemMetadata(previousRecord);
return new HoodieMetadataPayload(key, type, combinedFileInfo);
case METADATA_TYPE_BLOOM_FILTER:
HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord);
@@ -392,28 +405,53 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted);
}
private Map<String, HoodieMetadataFileInfo> combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
private Map<String, HoodieMetadataFileInfo> combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
// First, add all files listed in the previous record
if (previousRecord.filesystemMetadata != null) {
combinedFileInfo.putAll(previousRecord.filesystemMetadata);
}
// Second, merge in the files listed in the new record
if (filesystemMetadata != null) {
filesystemMetadata.forEach((filename, fileInfo) -> {
// If the filename wasnt present then we carry it forward
if (!combinedFileInfo.containsKey(filename)) {
combinedFileInfo.put(filename, fileInfo);
} else {
if (fileInfo.getIsDeleted()) {
// file deletion
combinedFileInfo.remove(filename);
} else {
// file appends.
combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> {
return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false);
});
}
}
validatePayload(type, filesystemMetadata);
filesystemMetadata.forEach((key, fileInfo) -> {
combinedFileInfo.merge(key, fileInfo,
// Combine previous record w/ the new one, new records taking precedence over
// the old one
//
// NOTE: That if previous listing contains the file that is being deleted by the tombstone
// record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting
// listing as well as drop the tombstone itself.
// However, if file is not present in the previous record we have to persist tombstone
// record in the listing to make sure we carry forward information that this file
// was deleted. This special case could occur since the merging flow is 2-stage:
// - First we merge records from all of the delta log-files
// - Then we merge records from base-files with the delta ones (coming as a result
// of the previous step)
(oldFileInfo, newFileInfo) ->
// NOTE: We cant assume that MT update records will be ordered the same way as actual
// FS operations (since they are not atomic), therefore MT record merging should be a
// _commutative_ & _associative_ operation (ie one that would work even in case records
// will get re-ordered), which is
// - Possible for file-sizes (since file-sizes will ever grow, we can simply
// take max of the old and new records)
// - Not possible for is-deleted flags*
//
// *However, were assuming that the case of concurrent write and deletion of the same
// file is _impossible_ -- it would only be possible with concurrent upsert and
// rollback operation (affecting the same log-file), which is implausible, b/c either
// of the following have to be true:
// - Were appending to failed log-file (then the other writer is trying to
// rollback it concurrently, before its own write)
// - Rollback (of completed instant) is running concurrently with append (meaning
// that restore is running concurrently with a write, which is also nut supported
// currently)
newFileInfo.getIsDeleted()
? null
: new HoodieMetadataFileInfo(Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), false));
});
}
@@ -509,6 +547,14 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
return sb.toString();
}
private static void validatePayload(int type, Map<String, HoodieMetadataFileInfo> filesystemMetadata) {
if (type == METADATA_TYPE_FILE_LIST) {
filesystemMetadata.forEach((fileName, fileInfo) -> {
checkState(fileInfo.getIsDeleted() || fileInfo.getSize() > 0, "Existing files should have size > 0");
});
}
}
private static <T> T getNestedFieldValue(GenericRecord record, String fieldName) {
// NOTE: This routine is more lightweight than {@code HoodieAvroUtils.getNestedFieldVal}
if (record.getSchema().getField(fieldName) == null) {

View File

@@ -40,6 +40,7 @@ import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.ValidationUtils;
@@ -147,40 +148,58 @@ public class HoodieTableMetadataUtil {
*/
public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCommitMetadata commitMetadata,
String instantTime) {
List<HoodieRecord> records = new LinkedList<>();
List<String> allPartitions = new LinkedList<>();
commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName;
allPartitions.add(partition);
List<HoodieRecord> records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size());
Map<String, Long> newFiles = new HashMap<>(writeStats.size());
writeStats.forEach(hoodieWriteStat -> {
String pathWithPartition = hoodieWriteStat.getPath();
// Add record bearing partitions list
ArrayList<String> partitionsList = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList));
// Update files listing records for each individual partition
List<HoodieRecord<HoodieMetadataPayload>> updatedPartitionFilesRecords =
commitMetadata.getPartitionToWriteStats().entrySet()
.stream()
.map(entry -> {
String partitionStatName = entry.getKey();
List<HoodieWriteStat> writeStats = entry.getValue();
String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName;
HashMap<String, Long> updatedFilesToSizesMapping =
writeStats.stream().reduce(new HashMap<>(writeStats.size()),
(map, stat) -> {
String pathWithPartition = stat.getPath();
if (pathWithPartition == null) {
// Empty partition
LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
return;
LOG.warn("Unable to find path in write stat to update metadata table " + stat);
return map;
}
int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1;
int offset = partition.equals(NON_PARTITIONED_NAME)
? (pathWithPartition.startsWith("/") ? 1 : 0)
: partition.length() + 1;
String filename = pathWithPartition.substring(offset);
long totalWriteBytes = newFiles.containsKey(filename)
? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes()
: hoodieWriteStat.getTotalWriteBytes();
newFiles.put(filename, totalWriteBytes);
});
// New files added to a partition
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
partition, Option.of(newFiles), Option.empty());
records.add(record);
});
// New partitions created
HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions));
records.add(record);
// Since write-stats are coming in no particular order, if the same
// file have previously been appended to w/in the txn, we simply pick max
// of the sizes as reported after every write, since file-sizes are
// monotonically increasing (ie file-size never goes down, unless deleted)
map.merge(filename, stat.getFileSizeInBytes(), Math::max);
return map;
},
CollectionUtils::combine);
return HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(updatedFilesToSizesMapping),
Option.empty());
})
.collect(Collectors.toList());
records.addAll(updatedPartitionFilesRecords);
LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType()
+ ". #partitions_updated=" + records.size());
return records;
}

View File

@@ -285,7 +285,7 @@ public class FileCreateUtils {
public static void createBaseFile(String basePath, String partitionPath, String instantTime, String fileId)
throws Exception {
createBaseFile(basePath, partitionPath, instantTime, fileId, 0);
createBaseFile(basePath, partitionPath, instantTime, fileId, 1);
}
public static void createBaseFile(String basePath, String partitionPath, String instantTime, String fileId, long length)

View File

@@ -1057,6 +1057,7 @@ public class HoodieTestTable {
writeStat.setPartitionPath(partition);
writeStat.setPath(partition + "/" + fileName);
writeStat.setTotalWriteBytes(fileIdInfo.getValue());
writeStat.setFileSizeInBytes(fileIdInfo.getValue());
writeStats.add(writeStat);
}
}
@@ -1082,6 +1083,7 @@ public class HoodieTestTable {
writeStat.setPartitionPath(partition);
writeStat.setPath(partition + "/" + fileName);
writeStat.setTotalWriteBytes(fileIdInfo.getValue()[1]);
writeStat.setFileSizeInBytes(fileIdInfo.getValue()[1]);
writeStats.add(writeStat);
}
}

View File

@@ -18,6 +18,14 @@
package org.apache.hudi.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -30,15 +38,6 @@ import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -276,6 +275,9 @@ public class TestStreamWriteOperatorCoordinator {
writeStat.setPartitionPath(partitionPath);
writeStat.setFileId("fileId123");
writeStat.setPath("path123");
writeStat.setFileSizeInBytes(123);
writeStat.setTotalWriteBytes(123);
writeStat.setNumWrites(1);
writeStatus.setStat(writeStat);

View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.functional;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig;
// Sole purpose of this class is to provide access to otherwise API inaccessible from the tests.
// While it's certainly not a great pattern, it would require substantial test restructuring to
// eliminate such access to an internal API, so this is considered acceptable given it's very limited
// scope (w/in the current package)
class SparkRDDWriteClientOverride extends org.apache.hudi.client.SparkRDDWriteClient {
public SparkRDDWriteClientOverride(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig);
}
@Override
public void rollbackFailedBootstrap() {
super.rollbackFailedBootstrap();
}
}

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.functional;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
@@ -253,7 +252,8 @@ public class TestBootstrap extends HoodieClientTestBase {
.withBootstrapParallelism(3)
.withBootstrapModeSelector(bootstrapModeSelectorClass).build())
.build();
SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride(context, config);
client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
@@ -272,7 +272,7 @@ public class TestBootstrap extends HoodieClientTestBase {
assertFalse(index.useIndex());
// Run bootstrap again
client = new SparkRDDWriteClient(context, config);
client = new SparkRDDWriteClientOverride(context, config);
client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.functional;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
@@ -245,7 +244,8 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
.withBootstrapParallelism(3)
.withBootstrapModeSelector(bootstrapModeSelectorClass).build())
.build();
SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride(context, config);
client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
@@ -266,7 +266,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
assertFalse(index.useIndex());
// Run bootstrap again
client = new SparkRDDWriteClient(context, config);
client = new SparkRDDWriteClientOverride(context, config);
client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();