1
0

[HUDI-1477] Support copyOnWriteTable in java client (#2382)

This commit is contained in:
Shen Hong
2021-02-23 20:50:55 +08:00
committed by GitHub
parent 3ceb1b4c83
commit 2efd0760ac
19 changed files with 1644 additions and 10 deletions

View File

@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
/**
* A factory to generate built-in partitioner to repartition input records into at least
* expected number of output spark partitions for bulk insert operation.
*/
public abstract class JavaBulkInsertInternalPartitionerFactory {
public static BulkInsertPartitioner get(BulkInsertSortMode sortMode) {
switch (sortMode) {
case NONE:
return new JavaNonSortPartitioner();
case GLOBAL_SORT:
return new JavaGlobalSortPartitioner();
default:
throw new HoodieException("The bulk insert sort mode \"" + sortMode.name()
+ "\" is not supported in java client.");
}
}
}

View File

@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.table.BulkInsertPartitioner;
import java.util.Comparator;
import java.util.List;
/**
* A built-in partitioner that does global sorting for the input records across partitions
* after repartition for bulk insert operation, corresponding to the
* {@code BulkInsertSortMode.GLOBAL_SORT} mode.
*
* @param <T> HoodieRecordPayload type
*/
public class JavaGlobalSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
@Override
public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> records,
int outputSparkPartitions) {
// Now, sort the records and line them up nicely for loading.
records.sort(new Comparator() {
@Override
public int compare(Object o1, Object o2) {
HoodieRecord o11 = (HoodieRecord) o1;
HoodieRecord o22 = (HoodieRecord) o2;
String left = new StringBuilder()
.append(o11.getPartitionPath())
.append("+")
.append(o11.getRecordKey())
.toString();
String right = new StringBuilder()
.append(o22.getPartitionPath())
.append("+")
.append(o22.getRecordKey())
.toString();
return left.compareTo(right);
}
});
return records;
}
@Override
public boolean arePartitionRecordsSorted() {
return true;
}
}

View File

@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.table.BulkInsertPartitioner;
import java.util.List;
/**
* A built-in partitioner that only does coalesce for input records for bulk insert operation,
* corresponding to the {@code BulkInsertSortMode.NONE} mode.
*
* @param <T> HoodieRecordPayload type
*/
public class JavaNonSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
@Override
public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> records,
int outputPartitions) {
return records;
}
@Override
public boolean arePartitionRecordsSorted() {
return false;
}
}

View File

@@ -39,10 +39,17 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.JavaCleanActionExecutor;
import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import java.util.List;
import java.util.Map;
@@ -75,7 +82,8 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
String instantTime,
List<HoodieRecord<T>> records,
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
throw new HoodieNotSupportedException("BulkInsert is not supported yet");
return new JavaBulkInsertCommitActionExecutor((HoodieJavaEngineContext) context, config,
this, instantTime, records, bulkInsertPartitioner).execute();
}
@Override
@@ -112,21 +120,24 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
String instantTime,
List<HoodieRecord<T>> preppedRecords,
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
throw new HoodieNotSupportedException("BulkInsertPrepped is not supported yet");
return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config,
this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> records) {
throw new HoodieNotSupportedException("InsertOverwrite is not supported yet");
return new JavaInsertOverwriteCommitActionExecutor(
context, config, this, instantTime, records).execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> records) {
throw new HoodieNotSupportedException("InsertOverwrite is not supported yet");
return new JavaInsertOverwriteTableCommitActionExecutor(
context, config, this, instantTime, records).execute();
}
@Override
@@ -175,7 +186,8 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
throw new HoodieNotSupportedException("Rollback is not supported yet");
return new JavaCopyOnWriteRollbackActionExecutor(
context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
@Override
@@ -183,13 +195,15 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
String instantToSavepoint,
String user,
String comment) {
throw new HoodieNotSupportedException("Savepoint is not supported yet");
return new SavepointActionExecutor(
context, config, this, instantToSavepoint, user, comment).execute();
}
@Override
public HoodieRestoreMetadata restore(HoodieEngineContext context,
String restoreInstantTime,
String instantToRestore) {
throw new HoodieNotSupportedException("Restore is not supported yet");
return new JavaCopyOnWriteRestoreActionExecutor((HoodieJavaEngineContext) context,
config, this, restoreInstantTime, instantToRestore).execute();
}
}

View File

@@ -121,6 +121,7 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
}
});
updateIndex(writeStatuses, result);
updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
}
@@ -297,8 +298,7 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
@@ -325,4 +325,13 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
return getUpsertPartitioner(profile);
}
public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
Instant indexStartTime = Instant.now();
// Update the index back
List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses));
commitOnAutoCommit(result);
}
}

View File

