1
0

Preparation for Avro update (#2650)

This commit is contained in:
Sebastian Bernauer
2021-03-31 06:50:17 +02:00
committed by GitHub
parent 8bc65b9318
commit aa0da72c59
8 changed files with 14 additions and 17 deletions

View File

@@ -34,8 +34,6 @@ import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieKeyLocationFetchHandle; import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import avro.shaded.com.google.common.collect.Lists;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@@ -135,8 +133,8 @@ public class FlinkHoodieSimpleIndex<T extends HoodieRecordPayload> extends Flink
context.map(latestBaseFiles, partitionPathBaseFile -> new HoodieKeyLocationFetchHandle<>(config, hoodieTable, partitionPathBaseFile), parallelism); context.map(latestBaseFiles, partitionPathBaseFile -> new HoodieKeyLocationFetchHandle<>(config, hoodieTable, partitionPathBaseFile), parallelism);
Map<HoodieKey, HoodieRecordLocation> recordLocations = new HashMap<>(); Map<HoodieKey, HoodieRecordLocation> recordLocations = new HashMap<>();
hoodieKeyLocationFetchHandles.stream() hoodieKeyLocationFetchHandles.stream()
.flatMap(handle -> Lists.newArrayList(handle.locations()).stream()) .flatMap(handle -> handle.locations())
.forEach(x -> x.forEach(y -> recordLocations.put(y.getKey(), y.getRight()))); .forEach(x -> recordLocations.put(x.getKey(), x.getRight()));
return recordLocations; return recordLocations;
} }
} }

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.index.hbase; package org.apache.hudi.index.hbase;
import avro.shaded.com.google.common.collect.Maps;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -65,6 +64,7 @@ import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.HashMap;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -469,7 +469,7 @@ public class TestHBaseIndex extends FunctionalTestHarness {
@Test @Test
public void testHbaseTagLocationForArchivedCommits() throws Exception { public void testHbaseTagLocationForArchivedCommits() throws Exception {
// Load to memory // Load to memory
Map<String, String> params = Maps.newHashMap(); Map<String, String> params = new HashMap<String, String>();
params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, "1"); params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, "1");
params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP, "3"); params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP, "3");
params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP, "2"); params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP, "2");

View File

@@ -30,7 +30,7 @@
}, },
{ {
"name":"strategy", "name":"strategy",
"type":["HoodieClusteringStrategy", "null"], "type":["null", "HoodieClusteringStrategy"],
"default": null "default": null
}, },
{ {

View File

@@ -28,22 +28,22 @@
{ {
"name":"userAction", "name":"userAction",
"type":[ "null", "string" ], "type":[ "null", "string" ],
"default": "null" "default": null
}, },
{ {
"name":"groupAction", "name":"groupAction",
"type":[ "null", "string" ], "type":[ "null", "string" ],
"default": "null" "default": null
}, },
{ {
"name":"otherAction", "name":"otherAction",
"type":[ "null", "string" ], "type":[ "null", "string" ],
"default": "null" "default": null
}, },
{ {
"name":"stickyBit", "name":"stickyBit",
"type":[ "null", "boolean" ], "type":[ "null", "boolean" ],
"default": "null" "default": null
} }
] ]
} }

View File

@@ -23,11 +23,11 @@
{ {
"name":"operationType", "name":"operationType",
"type":["null", "string"], "type":["null", "string"],
"default": "" "default": null
}, },
{ {
"name":"clusteringPlan", /* only set if operationType == clustering" */ "name":"clusteringPlan", /* only set if operationType == clustering" */
"type":["HoodieClusteringPlan", "null"], "type":["null", "HoodieClusteringPlan"],
"default": null "default": null
}, },
{ {

View File

@@ -38,7 +38,7 @@
/* overlaps with 'instantsToRollback' field. Adding this to track action type for all the instants being rolled back. */ /* overlaps with 'instantsToRollback' field. Adding this to track action type for all the instants being rolled back. */
{ {
"name": "restoreInstantInfo", "name": "restoreInstantInfo",
"default": null, "default": [],
"type": { "type": {
"type": "array", "type": "array",
"default": null, "default": null,

View File

@@ -330,7 +330,7 @@ public class TableSchemaResolver {
for (final Field newSchemaField : newSchema.getFields()) { for (final Field newSchemaField : newSchema.getFields()) {
final Field oldSchemaField = SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField); final Field oldSchemaField = SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField);
if (oldSchemaField == null) { if (oldSchemaField == null) {
if (newSchemaField.defaultValue() == null) { if (newSchemaField.defaultVal() == null) {
// C3: newly added field in newSchema does not have a default value // C3: newly added field in newSchema does not have a default value
return false; return false;
} }

View File

@@ -25,7 +25,6 @@ import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.codehaus.jackson.node.NullNode;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.ArrayList; import java.util.ArrayList;
@@ -184,7 +183,7 @@ public class TestHoodieAvroUtils {
Schema.Field evolvedField1 = new Schema.Field("key", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE); Schema.Field evolvedField1 = new Schema.Field("key", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
Schema.Field evolvedField2 = new Schema.Field("key1", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE); Schema.Field evolvedField2 = new Schema.Field("key1", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
Schema.Field evolvedField3 = new Schema.Field("key2", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE); Schema.Field evolvedField3 = new Schema.Field("key2", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
Schema.Field evolvedField4 = new Schema.Field("evolved_field", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); Schema.Field evolvedField4 = new Schema.Field("evolved_field", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
Schema.Field evolvedField5 = new Schema.Field("evolved_field1", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE); Schema.Field evolvedField5 = new Schema.Field("evolved_field1", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
evolvedFields.add(evolvedField1); evolvedFields.add(evolvedField1);
evolvedFields.add(evolvedField2); evolvedFields.add(evolvedField2);