diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java index d680b1268..420c6cce7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java @@ -28,23 +28,30 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DecoderFactory; +import org.apache.avro.util.Utf8; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.nio.file.FileSystem; import java.nio.file.FileSystemNotFoundException; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -196,4 +203,123 @@ public final class SchemaTestUtil { public static Schema getSchemaFromResource(Class clazz, String name) { return getSchemaFromResource(clazz, name, false); } + + public static List generateTestRecordsForSchema(Schema schema) { + RandomData generator = new RandomData(schema, 1000); + List records = new ArrayList<>(); + for (Object o : generator) { + IndexedRecord record = (IndexedRecord) o; + records.add(record); + } + return records; + } + + //Taken from test pkg 1.8.2 avro. This is available as a util class in latest versions. When we upgrade avro we can remove this + static class RandomData implements Iterable { + private final Schema root; + private final long seed; + private final int count; + + public RandomData(Schema schema, int count) { + this(schema, count, System.currentTimeMillis()); + } + + public RandomData(Schema schema, int count, long seed) { + this.root = schema; + this.seed = seed; + this.count = count; + } + + @SuppressWarnings(value = "unchecked") + private static Object generate(Schema schema, Random random, int d) { + switch (schema.getType()) { + case RECORD: + GenericRecord record = new GenericData.Record(schema); + for (Schema.Field field : schema.getFields()) { + record.put(field.name(), generate(field.schema(), random, d + 1)); + } + return record; + case ENUM: + List symbols = schema.getEnumSymbols(); + return new GenericData.EnumSymbol(schema, symbols.get(random.nextInt(symbols.size()))); + case ARRAY: + int length = (random.nextInt(5) + 2) - d; + GenericArray array = + new GenericData.Array(length <= 0 ? 0 : length, schema); + for (int i = 0; i < length; i++) { + array.add(generate(schema.getElementType(), random, d + 1)); + } + return array; + case MAP: + length = (random.nextInt(5) + 2) - d; + Map map = new HashMap(length <= 0 ? 0 : length); + for (int i = 0; i < length; i++) { + map.put(randomUtf8(random, 40), + generate(schema.getValueType(), random, d + 1)); + } + return map; + case UNION: + List types = schema.getTypes(); + //Dropping the null at the end. + return generate(types.get(random.nextInt(types.size() - 1)), random, d); + case FIXED: + byte[] bytes = new byte[schema.getFixedSize()]; + random.nextBytes(bytes); + return new GenericData.Fixed(schema, bytes); + case STRING: + return randomUtf8(random, 40); + case BYTES: + return randomBytes(random, 40); + case INT: + return random.nextInt(); + case LONG: + return random.nextLong(); + case FLOAT: + return random.nextFloat(); + case DOUBLE: + return random.nextDouble(); + case BOOLEAN: + return random.nextBoolean(); + case NULL: + return null; + default: + throw new RuntimeException("Unknown type: " + schema); + } + } + + private static Utf8 randomUtf8(Random rand, int maxLength) { + Utf8 utf8 = new Utf8().setLength(rand.nextInt(maxLength)); + for (int i = 0; i < utf8.getLength(); i++) { + utf8.getBytes()[i] = (byte) ('a' + rand.nextInt('z' - 'a')); + } + return utf8; + } + + private static ByteBuffer randomBytes(Random rand, int maxLength) { + ByteBuffer bytes = ByteBuffer.allocate(rand.nextInt(maxLength)); + bytes.limit(bytes.capacity()); + rand.nextBytes(bytes.array()); + return bytes; + } + + public Iterator iterator() { + return new Iterator() { + private int n; + private Random random = new Random(seed); + + public boolean hasNext() { + return n < count; + } + + public Object next() { + n++; + return generate(root, random, 0); + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } } diff --git a/hudi-common/src/test/resources/complex.schema.avsc b/hudi-common/src/test/resources/complex.schema.avsc new file mode 100644 index 000000000..1672415bd --- /dev/null +++ b/hudi-common/src/test/resources/complex.schema.avsc @@ -0,0 +1,1882 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +{ + "type": "record", + "name": "field_20", + "namespace": "hoodie.complex", + "fields": [ + { + "name": "field_24", + "type": [ + "string", + "null" + ], + "default": "null" + }, + { + "name": "field_31", + "type": [ + { + "type": "record", + "name": "field_35", + "namespace": "hoodie.complex.complex_record", + "fields": [ + { + "name": "field_39", + "type": [ + { + "type": "record", + "name": "field_43", + "namespace": "hoodie.complex.complex_record.metadata", + "fields": [ + { + "name": "field_47", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_54", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_61", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_68", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_75", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_82", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_89", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_96", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_103", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_110", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_117", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_124", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_131", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_138", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_145", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_152", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_165", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_172", + "type": [ + "long", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_184", + "type": [ + { + "type": "record", + "name": "field_188", + "namespace": "hoodie.complex.complex_record.metadata", + "fields": [ + { + "name": "field_192", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_199", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_206", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_213", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_220", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_227", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_234", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_241", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_248", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_255", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_262", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_269", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_276", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_283", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_290", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_297", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_310", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_317", + "type": [ + "long", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_329", + "type": [ + { + "type": "record", + "name": "field_333", + "namespace": "hoodie.complex.complex_record.metadata", + "fields": [ + { + "name": "field_337", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_344", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_351", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_358", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_365", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_372", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_379", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_386", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_393", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_400", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_407", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_414", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_421", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_428", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_435", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_442", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_455", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_462", + "type": [ + "long", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_474", + "type": [ + { + "type": "record", + "name": "field_478", + "namespace": "hoodie.complex.complex_record.metadata", + "fields": [ + { + "name": "field_482", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_489", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_496", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_503", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_510", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_517", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_524", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_531", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_538", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_545", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_552", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_559", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_566", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_573", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_580", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_587", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_600", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_607", + "type": [ + "long", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_619", + "type": [ + { + "type": "record", + "name": "field_623", + "namespace": "hoodie.complex.complex_record.metadata", + "fields": [ + { + "name": "field_627", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_634", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_641", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_648", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_655", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_662", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_669", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_676", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_683", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_690", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_697", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_704", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_711", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_718", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_725", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_732", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_745", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_752", + "type": [ + "long", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_764", + "type": [ + { + "type": "record", + "name": "field_768", + "namespace": "hoodie.complex.complex_record.metadata", + "fields": [ + { + "name": "field_772", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_779", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_786", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_793", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_800", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_807", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_814", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_821", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_828", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_835", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_842", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_849", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_856", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_863", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_870", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_877", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_890", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_897", + "type": [ + "long", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_909", + "type": [ + { + "type": "record", + "name": "field_913", + "namespace": "hoodie.complex.complex_record.metadata", + "fields": [ + { + "name": "field_917", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_924", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_931", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_938", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_945", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_952", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_959", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_966", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_973", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_980", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_987", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_994", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1001", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_1008", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1015", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1022", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_1035", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_1042", + "type": [ + "long", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_1054", + "type": [ + { + "type": "record", + "name": "field_1058", + "namespace": "hoodie.complex.complex_record.metadata", + "fields": [ + { + "name": "field_1062", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1069", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1076", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1083", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1090", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1097", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_1104", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1111", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1118", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1125", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1132", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1139", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1146", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_1153", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1160", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1167", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_1180", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_1187", + "type": [ + "long", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_1199", + "type": [ + { + "type": "record", + "name": "field_1203", + "namespace": "hoodie.complex.complex_record.metadata", + "fields": [ + { + "name": "field_1207", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1214", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1221", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1228", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1235", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1242", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_1249", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1256", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1263", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1270", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1277", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1284", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1291", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_1298", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1305", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1312", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_1325", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_1332", + "type": [ + "long", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_1344", + "type": [ + { + "type": "record", + "name": "field_1348", + "namespace": "hoodie.complex.complex_record.metadata", + "fields": [ + { + "name": "field_1352", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1359", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1366", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1373", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1380", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1387", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_1394", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1401", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1408", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1415", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1422", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1429", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1436", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_1443", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1450", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1457", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_1470", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_1477", + "type": [ + "long", + "null" + ] + } + ] + }, + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_1494", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1501", + "type": [ + "double", + "null" + ] + }, + { + "name": "field_1508", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1515", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_1522", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1529", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1536", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1543", + "type": [ + { + "type": "record", + "name": "field_1547", + "namespace": "hoodie.complex.complex_record", + "fields": [ + { + "name": "field_1551", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1558", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1565", + "type": [ + "long", + "null" + ] + }, + { + "name": "field_1572", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1579", + "type": [ + "string", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_1591", + "type": [ + { + "type": "record", + "name": "field_1595", + "namespace": "hoodie.complex.complex_record", + "fields": [ + { + "name": "field_1599", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "field_1606", + "type": [ + { + "type": "record", + "name": "field_1610", + "namespace": "hoodie.complex.complex_record.FLAGS", + "fields": [ + { + "name": "field_1614", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1621", + "type": [ + "string", + "null" + ] + } + ] + }, + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_1638", + "type": [ + { + "type": "record", + "name": "field_1642", + "namespace": "hoodie.complex.complex_record", + "fields": [ + { + "name": "field_1646", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1653", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_1666", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1673", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1680", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1687", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1694", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1701", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1708", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1715", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1722", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1729", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1736", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1743", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1750", + "type": [ + "string", + "null" + ] + } + ] + }, + "null" + ] + }, + { + "name": "field_1762", + "type": [ + { + "type": "record", + "name": "field_1766", + "namespace": "hoodie.complex.complex_record", + "fields": [ + { + "name": "field_1770", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1777", + "type": [ + { + "type": "array", + "items": [ + "string", + "null" + ] + }, + "null" + ] + }, + { + "name": "field_1790", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1797", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1804", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1811", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1818", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1825", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1832", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1839", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1846", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1853", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1860", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_1867", + "type": [ + "string", + "null" + ] + } + ] + }, + "null" + ] + } + ] +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 47f23864c..560563cb7 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -70,9 +70,13 @@ public class HiveSyncConfig implements Serializable { + "org.apache.hudi input format.") public Boolean usePreApacheInputFormat = false; + @Deprecated @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url") public Boolean useJdbc = true; + @Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql") + public String syncMode; + @Parameter(names = {"--auto-create-database"}, description = "Auto create hive database") public Boolean autoCreateDatabase = true; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 26a213b40..71f0b994e 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -26,17 +26,18 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hive.util.ConfigUtils; +import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils; + import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; -import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.sync.common.AbstractSyncTool; import com.beust.jcommander.JCommander; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.GroupType; @@ -149,7 +150,7 @@ public class HiveSyncTool extends AbstractSyncTool { // check if the database exists else create it if (cfg.autoCreateDatabase) { try { - hoodieHiveClient.updateHiveSQL("create database if not exists " + cfg.databaseName); + hoodieHiveClient.createDatabase(cfg.databaseName); } catch (Exception e) { // this is harmless since table creation will fail anyways, creation of DB is needed for in-memory testing LOG.warn("Unable to create database", e); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index ad219e4f9..66a41d6f5 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -19,12 +19,13 @@ package org.apache.hudi.hive; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.PartitionPathEncodeUtils; -import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.hive.ddl.DDLExecutor; +import org.apache.hudi.hive.ddl.HMSDDLExecutor; +import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor; +import org.apache.hudi.hive.ddl.JDBCExecutor; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.sync.common.AbstractSyncHoodieClient; @@ -33,33 +34,19 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; import org.apache.thrift.TException; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP; @@ -70,48 +57,37 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class); private final PartitionValueExtractor partitionValueExtractor; + private final HoodieTimeline activeTimeline; + DDLExecutor ddlExecutor; private IMetaStoreClient client; - private SessionState sessionState; - private Driver hiveDriver; - private HiveSyncConfig syncConfig; - private FileSystem fs; - private Connection connection; - private HoodieTimeline activeTimeline; - private HiveConf configuration; + private final HiveSyncConfig syncConfig; public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, fs); this.syncConfig = cfg; - this.fs = fs; - this.configuration = configuration; - // Support both JDBC and metastore based implementations for backwards compatiblity. Future users should + // Support JDBC, HiveQL and metastore based implementations for backwards compatiblity. Future users should // disable jdbc and depend on metastore client for all hive registrations - if (cfg.useJdbc) { - LOG.info("Creating hive connection " + cfg.jdbcUrl); - createHiveConnection(); - } try { - HoodieTimer timer = new HoodieTimer().startTimer(); - this.sessionState = new SessionState(configuration, - UserGroupInformation.getCurrentUser().getShortUserName()); - SessionState.start(this.sessionState); - this.sessionState.setCurrentDatabase(syncConfig.databaseName); - hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration); - this.client = Hive.get(configuration).getMSC(); - LOG.info(String.format("Time taken to start SessionState and create Driver and client: " - + "%s ms", (timer.endTimer()))); - } catch (Exception e) { - if (this.sessionState != null) { - try { - this.sessionState.close(); - } catch (IOException ioException) { - LOG.error("Error while closing SessionState", ioException); + if (!StringUtils.isNullOrEmpty(cfg.syncMode)) { + switch (cfg.syncMode.toLowerCase()) { + case "hms": + ddlExecutor = new HMSDDLExecutor(configuration, cfg, fs); + break; + case "hiveql": + ddlExecutor = new HiveQueryDDLExecutor(cfg, fs, configuration); + break; + case "jdbc": + ddlExecutor = new JDBCExecutor(cfg, fs); + break; + default: + throw new HoodieHiveSyncException("Invalid sync mode given " + cfg.syncMode); } + } else { + ddlExecutor = cfg.useJdbc ? new JDBCExecutor(cfg, fs) : new HiveQueryDDLExecutor(cfg, fs, configuration); } - if (this.hiveDriver != null) { - this.hiveDriver.close(); - } + this.client = Hive.get(configuration).getMSC(); + } catch (Exception e) { throw new HoodieHiveSyncException("Failed to create HiveMetaStoreClient", e); } @@ -135,13 +111,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { */ @Override public void addPartitionsToTable(String tableName, List partitionsToAdd) { - if (partitionsToAdd.isEmpty()) { - LOG.info("No partitions to add for " + tableName); - return; - } - LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName); - List sqls = constructAddPartitions(tableName, partitionsToAdd); - sqls.stream().forEach(sql -> updateHiveSQL(sql)); + ddlExecutor.addPartitionsToTable(tableName, partitionsToAdd); } /** @@ -149,15 +119,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { */ @Override public void updatePartitionsToTable(String tableName, List changedPartitions) { - if (changedPartitions.isEmpty()) { - LOG.info("No partitions to change for " + tableName); - return; - } - LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName); - List sqls = constructChangePartitions(tableName, changedPartitions); - for (String sql : sqls) { - updateHiveSQL(sql); - } + ddlExecutor.updatePartitionsToTable(tableName, changedPartitions); } /** @@ -170,7 +132,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { } try { Table table = client.getTable(syncConfig.databaseName, tableName); - for (Map.Entry entry: tableProperties.entrySet()) { + for (Map.Entry entry : tableProperties.entrySet()) { table.putToParameters(entry.getKey(), entry.getValue()); } client.alter_table(syncConfig.databaseName, tableName, table); @@ -180,81 +142,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { } } - private StringBuilder getAlterTablePrefix(String tableName) { - StringBuilder alterSQL = new StringBuilder("ALTER TABLE "); - alterSQL.append(HIVE_ESCAPE_CHARACTER).append(syncConfig.databaseName) - .append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER) - .append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS "); - return alterSQL; - } - - private List constructAddPartitions(String tableName, List partitions) { - if (syncConfig.batchSyncNum <= 0) { - throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter"); - } - List result = new ArrayList<>(); - int batchSyncPartitionNum = syncConfig.batchSyncNum; - StringBuilder alterSQL = getAlterTablePrefix(tableName); - for (int i = 0; i < partitions.size(); i++) { - String partitionClause = getPartitionClause(partitions.get(i)); - String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partitions.get(i)).toString(); - alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath) - .append("' "); - if ((i + 1) % batchSyncPartitionNum == 0) { - result.add(alterSQL.toString()); - alterSQL = getAlterTablePrefix(tableName); - } - } - // add left partitions to result - if (partitions.size() % batchSyncPartitionNum != 0) { - result.add(alterSQL.toString()); - } - return result; - } - - /** - * Generate Hive Partition from partition values. - * - * @param partition Partition path - * @return - */ - private String getPartitionClause(String partition) { - List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); - ValidationUtils.checkArgument(syncConfig.partitionFields.size() == partitionValues.size(), - "Partition key parts " + syncConfig.partitionFields + " does not match with partition values " + partitionValues - + ". Check partition strategy. "); - List partBuilder = new ArrayList<>(); - for (int i = 0; i < syncConfig.partitionFields.size(); i++) { - String partitionValue = partitionValues.get(i); - // decode the partition before sync to hive to prevent multiple escapes of HIVE - if (syncConfig.decodePartition) { - // This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath - partitionValue = PartitionPathEncodeUtils.unescapePathName(partitionValue); - } - partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValue + "'"); - } - return String.join(",", partBuilder); - } - - private List constructChangePartitions(String tableName, List partitions) { - List changePartitions = new ArrayList<>(); - // Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first - String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + syncConfig.databaseName + HIVE_ESCAPE_CHARACTER; - changePartitions.add(useDatabase); - String alterTable = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER; - for (String partition : partitions) { - String partitionClause = getPartitionClause(partition); - Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition); - String partitionScheme = partitionPath.toUri().getScheme(); - String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) - ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString(); - String changePartition = - alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'"; - changePartitions.add(changePartition); - } - return changePartitions; - } - /** * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated. * Generate a list of PartitionEvent based on the changes required. @@ -294,35 +181,14 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { } void updateTableDefinition(String tableName, MessageType newSchema) { - try { - String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields, syncConfig.supportTimestamp); - // Cascade clause should not be present for non-partitioned tables - String cascadeClause = syncConfig.partitionFields.size() > 0 ? " cascade" : ""; - StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER) - .append(syncConfig.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".") - .append(HIVE_ESCAPE_CHARACTER).append(tableName) - .append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(") - .append(newSchemaStr).append(" )").append(cascadeClause); - LOG.info("Updating table definition with " + sqlBuilder); - updateHiveSQL(sqlBuilder.toString()); - } catch (IOException e) { - throw new HoodieHiveSyncException("Failed to update table for " + tableName, e); - } + ddlExecutor.updateTableDefinition(tableName, newSchema); } @Override public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map serdeProperties, Map tableProperties) { - try { - String createSQLQuery = - HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig, inputFormatClass, - outputFormatClass, serdeClass, serdeProperties, tableProperties); - LOG.info("Creating table with " + createSQLQuery); - updateHiveSQL(createSQLQuery); - } catch (IOException e) { - throw new HoodieHiveSyncException("Failed to create table " + tableName, e); - } + ddlExecutor.createTable(tableName, storageSchema, inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties); } /** @@ -330,51 +196,11 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { */ @Override public Map getTableSchema(String tableName) { - if (syncConfig.useJdbc) { - if (!doesTableExist(tableName)) { - throw new IllegalArgumentException( - "Failed to get schema for table " + tableName + " does not exist"); - } - Map schema = new HashMap<>(); - ResultSet result = null; - try { - DatabaseMetaData databaseMetaData = connection.getMetaData(); - result = databaseMetaData.getColumns(null, syncConfig.databaseName, tableName, null); - while (result.next()) { - TYPE_CONVERTOR.doConvert(result, schema); - } - return schema; - } catch (SQLException e) { - throw new HoodieHiveSyncException("Failed to get table schema for " + tableName, e); - } finally { - closeQuietly(result, null); - } - } else { - return getTableSchemaUsingMetastoreClient(tableName); - } - } - - public Map getTableSchemaUsingMetastoreClient(String tableName) { - try { - // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to - // get the Schema of the table. - final long start = System.currentTimeMillis(); - Table table = this.client.getTable(syncConfig.databaseName, tableName); - Map partitionKeysMap = - table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); - - Map columnsMap = - table.getSd().getCols().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); - - Map schema = new HashMap<>(); - schema.putAll(columnsMap); - schema.putAll(partitionKeysMap); - final long end = System.currentTimeMillis(); - LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start))); - return schema; - } catch (Exception e) { - throw new HoodieHiveSyncException("Failed to get table schema for : " + tableName, e); + if (!doesTableExist(tableName)) { + throw new IllegalArgumentException( + "Failed to get schema for table " + tableName + " does not exist"); } + return ddlExecutor.getTableSchema(tableName); } /** @@ -391,7 +217,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { /** * @param databaseName - * @return true if the configured database exists + * @return true if the configured database exists */ public boolean doesDataBaseExist(String databaseName) { try { @@ -405,97 +231,8 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { return false; } - /** - * Execute a update in hive metastore with this SQL. - * - * @param s SQL to execute - */ - public void updateHiveSQL(String s) { - if (syncConfig.useJdbc) { - Statement stmt = null; - try { - stmt = connection.createStatement(); - LOG.info("Executing SQL " + s); - stmt.execute(s); - } catch (SQLException e) { - throw new HoodieHiveSyncException("Failed in executing SQL " + s, e); - } finally { - closeQuietly(null, stmt); - } - } else { - CommandProcessorResponse response = updateHiveSQLUsingHiveDriver(s); - if (response == null) { - throw new HoodieHiveSyncException("Failed in executing SQL null response" + s); - } - if (response.getResponseCode() != 0) { - LOG.error(String.format("Failure in SQL response %s", response.toString())); - if (response.getException() != null) { - throw new HoodieHiveSyncException( - String.format("Failed in executing SQL %s", s), response.getException()); - } else { - throw new HoodieHiveSyncException(String.format("Failed in executing SQL %s", s)); - } - } - } - } - - /** - * Execute a update in hive using Hive Driver. - * - * @param sql SQL statement to execute - */ - public CommandProcessorResponse updateHiveSQLUsingHiveDriver(String sql) { - List responses = updateHiveSQLs(Collections.singletonList(sql)); - return responses.get(responses.size() - 1); - } - - private List updateHiveSQLs(List sqls) { - List responses = new ArrayList<>(); - try { - for (String sql : sqls) { - if (hiveDriver != null) { - final long start = System.currentTimeMillis(); - responses.add(hiveDriver.run(sql)); - final long end = System.currentTimeMillis(); - LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start))); - } - } - } catch (Exception e) { - throw new HoodieHiveSyncException("Failed in executing SQL", e); - } - return responses; - } - - private void createHiveConnection() { - if (connection == null) { - try { - Class.forName("org.apache.hive.jdbc.HiveDriver"); - } catch (ClassNotFoundException e) { - LOG.error("Unable to load Hive driver class", e); - return; - } - - try { - this.connection = DriverManager.getConnection(syncConfig.jdbcUrl, syncConfig.hiveUser, syncConfig.hivePass); - LOG.info("Successfully established Hive connection to " + syncConfig.jdbcUrl); - } catch (SQLException e) { - throw new HoodieHiveSyncException("Cannot create hive connection " + getHiveJdbcUrlWithDefaultDBName(), e); - } - } - } - - private String getHiveJdbcUrlWithDefaultDBName() { - String hiveJdbcUrl = syncConfig.jdbcUrl; - String urlAppend = null; - // If the hive url contains addition properties like ;transportMode=http;httpPath=hs2 - if (hiveJdbcUrl.contains(";")) { - urlAppend = hiveJdbcUrl.substring(hiveJdbcUrl.indexOf(";")); - hiveJdbcUrl = hiveJdbcUrl.substring(0, hiveJdbcUrl.indexOf(";")); - } - if (!hiveJdbcUrl.endsWith("/")) { - hiveJdbcUrl = hiveJdbcUrl + "/"; - } - return hiveJdbcUrl + (urlAppend == null ? "" : urlAppend); + public void createDatabase(String databaseName) { + ddlExecutor.createDatabase(databaseName); } @Override @@ -556,14 +293,12 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { public void close() { try { - if (connection != null) { - connection.close(); - } + ddlExecutor.close(); if (client != null) { client.close(); client = null; } - } catch (SQLException e) { + } catch (Exception e) { LOG.error("Could not close connection ", e); } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java new file mode 100644 index 000000000..0e1e223aa --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.ddl; + +import org.apache.parquet.schema.MessageType; + +import java.util.List; +import java.util.Map; + +/** + * DDLExceutor is the interface which defines the ddl functions for Hive. + * There are two main implementations one is QueryBased other is based on HiveMetaStore + * QueryBasedDDLExecutor also has two impls namely HiveQL based and other JDBC based. + */ +public interface DDLExecutor { + /** + * @param databaseName name of database to be created. + */ + public void createDatabase(String databaseName); + + /** + * Creates a table with the following properties. + * + * @param tableName + * @param storageSchema + * @param inputFormatClass + * @param outputFormatClass + * @param serdeClass + * @param serdeProperties + * @param tableProperties + */ + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, + String outputFormatClass, String serdeClass, + Map serdeProperties, Map tableProperties); + + /** + * Updates the table with the newSchema. + * + * @param tableName + * @param newSchema + */ + public void updateTableDefinition(String tableName, MessageType newSchema); + + /** + * Fetches tableSchema for a table. + * + * @param tableName + * @return + */ + public Map getTableSchema(String tableName); + + /** + * Adds partition to table. + * + * @param tableName + * @param partitionsToAdd + */ + public void addPartitionsToTable(String tableName, List partitionsToAdd); + + /** + * Updates partitions for a given table. + * + * @param tableName + * @param changedPartitions + */ + public void updatePartitionsToTable(String tableName, List changedPartitions); + + public void close(); +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java new file mode 100644 index 000000000..b31c62ec5 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.ddl; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.StorageSchemes; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.PartitionValueExtractor; +import org.apache.hudi.hive.util.HiveSchemaUtil; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; +import org.apache.thrift.TException; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * DDLExecutor impl based on HMS which use HMS apis directly for all DDL tasks. + */ +public class HMSDDLExecutor implements DDLExecutor { + private static final Logger LOG = LogManager.getLogger(HMSDDLExecutor.class); + private final HiveSyncConfig syncConfig; + private final PartitionValueExtractor partitionValueExtractor; + private final FileSystem fs; + private final IMetaStoreClient client; + + public HMSDDLExecutor(HiveConf conf, HiveSyncConfig syncConfig, FileSystem fs) throws HiveException, MetaException { + this.client = Hive.get(conf).getMSC(); + this.syncConfig = syncConfig; + this.fs = fs; + try { + this.partitionValueExtractor = + (PartitionValueExtractor) Class.forName(syncConfig.partitionValueExtractorClass).newInstance(); + } catch (Exception e) { + throw new HoodieHiveSyncException( + "Failed to initialize PartitionValueExtractor class " + syncConfig.partitionValueExtractorClass, e); + } + } + + @Override + public void createDatabase(String databaseName) { + try { + Database database = new Database(databaseName, "automatically created by hoodie", null, null); + client.createDatabase(database); + } catch (Exception e) { + LOG.error("Failed to create database " + databaseName, e); + throw new HoodieHiveSyncException("Failed to create database " + databaseName, e); + } + } + + @Override + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map serdeProperties, + Map tableProperties) { + try { + LinkedHashMap mapSchema = HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false); + + List fieldSchema = HiveSchemaUtil.convertMapSchemaToHiveFieldSchema(mapSchema, syncConfig); + + List partitionSchema = syncConfig.partitionFields.stream().map(partitionKey -> { + String partitionKeyType = HiveSchemaUtil.getPartitionKeyType(mapSchema, partitionKey); + return new FieldSchema(partitionKey, partitionKeyType.toLowerCase(), ""); + }).collect(Collectors.toList()); + Table newTb = new Table(); + newTb.setDbName(syncConfig.databaseName); + newTb.setTableName(tableName); + newTb.setCreateTime((int) System.currentTimeMillis()); + StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setCols(fieldSchema); + storageDescriptor.setInputFormat(inputFormatClass); + storageDescriptor.setOutputFormat(outputFormatClass); + storageDescriptor.setLocation(syncConfig.basePath); + serdeProperties.put("serialization.format", "1"); + storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties)); + newTb.setSd(storageDescriptor); + newTb.setPartitionKeys(partitionSchema); + + if (!syncConfig.createManagedTable) { + newTb.putToParameters("EXTERNAL", "TRUE"); + } + + for (Map.Entry entry : tableProperties.entrySet()) { + newTb.putToParameters(entry.getKey(), entry.getValue()); + } + newTb.setTableType(TableType.EXTERNAL_TABLE.toString()); + client.createTable(newTb); + } catch (Exception e) { + LOG.error("failed to create table " + tableName, e); + throw new HoodieHiveSyncException("failed to create table " + tableName, e); + } + } + + @Override + public void updateTableDefinition(String tableName, MessageType newSchema) { + try { + boolean cascade = syncConfig.partitionFields.size() > 0; + List fieldSchema = HiveSchemaUtil.convertParquetSchemaToHiveFieldSchema(newSchema, syncConfig); + Table table = client.getTable(syncConfig.databaseName, tableName); + StorageDescriptor sd = table.getSd(); + sd.setCols(fieldSchema); + table.setSd(sd); + EnvironmentContext environmentContext = new EnvironmentContext(); + if (cascade) { + LOG.info("partition table,need cascade"); + environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE); + } + client.alter_table_with_environmentContext(syncConfig.databaseName, tableName, table, environmentContext); + } catch (Exception e) { + LOG.error("Failed to update table for " + tableName, e); + throw new HoodieHiveSyncException("Failed to update table for " + tableName, e); + } + } + + @Override + public Map getTableSchema(String tableName) { + try { + // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to + // get the Schema of the table. + final long start = System.currentTimeMillis(); + Table table = this.client.getTable(syncConfig.databaseName, tableName); + Map partitionKeysMap = + table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); + + Map columnsMap = + table.getSd().getCols().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); + + Map schema = new HashMap<>(); + schema.putAll(columnsMap); + schema.putAll(partitionKeysMap); + final long end = System.currentTimeMillis(); + LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start))); + return schema; + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed to get table schema for : " + tableName, e); + } + } + + @Override + public void addPartitionsToTable(String tableName, List partitionsToAdd) { + if (partitionsToAdd.isEmpty()) { + LOG.info("No partitions to add for " + tableName); + return; + } + LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName); + try { + StorageDescriptor sd = client.getTable(syncConfig.databaseName, tableName).getSd(); + List partitionList = partitionsToAdd.stream().map(partition -> { + StorageDescriptor partitionSd = new StorageDescriptor(); + partitionSd.setCols(sd.getCols()); + partitionSd.setInputFormat(sd.getInputFormat()); + partitionSd.setOutputFormat(sd.getOutputFormat()); + partitionSd.setSerdeInfo(sd.getSerdeInfo()); + String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + partitionSd.setLocation(fullPartitionPath); + return new Partition(partitionValues, syncConfig.databaseName, tableName, 0, 0, partitionSd, null); + }).collect(Collectors.toList()); + client.add_partitions(partitionList, true, false); + } catch (TException e) { + LOG.error(syncConfig.databaseName + "." + tableName + " add partition failed", e); + throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " add partition failed", e); + } + } + + @Override + public void updatePartitionsToTable(String tableName, List changedPartitions) { + if (changedPartitions.isEmpty()) { + LOG.info("No partitions to change for " + tableName); + return; + } + LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName); + try { + StorageDescriptor sd = client.getTable(syncConfig.databaseName, tableName).getSd(); + List partitionList = changedPartitions.stream().map(partition -> { + Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition); + String partitionScheme = partitionPath.toUri().getScheme(); + String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) + ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + sd.setLocation(fullPartitionPath); + return new Partition(partitionValues, syncConfig.databaseName, tableName, 0, 0, sd, null); + }).collect(Collectors.toList()); + client.alter_partitions(syncConfig.databaseName, tableName, partitionList, null); + } catch (TException e) { + LOG.error(syncConfig.databaseName + "." + tableName + " update partition failed", e); + throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " update partition failed", e); + } + } + + @Override + public void close() { + if (client != null) { + Hive.closeCurrent(); + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java new file mode 100644 index 000000000..e2635eef0 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.ddl; + +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * This class offers DDL executor backed by the hive.ql Driver This class preserves the old useJDBC = false way of doing things. + */ +public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { + private static final Logger LOG = LogManager.getLogger(HiveQueryDDLExecutor.class); + private final HiveSyncConfig config; + private final IMetaStoreClient metaStoreClient; + private SessionState sessionState = null; + private Driver hiveDriver = null; + + public HiveQueryDDLExecutor(HiveSyncConfig config, FileSystem fs, HiveConf configuration) throws HiveException, MetaException { + super(config, fs); + this.config = config; + this.metaStoreClient = Hive.get(configuration).getMSC(); + try { + this.sessionState = new SessionState(configuration, + UserGroupInformation.getCurrentUser().getShortUserName()); + SessionState.start(this.sessionState); + this.sessionState.setCurrentDatabase(config.databaseName); + hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration); + } catch (Exception e) { + if (sessionState != null) { + try { + this.sessionState.close(); + } catch (IOException ioException) { + LOG.error("Error while closing SessionState", ioException); + } + } + if (this.hiveDriver != null) { + this.hiveDriver.close(); + } + throw new HoodieHiveSyncException("Failed to create HiveQueryDDL object", e); + } + } + + @Override + public void runSQL(String sql) { + updateHiveSQLs(Collections.singletonList(sql)); + } + + private List updateHiveSQLs(List sqls) { + List responses = new ArrayList<>(); + try { + for (String sql : sqls) { + if (hiveDriver != null) { + HoodieTimer timer = new HoodieTimer().startTimer(); + responses.add(hiveDriver.run(sql)); + LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, timer.endTimer())); + } + } + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed in executing SQL", e); + } + return responses; + } + + //TODO Duplicating it here from HMSDLExecutor as HiveQueryQL has no way of doing it on its own currently. Need to refactor it + @Override + public Map getTableSchema(String tableName) { + try { + // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to + // get the Schema of the table. + final long start = System.currentTimeMillis(); + Table table = metaStoreClient.getTable(config.databaseName, tableName); + Map partitionKeysMap = + table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); + + Map columnsMap = + table.getSd().getCols().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase())); + + Map schema = new HashMap<>(); + schema.putAll(columnsMap); + schema.putAll(partitionKeysMap); + final long end = System.currentTimeMillis(); + LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start))); + return schema; + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed to get table schema for : " + tableName, e); + } + } + + @Override + public void close() { + if (metaStoreClient != null) { + Hive.closeCurrent(); + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java new file mode 100644 index 000000000..f28c3e6b6 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.ddl; + +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +/** + * This class offers DDL executor backed by the jdbc This class preserves the old useJDBC = true way of doing things. + */ +public class JDBCExecutor extends QueryBasedDDLExecutor { + private static final Logger LOG = LogManager.getLogger(QueryBasedDDLExecutor.class); + private final HiveSyncConfig config; + private Connection connection; + + public JDBCExecutor(HiveSyncConfig config, FileSystem fs) { + super(config, fs); + this.config = config; + createHiveConnection(config.jdbcUrl, config.hiveUser, config.hivePass); + } + + @Override + public void runSQL(String s) { + Statement stmt = null; + try { + stmt = connection.createStatement(); + LOG.info("Executing SQL " + s); + stmt.execute(s); + } catch (SQLException e) { + throw new HoodieHiveSyncException("Failed in executing SQL " + s, e); + } finally { + closeQuietly(null, stmt); + } + } + + private void closeQuietly(ResultSet resultSet, Statement stmt) { + try { + if (stmt != null) { + stmt.close(); + } + } catch (SQLException e) { + LOG.warn("Could not close the statement opened ", e); + } + + try { + if (resultSet != null) { + resultSet.close(); + } + } catch (SQLException e) { + LOG.warn("Could not close the resultset opened ", e); + } + } + + private void createHiveConnection(String jdbcUrl, String hiveUser, String hivePass) { + if (connection == null) { + try { + Class.forName("org.apache.hive.jdbc.HiveDriver"); + } catch (ClassNotFoundException e) { + LOG.error("Unable to load Hive driver class", e); + return; + } + + try { + this.connection = DriverManager.getConnection(jdbcUrl, hiveUser, hivePass); + LOG.info("Successfully established Hive connection to " + jdbcUrl); + } catch (SQLException e) { + throw new HoodieHiveSyncException("Cannot create hive connection " + getHiveJdbcUrlWithDefaultDBName(jdbcUrl), e); + } + } + } + + private String getHiveJdbcUrlWithDefaultDBName(String jdbcUrl) { + String hiveJdbcUrl = jdbcUrl; + String urlAppend = null; + // If the hive url contains addition properties like ;transportMode=http;httpPath=hs2 + if (hiveJdbcUrl.contains(";")) { + urlAppend = hiveJdbcUrl.substring(hiveJdbcUrl.indexOf(";")); + hiveJdbcUrl = hiveJdbcUrl.substring(0, hiveJdbcUrl.indexOf(";")); + } + if (!hiveJdbcUrl.endsWith("/")) { + hiveJdbcUrl = hiveJdbcUrl + "/"; + } + return hiveJdbcUrl + (urlAppend == null ? "" : urlAppend); + } + + @Override + public Map getTableSchema(String tableName) { + Map schema = new HashMap<>(); + ResultSet result = null; + try { + DatabaseMetaData databaseMetaData = connection.getMetaData(); + result = databaseMetaData.getColumns(null, config.databaseName, tableName, null); + while (result.next()) { + String columnName = result.getString(4); + String columnType = result.getString(6); + if ("DECIMAL".equals(columnType)) { + int columnSize = result.getInt("COLUMN_SIZE"); + int decimalDigits = result.getInt("DECIMAL_DIGITS"); + columnType += String.format("(%s,%s)", columnSize, decimalDigits); + } + schema.put(columnName, columnType); + } + return schema; + } catch (SQLException e) { + throw new HoodieHiveSyncException("Failed to get table schema for " + tableName, e); + } finally { + closeQuietly(result, null); + } + } + + @Override + public void close() { + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + LOG.error("Could not close connection ", e); + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java new file mode 100644 index 000000000..6fbcfa937 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.ddl; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.StorageSchemes; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.PartitionValueExtractor; +import org.apache.hudi.hive.util.HiveSchemaUtil; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER; +/* +This class adds functionality for all query based DDLExecutors. The classes extending it only have to provide runSQL(sql) functions. + */ +public abstract class QueryBasedDDLExecutor implements DDLExecutor { + private static final Logger LOG = LogManager.getLogger(QueryBasedDDLExecutor.class); + private final HiveSyncConfig config; + private final PartitionValueExtractor partitionValueExtractor; + private final FileSystem fs; + + public QueryBasedDDLExecutor(HiveSyncConfig config, FileSystem fs) { + this.fs = fs; + this.config = config; + try { + this.partitionValueExtractor = + (PartitionValueExtractor) Class.forName(config.partitionValueExtractorClass).newInstance(); + } catch (Exception e) { + throw new HoodieHiveSyncException( + "Failed to initialize PartitionValueExtractor class " + config.partitionValueExtractorClass, e); + } + } + + /** + * All implementations of QueryBasedDDLExecutor must supply the runSQL function. + * @param sql is the sql query which needs to be run + */ + public abstract void runSQL(String sql); + + @Override + public void createDatabase(String databaseName) { + runSQL("create database if not exists " + databaseName); + } + + @Override + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map serdeProperties, + Map tableProperties) { + try { + String createSQLQuery = + HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, config, inputFormatClass, + outputFormatClass, serdeClass, serdeProperties, tableProperties); + LOG.info("Creating table with " + createSQLQuery); + runSQL(createSQLQuery); + } catch (IOException e) { + throw new HoodieHiveSyncException("Failed to create table " + tableName, e); + } + } + + @Override + public void updateTableDefinition(String tableName, MessageType newSchema) { + try { + String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, config.partitionFields, config.supportTimestamp); + // Cascade clause should not be present for non-partitioned tables + String cascadeClause = config.partitionFields.size() > 0 ? " cascade" : ""; + StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER) + .append(config.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".") + .append(HIVE_ESCAPE_CHARACTER).append(tableName) + .append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(") + .append(newSchemaStr).append(" )").append(cascadeClause); + LOG.info("Updating table definition with " + sqlBuilder); + runSQL(sqlBuilder.toString()); + } catch (IOException e) { + throw new HoodieHiveSyncException("Failed to update table for " + tableName, e); + } + } + + @Override + public void addPartitionsToTable(String tableName, List partitionsToAdd) { + if (partitionsToAdd.isEmpty()) { + LOG.info("No partitions to add for " + tableName); + return; + } + LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName); + List sqls = constructAddPartitions(tableName, partitionsToAdd); + sqls.stream().forEach(sql -> runSQL(sql)); + } + + @Override + public void updatePartitionsToTable(String tableName, List changedPartitions) { + if (changedPartitions.isEmpty()) { + LOG.info("No partitions to change for " + tableName); + return; + } + LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName); + List sqls = constructChangePartitions(tableName, changedPartitions); + for (String sql : sqls) { + runSQL(sql); + } + } + + private List constructAddPartitions(String tableName, List partitions) { + if (config.batchSyncNum <= 0) { + throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter"); + } + List result = new ArrayList<>(); + int batchSyncPartitionNum = config.batchSyncNum; + StringBuilder alterSQL = getAlterTablePrefix(tableName); + for (int i = 0; i < partitions.size(); i++) { + String partitionClause = getPartitionClause(partitions.get(i)); + String fullPartitionPath = FSUtils.getPartitionPath(config.basePath, partitions.get(i)).toString(); + alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath) + .append("' "); + if ((i + 1) % batchSyncPartitionNum == 0) { + result.add(alterSQL.toString()); + alterSQL = getAlterTablePrefix(tableName); + } + } + // add left partitions to result + if (partitions.size() % batchSyncPartitionNum != 0) { + result.add(alterSQL.toString()); + } + return result; + } + + private StringBuilder getAlterTablePrefix(String tableName) { + StringBuilder alterSQL = new StringBuilder("ALTER TABLE "); + alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName) + .append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER) + .append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS "); + return alterSQL; + } + + private String getPartitionClause(String partition) { + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(), + "Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues + + ". Check partition strategy. "); + List partBuilder = new ArrayList<>(); + for (int i = 0; i < config.partitionFields.size(); i++) { + String partitionValue = partitionValues.get(i); + // decode the partition before sync to hive to prevent multiple escapes of HIVE + if (config.decodePartition) { + // This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath + partitionValue = PartitionPathEncodeUtils.unescapePathName(partitionValue); + } + partBuilder.add("`" + config.partitionFields.get(i) + "`='" + partitionValue + "'"); + } + return String.join(",", partBuilder); + } + + private List constructChangePartitions(String tableName, List partitions) { + List changePartitions = new ArrayList<>(); + // Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first + String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + config.databaseName + HIVE_ESCAPE_CHARACTER; + changePartitions.add(useDatabase); + String alterTable = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER; + for (String partition : partitions) { + String partitionClause = getPartitionClause(partition); + Path partitionPath = FSUtils.getPartitionPath(config.basePath, partition); + String partitionScheme = partitionPath.toUri().getScheme(); + String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) + ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString(); + String changePartition = + alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'"; + changePartitions.add(changePartition); + } + return changePartitions; + } +} + diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index 7af54bb05..7cef6abf8 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -22,6 +22,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.SchemaDifference; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.DecimalMetadata; @@ -39,6 +40,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Schema Utilities. @@ -143,30 +145,74 @@ public class HiveSchemaUtil { * @param messageType : Parquet Schema * @return : Hive Table schema read from parquet file MAP[String,String] */ - private static Map convertParquetSchemaToHiveSchema(MessageType messageType, boolean supportTimestamp) throws IOException { - Map schema = new LinkedHashMap<>(); + public static Map convertParquetSchemaToHiveSchema(MessageType messageType, boolean supportTimestamp) throws IOException { + return convertMapSchemaToHiveSchema(parquetSchemaToMapSchema(messageType, supportTimestamp, true)); + } + + /** + * Returns equivalent Hive table Field schema read from a parquet file. + * + * @param messageType : Parquet Schema + * @return : Hive Table schema read from parquet file List[FieldSchema] without partitionField + */ + public static List convertParquetSchemaToHiveFieldSchema(MessageType messageType, HiveSyncConfig syncConfig) throws IOException { + return convertMapSchemaToHiveFieldSchema(parquetSchemaToMapSchema(messageType, syncConfig.supportTimestamp, false), syncConfig); + } + + /** + * Returns schema in Map form read from a parquet file. + * + * @param messageType : parquet Schema + * @param supportTimestamp + * @param doFormat : This option controls whether schema will have spaces in the value part of the schema map. This is required because spaces in complex schema trips the HMS create table calls. + * This value will be false for HMS but true for QueryBasedDDLExecutors + * @return : Intermediate schema in the form of Map + */ + public static LinkedHashMap parquetSchemaToMapSchema(MessageType messageType, boolean supportTimestamp, boolean doFormat) throws IOException { + LinkedHashMap schema = new LinkedHashMap<>(); List parquetFields = messageType.getFields(); for (Type parquetType : parquetFields) { StringBuilder result = new StringBuilder(); String key = parquetType.getName(); if (parquetType.isRepetition(Type.Repetition.REPEATED)) { - result.append(createHiveArray(parquetType, "", supportTimestamp)); + result.append(createHiveArray(parquetType, "", supportTimestamp, doFormat)); } else { - result.append(convertField(parquetType, supportTimestamp)); + result.append(convertField(parquetType, supportTimestamp, doFormat)); } - schema.put(hiveCompatibleFieldName(key, false), result.toString()); + schema.put(key, result.toString()); } return schema; } + public static Map convertMapSchemaToHiveSchema(LinkedHashMap schema) throws IOException { + Map hiveSchema = new LinkedHashMap<>(); + for (Map.Entry entry: schema.entrySet()) { + hiveSchema.put(hiveCompatibleFieldName(entry.getKey(), false, true), entry.getValue()); + } + return hiveSchema; + } + + /** + * @param schema Intermediate schema in the form of Map + * @param syncConfig + * @return List of FieldSchema objects derived from schema without the partition fields as the HMS api expects them as different arguments for alter table commands. + * @throws IOException + */ + public static List convertMapSchemaToHiveFieldSchema(LinkedHashMap schema, HiveSyncConfig syncConfig) throws IOException { + return schema.keySet().stream() + .map(key -> new FieldSchema(key, schema.get(key).toLowerCase(), "")) + .filter(field -> !syncConfig.partitionFields.contains(field.getName())) + .collect(Collectors.toList()); + } + /** * Convert one field data type of parquet schema into an equivalent Hive schema. * - * @param parquetType : Single paruet field + * @param parquetType : Single parquet field * @return : Equivalent sHive schema */ - private static String convertField(final Type parquetType, boolean supportTimestamp) { + private static String convertField(final Type parquetType, boolean supportTimestamp, boolean doFormat) { StringBuilder field = new StringBuilder(); if (parquetType.isPrimitive()) { final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName = @@ -174,7 +220,7 @@ public class HiveSchemaUtil { final OriginalType originalType = parquetType.getOriginalType(); if (originalType == OriginalType.DECIMAL) { final DecimalMetadata decimalMetadata = parquetType.asPrimitiveType().getDecimalMetadata(); - return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(" , ") + return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(doFormat ? " , " : ",") .append(decimalMetadata.getScale()).append(")").toString(); } else if (originalType == OriginalType.DATE) { return field.append("DATE").toString(); @@ -241,7 +287,7 @@ public class HiveSchemaUtil { if (!elementType.isRepetition(Type.Repetition.REPEATED)) { throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); } - return createHiveArray(elementType, parquetGroupType.getName(), supportTimestamp); + return createHiveArray(elementType, parquetGroupType.getName(), supportTimestamp, doFormat); case MAP: if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) { throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); @@ -259,7 +305,7 @@ public class HiveSchemaUtil { throw new UnsupportedOperationException("Map key type must be binary (UTF8): " + keyType); } Type valueType = mapKeyValType.getType(1); - return createHiveMap(convertField(keyType, supportTimestamp), convertField(valueType, supportTimestamp)); + return createHiveMap(convertField(keyType, supportTimestamp, doFormat), convertField(valueType, supportTimestamp, doFormat), doFormat); case ENUM: case UTF8: return "string"; @@ -274,7 +320,7 @@ public class HiveSchemaUtil { } } else { // if no original type then it's a record - return createHiveStruct(parquetGroupType.getFields(), supportTimestamp); + return createHiveStruct(parquetGroupType.getFields(), supportTimestamp, doFormat); } } } @@ -285,16 +331,16 @@ public class HiveSchemaUtil { * @param parquetFields : list of parquet fields * @return : Equivalent 'struct' Hive schema */ - private static String createHiveStruct(List parquetFields, boolean supportTimestamp) { + private static String createHiveStruct(List parquetFields, boolean supportTimestamp, boolean doFormat) { StringBuilder struct = new StringBuilder(); - struct.append("STRUCT< "); + struct.append(doFormat ? "STRUCT< " : "STRUCT<"); for (Type field : parquetFields) { // TODO: struct field name is only translated to support special char($) // We will need to extend it to other collection type - struct.append(hiveCompatibleFieldName(field.getName(), true)).append(" : "); - struct.append(convertField(field, supportTimestamp)).append(", "); + struct.append(hiveCompatibleFieldName(field.getName(), true, doFormat)).append(doFormat ? " : " : ":"); + struct.append(convertField(field, supportTimestamp, doFormat)).append(doFormat ? ", " : ","); } - struct.delete(struct.length() - 2, struct.length()); // Remove the last + struct.delete(struct.length() - (doFormat ? 2 : 1), struct.length()); // Remove the last // ", " struct.append(">"); String finalStr = struct.toString(); @@ -305,12 +351,12 @@ public class HiveSchemaUtil { return finalStr; } - private static String hiveCompatibleFieldName(String fieldName, boolean isNested) { + private static String hiveCompatibleFieldName(String fieldName, boolean isNested, boolean doFormat) { String result = fieldName; if (isNested) { result = ColumnNameXLator.translateNestedColumn(fieldName); } - return tickSurround(result); + return doFormat ? tickSurround(result) : result; } private static String tickSurround(String result) { @@ -334,26 +380,26 @@ public class HiveSchemaUtil { /** * Create a 'Map' schema from Parquet map field. */ - private static String createHiveMap(String keyType, String valueType) { - return "MAP< " + keyType + ", " + valueType + ">"; + private static String createHiveMap(String keyType, String valueType, boolean doFormat) { + return (doFormat ? "MAP< " : "MAP<") + keyType + (doFormat ? ", " : ",") + valueType + ">"; } /** * Create an Array Hive schema from equivalent parquet list type. */ - private static String createHiveArray(Type elementType, String elementName, boolean supportTimestamp) { + private static String createHiveArray(Type elementType, String elementName, boolean supportTimestamp, boolean doFormat) { StringBuilder array = new StringBuilder(); - array.append("ARRAY< "); + array.append(doFormat ? "ARRAY< " : "ARRAY<"); if (elementType.isPrimitive()) { - array.append(convertField(elementType, supportTimestamp)); + array.append(convertField(elementType, supportTimestamp, doFormat)); } else { final GroupType groupType = elementType.asGroupType(); final List groupFields = groupType.getFields(); if (groupFields.size() > 1 || (groupFields.size() == 1 && (elementType.getName().equals("array") || elementType.getName().equals(elementName + "_tuple")))) { - array.append(convertField(elementType, supportTimestamp)); + array.append(convertField(elementType, supportTimestamp, doFormat)); } else { - array.append(convertField(groupType.getFields().get(0), supportTimestamp)); + array.append(convertField(groupType.getFields().get(0), supportTimestamp, doFormat)); } } array.append(">"); @@ -454,7 +500,7 @@ public class HiveSchemaUtil { return sb.toString(); } - private static String getPartitionKeyType(Map hiveSchema, String partitionKey) { + public static String getPartitionKeyType(Map hiveSchema, String partitionKey) { if (hiveSchema.containsKey(partitionKey)) { return hiveSchema.get(partitionKey); } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 3494e4451..af94c16b6 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -26,15 +26,22 @@ import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.testutils.HiveTestUtil; import org.apache.hudi.hive.util.ConfigUtils; +import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; import org.joda.time.DateTime; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -42,36 +49,65 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; + import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Stream; +import static org.apache.hudi.hive.testutils.HiveTestUtil.ddlExecutor; +import static org.apache.hudi.hive.testutils.HiveTestUtil.fileSystem; +import static org.apache.hudi.hive.testutils.HiveTestUtil.hiveSyncConfig; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHiveSyncTool { - private static Stream useJdbc() { - return Stream.of(false, true); + private static final List SYNC_MODES = Arrays.asList( + "hms", + "hiveql", + "jdbc"); + + private static Iterable syncMode() { + return SYNC_MODES; } - private static Iterable useJdbcAndSchemaFromCommitMetadata() { - return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}}); + private static Iterable syncModeAndSchemaFromCommitMetadata() { + List opts = new ArrayList<>(); + for (Object mode : SYNC_MODES) { + opts.add(new Object[] {true, mode}); + opts.add(new Object[] {false, mode}); + } + return opts; } - private static Iterable useJdbcAndSchemaFromCommitMetadataAndManagedTable() { - return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); + @AfterAll + public static void cleanUpClass() { + HiveTestUtil.shutdown(); + } + + private static Iterable syncModeAndSchemaFromCommitMetadataAndManagedTable() { + List opts = new ArrayList<>(); + for (Object mode : SYNC_MODES) { + opts.add(new Object[] {true, true, mode}); + opts.add(new Object[] {false, false, mode}); + } + return opts; } // (useJdbc, useSchemaFromCommitMetadata, syncAsDataSource) private static Iterable syncDataSourceTableParams() { - return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); + List opts = new ArrayList<>(); + for (Object mode : SYNC_MODES) { + opts.add(new Object[] {true, true, mode}); + opts.add(new Object[] {false, false, mode}); + } + return opts; } @BeforeEach @@ -84,24 +120,130 @@ public class TestHiveSyncTool { HiveTestUtil.clear(); } - @AfterAll - public static void cleanUpClass() { - HiveTestUtil.shutdown(); + /** + * Testing converting array types to Hive field declaration strings. + *

+ * Refer to the Parquet-113 spec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists + */ + @Test + public void testSchemaConvertArray() throws IOException { + // Testing the 3-level annotation structure + MessageType schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup() + .optional(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list").named("int_list") + .named("ArrayOfInts"); + + String schemaString = HiveSchemaUtil.generateSchemaString(schema); + assertEquals("`int_list` ARRAY< int>", schemaString); + + // A array of arrays + schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().requiredGroup() + .as(OriginalType.LIST).repeatedGroup().required(PrimitiveType.PrimitiveTypeName.INT32).named("element") + .named("list").named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts"); + + schemaString = HiveSchemaUtil.generateSchemaString(schema); + assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString); + + // A list of integers + schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeated(PrimitiveType.PrimitiveTypeName.INT32) + .named("element").named("int_list").named("ArrayOfInts"); + + schemaString = HiveSchemaUtil.generateSchemaString(schema); + assertEquals("`int_list` ARRAY< int>", schemaString); + + // A list of structs with two fields + schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup() + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").required(PrimitiveType.PrimitiveTypeName.INT32) + .named("num").named("element").named("tuple_list").named("ArrayOfTuples"); + + schemaString = HiveSchemaUtil.generateSchemaString(schema); + assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString); + + // A list of structs with a single field + // For this case, since the inner group name is "array", we treat the + // element type as a one-element struct. + schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup() + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array").named("one_tuple_list") + .named("ArrayOfOneTuples"); + + schemaString = HiveSchemaUtil.generateSchemaString(schema); + assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString); + + // A list of structs with a single field + // For this case, since the inner group name ends with "_tuple", we also treat the + // element type as a one-element struct. + schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup() + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list_tuple") + .named("one_tuple_list").named("ArrayOfOneTuples2"); + + schemaString = HiveSchemaUtil.generateSchemaString(schema); + assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString); + + // A list of structs with a single field + // Unlike the above two cases, for this the element type is the type of the + // only field in the struct. + schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup() + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list").named("one_tuple_list") + .named("ArrayOfOneTuples3"); + + schemaString = HiveSchemaUtil.generateSchemaString(schema); + assertEquals("`one_tuple_list` ARRAY< binary>", schemaString); + + // A list of maps + schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().as(OriginalType.MAP) + .repeatedGroup().as(OriginalType.MAP_KEY_VALUE).required(PrimitiveType.PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8).named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32).named("int_value") + .named("key_value").named("array").named("map_list").named("ArrayOfMaps"); + + schemaString = HiveSchemaUtil.generateSchemaString(schema); + assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString); + } + + @Test + public void testSchemaConvertTimestampMicros() throws IOException { + MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64) + .as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp"); + String schemaString = HiveSchemaUtil.generateSchemaString(schema); + // verify backward compatibility - int64 converted to bigint type + assertEquals("`my_element` bigint", schemaString); + // verify new functionality - int64 converted to timestamp type when 'supportTimestamp' is enabled + schemaString = HiveSchemaUtil.generateSchemaString(schema, Collections.emptyList(), true); + assertEquals("`my_element` TIMESTAMP", schemaString); + } + + @Test + public void testSchemaDiffForTimestampMicros() { + MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64) + .as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp"); + // verify backward compatibility - int64 converted to bigint type + SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDifference(schema, + Collections.emptyMap(), Collections.emptyList(), false); + assertEquals("bigint", schemaDifference.getAddColumnTypes().get("`my_element`")); + schemaDifference = HiveSchemaUtil.getSchemaDifference(schema, + schemaDifference.getAddColumnTypes(), Collections.emptyList(), false); + assertTrue(schemaDifference.isEmpty()); + + // verify schema difference is calculated correctly when supportTimestamp is enabled + schemaDifference = HiveSchemaUtil.getSchemaDifference(schema, + Collections.emptyMap(), Collections.emptyList(), true); + assertEquals("TIMESTAMP", schemaDifference.getAddColumnTypes().get("`my_element`")); + schemaDifference = HiveSchemaUtil.getSchemaDifference(schema, + schemaDifference.getAddColumnTypes(), Collections.emptyList(), true); + assertTrue(schemaDifference.isEmpty()); } @ParameterizedTest - @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) - public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource({"syncModeAndSchemaFromCommitMetadata"}) + public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; String instantTime = "100"; HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata); HoodieHiveClient hiveClient = - new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); - assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), - "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), + "Table " + hiveSyncConfig.tableName + " should not exist initially"); // Lets do the sync - HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); // we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive // session, then lead to connection retry, we can see there is a exception at log. @@ -112,36 +254,36 @@ public class TestHiveSyncTool { assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(), hiveClient.getDataSchema().getColumns().size() + 1, "Hive Schema should match the table schema + partition field"); - assertEquals(5, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(), + assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), "Table partitions should match the number of partitions we wrote"); - assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(), + assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), "The last commit that was synced should be updated in the TBLPROPERTIES"); // Adding of new partitions List newPartition = Arrays.asList("2050/01/01"); - hiveClient.addPartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, Arrays.asList()); - assertEquals(5, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(), + hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, Arrays.asList()); + assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), "No new partition should be added"); - hiveClient.addPartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, newPartition); - assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(), + hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, newPartition); + assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), "New partition should be added"); // Update partitions - hiveClient.updatePartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, Arrays.asList()); - assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(), + hiveClient.updatePartitionsToTable(hiveSyncConfig.tableName, Arrays.asList()); + assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), "Partition count should remain the same"); - hiveClient.updatePartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, newPartition); - assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(), + hiveClient.updatePartitionsToTable(hiveSyncConfig.tableName, newPartition); + assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), "Partition count should remain the same"); // Alter partitions // Manually change a hive partition location to check if the sync will detect // it and generate a partition update event for it. - hiveClient.updateHiveSQL("ALTER TABLE `" + HiveTestUtil.hiveSyncConfig.tableName + ddlExecutor.runSQL("ALTER TABLE `" + hiveSyncConfig.tableName + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'"); - hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); - List hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName); + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + List hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty()); //writtenPartitionsSince.add(newPartition.get(0)); List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); @@ -149,22 +291,22 @@ public class TestHiveSyncTool { assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType, "The one partition event must of type UPDATE"); - tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); // Sync should update the changed partition to correct path - List tablePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName); + List tablePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); assertEquals(6, tablePartitions.size(), "The one partition we wrote should be added to hive"); - assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(), + assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), "The last commit that was synced should be 100"); } @ParameterizedTest @MethodSource({"syncDataSourceTableParams"}) - public void testSyncCOWTableWithProperties(boolean useJdbc, - boolean useSchemaFromCommitMetadata, - boolean syncAsDataSourceTable) throws Exception { + public void testSyncCOWTableWithProperties(boolean useSchemaFromCommitMetadata, + boolean syncAsDataSourceTable, + String syncMode) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; Map serdeProperties = new HashMap() { @@ -179,14 +321,15 @@ public class TestHiveSyncTool { put("tp_1", "p1"); } }; + + hiveSyncConfig.syncMode = syncMode; hiveSyncConfig.syncAsSparkDataSourceTable = syncAsDataSourceTable; - hiveSyncConfig.useJdbc = useJdbc; hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties); hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties); String instantTime = "100"; HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata); - HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); SessionState.start(HiveTestUtil.getHiveConf()); @@ -254,9 +397,9 @@ public class TestHiveSyncTool { @ParameterizedTest @MethodSource({"syncDataSourceTableParams"}) - public void testSyncMORTableWithProperties(boolean useJdbc, - boolean useSchemaFromCommitMetadata, - boolean syncAsDataSourceTable) throws Exception { + public void testSyncMORTableWithProperties(boolean useSchemaFromCommitMetadata, + boolean syncAsDataSourceTable, + String syncMode) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; Map serdeProperties = new HashMap() { @@ -272,7 +415,7 @@ public class TestHiveSyncTool { } }; hiveSyncConfig.syncAsSparkDataSourceTable = syncAsDataSourceTable; - hiveSyncConfig.useJdbc = useJdbc; + hiveSyncConfig.syncMode = syncMode; hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties); hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties); String instantTime = "100"; @@ -325,13 +468,13 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource({"useJdbcAndSchemaFromCommitMetadataAndManagedTable"}) - public void testSyncManagedTable(boolean useJdbc, - boolean useSchemaFromCommitMetadata, - boolean isManagedTable) throws Exception { + @MethodSource({"syncModeAndSchemaFromCommitMetadataAndManagedTable"}) + public void testSyncManagedTable(boolean useSchemaFromCommitMetadata, + boolean isManagedTable, + String syncMode) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; - hiveSyncConfig.useJdbc = useJdbc; + hiveSyncConfig.syncMode = syncMode; hiveSyncConfig.createManagedTable = isManagedTable; String instantTime = "100"; HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata); @@ -356,20 +499,39 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testSyncIncremental(boolean useJdbc) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncMode") + public void testSyncWithSchema(String syncMode) throws Exception { + + hiveSyncConfig.syncMode = syncMode; + String commitTime = "100"; + HiveTestUtil.createCOWTableWithSchema(commitTime, "/complex.schema.avsc"); + + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + HoodieHiveClient hiveClient = + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(commitTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was synced should be updated in the TBLPROPERTIES"); + } + + @ParameterizedTest + @MethodSource("syncMode") + public void testSyncIncremental(String syncMode) throws Exception { + + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String commitTime1 = "100"; HiveTestUtil.createCOWTable(commitTime1, 5, true); HoodieHiveClient hiveClient = - new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); // Lets do the sync - HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); - assertEquals(5, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(), + assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), "Table partitions should match the number of partitions we wrote"); - assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(), + assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), "The last commit that was synced should be updated in the TBLPROPERTIES"); // Now lets create more partitions and these are the only ones which needs to be synced @@ -378,37 +540,37 @@ public class TestHiveSyncTool { HiveTestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2); // Lets do the sync - hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1)); assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); - List hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName); + List hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); - tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); // Sync should add the one partition - assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(), + assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), "The one partition we wrote should be added to hive"); - assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(), + assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), "The last commit that was synced should be 101"); } @ParameterizedTest - @MethodSource("useJdbc") - public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncMode") + public void testSyncIncrementalWithSchemaEvolution(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String commitTime1 = "100"; HiveTestUtil.createCOWTable(commitTime1, 5, true); HoodieHiveClient hiveClient = - new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); // Lets do the sync - HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); - int fields = hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(); + int fields = hiveClient.getTableSchema(hiveSyncConfig.tableName).size(); // Now lets create more partitions and these are the only ones which needs to be synced DateTime dateTime = DateTime.now().plusDays(6); @@ -416,51 +578,51 @@ public class TestHiveSyncTool { HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2); // Lets do the sync - tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); - assertEquals(fields + 3, hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(), + assertEquals(fields + 3, hiveClient.getTableSchema(hiveSyncConfig.tableName).size(), "Hive Schema has evolved and should not be 3 more field"); - assertEquals("BIGINT", hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("favorite_number"), + assertEquals("BIGINT", hiveClient.getTableSchema(hiveSyncConfig.tableName).get("favorite_number"), "Hive Schema has evolved - Field favorite_number has evolved from int to long"); - assertTrue(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).containsKey("favorite_movie"), + assertTrue(hiveClient.getTableSchema(hiveSyncConfig.tableName).containsKey("favorite_movie"), "Hive Schema has evolved - Field favorite_movie was added"); // Sync should add the one partition - assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(), + assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), "The one partition we wrote should be added to hive"); - assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(), + assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), "The last commit that was synced should be 101"); } @ParameterizedTest - @MethodSource("useJdbcAndSchemaFromCommitMetadata") - public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncModeAndSchemaFromCommitMetadata") + public void testSyncMergeOnRead(boolean useSchemaFromCommitMetadata, String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String instantTime = "100"; String deltaCommitTime = "101"; HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata); - String roTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE; - HoodieHiveClient hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); - assertFalse(hiveClient.doesTableExist(roTableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + String roTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE; + HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + assertFalse(hiveClient.doesTableExist(roTableName), "Table " + hiveSyncConfig.tableName + " should not exist initially"); // Lets do the sync - HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes"); if (useSchemaFromCommitMetadata) { assertEquals(hiveClient.getTableSchema(roTableName).size(), - SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + SchemaTestUtil.getSimpleSchema().getFields().size() + hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), "Hive Schema should match the table schema + partition field"); } else { // The data generated and schema in the data file do not have metadata columns, so we need a separate check. assertEquals(hiveClient.getTableSchema(roTableName).size(), - SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(), + SchemaTestUtil.getSimpleSchema().getFields().size() + hiveSyncConfig.partitionFields.size(), "Hive Schema should match the table schema + partition field"); } @@ -478,19 +640,19 @@ public class TestHiveSyncTool { HiveTestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2); // Lets do the sync - tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); - hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); if (useSchemaFromCommitMetadata) { assertEquals(hiveClient.getTableSchema(roTableName).size(), - SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + SchemaTestUtil.getEvolvedSchema().getFields().size() + hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), "Hive Schema should match the evolved table schema + partition field"); } else { // The data generated and schema in the data file do not have metadata columns, so we need a separate check. assertEquals(hiveClient.getTableSchema(roTableName).size(), - SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(), + SchemaTestUtil.getEvolvedSchema().getFields().size() + hiveSyncConfig.partitionFields.size(), "Hive Schema should match the evolved table schema + partition field"); } // Sync should add the one partition @@ -501,38 +663,38 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbcAndSchemaFromCommitMetadata") - public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncModeAndSchemaFromCommitMetadata") + public void testSyncMergeOnReadRT(boolean useSchemaFromCommitMetadata, String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String instantTime = "100"; String deltaCommitTime = "101"; - String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; + String snapshotTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata); HoodieHiveClient hiveClientRT = - new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); assertFalse(hiveClientRT.doesTableExist(snapshotTableName), - "Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + "Table " + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + " should not exist initially"); // Lets do the sync - HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); assertTrue(hiveClientRT.doesTableExist(snapshotTableName), - "Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + "Table " + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + " should exist after sync completes"); if (useSchemaFromCommitMetadata) { assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), - SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + SchemaTestUtil.getSimpleSchema().getFields().size() + hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), "Hive Schema should match the table schema + partition field"); } else { // The data generated and schema in the data file do not have metadata columns, so we need a separate check. assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), - SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(), + SchemaTestUtil.getSimpleSchema().getFields().size() + hiveSyncConfig.partitionFields.size(), "Hive Schema should match the table schema + partition field"); } @@ -549,19 +711,19 @@ public class TestHiveSyncTool { HiveTestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2); HiveTestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2); // Lets do the sync - tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); - hiveClientRT = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + hiveClientRT = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); if (useSchemaFromCommitMetadata) { assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), - SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + SchemaTestUtil.getEvolvedSchema().getFields().size() + hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), "Hive Schema should match the evolved table schema + partition field"); } else { // The data generated and schema in the data file do not have metadata columns, so we need a separate check. assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), - SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(), + SchemaTestUtil.getEvolvedSchema().getFields().size() + hiveSyncConfig.partitionFields.size(), "Hive Schema should match the evolved table schema + partition field"); } // Sync should add the one partition @@ -572,9 +734,10 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testMultiPartitionKeySync(boolean useJdbc) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncMode") + public void testMultiPartitionKeySync(String syncMode) throws Exception { + + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String instantTime = "100"; HiveTestUtil.createCOWTable(instantTime, 5, true); @@ -585,11 +748,11 @@ public class TestHiveSyncTool { hiveSyncConfig.partitionFields = Arrays.asList("year", "month", "day"); HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); - HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should not exist initially"); // Lets do the sync - HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); @@ -607,7 +770,7 @@ public class TestHiveSyncTool { String commitTime2 = "101"; HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2); - hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); List hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); @@ -615,7 +778,7 @@ public class TestHiveSyncTool { assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); - tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); // Sync should add the one partition @@ -629,9 +792,9 @@ public class TestHiveSyncTool { HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3); HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); - hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); - tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), @@ -647,9 +810,10 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testNonPartitionedSync(boolean useJdbc) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncMode") + public void testNonPartitionedSync(String syncMode) throws Exception { + + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String instantTime = "100"; HiveTestUtil.createCOWTable(instantTime, 5, true); @@ -661,11 +825,11 @@ public class TestHiveSyncTool { hiveSyncConfig.partitionFields = Arrays.asList("year", "month", "day"); HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); - HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should not exist initially"); // Lets do the sync - HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); @@ -677,29 +841,30 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testReadSchemaForMOR(boolean useJdbc) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncMode") + public void testReadSchemaForMOR(String syncMode) throws Exception { + + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String commitTime = "100"; - String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; + String snapshotTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; HiveTestUtil.createMORTable(commitTime, "", 5, false, true); HoodieHiveClient hiveClientRT = - new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); - assertFalse(hiveClientRT.doesTableExist(snapshotTableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + assertFalse(hiveClientRT.doesTableExist(snapshotTableName), "Table " + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + " should not exist initially"); // Lets do the sync - HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); - assertTrue(hiveClientRT.doesTableExist(snapshotTableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + assertTrue(hiveClientRT.doesTableExist(snapshotTableName), "Table " + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + " should exist after sync completes"); // Schema being read from compacted base files assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), - SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + SchemaTestUtil.getSimpleSchema().getFields().size() + hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), "Hive Schema should match the table schema + partition field"); assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote"); @@ -711,13 +876,13 @@ public class TestHiveSyncTool { HiveTestUtil.addMORPartitions(1, true, false, true, dateTime, commitTime2, deltaCommitTime2); // Lets do the sync - tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); tool.syncHoodieTable(); - hiveClientRT = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + hiveClientRT = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); // Schema being read from the log files assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), - SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + SchemaTestUtil.getEvolvedSchema().getFields().size() + hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), "Hive Schema should match the evolved table schema + partition field"); // Sync should add the one partition @@ -727,26 +892,27 @@ public class TestHiveSyncTool { } @Test - public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxException { + public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxException, HiveException, MetaException { + hiveSyncConfig.useJdbc = true; HiveTestUtil.hiveSyncConfig.useJdbc = true; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String instantTime = "100"; HiveTestUtil.createCOWTable(instantTime, 5, false); HoodieHiveClient hiveClient = - new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); - assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), - "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), + "Table " + hiveSyncConfig.tableName + " should not exist initially"); // Lets do the sync - HiveSyncConfig syncToolConfig = HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig); + HiveSyncConfig syncToolConfig = HiveSyncConfig.copy(hiveSyncConfig); syncToolConfig.ignoreExceptions = true; syncToolConfig.jdbcUrl = HiveTestUtil.hiveSyncConfig.jdbcUrl .replace(String.valueOf(HiveTestUtil.hiveTestService.getHiveServerPort()), String.valueOf(NetworkTestUtils.nextFreePort())); HiveSyncTool tool = new HiveSyncTool(syncToolConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); tool.syncHoodieTable(); - assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), - "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), + "Table " + hiveSyncConfig.tableName + " should not exist initially"); } private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String emptyCommitTime) throws Exception { @@ -772,9 +938,9 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean useJdbc) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncMode") + public void testPickingOlderParquetFileIfLatestIsEmptyCommit(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; final String commitTime = "100"; HiveTestUtil.createCOWTable(commitTime, 1, true); @@ -793,9 +959,9 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean useJdbc) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncMode") + public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; final String commitTime = "100"; HiveTestUtil.createCOWTable(commitTime, 1, true); @@ -836,9 +1002,9 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean useJdbc) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncMode") + public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; final String commitTime = "100"; HiveTestUtil.createCOWTable(commitTime, 1, true); @@ -883,9 +1049,9 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testTypeConverter(boolean useJdbc) throws Exception { - HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + @MethodSource("syncMode") + public void testTypeConverter(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; HiveTestUtil.createCOWTable("100", 5, true); HoodieHiveClient hiveClient = @@ -895,30 +1061,30 @@ public class TestHiveSyncTool { String dropTableSql = String.format("DROP TABLE IF EXISTS %s ", tableAbsoluteName); String createTableSqlPrefix = String.format("CREATE TABLE IF NOT EXISTS %s ", tableAbsoluteName); String errorMsg = "An error occurred in decimal type converting."; - hiveClient.updateHiveSQL(dropTableSql); + ddlExecutor.runSQL(dropTableSql); // test one column in DECIMAL String oneTargetColumnSql = createTableSqlPrefix + "(`decimal_col` DECIMAL(9,8), `bigint_col` BIGINT)"; - hiveClient.updateHiveSQL(oneTargetColumnSql); + ddlExecutor.runSQL(oneTargetColumnSql); System.out.println(hiveClient.getTableSchema(tableName)); assertTrue(hiveClient.getTableSchema(tableName).containsValue("DECIMAL(9,8)"), errorMsg); - hiveClient.updateHiveSQL(dropTableSql); + ddlExecutor.runSQL(dropTableSql); // test multiple columns in DECIMAL String multipleTargetColumnSql = createTableSqlPrefix + "(`decimal_col1` DECIMAL(9,8), `bigint_col` BIGINT, `decimal_col2` DECIMAL(7,4))"; - hiveClient.updateHiveSQL(multipleTargetColumnSql); + ddlExecutor.runSQL(multipleTargetColumnSql); System.out.println(hiveClient.getTableSchema(tableName)); assertTrue(hiveClient.getTableSchema(tableName).containsValue("DECIMAL(9,8)") && hiveClient.getTableSchema(tableName).containsValue("DECIMAL(7,4)"), errorMsg); - hiveClient.updateHiveSQL(dropTableSql); + ddlExecutor.runSQL(dropTableSql); // test no columns in DECIMAL String noTargetColumnsSql = createTableSqlPrefix + "(`bigint_col` BIGINT)"; - hiveClient.updateHiveSQL(noTargetColumnsSql); + ddlExecutor.runSQL(noTargetColumnsSql); System.out.println(hiveClient.getTableSchema(tableName)); assertTrue(hiveClient.getTableSchema(tableName).size() == 1 && hiveClient.getTableSchema(tableName) .containsValue("BIGINT"), errorMsg); - hiveClient.updateHiveSQL(dropTableSql); + ddlExecutor.runSQL(dropTableSql); } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java index a145dc024..b6adcb298 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java @@ -25,10 +25,13 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveClient; +import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; @@ -98,20 +101,20 @@ public class HiveSyncFunctionalTestHarness { return new HoodieHiveClient(hiveSyncConfig, hiveConf(), fs()); } - public void dropTables(String database, String... tables) throws IOException { + public void dropTables(String database, String... tables) throws IOException, HiveException, MetaException { HiveSyncConfig hiveSyncConfig = hiveSyncConf(); hiveSyncConfig.databaseName = database; for (String table : tables) { hiveSyncConfig.tableName = table; - hiveClient(hiveSyncConfig).updateHiveSQL("drop table if exists " + table); + new HiveQueryDDLExecutor(hiveSyncConfig, fs(), hiveConf()).runSQL("drop table if exists " + table); } } - public void dropDatabases(String... databases) throws IOException { + public void dropDatabases(String... databases) throws IOException, HiveException, MetaException { HiveSyncConfig hiveSyncConfig = hiveSyncConf(); for (String database : databases) { hiveSyncConfig.databaseName = database; - hiveClient(hiveSyncConfig).updateHiveSQL("drop database if exists " + database); + new HiveQueryDDLExecutor(hiveSyncConfig, fs(), hiveConf()).runSQL("drop database if exists " + database); } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index 46f95f616..7b9f1d127 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -42,7 +42,8 @@ import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; -import org.apache.hudi.hive.HoodieHiveClient; +import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor; +import org.apache.hudi.hive.ddl.QueryBasedDDLExecutor; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; @@ -51,6 +52,8 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.service.server.HiveServer2; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; @@ -91,8 +94,9 @@ public class HiveTestUtil { private static DateTimeFormatter dtfOut; public static FileSystem fileSystem; private static Set createdTablesSet = new HashSet<>(); + public static QueryBasedDDLExecutor ddlExecutor; - public static void setUp() throws IOException, InterruptedException { + public static void setUp() throws IOException, InterruptedException, HiveException, MetaException { configuration = new Configuration(); if (zkServer == null) { zkService = new ZookeeperTestService(configuration); @@ -116,11 +120,12 @@ public class HiveTestUtil { hiveSyncConfig.partitionFields = Collections.singletonList("datestr"); dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd"); + ddlExecutor = new HiveQueryDDLExecutor(hiveSyncConfig, fileSystem, getHiveConf()); clear(); } - public static void clear() throws IOException { + public static void clear() throws IOException, HiveException, MetaException { fileSystem.delete(new Path(hiveSyncConfig.basePath), true); HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.COPY_ON_WRITE) @@ -128,13 +133,12 @@ public class HiveTestUtil { .setPayloadClass(HoodieAvroPayload.class) .initTable(configuration, hiveSyncConfig.basePath); - HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), fileSystem); for (String tableName : createdTablesSet) { - client.updateHiveSQL("drop table if exists " + tableName); + ddlExecutor.runSQL("drop table if exists " + tableName); } createdTablesSet.clear(); - client.updateHiveSQL("drop database if exists " + hiveSyncConfig.databaseName); - client.updateHiveSQL("create database " + hiveSyncConfig.databaseName); + ddlExecutor.runSQL("drop database if exists " + hiveSyncConfig.databaseName); + ddlExecutor.runSQL("create database " + hiveSyncConfig.databaseName); } public static HiveConf getHiveConf() { @@ -172,6 +176,40 @@ public class HiveTestUtil { createCommitFile(commitMetadata, instantTime); } + public static void createCOWTableWithSchema(String instantTime, String schemaFileName) + throws IOException, URISyntaxException { + Path path = new Path(hiveSyncConfig.basePath); + FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(hiveSyncConfig.tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(configuration, hiveSyncConfig.basePath); + + boolean result = fileSystem.mkdirs(path); + checkResult(result); + DateTime dateTime = DateTime.now().withTimeAtStartOfDay(); + + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + String partitionPath = dtfOut.print(dateTime); + Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath); + fileSystem.makeQualified(partPath); + fileSystem.mkdirs(partPath); + List writeStats = new ArrayList<>(); + String fileId = UUID.randomUUID().toString(); + Path filePath = new Path(partPath.toString() + "/" + FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)); + Schema schema = SchemaTestUtil.getSchemaFromResource(HiveTestUtil.class, schemaFileName); + generateParquetDataWithSchema(filePath, schema); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(fileId); + writeStat.setPath(filePath.toString()); + writeStats.add(writeStat); + writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s)); + commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema.toString()); + createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + createCommitFile(commitMetadata, instantTime); + } + public static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions, boolean createDeltaCommit, boolean useSchemaFromCommitMetadata) throws IOException, URISyntaxException, InterruptedException { @@ -330,6 +368,27 @@ public class HiveTestUtil { writer.close(); } + private static void generateParquetDataWithSchema(Path filePath, Schema schema) + throws IOException, URISyntaxException { + org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema); + BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1, + BloomFilterTypeCode.SIMPLE.name()); + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(parquetSchema, schema, filter); + ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024, + ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, + ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION, fileSystem.getConf()); + + List testRecords = SchemaTestUtil.generateTestRecordsForSchema(schema); + testRecords.forEach(s -> { + try { + writer.write(s); + } catch (IOException e) { + fail("IOException while writing test records as parquet" + e.toString()); + } + }); + writer.close(); + } + private static HoodieLogFile generateLogData(Path parquetFilePath, boolean isLogSchemaSimple) throws IOException, InterruptedException, URISyntaxException { Schema schema = getTestDataSchema(isLogSchemaSimple); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 60f542813..8a67582c4 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -18,8 +18,6 @@ package org.apache.hudi.utilities.testutils; -import java.io.FileInputStream; - import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; @@ -34,7 +32,8 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hive.HiveSyncConfig; -import org.apache.hudi.hive.HoodieHiveClient; +import org.apache.hudi.hive.ddl.JDBCExecutor; +import org.apache.hudi.hive.ddl.QueryBasedDDLExecutor; import org.apache.hudi.hive.testutils.HiveTestService; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.sources.TestDataSource; @@ -70,6 +69,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import java.io.BufferedReader; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; @@ -201,10 +201,10 @@ public class UtilitiesTestBase { .setTableName(hiveSyncConfig.tableName) .initTable(dfs.getConf(), hiveSyncConfig.basePath); - HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveConf, dfs); - client.updateHiveSQL("drop database if exists " + hiveSyncConfig.databaseName); - client.updateHiveSQL("create database " + hiveSyncConfig.databaseName); - client.close(); + QueryBasedDDLExecutor ddlExecutor = new JDBCExecutor(hiveSyncConfig, dfs); + ddlExecutor.runSQL("drop database if exists " + hiveSyncConfig.databaseName); + ddlExecutor.runSQL("create database " + hiveSyncConfig.databaseName); + ddlExecutor.close(); } public static class Helpers {