@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.List;
import java.util.Map;
public class JavaBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
private final List<HoodieRecord<T>> inputRecords;
private final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner;
public JavaBulkInsertCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> inputRecords,
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
this(context, config, table, instantTime, inputRecords, bulkInsertPartitioner, Option.empty());
}
public JavaBulkInsertCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> inputRecords,
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
this.inputRecords = inputRecords;
this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
try {
return JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, table, config,
this, true, bulkInsertPartitioner);
} catch (HoodieInsertException ie) {
throw ie;
} catch (Throwable e) {
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}
}

View File

@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.JavaLazyInsertIterable;
import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.ArrayList;
import java.util.List;
/**
* A java implementation of {@link AbstractBulkInsertHelper}.
*
* @param <T>
*/
@SuppressWarnings("checkstyle:LineLength")
public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends AbstractBulkInsertHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>, R> {
private JavaBulkInsertHelper() {
}
private static class BulkInsertHelperHolder {
private static final JavaBulkInsertHelper JAVA_BULK_INSERT_HELPER = new JavaBulkInsertHelper();
}
public static JavaBulkInsertHelper newInstance() {
return BulkInsertHelperHolder.JAVA_BULK_INSERT_HELPER;
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(final List<HoodieRecord<T>> inputRecords,
final String instantTime,
final HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
final HoodieWriteConfig config,
final BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor,
final boolean performDedupe,
final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
//transition bulk_insert state to inflight
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
// write new files
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism());
//update index
((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
}
@Override
public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean useWriterSchema,
int parallelism) {
// De-dupe/merge if needed
List<HoodieRecord<T>> dedupedRecords = inputRecords;
if (performDedupe) {
dedupedRecords = (List<HoodieRecord<T>>) JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
parallelism, table);
}
final List<HoodieRecord<T>> repartitionedRecords;
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
? userDefinedBulkInsertPartitioner.get()
: JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
String idPfx = FSUtils.createNewFileIdPfx();
List<WriteStatus> writeStatuses = new ArrayList<>();
new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true, config, instantTime, table, idPfx,
table.getTaskContextSupplier(), new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
return writeStatuses;
}
}

View File

@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.List;
public class JavaBulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseJavaCommitActionExecutor<T> {
private final List<HoodieRecord<T>> preppedInputRecord;
private final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner;
public JavaBulkInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> preppedInputRecord,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecord = preppedInputRecord;
this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
try {
return JavaBulkInsertHelper.newInstance().bulkInsert(preppedInputRecord, instantTime, table, config,
this, false, userDefinedBulkInsertPartitioner);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}
}

View File

@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class JavaInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseJavaCommitActionExecutor<T> {
private final List<HoodieRecord<T>> inputRecords;
public JavaInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> inputRecords) {
this(context, config, table, instantTime, inputRecords, WriteOperationType.INSERT_OVERWRITE);
}
public JavaInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> inputRecords,
WriteOperationType writeOperationType) {
super(context, config, table, instantTime, writeOperationType);
this.inputRecords = inputRecords;
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
}
@Override
protected String getCommitActionType() {
return HoodieTimeline.REPLACE_COMMIT_ACTION;
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
return context.mapToPair(
writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toList()),
partitionPath ->
Pair.of(partitionPath, getAllExistingFileIds(partitionPath)), 1
);
}
private List<String> getAllExistingFileIds(String partitionPath) {
// because new commit is not complete. it is safe to mark all existing file Ids as old files
return table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class JavaInsertOverwriteTableCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends JavaInsertOverwriteCommitActionExecutor<T> {
public JavaInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> inputRecords) {
super(context, config, table, instantTime, inputRecords, WriteOperationType.INSERT_OVERWRITE_TABLE);
}
protected List<String> getAllExistingFileIds(String partitionPath) {
return table.getSliceView().getLatestFileSlices(partitionPath)
.map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
if (partitionPaths != null && partitionPaths.size() > 0) {
partitionToExistingFileIds = context.mapToPair(partitionPaths,
partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)), 1);
}
return partitionToExistingFileIds;
}
}

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor;
import java.util.List;
public class JavaCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload> extends
BaseRestoreActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public JavaCopyOnWriteRestoreActionExecutor(HoodieJavaEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
String restoreInstantTime) {
super(context, config, table, instantTime, restoreInstantTime);
}
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
table.getMetaClient().reloadActiveTimeline();
JavaCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new JavaCopyOnWriteRollbackActionExecutor(
context,
config,
table,
HoodieActiveTimeline.createNewInstantTime(),
instantToRollback,
true,
true,
false);
if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
&& !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback);
}
return rollbackActionExecutor.execute();
}
}

View File

