1
0

[HUDI-332]Add operation type (insert/upsert/bulkinsert/delete) to HoodieCommitMetadata (#1157)

[HUDI-332]Add operation type (insert/upsert/bulkinsert/delete) to HoodieCommitMetadata (#1157)
This commit is contained in:
hongdd
2020-03-04 02:10:29 +08:00
committed by GitHub
parent 2d04014581
commit 8306205d7a
10 changed files with 179 additions and 12 deletions

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRollingStat;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -71,6 +72,15 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
private final transient HoodieIndex<T> index;
private transient Timer.Context writeContext = null;
private transient WriteOperationType operationType;
public void setOperationType(WriteOperationType operationType) {
this.operationType = operationType;
}
public WriteOperationType getOperationType() {
return this.operationType;
}
protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, HoodieWriteConfig clientConfig) {
super(jsc, clientConfig);
@@ -149,6 +159,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
extraMetadata.get().forEach(metadata::addMetadata);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
metadata.setOperationType(operationType);
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime),
@@ -255,9 +266,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
return index;
}
protected HoodieTable getTableAndInitCtx(OperationType operationType) {
protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) {
HoodieTableMetaClient metaClient = createMetaClient(true);
if (operationType == OperationType.DELETE) {
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaFromLastInstant(metaClient);
}
// Create a Hoodie table which encapsulated the commits and files visible

View File

@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
@@ -98,7 +99,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private final transient HoodieCleanClient<T> cleanClient;
private transient Timer.Context compactionTimer;
/**
* Create a write client, without cleaning up failed/inflight commits.
*
@@ -174,7 +174,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.UPSERT);
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
setOperationType(WriteOperationType.UPSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
@@ -203,7 +204,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.UPSERT_PREPPED);
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
setOperationType(WriteOperationType.UPSERT_PREPPED);
try {
return upsertRecordsInternal(preppedRecords, commitTime, table, true);
} catch (Throwable e) {
@@ -225,7 +227,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.INSERT);
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
setOperationType(WriteOperationType.INSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
@@ -252,7 +255,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.INSERT_PREPPED);
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
setOperationType(WriteOperationType.INSERT_PREPPED);
try {
return upsertRecordsInternal(preppedRecords, commitTime, table, false);
} catch (Throwable e) {
@@ -295,7 +299,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.BULK_INSERT);
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
setOperationType(WriteOperationType.BULK_INSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
@@ -328,7 +333,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.BULK_INSERT_PREPPED);
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
try {
return bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner);
} catch (Throwable e) {
@@ -341,14 +347,15 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be
* deduped and non existant keys will be removed before deleting.
* de-duped and non existent keys will be removed before deleting.
*
* @param keys {@link List} of {@link HoodieKey}s to be deleted
* @param commitTime Commit time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.DELETE);
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.DELETE);
setOperationType(WriteOperationType.DELETE);
try {
// De-dupe/merge if needed
JavaRDD<HoodieKey> dedupedKeys =
@@ -435,6 +442,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
metadata.addWriteStat(path.toString(), writeStat);
});
});
metadata.setOperationType(getOperationType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();

View File

@@ -331,7 +331,7 @@ public class HoodieCommitArchiveLog {
return archivedMetaWrapper;
}
private org.apache.hudi.avro.model.HoodieCommitMetadata commitMetadataConverter(
public org.apache.hudi.avro.model.HoodieCommitMetadata commitMetadataConverter(
HoodieCommitMetadata hoodieCommitMetadata) {
ObjectMapper mapper = new ObjectMapper();
// Need this to ignore other public get() methods

View File

@@ -20,7 +20,9 @@ package org.apache.hudi.io;
import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -413,4 +415,20 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
assertEquals("Loaded inflight clean actions and the count should match", expectedTotalInstants,
timeline.countInstants());
}
@Test
public void testCommitMetadataConverter() {
HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata();
hoodieCommitMetadata.setOperationType(WriteOperationType.INSERT);
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-commitMetadata-converter")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.commitMetadataConverter(hoodieCommitMetadata);
assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());
}
}

View File

@@ -133,6 +133,11 @@
"name":"version",
"type":["int", "null"],
"default": 1
},
{
"name":"operationType",
"type": ["null","string"],
"default":null
}
]
}

View File

@@ -49,6 +49,8 @@ public class HoodieCommitMetadata implements Serializable {
private Map<String, String> extraMetadata;
private WriteOperationType operationType = WriteOperationType.UNKNOWN;
// for ser/deser
public HoodieCommitMetadata() {
this(false);
@@ -106,6 +108,14 @@ public class HoodieCommitMetadata implements Serializable {
return filePaths;
}
public void setOperationType(WriteOperationType type) {
this.operationType = type;
}
public WriteOperationType getOperationType() {
return this.operationType;
}
public HashMap<String, String> getFileIdAndFullPaths(String basePath) {
HashMap<String, String> fullPaths = new HashMap<>();
for (Map.Entry<String, String> entry : getFileIdAndRelativePaths().entrySet()) {

View File

@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.model;
import org.apache.hudi.exception.HoodieException;
import java.util.Locale;
/**
* The supported write operation types, used by commitMetadata.
*/
public enum WriteOperationType {
// directly insert
INSERT("insert"),
INSERT_PREPPED("insert_prepped"),
// update and insert
UPSERT("upsert"),
UPSERT_PREPPED("upsert_prepped"),
// bulk insert
BULK_INSERT("bulk_insert"),
BULK_INSERT_PREPPED("bulk_insert_prepped"),
// delete
DELETE("delete"),
// used for old version
UNKNOWN("unknown");
private final String value;
WriteOperationType(String value) {
this.value = value;
}
/**
* Convert string value to WriteOperationType.
*/
public static WriteOperationType fromValue(String value) {
switch (value.toLowerCase(Locale.ROOT)) {
case "insert":
return INSERT;
case "insert_prepped":
return INSERT_PREPPED;
case "upsert":
return UPSERT;
case "upsert_prepped":
return UPSERT_PREPPED;
case "bulk_insert":
return BULK_INSERT;
case "bulk_insert_prepped":
return BULK_INSERT_PREPPED;
case "delete":
return DELETE;
default:
throw new HoodieException("Invalid value of Type.");
}
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.model;
import org.apache.hudi.common.util.FileIOUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -46,4 +47,25 @@ public class TestHoodieCommitMetadata {
Assert.assertEquals(0, (long) metadata.getTotalScanTime());
Assert.assertTrue(metadata.getTotalLogFilesCompacted() > 0);
}
@Test
public void testCompatibilityWithoutOperationType() throws Exception {
// test compatibility of old version file
String serializedCommitMetadata =
FileIOUtils.readAsUTFString(TestHoodieCommitMetadata.class.getResourceAsStream("/old-version.commit"));
HoodieCommitMetadata metadata =
HoodieCommitMetadata.fromJsonString(serializedCommitMetadata, HoodieCommitMetadata.class);
Assert.assertTrue(metadata.getOperationType() == WriteOperationType.UNKNOWN);
// test operate type
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
commitMetadata.setOperationType(WriteOperationType.INSERT);
Assert.assertTrue(commitMetadata.getOperationType() == WriteOperationType.INSERT);
// test serialized
serializedCommitMetadata = commitMetadata.toJsonString();
metadata =
HoodieCommitMetadata.fromJsonString(serializedCommitMetadata, HoodieCommitMetadata.class);
Assert.assertTrue(metadata.getOperationType() == WriteOperationType.INSERT);
}
}

View File

@@ -0,0 +1,20 @@
{
"partitionToWriteStats" : {
"americas/brazil/sao_paulo" : []
},
"compacted" : false,
"extraMetadataMap" : {
},
"extraMetadata" : {
},
"fileIdAndRelativePaths" : {
},
"totalRecordsDeleted" : 0,
"totalLogRecordsCompacted" : 0,
"totalScanTime" : 0,
"totalCreateTime" : 4895,
"totalUpsertTime" : 0,
"totalCompactedRecordsUpdated" : 0,
"totalLogFilesCompacted" : 0,
"totalLogFilesSize" : 0
}

View File

@@ -345,6 +345,7 @@
<exclude>**/*LICENSE*</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>**/test/resources/*.data</exclude>
<exclude>**/test/resources/*.commit</exclude>
<exclude>**/target/**</exclude>
<exclude>**/generated-sources/**</exclude>
</excludes>