diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index b55d3f864..ad56762ee 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRollingStat; import org.apache.hudi.common.model.HoodieRollingStatMetadata; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -71,6 +72,15 @@ public abstract class AbstractHoodieWriteClient e private final transient HoodieIndex index; private transient Timer.Context writeContext = null; + private transient WriteOperationType operationType; + + public void setOperationType(WriteOperationType operationType) { + this.operationType = operationType; + } + + public WriteOperationType getOperationType() { + return this.operationType; + } protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, HoodieWriteConfig clientConfig) { super(jsc, clientConfig); @@ -149,6 +159,7 @@ public abstract class AbstractHoodieWriteClient e extraMetadata.get().forEach(metadata::addMetadata); } metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema()); + metadata.setOperationType(operationType); try { activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime), @@ -255,9 +266,9 @@ public abstract class AbstractHoodieWriteClient e return index; } - protected HoodieTable getTableAndInitCtx(OperationType operationType) { + protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) { HoodieTableMetaClient metaClient = createMetaClient(true); - if (operationType == OperationType.DELETE) { + if (operationType == WriteOperationType.DELETE) { setWriteSchemaFromLastInstant(metaClient); } // Create a Hoodie table which encapsulated the commits and files visible diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 40be5444d..6423d31a0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView; @@ -98,7 +99,6 @@ public class HoodieWriteClient extends AbstractHo private final transient HoodieCleanClient cleanClient; private transient Timer.Context compactionTimer; - /** * Create a write client, without cleaning up failed/inflight commits. * @@ -174,7 +174,8 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD upsert(JavaRDD> records, final String commitTime) { - HoodieTable table = getTableAndInitCtx(OperationType.UPSERT); + HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT); + setOperationType(WriteOperationType.UPSERT); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = @@ -203,7 +204,8 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, final String commitTime) { - HoodieTable table = getTableAndInitCtx(OperationType.UPSERT_PREPPED); + HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED); + setOperationType(WriteOperationType.UPSERT_PREPPED); try { return upsertRecordsInternal(preppedRecords, commitTime, table, true); } catch (Throwable e) { @@ -225,7 +227,8 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insert(JavaRDD> records, final String commitTime) { - HoodieTable table = getTableAndInitCtx(OperationType.INSERT); + HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT); + setOperationType(WriteOperationType.INSERT); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = @@ -252,7 +255,8 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, final String commitTime) { - HoodieTable table = getTableAndInitCtx(OperationType.INSERT_PREPPED); + HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED); + setOperationType(WriteOperationType.INSERT_PREPPED); try { return upsertRecordsInternal(preppedRecords, commitTime, table, false); } catch (Throwable e) { @@ -295,7 +299,8 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD bulkInsert(JavaRDD> records, final String commitTime, Option bulkInsertPartitioner) { - HoodieTable table = getTableAndInitCtx(OperationType.BULK_INSERT); + HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT); + setOperationType(WriteOperationType.BULK_INSERT); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = @@ -328,7 +333,8 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, final String commitTime, Option bulkInsertPartitioner) { - HoodieTable table = getTableAndInitCtx(OperationType.BULK_INSERT_PREPPED); + HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED); + setOperationType(WriteOperationType.BULK_INSERT_PREPPED); try { return bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner); } catch (Throwable e) { @@ -341,14 +347,15 @@ public class HoodieWriteClient extends AbstractHo /** * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be - * deduped and non existant keys will be removed before deleting. + * de-duped and non existent keys will be removed before deleting. * * @param keys {@link List} of {@link HoodieKey}s to be deleted * @param commitTime Commit time handle * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD delete(JavaRDD keys, final String commitTime) { - HoodieTable table = getTableAndInitCtx(OperationType.DELETE); + HoodieTable table = getTableAndInitCtx(WriteOperationType.DELETE); + setOperationType(WriteOperationType.DELETE); try { // De-dupe/merge if needed JavaRDD dedupedKeys = @@ -435,6 +442,7 @@ public class HoodieWriteClient extends AbstractHo metadata.addWriteStat(path.toString(), writeStat); }); }); + metadata.setOperationType(getOperationType()); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); String commitActionType = table.getMetaClient().getCommitActionType(); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java index fcc9673c5..e99c1e797 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java @@ -331,7 +331,7 @@ public class HoodieCommitArchiveLog { return archivedMetaWrapper; } - private org.apache.hudi.avro.model.HoodieCommitMetadata commitMetadataConverter( + public org.apache.hudi.avro.model.HoodieCommitMetadata commitMetadataConverter( HoodieCommitMetadata hoodieCommitMetadata) { ObjectMapper mapper = new ObjectMapper(); // Need this to ignore other public get() methods diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index 0972385c1..8d00d3820 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -20,7 +20,9 @@ package org.apache.hudi.io; import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTestUtils; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -413,4 +415,20 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { assertEquals("Loaded inflight clean actions and the count should match", expectedTotalInstants, timeline.countInstants()); } + + @Test + public void testCommitMetadataConverter() { + HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata(); + hoodieCommitMetadata.setOperationType(WriteOperationType.INSERT); + + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-commitMetadata-converter") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + + org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.commitMetadataConverter(hoodieCommitMetadata); + assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString()); + } } diff --git a/hudi-common/src/main/avro/HoodieCommitMetadata.avsc b/hudi-common/src/main/avro/HoodieCommitMetadata.avsc index bdd2aca11..b7e736945 100644 --- a/hudi-common/src/main/avro/HoodieCommitMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieCommitMetadata.avsc @@ -133,6 +133,11 @@ "name":"version", "type":["int", "null"], "default": 1 + }, + { + "name":"operationType", + "type": ["null","string"], + "default":null } ] } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index 3097052b7..d40a77014 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -49,6 +49,8 @@ public class HoodieCommitMetadata implements Serializable { private Map extraMetadata; + private WriteOperationType operationType = WriteOperationType.UNKNOWN; + // for ser/deser public HoodieCommitMetadata() { this(false); @@ -106,6 +108,14 @@ public class HoodieCommitMetadata implements Serializable { return filePaths; } + public void setOperationType(WriteOperationType type) { + this.operationType = type; + } + + public WriteOperationType getOperationType() { + return this.operationType; + } + public HashMap getFileIdAndFullPaths(String basePath) { HashMap fullPaths = new HashMap<>(); for (Map.Entry entry : getFileIdAndRelativePaths().entrySet()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java new file mode 100644 index 000000000..868af2080 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.exception.HoodieException; + +import java.util.Locale; + +/** + * The supported write operation types, used by commitMetadata. + */ +public enum WriteOperationType { + // directly insert + INSERT("insert"), + INSERT_PREPPED("insert_prepped"), + // update and insert + UPSERT("upsert"), + UPSERT_PREPPED("upsert_prepped"), + // bulk insert + BULK_INSERT("bulk_insert"), + BULK_INSERT_PREPPED("bulk_insert_prepped"), + // delete + DELETE("delete"), + // used for old version + UNKNOWN("unknown"); + + private final String value; + + WriteOperationType(String value) { + this.value = value; + } + + /** + * Convert string value to WriteOperationType. + */ + public static WriteOperationType fromValue(String value) { + switch (value.toLowerCase(Locale.ROOT)) { + case "insert": + return INSERT; + case "insert_prepped": + return INSERT_PREPPED; + case "upsert": + return UPSERT; + case "upsert_prepped": + return UPSERT_PREPPED; + case "bulk_insert": + return BULK_INSERT; + case "bulk_insert_prepped": + return BULK_INSERT_PREPPED; + case "delete": + return DELETE; + default: + throw new HoodieException("Invalid value of Type."); + } + } +} \ No newline at end of file diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java index 8cfaf55ac..e6b395b81 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.model; +import org.apache.hudi.common.util.FileIOUtils; import org.junit.Assert; import org.junit.Test; @@ -46,4 +47,25 @@ public class TestHoodieCommitMetadata { Assert.assertEquals(0, (long) metadata.getTotalScanTime()); Assert.assertTrue(metadata.getTotalLogFilesCompacted() > 0); } + + @Test + public void testCompatibilityWithoutOperationType() throws Exception { + // test compatibility of old version file + String serializedCommitMetadata = + FileIOUtils.readAsUTFString(TestHoodieCommitMetadata.class.getResourceAsStream("/old-version.commit")); + HoodieCommitMetadata metadata = + HoodieCommitMetadata.fromJsonString(serializedCommitMetadata, HoodieCommitMetadata.class); + Assert.assertTrue(metadata.getOperationType() == WriteOperationType.UNKNOWN); + + // test operate type + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.setOperationType(WriteOperationType.INSERT); + Assert.assertTrue(commitMetadata.getOperationType() == WriteOperationType.INSERT); + + // test serialized + serializedCommitMetadata = commitMetadata.toJsonString(); + metadata = + HoodieCommitMetadata.fromJsonString(serializedCommitMetadata, HoodieCommitMetadata.class); + Assert.assertTrue(metadata.getOperationType() == WriteOperationType.INSERT); + } } diff --git a/hudi-common/src/test/resources/old-version.commit b/hudi-common/src/test/resources/old-version.commit new file mode 100644 index 000000000..c5e263506 --- /dev/null +++ b/hudi-common/src/test/resources/old-version.commit @@ -0,0 +1,20 @@ +{ + "partitionToWriteStats" : { + "americas/brazil/sao_paulo" : [] + }, + "compacted" : false, + "extraMetadataMap" : { + }, + "extraMetadata" : { + }, + "fileIdAndRelativePaths" : { + }, + "totalRecordsDeleted" : 0, + "totalLogRecordsCompacted" : 0, + "totalScanTime" : 0, + "totalCreateTime" : 4895, + "totalUpsertTime" : 0, + "totalCompactedRecordsUpdated" : 0, + "totalLogFilesCompacted" : 0, + "totalLogFilesSize" : 0 +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 104de8814..69f566ac8 100644 --- a/pom.xml +++ b/pom.xml @@ -345,6 +345,7 @@ **/*LICENSE* **/dependency-reduced-pom.xml **/test/resources/*.data + **/test/resources/*.commit **/target/** **/generated-sources/**