@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.util.List;
@SuppressWarnings("checkstyle:LineLength")
public class JavaCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
BaseCopyOnWriteRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
}
public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
}
@Override
protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
if (useMarkerBasedStrategy) {
return new JavaMarkerBasedRollbackStrategy(table, context, config, instantTime);
} else {
return this::executeRollbackUsingFileListing;
}
}
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
context, table.getMetaClient().getBasePath(), config);
return new JavaListingBasedRollbackHelper(table.getMetaClient(), config)
.performRollback(context, instantToRollback, rollbackRequests);
}
}

View File

@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Performs Rollback of Hoodie Tables.
*/
public class JavaListingBasedRollbackHelper implements Serializable {
private static final Logger LOG = LogManager.getLogger(JavaListingBasedRollbackHelper.class);
private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig config;
public JavaListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
this.metaClient = metaClient;
this.config = config;
}
/**
* Performs all rollback actions that we have collected in parallel.
*/
public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
Map<String, List<Pair<String, HoodieRollbackStat>>> collect = partitionPathRollbackStatsPairs.entrySet()
.stream()
.map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
return collect.values().stream()
.map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/**
* Collect all file info that needs to be rollbacked.
*/
public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
return new ArrayList<>(partitionPathRollbackStatsPairs.values());
}
/**
* May be delete interested files and collect stats or collect stats only.
*
* @param context instance of {@link HoodieEngineContext} to use.
* @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
* @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on.
* @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
* @return stats collected with or w/o actual deletions.
*/
Map<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context,
HoodieInstant instantToRollback,
List<ListingBasedRollbackRequest> rollbackRequests,
boolean doDelete) {
return context.mapToPair(rollbackRequests, rollbackRequest -> {
switch (rollbackRequest.getType()) {
case DELETE_DATA_FILES_ONLY: {
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath(), doDelete);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case DELETE_DATA_AND_LOG_FILES: {
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
HoodieLogFormat.Writer writer = null;
try {
writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(rollbackRequest.getFileId().get())
.overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
if (doDelete) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
// if update belongs to an existing log file
writer.appendBlock(new HoodieCommandBlock(header));
}
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
} finally {
try {
if (writer != null) {
writer.close();
}
} catch (IOException io) {
throw new HoodieIOException("Error appending rollback block..", io);
}
}
// This step is intentionally done after writer is closed. Guarantees that
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168
Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L
);
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
}
default:
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
}
}, 0);
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, boolean doDelete) throws IOException {
LOG.info("Cleaning path " + partitionPath);
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
SerializablePathFilter filter = (path) -> {
if (path.toString().endsWith(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
} else if (FSUtils.isLogFile(path)) {
// Since the baseCommitTime is the only commit for new log files, it's okay here
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
return commit.equals(fileCommitTime);
}
return false;
};
final Map<FileStatus, Boolean> results = new HashMap<>();
FileSystem fs = metaClient.getFs();
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
if (doDelete) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
LOG.info("Delete file " + file.getPath() + "\t" + success);
} else {
results.put(file, true);
}
}
return results;
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
String commit, String partitionPath, boolean doDelete) throws IOException {
final Map<FileStatus, Boolean> results = new HashMap<>();
LOG.info("Cleaning path " + partitionPath);
FileSystem fs = metaClient.getFs();
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
PathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
if (doDelete) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
LOG.info("Delete file " + file.getPath() + "\t" + success);
} else {
results.put(file, true);
}
}
return results;
}
private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
// generate metadata
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(3);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
return header;
}
public interface SerializablePathFilter extends PathFilter, Serializable {
}
}

View File

@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import java.util.List;
import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
public class JavaMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public JavaMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieEngineContext context,
HoodieWriteConfig config,
String instantTime) {
super(table, context, config, instantTime);
}
@Override
public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
try {
MarkerFiles markerFiles = new MarkerFiles(table, instantToRollback.getTimestamp());
List<HoodieRollbackStat> rollbackStats = context.map(markerFiles.allMarkerFilePaths(), markerFilePath -> {
String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
switch (type) {
case MERGE:
return undoMerge(MarkerFiles.stripMarkerSuffix(markerFilePath));
case APPEND:
return undoAppend(MarkerFiles.stripMarkerSuffix(markerFilePath), instantToRollback);
case CREATE:
return undoCreate(MarkerFiles.stripMarkerSuffix(markerFilePath));
default:
throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
}
}, 0);
return rollbackStats.stream().map(rollbackStat -> Pair.of(rollbackStat.getPartitionPath(), rollbackStat))
.collect(Collectors.groupingBy(Pair::getKey))
.values()
.stream()
.map(x -> x.stream().map(y -> y.getValue()).reduce(RollbackUtils::mergeRollbackStat).get())
.collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
}
}
}