[HUDI-3593] Restore TypedProperties and flush checksum in table config (#5013)
Create new TypedProperties while performing clustering Add OrderedProperties and minor refactoring Add javadoc and remove getters from OrderedProperties
This commit is contained in:
@@ -0,0 +1,100 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.common.config;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* An extension of {@link java.util.Properties} that maintains the order.
|
||||
* The implementation is not thread-safe.
|
||||
*/
|
||||
public class OrderedProperties extends Properties {
|
||||
|
||||
private final HashSet<Object> keys = new LinkedHashSet<>();
|
||||
|
||||
public OrderedProperties() {
|
||||
super(null);
|
||||
}
|
||||
|
||||
public OrderedProperties(Properties defaults) {
|
||||
if (Objects.nonNull(defaults)) {
|
||||
for (String key : defaults.stringPropertyNames()) {
|
||||
put(key, defaults.getProperty(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Enumeration propertyNames() {
|
||||
return Collections.enumeration(keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Enumeration<Object> keys() {
|
||||
return Collections.enumeration(keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> stringPropertyNames() {
|
||||
Set<String> set = new LinkedHashSet<>();
|
||||
for (Object key : this.keys) {
|
||||
if (key instanceof String) {
|
||||
set.add((String) key);
|
||||
}
|
||||
}
|
||||
return set;
|
||||
}
|
||||
|
||||
public synchronized void putAll(Properties t) {
|
||||
for (Map.Entry<?, ?> e : t.entrySet()) {
|
||||
if (!containsKey(String.valueOf(e.getKey()))) {
|
||||
keys.add(e.getKey());
|
||||
}
|
||||
super.put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Object put(Object key, Object value) {
|
||||
keys.remove(key);
|
||||
keys.add(key);
|
||||
return super.put(key, value);
|
||||
}
|
||||
|
||||
public synchronized Object putIfAbsent(Object key, Object value) {
|
||||
if (!containsKey(String.valueOf(key))) {
|
||||
keys.add(key);
|
||||
}
|
||||
return super.putIfAbsent(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object remove(Object key) {
|
||||
keys.remove(key);
|
||||
return super.remove(key);
|
||||
}
|
||||
}
|
||||
@@ -20,15 +20,9 @@ package org.apache.hudi.common.config;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@@ -36,8 +30,6 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class TypedProperties extends Properties implements Serializable {
|
||||
|
||||
private final HashSet<Object> keys = new LinkedHashSet<>();
|
||||
|
||||
public TypedProperties() {
|
||||
super(null);
|
||||
}
|
||||
@@ -50,56 +42,6 @@ public class TypedProperties extends Properties implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Enumeration propertyNames() {
|
||||
return Collections.enumeration(keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Enumeration<Object> keys() {
|
||||
return Collections.enumeration(keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> stringPropertyNames() {
|
||||
Set<String> set = new LinkedHashSet<>();
|
||||
for (Object key : this.keys) {
|
||||
if (key instanceof String) {
|
||||
set.add((String) key);
|
||||
}
|
||||
}
|
||||
return set;
|
||||
}
|
||||
|
||||
public synchronized void putAll(Properties t) {
|
||||
for (Map.Entry<?, ?> e : t.entrySet()) {
|
||||
if (!containsKey(String.valueOf(e.getKey()))) {
|
||||
keys.add(e.getKey());
|
||||
}
|
||||
super.put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Object put(Object key, Object value) {
|
||||
keys.remove(key);
|
||||
keys.add(key);
|
||||
return super.put(key, value);
|
||||
}
|
||||
|
||||
public synchronized Object putIfAbsent(Object key, Object value) {
|
||||
if (!containsKey(String.valueOf(key))) {
|
||||
keys.add(key);
|
||||
}
|
||||
return super.putIfAbsent(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object remove(Object key) {
|
||||
keys.remove(key);
|
||||
return super.remove(key);
|
||||
}
|
||||
|
||||
private void checkKey(String property) {
|
||||
if (!containsKey(property)) {
|
||||
throw new IllegalArgumentException("Property " + property + " not found");
|
||||
|
||||
@@ -18,19 +18,13 @@
|
||||
|
||||
package org.apache.hudi.common.table;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
|
||||
import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex;
|
||||
import org.apache.hudi.common.config.ConfigClassProperty;
|
||||
import org.apache.hudi.common.config.ConfigGroups;
|
||||
import org.apache.hudi.common.config.ConfigProperty;
|
||||
import org.apache.hudi.common.config.HoodieConfig;
|
||||
import org.apache.hudi.common.config.OrderedProperties;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
@@ -47,12 +41,17 @@ import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
@@ -220,10 +219,7 @@ public class HoodieTableConfig extends HoodieConfig {
|
||||
setValue(PAYLOAD_CLASS_NAME, payloadClassName);
|
||||
// FIXME(vc): wonder if this can be removed. Need to look into history.
|
||||
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
|
||||
if (!isValidChecksum()) {
|
||||
setValue(TABLE_CHECKSUM, String.valueOf(generateChecksum(props)));
|
||||
}
|
||||
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
|
||||
storeProperties(props, outputStream);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
@@ -233,6 +229,34 @@ public class HoodieTableConfig extends HoodieConfig {
|
||||
"hoodie.properties file seems invalid. Please check for left over `.updated` files if any, manually copy it to hoodie.properties and retry");
|
||||
}
|
||||
|
||||
private static Properties getOrderedPropertiesWithTableChecksum(Properties props) {
|
||||
Properties orderedProps = new OrderedProperties(props);
|
||||
orderedProps.put(TABLE_CHECKSUM.key(), String.valueOf(generateChecksum(props)));
|
||||
return orderedProps;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the properties to the given output stream and return the table checksum.
|
||||
*
|
||||
* @param props - properties to be written
|
||||
* @param outputStream - output stream to which properties will be written
|
||||
* @return return the table checksum
|
||||
* @throws IOException
|
||||
*/
|
||||
private static String storeProperties(Properties props, FSDataOutputStream outputStream) throws IOException {
|
||||
String checksum;
|
||||
if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) {
|
||||
checksum = props.getProperty(TABLE_CHECKSUM.key());
|
||||
props.store(outputStream, "Updated at " + Instant.now());
|
||||
} else {
|
||||
Properties propsWithChecksum = getOrderedPropertiesWithTableChecksum(props);
|
||||
propsWithChecksum.store(outputStream, "Properties saved on " + Instant.now());
|
||||
checksum = propsWithChecksum.getProperty(TABLE_CHECKSUM.key());
|
||||
props.setProperty(TABLE_CHECKSUM.key(), checksum);
|
||||
}
|
||||
return checksum;
|
||||
}
|
||||
|
||||
private boolean isValidChecksum() {
|
||||
return contains(TABLE_CHECKSUM) && validateChecksum(props);
|
||||
}
|
||||
@@ -316,13 +340,7 @@ public class HoodieTableConfig extends HoodieConfig {
|
||||
Properties props = new TypedProperties();
|
||||
props.load(in);
|
||||
modifyFn.accept(props, modifyProps);
|
||||
if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) {
|
||||
checksum = props.getProperty(TABLE_CHECKSUM.key());
|
||||
} else {
|
||||
checksum = String.valueOf(generateChecksum(props));
|
||||
props.setProperty(TABLE_CHECKSUM.key(), checksum);
|
||||
}
|
||||
props.store(out, "Updated at " + System.currentTimeMillis());
|
||||
checksum = storeProperties(props, out);
|
||||
}
|
||||
// 4. verify and remove backup.
|
||||
try (FSDataInputStream in = fs.open(cfgPath)) {
|
||||
@@ -385,12 +403,7 @@ public class HoodieTableConfig extends HoodieConfig {
|
||||
if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
|
||||
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
|
||||
}
|
||||
if (hoodieConfig.contains(TABLE_CHECKSUM)) {
|
||||
hoodieConfig.setValue(TABLE_CHECKSUM, hoodieConfig.getString(TABLE_CHECKSUM));
|
||||
} else {
|
||||
hoodieConfig.setValue(TABLE_CHECKSUM, String.valueOf(generateChecksum(hoodieConfig.getProps())));
|
||||
}
|
||||
hoodieConfig.getProps().store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
|
||||
storeProperties(hoodieConfig.getProps(), outputStream);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.common.properties;
|
||||
|
||||
import org.apache.hudi.common.config.OrderedProperties;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class TestOrderedProperties {
|
||||
|
||||
@Test
|
||||
public void testPutPropertiesOrder() {
|
||||
Properties properties = new OrderedProperties();
|
||||
properties.put("key0", "true");
|
||||
properties.put("key1", "false");
|
||||
properties.put("key2", "true");
|
||||
properties.put("key3", "false");
|
||||
properties.put("key4", "true");
|
||||
properties.put("key5", "true");
|
||||
properties.put("key6", "false");
|
||||
properties.put("key7", "true");
|
||||
properties.put("key8", "false");
|
||||
properties.put("key9", "true");
|
||||
|
||||
OrderedProperties typedProperties = new OrderedProperties(properties);
|
||||
assertTypeProperties(typedProperties, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPutAllPropertiesOrder() {
|
||||
Properties firstProp = new OrderedProperties();
|
||||
firstProp.put("key0", "true");
|
||||
firstProp.put("key1", "false");
|
||||
firstProp.put("key2", "true");
|
||||
|
||||
OrderedProperties firstProperties = new OrderedProperties(firstProp);
|
||||
assertTypeProperties(firstProperties, 0);
|
||||
|
||||
OrderedProperties secondProperties = new OrderedProperties();
|
||||
secondProperties.put("key3", "true");
|
||||
secondProperties.put("key4", "false");
|
||||
secondProperties.put("key5", "true");
|
||||
assertTypeProperties(secondProperties, 3);
|
||||
|
||||
OrderedProperties thirdProperties = new OrderedProperties();
|
||||
thirdProperties.putAll(firstProp);
|
||||
thirdProperties.putAll(secondProperties);
|
||||
|
||||
assertEquals(3, firstProp.stringPropertyNames().size());
|
||||
assertEquals(3, secondProperties.stringPropertyNames().size());
|
||||
assertEquals(6, thirdProperties.stringPropertyNames().size());
|
||||
}
|
||||
|
||||
private void assertTypeProperties(OrderedProperties typedProperties, int start) {
|
||||
String[] props = typedProperties.stringPropertyNames().toArray(new String[0]);
|
||||
for (int i = start; i < props.length; i++) {
|
||||
assertEquals(String.format("key%d", i), props[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -22,10 +22,11 @@ import org.apache.hudi.common.config.TypedProperties;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestTypedProperties {
|
||||
@Test
|
||||
@@ -79,58 +80,8 @@ public class TestTypedProperties {
|
||||
properties.put("key1", "true");
|
||||
|
||||
TypedProperties typedProperties = new TypedProperties(properties);
|
||||
assertEquals(true, typedProperties.getBoolean("key1"));
|
||||
assertEquals(true, typedProperties.getBoolean("key1", false));
|
||||
assertEquals(false, typedProperties.getBoolean("key2", false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPropertiesOrder() throws IOException {
|
||||
Properties properties = new TypedProperties();
|
||||
properties.put("key0", "true");
|
||||
properties.put("key1", "false");
|
||||
properties.put("key2", "true");
|
||||
properties.put("key3", "false");
|
||||
properties.put("key4", "true");
|
||||
properties.put("key5", "true");
|
||||
properties.put("key6", "false");
|
||||
properties.put("key7", "true");
|
||||
properties.put("key8", "false");
|
||||
properties.put("key9", "true");
|
||||
|
||||
TypedProperties typedProperties = new TypedProperties(properties);
|
||||
assertTypeProperties(typedProperties, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPutAllProperties() {
|
||||
Properties firstProp = new TypedProperties();
|
||||
firstProp.put("key0", "true");
|
||||
firstProp.put("key1", "false");
|
||||
firstProp.put("key2", "true");
|
||||
|
||||
TypedProperties firstProperties = new TypedProperties(firstProp);
|
||||
assertTypeProperties(firstProperties, 0);
|
||||
|
||||
TypedProperties secondProperties = new TypedProperties();
|
||||
secondProperties.put("key3", "true");
|
||||
secondProperties.put("key4", "false");
|
||||
secondProperties.put("key5", "true");
|
||||
assertTypeProperties(secondProperties, 3);
|
||||
|
||||
TypedProperties thirdProperties = new TypedProperties();
|
||||
thirdProperties.putAll(firstProp);
|
||||
thirdProperties.putAll(secondProperties);
|
||||
|
||||
assertEquals(3, firstProp.stringPropertyNames().size());
|
||||
assertEquals(3, secondProperties.stringPropertyNames().size());
|
||||
assertEquals(6, thirdProperties.stringPropertyNames().size());
|
||||
}
|
||||
|
||||
private void assertTypeProperties(TypedProperties typedProperties, int start) {
|
||||
String[] props = typedProperties.stringPropertyNames().toArray(new String[0]);
|
||||
for (int i = start; i < props.length; i++) {
|
||||
assertEquals(String.format("key%d", i), props[i]);
|
||||
}
|
||||
assertTrue(typedProperties.getBoolean("key1"));
|
||||
assertTrue(typedProperties.getBoolean("key1", false));
|
||||
assertFalse(typedProperties.getBoolean("key2", false));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user