1
0

[HUDI-242] Support for RFC-12/Bootstrapping of external datasets to hudi (#1876)

- [HUDI-418] Bootstrap Index Implementation using HFile with unit-test
 - [HUDI-421] FileSystem View Changes to support Bootstrap with unit-tests
 - [HUDI-424] Implement Query Side Integration for querying tables containing bootstrap file slices
 - [HUDI-423] Implement upsert functionality for handling updates to these bootstrap file slices
 - [HUDI-421] Bootstrap Write Client with tests
 - [HUDI-425] Added HoodieDeltaStreamer support
 - [HUDI-899] Add a knob to change partition-path style while performing metadata bootstrap
 - [HUDI-900] Metadata Bootstrap Key Generator needs to handle complex keys correctly
 - [HUDI-424] Simplify Record reader implementation
 - [HUDI-423] Implement upsert functionality for handling updates to these bootstrap file slices
 - [HUDI-420] Hoodie Demo working with hive and sparkSQL. Also, Hoodie CLI working with bootstrap tables

Co-authored-by: Mehrotra <uditme@amazon.com>
Co-authored-by: Vinoth Chandar <vinoth@apache.org>
Co-authored-by: Balaji Varadarajan <varadarb@uber.com>
This commit is contained in:
vinoth chandar
2020-08-03 20:19:21 -07:00
committed by GitHub
parent 266bce12b3
commit 539621bd33
175 changed files with 7540 additions and 779 deletions

View File

@@ -149,6 +149,35 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}
/**
* Main API to run bootstrap to hudi.
*/
public void bootstrap(Option<Map<String, String>> extraMetadata) {
if (rollbackPending) {
rollBackInflightBootstrap();
}
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
table.bootstrap(jsc, extraMetadata);
}
/**
* Main API to rollback pending bootstrap.
*/
protected void rollBackInflightBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
Option<String> instant = Option.fromJavaOptional(
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
LOG.info("Found pending bootstrap instants. Rolling them back");
table.rollbackBootstrap(jsc, HoodieActiveTimeline.createNewInstantTime());
LOG.info("Finished rolling back pending bootstrap");
}
}
/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
*
@@ -671,7 +700,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
for (String commit : commits) {
rollback(commit);
if (HoodieTimeline.compareTimestamps(commit, HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
rollBackInflightBootstrap();
break;
} else {
rollback(commit);
}
}
}

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap;
/**
* Identifies different types of bootstrap.
*/
public enum BootstrapMode {
/**
* In this mode, record level metadata is generated for each source record and both original record and metadata
* for each record copied.
*/
FULL_RECORD,
/**
* In this mode, record level metadata alone is generated for each source record and stored in new bootstrap location.
*/
METADATA_ONLY
}

View File

@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
public class BootstrapRecordPayload implements HoodieRecordPayload<BootstrapRecordPayload> {
private final GenericRecord record;
public BootstrapRecordPayload(GenericRecord record) {
this.record = record;
}
@Override
public BootstrapRecordPayload preCombine(BootstrapRecordPayload another) {
return this;
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) {
return Option.ofNullable(record);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) {
return Option.ofNullable(record);
}
}

View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import java.util.List;
/**
* Bootstrap Schema Provider. Schema provided in config is used. If not available, use schema from Parquet
*/
public class BootstrapSchemaProvider {
protected final HoodieWriteConfig writeConfig;
public BootstrapSchemaProvider(HoodieWriteConfig writeConfig) {
this.writeConfig = writeConfig;
}
/**
* Main API to select avro schema for bootstrapping.
* @param jsc Java Spark Context
* @param partitions List of partitions with files within them
* @return Avro Schema
*/
public final Schema getBootstrapSchema(JavaSparkContext jsc, List<Pair<String, List<HoodieFileStatus>>> partitions) {
if (writeConfig.getSchema() != null) {
// Use schema specified by user if set
return Schema.parse(writeConfig.getSchema());
}
return getBootstrapSourceSchema(jsc, partitions);
}
/**
* Select a random file to be used to generate avro schema.
* Override this method to get custom schema selection.
* @param jsc Java Spark Context
* @param partitions List of partitions with files within them
* @return Avro Schema
*/
protected Schema getBootstrapSourceSchema(JavaSparkContext jsc,
List<Pair<String, List<HoodieFileStatus>>> partitions) {
return partitions.stream().flatMap(p -> p.getValue().stream())
.map(fs -> {
try {
Path filePath = FileStatusUtils.toPath(fs.getPath());
return ParquetUtils.readAvroSchema(jsc.hadoopConfiguration(), filePath);
} catch (Exception ex) {
return null;
}
}).filter(x -> x != null).findAny().get();
}
}

View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.collection.Pair;
/**
* WriteStatus for Bootstrap.
*/
public class BootstrapWriteStatus extends WriteStatus {
private BootstrapFileMapping sourceFileMapping;
public BootstrapWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
super(trackSuccessRecords, failureFraction);
}
public BootstrapFileMapping getBootstrapSourceFileMapping() {
return sourceFileMapping;
}
public Pair<BootstrapFileMapping, HoodieWriteStat> getBootstrapSourceAndWriteStat() {
return Pair.of(getBootstrapSourceFileMapping(), getStat());
}
public void setBootstrapSourceFileMapping(BootstrapFileMapping sourceFileMapping) {
this.sourceFileMapping = sourceFileMapping;
}
}

View File

@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap;
import java.io.Serializable;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.List;
/**
* Creates RDD of Hoodie Records with complete record data, given a list of partitions to be bootstrapped.
*/
public abstract class FullRecordBootstrapDataProvider implements Serializable {
protected static final Logger LOG = LogManager.getLogger(FullRecordBootstrapDataProvider.class);
protected final TypedProperties props;
protected final transient JavaSparkContext jsc;
public FullRecordBootstrapDataProvider(TypedProperties props, JavaSparkContext jsc) {
this.props = props;
this.jsc = jsc;
}
/**
* Generates a list of input partition and files and returns a RDD representing source.
* @param tableName Hudi Table Name
* @param sourceBasePath Source Base Path
* @param partitionPaths Partition Paths
* @return JavaRDD of input records
*/
public abstract JavaRDD<HoodieRecord> generateInputRecordRDD(String tableName,
String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths);
}

View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap.selector;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* Pluggable Partition Selector for selecting partitions to perform full or metadata-only bootstrapping.
*/
public abstract class BootstrapModeSelector implements Serializable {
protected final HoodieWriteConfig writeConfig;
public BootstrapModeSelector(HoodieWriteConfig writeConfig) {
this.writeConfig = writeConfig;
}
/**
* Classify partitions for the purpose of bootstrapping. For a non-partitioned source, input list will be one entry.
*
* @param partitions List of partitions with files present in each partitions
* @return a partitions grouped by bootstrap mode
*/
public abstract Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions);
}

View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap.selector;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
public class BootstrapRegexModeSelector extends BootstrapModeSelector {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(BootstrapRegexModeSelector.class);
private final Pattern pattern;
private final BootstrapMode bootstrapModeOnMatch;
private final BootstrapMode defaultMode;
public BootstrapRegexModeSelector(HoodieWriteConfig writeConfig) {
super(writeConfig);
this.pattern = Pattern.compile(writeConfig.getBootstrapModeSelectorRegex());
this.bootstrapModeOnMatch = writeConfig.getBootstrapModeForRegexMatch();
this.defaultMode = BootstrapMode.FULL_RECORD.equals(bootstrapModeOnMatch)
? BootstrapMode.METADATA_ONLY : BootstrapMode.FULL_RECORD;
LOG.info("Default Mode :" + defaultMode + ", on Match Mode :" + bootstrapModeOnMatch);
}
@Override
public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
return partitions.stream()
.map(p -> Pair.of(pattern.matcher(p.getKey()).matches() ? bootstrapModeOnMatch : defaultMode, p.getKey()))
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));
}
}

View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap.selector;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.config.HoodieWriteConfig;
public class FullRecordBootstrapModeSelector extends UniformBootstrapModeSelector {
public FullRecordBootstrapModeSelector(HoodieWriteConfig bootstrapConfig) {
super(bootstrapConfig, BootstrapMode.FULL_RECORD);
}
}

View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap.selector;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.config.HoodieWriteConfig;
public class MetadataOnlyBootstrapModeSelector extends UniformBootstrapModeSelector {
public MetadataOnlyBootstrapModeSelector(HoodieWriteConfig bootstrapConfig) {
super(bootstrapConfig, BootstrapMode.METADATA_ONLY);
}
}

View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap.selector;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* A bootstrap selector which employs same bootstrap mode for all partitions.
*/
public abstract class UniformBootstrapModeSelector extends BootstrapModeSelector {
private final BootstrapMode bootstrapMode;
public UniformBootstrapModeSelector(HoodieWriteConfig bootstrapConfig, BootstrapMode bootstrapMode) {
super(bootstrapConfig);
this.bootstrapMode = bootstrapMode;
}
@Override
public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
return partitions.stream().map(p -> Pair.of(bootstrapMode, p))
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(x -> x.getValue().getKey(),
Collectors.toList())));
}
}

View File

@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap.translator;
import java.io.Serializable;
import org.apache.hudi.common.config.TypedProperties;
public abstract class BootstrapPartitionPathTranslator implements Serializable {
private final TypedProperties properties;
public BootstrapPartitionPathTranslator(TypedProperties properties) {
this.properties = properties;
}
/**
* Given a bootstrap partition path, translated partition path.
*
* @param bootStrapPartitionPath bootstrap Partition Path
* @return Translated Path
*/
public abstract String getBootstrapTranslatedPath(String bootStrapPartitionPath);
}

View File

@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap.translator;
import org.apache.hudi.common.config.TypedProperties;
/**
* Return same path as bootstrap partition path.
*/
public class IdentityBootstrapPartitionPathTranslator extends BootstrapPartitionPathTranslator {
public IdentityBootstrapPartitionPathTranslator(TypedProperties properties) {
super(properties);
}
@Override
public String getBootstrapTranslatedPath(String bootStrapPartitionPath) {
return bootStrapPartitionPath;
}
}

View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.utils;
import java.util.Iterator;
import java.util.function.Function;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
public class MergingIterator<T extends GenericRecord> implements Iterator<T> {
private final Iterator<T> leftIterator;
private final Iterator<T> rightIterator;
private final Function<Pair<T,T>, T> mergeFunction;
public MergingIterator(Iterator<T> leftIterator, Iterator<T> rightIterator, Function<Pair<T,T>, T> mergeFunction) {
this.leftIterator = leftIterator;
this.rightIterator = rightIterator;
this.mergeFunction = mergeFunction;
}
@Override
public boolean hasNext() {
boolean leftHasNext = leftIterator.hasNext();
boolean rightHasNext = rightIterator.hasNext();
ValidationUtils.checkArgument(leftHasNext == rightHasNext);
return leftHasNext;
}
@Override
public T next() {
return mergeFunction.apply(Pair.of(leftIterator.next(), rightIterator.next()));
}
}

View File

@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.config;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
/**
* Bootstrap specific configs.
*/
public class HoodieBootstrapConfig extends DefaultHoodieConfig {
public static final String BOOTSTRAP_BASE_PATH_PROP = "hoodie.bootstrap.base.path";
public static final String BOOTSTRAP_MODE_SELECTOR = "hoodie.bootstrap.mode.selector";
public static final String FULL_BOOTSTRAP_INPUT_PROVIDER = "hoodie.bootstrap.full.input.provider";
public static final String BOOTSTRAP_KEYGEN_CLASS = "hoodie.bootstrap.keygen.class";
public static final String BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS =
"hoodie.bootstrap.partitionpath.translator.class";
public static final String DEFAULT_BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS =
IdentityBootstrapPartitionPathTranslator.class.getName();
public static final String BOOTSTRAP_PARALLELISM = "hoodie.bootstrap.parallelism";
public static final String DEFAULT_BOOTSTRAP_PARALLELISM = "1500";
// Used By BootstrapRegexModeSelector class. When a partition path matches the regex, the corresponding
// mode will be used. Otherwise, the alternative mode will be used.
public static final String BOOTSTRAP_MODE_SELECTOR_REGEX = "hoodie.bootstrap.mode.selector.regex";
public static final String BOOTSTRAP_MODE_SELECTOR_REGEX_MODE = "hoodie.bootstrap.mode.selector.regex.mode";
public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX = ".*";
public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE = BootstrapMode.METADATA_ONLY.name();
public HoodieBootstrapConfig(Properties props) {
super(props);
}
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private final Properties props = new Properties();
public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
return this;
}
}
public Builder withBootstrapBasePath(String basePath) {
props.setProperty(BOOTSTRAP_BASE_PATH_PROP, basePath);
return this;
}
public Builder withBootstrapModeSelector(String partitionSelectorClass) {
props.setProperty(BOOTSTRAP_MODE_SELECTOR, partitionSelectorClass);
return this;
}
public Builder withFullBootstrapInputProvider(String partitionSelectorClass) {
props.setProperty(FULL_BOOTSTRAP_INPUT_PROVIDER, partitionSelectorClass);
return this;
}
public Builder withBootstrapKeyGenClass(String keyGenClass) {
props.setProperty(BOOTSTRAP_KEYGEN_CLASS, keyGenClass);
return this;
}
public Builder withBootstrapPartitionPathTranslatorClass(String partitionPathTranslatorClass) {
props.setProperty(BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS, partitionPathTranslatorClass);
return this;
}
public Builder withBootstrapParallelism(int parallelism) {
props.setProperty(BOOTSTRAP_PARALLELISM, String.valueOf(parallelism));
return this;
}
public Builder withBootstrapModeSelectorRegex(String regex) {
props.setProperty(BOOTSTRAP_MODE_SELECTOR_REGEX, regex);
return this;
}
public Builder withBootstrapModeForRegexMatch(BootstrapMode modeForRegexMatch) {
props.setProperty(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE, modeForRegexMatch.name());
return this;
}
public Builder fromProperties(Properties props) {
this.props.putAll(props);
return this;
}
public HoodieBootstrapConfig build() {
HoodieBootstrapConfig config = new HoodieBootstrapConfig(props);
setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_PARALLELISM), BOOTSTRAP_PARALLELISM,
DEFAULT_BOOTSTRAP_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS),
BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS, DEFAULT_BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS);
setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_MODE_SELECTOR), BOOTSTRAP_MODE_SELECTOR,
MetadataOnlyBootstrapModeSelector.class.getCanonicalName());
setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_MODE_SELECTOR_REGEX), BOOTSTRAP_MODE_SELECTOR_REGEX,
DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX);
setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE),
BOOTSTRAP_MODE_SELECTOR_REGEX_MODE, DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE);
BootstrapMode.valueOf(props.getProperty(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE));
return config;
}
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.config;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -129,6 +130,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
"_.hoodie.allow.multi.write.on.same.instant";
public static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false";
public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_SCHEMA + ".externalTransformation";
public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = "false";
private ConsistencyGuardConfig consistencyGuardConfig;
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
@@ -136,7 +140,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
private FileSystemViewStorageConfig viewStorageConfig;
private HoodieWriteConfig(Properties props) {
protected HoodieWriteConfig(Properties props) {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
@@ -180,6 +184,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP));
}
public boolean shouldUseExternalSchemaTransformation() {
return Boolean.parseBoolean(props.getProperty(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION));
}
public Integer getTimelineLayoutVersion() {
return Integer.parseInt(props.getProperty(TIMELINE_LAYOUT_VERSION));
}
@@ -675,13 +683,46 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP);
}
public String getBootstrapSourceBasePath() {
return props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP);
}
public String getBootstrapModeSelectorClass() {
return props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR);
}
public String getFullBootstrapInputProvider() {
return props.getProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER);
}
public String getBootstrapKeyGeneratorClass() {
return props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS);
}
public String getBootstrapModeSelectorRegex() {
return props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR_REGEX);
}
public BootstrapMode getBootstrapModeForRegexMatch() {
return BootstrapMode.valueOf(props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR_REGEX_MODE));
}
public String getBootstrapPartitionPathTranslatorClass() {
return props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS);
}
public int getBootstrapParallelism() {
return Integer.parseInt(props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM));
}
public static class Builder {
private final Properties props = new Properties();
protected final Properties props = new Properties();
private boolean isIndexConfigSet = false;
private boolean isStorageConfigSet = false;
private boolean isCompactionConfigSet = false;
private boolean isMetricsConfigSet = false;
private boolean isBootstrapConfigSet = false;
private boolean isMemoryConfigSet = false;
private boolean isViewConfigSet = false;
private boolean isConsistencyGuardSet = false;
@@ -805,6 +846,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withBootstrapConfig(HoodieBootstrapConfig bootstrapConfig) {
props.putAll(bootstrapConfig.getProps());
isBootstrapConfigSet = true;
return this;
}
public Builder withAutoCommit(boolean autoCommit) {
props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
return this;
@@ -863,7 +910,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public HoodieWriteConfig build() {
public Builder withExternalSchemaTrasformation(boolean enabled) {
props.setProperty(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, String.valueOf(enabled));
return this;
}
public Builder withProperties(Properties properties) {
this.props.putAll(properties);
return this;
}
protected void setDefaults() {
// Check for mandatory properties
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM,
@@ -916,6 +973,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !isCompactionConfigSet,
HoodieCompactionConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isMetricsConfigSet, HoodieMetricsConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isBootstrapConfigSet,
HoodieBootstrapConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isMemoryConfigSet, HoodieMemoryConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isViewConfigSet,
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build());
@@ -924,15 +983,24 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !isCallbackConfigSet,
HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !props.containsKey(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION),
EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION);
setDefaultOnCondition(props, !props.containsKey(TIMELINE_LAYOUT_VERSION), TIMELINE_LAYOUT_VERSION,
String.valueOf(TimelineLayoutVersion.CURR_VERSION));
}
private void validate() {
String layoutVersion = props.getProperty(TIMELINE_LAYOUT_VERSION);
// Ensure Layout Version is good
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
Objects.requireNonNull(props.getProperty(BASE_PATH_PROP));
}
public HoodieWriteConfig build() {
setDefaults();
validate();
// Build WriteConfig at the end
HoodieWriteConfig config = new HoodieWriteConfig(props);
Objects.requireNonNull(config.getBasePath());
return config;
}
}

View File

@@ -165,7 +165,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
Option recordMetadata = hoodieRecord.getData().getMetadata();
try {
Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(originalSchema);
Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(writerSchema);
if (avroRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get()));
@@ -212,7 +212,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
private void doAppend(Map<HeaderMetadataType, String> header) {
try {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchemaWithMetafields.toString());
if (recordList.size() > 0) {
writer = writer.appendBlock(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
recordList.clear();

View File

@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
/**
* This class is essentially same as Create Handle but overrides two things
* 1) Schema : Metadata bootstrap writes only metadata fields as part of write. So, setup the writer schema accordingly.
* 2) canWrite is overridden to always return true so that skeleton file and bootstrap file is aligned and we don't end up
* writing more than 1 skeleton file for the same bootstrap file.
* @param <T> HoodieRecordPayload
*/
public class HoodieBootstrapHandle<T extends HoodieRecordPayload> extends HoodieCreateHandle<T> {
public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, commitTime, hoodieTable, partitionPath, fileId,
Pair.of(HoodieAvroUtils.RECORD_KEY_SCHEMA,
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.RECORD_KEY_SCHEMA)), sparkTaskContextSupplier);
}
@Override
public boolean canWrite(HoodieRecord record) {
return true;
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.io;
import org.apache.avro.Schema;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
@@ -56,8 +58,16 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
private boolean useWriterSchema = false;
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config),
sparkTaskContextSupplier);
}
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, writerSchemaIncludingAndExcludingMetadataPair,
sparkTaskContextSupplier);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
@@ -68,8 +78,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
this.fileWriter =
HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.sparkTaskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
@@ -132,9 +141,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
while (recordIterator.hasNext()) {
HoodieRecord<T> record = recordIterator.next();
if (useWriterSchema) {
write(record, record.getData().getInsertValue(writerSchema));
write(record, record.getData().getInsertValue(writerSchemaWithMetafields));
} else {
write(record, record.getData().getInsertValue(originalSchema));
write(record, record.getData().getInsertValue(writerSchema));
}
}
} catch (IOException io) {

View File

@@ -67,6 +67,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private long updatedRecordsWritten = 0;
private long insertRecordsWritten = 0;
private boolean useWriterSchema;
private HoodieBaseFile baseFileToMerge;
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
@@ -88,6 +89,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
}
@Override
public Schema getWriterSchemaWithMetafields() {
return writerSchemaWithMetafields;
}
public Schema getWriterSchema() {
return writerSchema;
}
@@ -95,12 +100,13 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
/**
* Extract old file path, initialize StorageWriter and WriteStatus.
*/
private void init(String fileId, String partitionPath, HoodieBaseFile dataFileToBeMerged) {
private void init(String fileId, String partitionPath, HoodieBaseFile baseFileToMerge) {
LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId);
this.baseFileToMerge = baseFileToMerge;
this.writtenRecordKeys = new HashSet<>();
writeStatus.setStat(new HoodieWriteStat());
try {
String latestValidFilePath = dataFileToBeMerged.getFileName();
String latestValidFilePath = baseFileToMerge.getFileName();
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
@@ -126,8 +132,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
createMarkerFile(partitionPath, newFileName);
// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, sparkTaskContextSupplier);
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
@@ -145,7 +150,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
long memoryForMerge = SparkConfigUtils.getMaxMemoryPerPartitionMerge(config.getProps());
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema));
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema));
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
@@ -216,7 +221,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
try {
Option<IndexedRecord> combinedAvroRecord =
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchema : originalSchema);
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchemaWithMetafields : writerSchema);
if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully write the the combined new
@@ -241,7 +246,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
fileWriter.writeAvro(key, oldRecord);
} catch (ClassCastException e) {
LOG.error("Schema mismatch when rewriting old record " + oldRecord + " from file " + getOldFilePath()
+ " to file " + newFilePath + " with writerSchema " + writerSchema.toString(true));
+ " to file " + newFilePath + " with writerSchema " + writerSchemaWithMetafields.toString(true));
throw new HoodieUpsertException(errMsg, e);
} catch (IOException e) {
LOG.error("Failed to merge old record into new file for key " + key + " from old file " + getOldFilePath()
@@ -262,9 +267,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
HoodieRecord<T> hoodieRecord = newRecordsItr.next();
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(originalSchema));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
}
insertRecordsWritten++;
}
@@ -312,4 +317,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
public IOType getIOType() {
return IOType.MERGE;
}
public HoodieBaseFile baseFileForMerge() {
return baseFileToMerge;
}
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileWriter;
@@ -51,8 +52,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class);
protected final Schema originalSchema;
protected final Schema writerSchema;
protected final Schema writerSchemaWithMetafields;
protected HoodieTimer timer;
protected final WriteStatus writeStatus;
protected final String partitionPath;
@@ -62,11 +63,18 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T> hoodieTable, SparkTaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, partitionPath, fileId, hoodieTable,
getWriterSchemaIncludingAndExcludingMetadataPair(config), sparkTaskContextSupplier);
}
protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId,
HoodieTable<T> hoodieTable, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, hoodieTable);
this.partitionPath = partitionPath;
this.fileId = fileId;
this.originalSchema = new Schema.Parser().parse(config.getSchema());
this.writerSchema = HoodieAvroUtils.createHoodieWriteSchema(originalSchema);
this.writerSchema = writerSchemaIncludingAndExcludingMetadataPair.getKey();
this.writerSchemaWithMetafields = writerSchemaIncludingAndExcludingMetadataPair.getValue();
this.timer = new HoodieTimer().startTimer();
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
@@ -74,6 +82,19 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
this.writeToken = makeWriteToken();
}
/**
* Returns writer schema pairs containing
* (a) Writer Schema from client
* (b) (a) with hoodie metadata fields.
* @param config Write Config
* @return
*/
protected static Pair<Schema, Schema> getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config) {
Schema originalSchema = new Schema.Parser().parse(config.getSchema());
Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema);
return Pair.of(originalSchema, hoodieSchema);
}
/**
* Generate a write token based on the currently running spark task and its place in the spark dag.
*/
@@ -103,8 +124,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
markerFiles.create(partitionPath, dataFileName, getIOType());
}
public Schema getWriterSchema() {
return writerSchema;
public Schema getWriterSchemaWithMetafields() {
return writerSchemaWithMetafields;
}
/**
@@ -142,7 +163,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
*/
protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writerSchema);
return HoodieAvroUtils.rewriteRecord(record, writerSchemaWithMetafields);
}
public abstract WriteStatus close();

View File

@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
import java.util.List;
import java.util.stream.Collectors;
/**
* Base class for all the built-in key generators. Contains methods structured for
* code reuse amongst them.
*/
public abstract class BuiltinKeyGenerator extends KeyGenerator {
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
/**
* Generate a record Key out of provided generic record.
*/
public abstract String getRecordKey(GenericRecord record);
/**
* Generate a partition path out of provided generic record.
*/
public abstract String getPartitionPath(GenericRecord record);
/**
* Generate a Hoodie Key out of provided generic record.
*/
public final HoodieKey getKey(GenericRecord record) {
if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
}
return new HoodieKey(getRecordKey(record), getPartitionPath(record));
}
/**
* Return fields that constitute record key. Used by Metadata bootstrap.
* Have a base implementation inorder to prevent forcing custom KeyGenerator implementation
* to implement this method
* @return list of record key fields
*/
public List<String> getRecordKeyFields() {
throw new IllegalStateException("This method is expected to be overridden by subclasses");
}
/**
* Return fields that constiture partition path. Used by Metadata bootstrap.
* Have a base implementation inorder to prevent forcing custom KeyGenerator implementation
* to implement this method
* @return list of partition path fields
*/
public List<String> getPartitionPathFields() {
throw new IllegalStateException("This method is expected to be overridden by subclasses");
}
@Override
public final List<String> getRecordKeyFieldNames() {
// For nested columns, pick top level column name
return getRecordKeyFields().stream().map(k -> {
int idx = k.indexOf('.');
return idx > 0 ? k.substring(0, idx) : k;
}).collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyException;
public class KeyGenUtils {
protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
protected static final String DEFAULT_PARTITION_PATH = "default";
protected static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
public static String getRecordKey(GenericRecord record, List<String> recordKeyFields) {
boolean keyIsNullEmpty = true;
StringBuilder recordKey = new StringBuilder();
for (String recordKeyField : recordKeyFields) {
String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true);
if (recordKeyValue == null) {
recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ",");
} else if (recordKeyValue.isEmpty()) {
recordKey.append(recordKeyField + ":" + EMPTY_RECORDKEY_PLACEHOLDER + ",");
} else {
recordKey.append(recordKeyField + ":" + recordKeyValue + ",");
keyIsNullEmpty = false;
}
}
recordKey.deleteCharAt(recordKey.length() - 1);
if (keyIsNullEmpty) {
throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: "
+ recordKeyFields.toString() + " cannot be entirely null or empty.");
}
return recordKey.toString();
}
public static String getRecordPartitionPath(GenericRecord record, List<String> partitionPathFields,
boolean hiveStylePartitioning, boolean encodePartitionPath) {
StringBuilder partitionPath = new StringBuilder();
for (String partitionPathField : partitionPathFields) {
String fieldVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true);
if (fieldVal == null || fieldVal.isEmpty()) {
partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + DEFAULT_PARTITION_PATH
: DEFAULT_PARTITION_PATH);
} else {
if (encodePartitionPath) {
try {
fieldVal = URLEncoder.encode(fieldVal, StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException uoe) {
throw new HoodieException(uoe.getMessage(), uoe);
}
}
partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + fieldVal : fieldVal);
}
partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
}
partitionPath.deleteCharAt(partitionPath.length() - 1);
return partitionPath.toString();
}
public static String getRecordKey(GenericRecord record, String recordKeyField) {
String recordKey = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true);
if (recordKey == null || recordKey.isEmpty()) {
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
}
return recordKey;
}
public static String getPartitionPath(GenericRecord record, String partitionPathField,
boolean hiveStylePartitioning, boolean encodePartitionPath) {
String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true);
if (partitionPath == null || partitionPath.isEmpty()) {
partitionPath = DEFAULT_PARTITION_PATH;
}
if (encodePartitionPath) {
try {
partitionPath = URLEncoder.encode(partitionPath, StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException uoe) {
throw new HoodieException(uoe.getMessage(), uoe);
}
}
if (hiveStylePartitioning) {
partitionPath = partitionPathField + "=" + partitionPath;
}
return partitionPath;
}
}

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.avro.generic.GenericRecord;
import java.io.Serializable;
import java.util.List;
/**
* Abstract class to extend for plugging in extraction of {@link HoodieKey} from an Avro record.
*/
public abstract class KeyGenerator implements Serializable {
protected transient TypedProperties config;
protected KeyGenerator(TypedProperties config) {
this.config = config;
}
/**
* Generate a Hoodie Key out of provided generic record.
*/
public abstract HoodieKey getKey(GenericRecord record);
/**
* Used during bootstrap, to project out only the record key fields from bootstrap source dataset.
*
* @return list of field names, when concatenated make up the record key.
*/
public List<String> getRecordKeyFieldNames() {
throw new UnsupportedOperationException("Bootstrap not supported for key generator. "
+ "Please override this method in your custom key generator.");
}
}

View File

@@ -18,9 +18,6 @@
package org.apache.hudi.table;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -33,30 +30,32 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.BootstrapCommitActionExecutor;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.MergeHelper;
import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -97,7 +96,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
@Override
public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
return new BulkInsertCommitActionExecutor<>(jsc, config,
return new BulkInsertCommitActionExecutor(jsc, config,
this, instantTime, records, userDefinedBulkInsertPartitioner).execute();
}
@@ -121,7 +120,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
@Override
public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
return new BulkInsertPreppedCommitActionExecutor<>(jsc, config,
return new BulkInsertPreppedCommitActionExecutor(jsc, config,
this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute();
}
@@ -135,6 +134,16 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
}
@Override
public HoodieBootstrapWriteMetadata bootstrap(JavaSparkContext jsc, Option<Map<String, String>> extraMetadata) {
return new BootstrapCommitActionExecutor(jsc, config, this, extraMetadata).execute();
}
@Override
public void rollbackBootstrap(JavaSparkContext jsc, String instantTime) {
new CopyOnWriteRestoreActionExecutor(jsc, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
}
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
@@ -148,25 +157,10 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
HoodieFileReader<IndexedRecord> storageReader =
HoodieFileReaderFactory.getFileReader(getHadoopConf(), upsertHandle.getOldFilePath());
try {
wrapper =
new SparkBoundedInMemoryExecutor(config, storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
new UpdateHandler(upsertHandle), x -> x);
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
} finally {
upsertHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
}
}
MergeHelper.runMerge(this, upsertHandle);
}
// TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "

View File

@@ -28,10 +28,14 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.BootstrapDeltaCommitActionExecutor;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.BulkInsertPreppedDeltaCommitActionExecutor;
@@ -84,7 +88,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
@Override
public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
return new BulkInsertDeltaCommitActionExecutor<>(jsc, config,
return new BulkInsertDeltaCommitActionExecutor(jsc, config,
this, instantTime, records, userDefinedBulkInsertPartitioner).execute();
}
@@ -108,7 +112,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
@Override
public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
return new BulkInsertPreppedDeltaCommitActionExecutor<>(jsc, config,
return new BulkInsertPreppedDeltaCommitActionExecutor(jsc, config,
this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute();
}
@@ -125,6 +129,16 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
return compactionExecutor.execute();
}
@Override
public HoodieBootstrapWriteMetadata bootstrap(JavaSparkContext jsc, Option<Map<String, String>> extraMetadata) {
return new BootstrapDeltaCommitActionExecutor(jsc, config, this, extraMetadata).execute();
}
@Override
public void rollbackBootstrap(JavaSparkContext jsc, String instantTime) {
new MergeOnReadRestoreActionExecutor(jsc, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
}
@Override
public HoodieRollbackMetadata rollback(JavaSparkContext jsc,
String rollbackInstantTime,

View File

@@ -60,6 +60,7 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -331,6 +332,20 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
public abstract HoodieWriteMetadata compact(JavaSparkContext jsc,
String compactionInstantTime);
/**
* Perform metadata/full bootstrap of a Hudi table.
* @param jsc JavaSparkContext
* @param extraMetadata Additional Metadata for storing in commit file.
* @return HoodieBootstrapWriteMetadata
*/
public abstract HoodieBootstrapWriteMetadata bootstrap(JavaSparkContext jsc, Option<Map<String, String>> extraMetadata);
/**
* Perform rollback of bootstrap of a Hudi table.
* @param jsc JavaSparkContext
*/
public abstract void rollbackBootstrap(JavaSparkContext jsc, String instantTime);
/**
* Executes a new clean action.
*

View File

@@ -0,0 +1,356 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.bootstrap;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
import org.apache.hudi.client.bootstrap.BootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieBootstrapHandle;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.action.commit.CommitActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class BootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseCommitActionExecutor<T, HoodieBootstrapWriteMetadata> {
private static final Logger LOG = LogManager.getLogger(BootstrapCommitActionExecutor.class);
protected String bootstrapSchema = null;
private transient FileSystem bootstrapSourceFileSystem;
public BootstrapCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table,
Option<Map<String, String>> extraMetadata) {
super(jsc, new HoodieWriteConfig.Builder().withProps(config.getProps())
.withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
.withBulkInsertParallelism(config.getBootstrapParallelism())
.build(), table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, WriteOperationType.BOOTSTRAP,
extraMetadata);
bootstrapSourceFileSystem = FSUtils.getFs(config.getBootstrapSourceBasePath(), hadoopConf);
}
private void validate() {
ValidationUtils.checkArgument(config.getBootstrapSourceBasePath() != null,
"Ensure Bootstrap Source Path is set");
ValidationUtils.checkArgument(config.getBootstrapModeSelectorClass() != null,
"Ensure Bootstrap Partition Selector is set");
ValidationUtils.checkArgument(config.getBootstrapKeyGeneratorClass() != null,
"Ensure bootstrap key generator class is set");
}
@Override
public HoodieBootstrapWriteMetadata execute() {
validate();
try {
HoodieTableMetaClient metaClient = table.getMetaClient();
Option<HoodieInstant> completetedInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
ValidationUtils.checkArgument(!completetedInstant.isPresent(),
"Active Timeline is expected to be empty for bootstrap to be performed. "
+ "If you want to re-bootstrap, please rollback bootstrap first !!");
Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();
// First run metadata bootstrap which will auto commit
Option<HoodieWriteMetadata> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
// if there are full bootstrap to be performed, perform that too
Option<HoodieWriteMetadata> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
protected String getSchemaToStoreInCommit() {
return bootstrapSchema;
}
/**
* Perform Metadata Bootstrap.
* @param partitionFilesList List of partitions and files within that partitions
*/
protected Option<HoodieWriteMetadata> metadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
if (null == partitionFilesList || partitionFilesList.isEmpty()) {
return Option.empty();
}
HoodieTableMetaClient metaClient = table.getMetaClient();
metaClient.getActiveTimeline().createNewInstant(
new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(),
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
metaClient.getCommitActionType(), HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS), Option.empty());
JavaRDD<BootstrapWriteStatus> bootstrapWriteStatuses = runMetadataBootstrap(partitionFilesList);
HoodieWriteMetadata result = new HoodieWriteMetadata();
updateIndexAndCommitIfNeeded(bootstrapWriteStatuses.map(w -> w), result);
return Option.of(result);
}
@Override
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) {
// Perform bootstrap index write and then commit. Make sure both record-key and bootstrap-index
// is all done in a single job DAG.
Map<String, List<Pair<BootstrapFileMapping, HoodieWriteStat>>> bootstrapSourceAndStats =
result.getWriteStatuses().collect().stream()
.map(w -> {
BootstrapWriteStatus ws = (BootstrapWriteStatus) w;
return Pair.of(ws.getBootstrapSourceFileMapping(), ws.getStat());
}).collect(Collectors.groupingBy(w -> w.getKey().getPartitionPath()));
HoodieTableMetaClient metaClient = table.getMetaClient();
try (BootstrapIndex.IndexWriter indexWriter = BootstrapIndex.getBootstrapIndex(metaClient)
.createWriter(metaClient.getTableConfig().getBootstrapBasePath().get())) {
LOG.info("Starting to write bootstrap index for source " + config.getBootstrapSourceBasePath() + " in table "
+ config.getBasePath());
indexWriter.begin();
bootstrapSourceAndStats.forEach((key, value) -> indexWriter.appendNextPartition(key,
value.stream().map(Pair::getKey).collect(Collectors.toList())));
indexWriter.finish();
LOG.info("Finished writing bootstrap index for source " + config.getBootstrapSourceBasePath() + " in table "
+ config.getBasePath());
}
super.commit(extraMetadata, result, bootstrapSourceAndStats.values().stream()
.flatMap(f -> f.stream().map(Pair::getValue)).collect(Collectors.toList()));
LOG.info("Committing metadata bootstrap !!");
}
/**
* Perform Metadata Bootstrap.
* @param partitionFilesList List of partitions and files within that partitions
*/
protected Option<HoodieWriteMetadata> fullBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
if (null == partitionFilesList || partitionFilesList.isEmpty()) {
return Option.empty();
}
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
FullRecordBootstrapDataProvider inputProvider =
(FullRecordBootstrapDataProvider) ReflectionUtils.loadClass(config.getFullBootstrapInputProvider(),
properties, jsc);
JavaRDD<HoodieRecord> inputRecordsRDD =
inputProvider.generateInputRecordRDD("bootstrap_source", config.getBootstrapSourceBasePath(),
partitionFilesList);
// Start Full Bootstrap
final HoodieInstant requested = new HoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(),
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
table.getActiveTimeline().createNewInstant(requested);
// Setup correct schema and run bulk insert.
return Option.of(getBulkInsertActionExecutor(inputRecordsRDD).execute());
}
protected CommitActionExecutor<T> getBulkInsertActionExecutor(JavaRDD<HoodieRecord> inputRecordsRDD) {
return new BulkInsertCommitActionExecutor(jsc, new HoodieWriteConfig.Builder().withProps(config.getProps())
.withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
inputRecordsRDD, extraMetadata);
}
private BootstrapWriteStatus handleMetadataBootstrap(String srcPartitionPath, String partitionPath,
HoodieFileStatus srcFileStatus, KeyGenerator keyGenerator) {
Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
HoodieBootstrapHandle bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
table, partitionPath, FSUtils.createNewFileIdPfx(), table.getSparkTaskContextSupplier());
Schema avroSchema = null;
try {
ParquetMetadata readFooter = ParquetFileReader.readFooter(table.getHadoopConf(), sourceFilePath,
ParquetMetadataConverter.NO_FILTER);
MessageType parquetSchema = readFooter.getFileMetaData().getSchema();
avroSchema = new AvroSchemaConverter().convert(parquetSchema);
Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema,
keyGenerator.getRecordKeyFieldNames());
LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);
AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);
AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);
BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null;
try (ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build()) {
wrapper = new SparkBoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config,
new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
HoodieRecord rec = new HoodieRecord(new HoodieKey(recKey, partitionPath), payload);
return rec;
});
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
} finally {
bootstrapHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
}
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
BootstrapWriteStatus writeStatus = (BootstrapWriteStatus)bootstrapHandle.getWriteStatus();
BootstrapFileMapping bootstrapFileMapping = new BootstrapFileMapping(
config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath,
srcFileStatus, writeStatus.getFileId());
writeStatus.setBootstrapSourceFileMapping(bootstrapFileMapping);
return writeStatus;
}
/**
* Return Bootstrap Mode selections for partitions listed and figure out bootstrap Schema.
* @return
* @throws IOException
*/
private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndProcessSourcePartitions() throws IOException {
List<Pair<String, List<HoodieFileStatus>>> folders =
BootstrapUtils.getAllLeafFoldersWithFiles(bootstrapSourceFileSystem,
config.getBootstrapSourceBasePath(), path -> {
// TODO: Needs to be abstracted out when supporting different formats
// TODO: Remove hoodieFilter
return path.getName().endsWith(HoodieFileFormat.PARQUET.getFileExtension());
});
LOG.info("Fetching Bootstrap Schema !!");
BootstrapSchemaProvider sourceSchemaProvider = new BootstrapSchemaProvider(config);
bootstrapSchema = sourceSchemaProvider.getBootstrapSchema(jsc, folders).toString();
LOG.info("Bootstrap Schema :" + bootstrapSchema);
BootstrapModeSelector selector =
(BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);
Map<BootstrapMode, List<String>> result = selector.select(folders);
Map<String, List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(
Collectors.toMap(Pair::getKey, Pair::getValue));
// Ensure all partitions are accounted for
ValidationUtils.checkArgument(partitionToFiles.keySet().equals(
result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));
return result.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
.map(p -> Pair.of(p, partitionToFiles.get(p))).collect(Collectors.toList())))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
private JavaRDD<BootstrapWriteStatus> runMetadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitions) {
if (null == partitions || partitions.isEmpty()) {
return jsc.emptyRDD();
}
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(config.getBootstrapKeyGeneratorClass(),
properties);
BootstrapPartitionPathTranslator translator = (BootstrapPartitionPathTranslator) ReflectionUtils.loadClass(
config.getBootstrapPartitionPathTranslatorClass(), properties);
List<Pair<String, Pair<String, HoodieFileStatus>>> bootstrapPaths = partitions.stream()
.flatMap(p -> {
String translatedPartitionPath = translator.getBootstrapTranslatedPath(p.getKey());
return p.getValue().stream().map(f -> Pair.of(p.getKey(), Pair.of(translatedPartitionPath, f)));
})
.collect(Collectors.toList());
return jsc.parallelize(bootstrapPaths, config.getBootstrapParallelism())
.map(partitionFsPair -> handleMetadataBootstrap(partitionFsPair.getLeft(), partitionFsPair.getRight().getLeft(),
partitionFsPair.getRight().getRight(), keyGenerator));
}
//TODO: Once we decouple commit protocol, we should change the class hierarchy to avoid doing this.
@Override
protected Partitioner getUpsertPartitioner(WorkloadProfile profile) {
throw new UnsupportedOperationException("Should not called in bootstrap code path");
}
@Override
protected Partitioner getInsertPartitioner(WorkloadProfile profile) {
throw new UnsupportedOperationException("Should not called in bootstrap code path");
}
@Override
protected Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
throw new UnsupportedOperationException("Should not called in bootstrap code path");
}
@Override
protected Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
throw new UnsupportedOperationException("Should not called in bootstrap code path");
}
}

View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.bootstrap;
import java.util.Map;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.CommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class BootstrapDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BootstrapCommitActionExecutor<T> {
public BootstrapDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable<?> table,
Option<Map<String, String>> extraMetadata) {
super(jsc, config, table, extraMetadata);
}
protected CommitActionExecutor<T> getBulkInsertActionExecutor(JavaRDD<HoodieRecord> inputRecordsRDD) {
return new BulkInsertDeltaCommitActionExecutor(jsc, new HoodieWriteConfig.Builder().withProps(config.getProps())
.withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
inputRecordsRDD, extraMetadata);
}
}

View File

@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.bootstrap;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieBootstrapHandle;
import java.io.IOException;
/**
* Consumer that dequeues records from queue and sends to Merge Handle for writing.
*/
public class BootstrapRecordConsumer extends BoundedInMemoryQueueConsumer<HoodieRecord, Void> {
private final HoodieBootstrapHandle bootstrapHandle;
public BootstrapRecordConsumer(HoodieBootstrapHandle bootstrapHandle) {
this.bootstrapHandle = bootstrapHandle;
}
@Override
protected void consumeOneRecord(HoodieRecord record) {
try {
bootstrapHandle.write(record, record.getData().getInsertValue(bootstrapHandle.getWriterSchemaWithMetafields()));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
@Override
protected void finish() {}
@Override
protected Void getResult() {
return null;
}
}

View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.bootstrap;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.stream.Collectors;
public class BootstrapUtils {
/**
* Returns leaf folders with files under a path.
* @param fs File System
* @param basePathStr Base Path to look for leaf folders
* @param filePathFilter Filters to skip directories/paths
* @return list of partition paths with files under them.
* @throws IOException
*/
public static List<Pair<String, List<HoodieFileStatus>>> getAllLeafFoldersWithFiles(FileSystem fs, String basePathStr,
PathFilter filePathFilter) throws IOException {
final Path basePath = new Path(basePathStr);
final Map<Integer, List<String>> levelToPartitions = new HashMap<>();
final Map<String, List<HoodieFileStatus>> partitionToFiles = new HashMap<>();
FSUtils.processFiles(fs, basePathStr, (status) -> {
if (status.isFile() && filePathFilter.accept(status.getPath())) {
String relativePath = FSUtils.getRelativePartitionPath(basePath, status.getPath().getParent());
List<HoodieFileStatus> statusList = partitionToFiles.get(relativePath);
if (null == statusList) {
Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count();
List<String> dirs = levelToPartitions.get(level);
if (null == dirs) {
dirs = new ArrayList<>();
levelToPartitions.put(level, dirs);
}
dirs.add(relativePath);
statusList = new ArrayList<>();
partitionToFiles.put(relativePath, statusList);
}
statusList.add(FileStatusUtils.fromFileStatus(status));
}
return true;
}, true);
OptionalInt maxLevelOpt = levelToPartitions.keySet().stream().mapToInt(x -> x).max();
int maxLevel = maxLevelOpt.orElse(-1);
return maxLevel >= 0 ? levelToPartitions.get(maxLevel).stream()
.map(d -> Pair.of(d, partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>();
}
}

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.bootstrap;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.action.HoodieWriteMetadata;
/**
* Write Result for a bootstrap operation.
*/
public class HoodieBootstrapWriteMetadata {
private final Option<HoodieWriteMetadata> metadataBootstrapResult;
private final Option<HoodieWriteMetadata> fullBootstrapResult;
public HoodieBootstrapWriteMetadata(Option<HoodieWriteMetadata> metadataBootstrapResult,
Option<HoodieWriteMetadata> fullBootstrapResult) {
this.metadataBootstrapResult = metadataBootstrapResult;
this.fullBootstrapResult = fullBootstrapResult;
}
public Option<HoodieWriteMetadata> getMetadataBootstrapResult() {
return metadataBootstrapResult;
}
public Option<HoodieWriteMetadata> getFullBootstrapResult() {
return fullBootstrapResult;
}
}

View File

@@ -18,6 +18,13 @@
package org.apache.hudi.table.action.commit;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
@@ -38,43 +45,31 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import scala.Tuple2;
public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseActionExecutor<HoodieWriteMetadata> {
public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>, R>
extends BaseActionExecutor<R> {
private static final Logger LOG = LogManager.getLogger(BaseCommitActionExecutor.class);
protected final Option<Map<String, String>> extraMetadata;
private final WriteOperationType operationType;
protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
public BaseCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config,
HoodieTable table, String instantTime, WriteOperationType operationType) {
this(jsc, config, table, instantTime, operationType, null);
}
public BaseCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config,
HoodieTable table, String instantTime, WriteOperationType operationType,
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
Option<Map<String, String>> extraMetadata) {
super(jsc, config, table, instantTime);
this.operationType = operationType;
this.extraMetadata = extraMetadata;
}
public HoodieWriteMetadata execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
@@ -173,13 +168,17 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
protected void commitOnAutoCommit(HoodieWriteMetadata result) {
if (config.shouldAutoCommit()) {
LOG.info("Auto commit enabled: Committing " + instantTime);
commit(Option.empty(), result);
commit(extraMetadata, result);
} else {
LOG.info("Auto commit disabled for " + instantTime);
}
}
private void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) {
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) {
commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect());
}
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result, List<HoodieWriteStat> stats) {
String actionType = table.getMetaClient().getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType);
// Create a Hoodie table which encapsulated the commits and files visible
@@ -189,7 +188,6 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
result.setCommitted(true);
List<HoodieWriteStat> stats = result.getWriteStatuses().map(WriteStatus::getStat).collect();
stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat));
result.setWriteStats(stats);
@@ -200,7 +198,7 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(metadata::addMetadata);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, getSchemaToStoreInCommit());
metadata.setOperationType(operationType);
try {
@@ -229,6 +227,13 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
}
}
/**
* By default, return the writer schema in Write Config for storing in commit.
*/
protected String getSchemaToStoreInCommit() {
return config.getSchema();
}
protected boolean isWorkloadProfileNeeded() {
return true;
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table.action.commit;
import java.util.Map;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -31,30 +32,34 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class BulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends CommitActionExecutor<T> {
public class BulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends CommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner;
private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;
public BulkInsertCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
public BulkInsertCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner) {
this(jsc, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty());
}
public BulkInsertCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner,
Option<Map<String, String>> extraMetadata) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
this.inputRecordsRDD = inputRecordsRDD;
this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@Override
public HoodieWriteMetadata execute() {
try {
return BulkInsertHelper.bulkInsert(inputRecordsRDD, instantTime, (HoodieTable<T>) table, config,
this, true, userDefinedBulkInsertPartitioner);
this, true, bulkInsertPartitioner);
} catch (HoodieInsertException ie) {
throw ie;
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}

View File

@@ -39,11 +39,10 @@ import java.util.stream.IntStream;
public class BulkInsertHelper<T extends HoodieRecordPayload<T>> {
public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata bulkInsert(
JavaRDD<HoodieRecord<T>> inputRecords, String instantTime,
HoodieTable<T> table, HoodieWriteConfig config,
CommitActionExecutor<T> executor, boolean performDedupe,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata bulkInsert(JavaRDD<HoodieRecord<T>> inputRecords, String instantTime,
HoodieTable<T> table, HoodieWriteConfig config,
CommitActionExecutor<T> executor, boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
// De-dupe/merge if needed

View File

@@ -35,12 +35,12 @@ public class BulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload
extends CommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
private final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner;
private final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner;
public BulkInsertPreppedCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;

View File

@@ -23,21 +23,15 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.LazyInsertIterable;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -50,14 +44,19 @@ import java.util.List;
import java.util.Map;
public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseCommitActionExecutor<T> {
extends BaseCommitActionExecutor<T, HoodieWriteMetadata> {
private static final Logger LOG = LogManager.getLogger(CommitActionExecutor.class);
public CommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType) {
super(jsc, config, table, instantTime, operationType);
public CommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType) {
this(jsc, config, table, instantTime, operationType, Option.empty());
}
public CommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType,
Option<Map<String, String>> extraMetadata) {
super(jsc, config, table, instantTime, operationType, extraMetadata);
}
@Override
@@ -87,22 +86,7 @@ public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
try {
HoodieFileReader<IndexedRecord> storageReader =
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), upsertHandle.getOldFilePath());
wrapper =
new SparkBoundedInMemoryExecutor(config, storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
new UpdateHandler(upsertHandle), x -> x);
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
} finally {
upsertHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
}
}
MergeHelper.runMerge(table, upsertHandle);
}
// TODO(vc): This needs to be revisited
@@ -147,29 +131,4 @@ public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
public Partitioner getInsertPartitioner(WorkloadProfile profile) {
return getUpsertPartitioner(profile);
}
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
private static class UpdateHandler extends BoundedInMemoryQueueConsumer<GenericRecord, Void> {
private final HoodieMergeHandle upsertHandle;
private UpdateHandler(HoodieMergeHandle upsertHandle) {
this.upsertHandle = upsertHandle;
}
@Override
protected void consumeOneRecord(GenericRecord record) {
upsertHandle.write(record);
}
@Override
protected void finish() {}
@Override
protected Void getResult() {
return null;
}
}
}

View File

@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import java.io.ByteArrayOutputStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.Iterator;
/**
* Helper to read records from previous version of parquet and run Merge.
*/
public class MergeHelper {
/**
* Read records from previous version of base file and merge.
* @param table Hoodie Table
* @param upsertHandle Merge Handle
* @param <T>
* @throws IOException in case of error
*/
public static <T extends HoodieRecordPayload<T>> void runMerge(HoodieTable<T> table, HoodieMergeHandle<T> upsertHandle) throws IOException {
final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieBaseFile baseFile = upsertHandle.baseFileForMerge();
final GenericDatumWriter<GenericRecord> gWriter;
final GenericDatumReader<GenericRecord> gReader;
Schema readSchema;
if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), upsertHandle.getOldFilePath()).getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
gReader = new GenericDatumReader<>(readSchema, upsertHandle.getWriterSchemaWithMetafields());
} else {
gReader = null;
gWriter = null;
readSchema = upsertHandle.getWriterSchemaWithMetafields();
}
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<T, GenericRecord>getFileReader(cfgForHoodieFile, upsertHandle.getOldFilePath());
try {
final Iterator<GenericRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, upsertHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
readerIterator = reader.getRecordIterator(readSchema);
}
ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
wrapper = new SparkBoundedInMemoryExecutor(table.getConfig(), readerIterator,
new UpdateHandler(upsertHandle), record -> {
if (!externalSchemaTransformation) {
return record;
}
return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
});
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
} finally {
if (reader != null) {
reader.close();
}
upsertHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
}
}
}
private static GenericRecord transformRecordBasedOnNewSchema(GenericDatumReader<GenericRecord> gReader, GenericDatumWriter<GenericRecord> gWriter,
ThreadLocal<BinaryEncoder> encoderCache, ThreadLocal<BinaryDecoder> decoderCache,
GenericRecord gRec) {
ByteArrayOutputStream inStream = null;
try {
inStream = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(inStream, encoderCache.get());
encoderCache.set(encoder);
gWriter.write(gRec, encoder);
encoder.flush();
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inStream.toByteArray(), decoderCache.get());
decoderCache.set(decoder);
GenericRecord transformedRec = gReader.read(null, decoder);
return transformedRec;
} catch (IOException e) {
throw new HoodieException(e);
} finally {
try {
inStream.close();
} catch (IOException ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
}
}
}
/**
* Create Parquet record iterator that provides a stitched view of record read from skeleton and bootstrap file.
* Skeleton file is a representation of the bootstrap file inside the table, with just the bare bone fields needed
* for indexing, writing and other functionality.
*
*/
private static <T extends HoodieRecordPayload<T>> Iterator<GenericRecord> getMergingIterator(HoodieTable<T> table, HoodieMergeHandle<T> mergeHandle,
HoodieBaseFile baseFile, HoodieFileReader<GenericRecord> reader,
Schema readSchema, boolean externalSchemaTransformation) throws IOException {
Path externalFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> bootstrapReader = HoodieFileReaderFactory.<T, GenericRecord>getFileReader(bootstrapFileConfig, externalFilePath);
Schema bootstrapReadSchema;
if (externalSchemaTransformation) {
bootstrapReadSchema = bootstrapReader.getSchema();
} else {
bootstrapReadSchema = mergeHandle.getWriterSchema();
}
return new MergingIterator<>(reader.getRecordIterator(readSchema), bootstrapReader.getRecordIterator(bootstrapReadSchema),
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetafields()));
}
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
private static class UpdateHandler extends BoundedInMemoryQueueConsumer<GenericRecord, Void> {
private final HoodieMergeHandle upsertHandle;
private UpdateHandler(HoodieMergeHandle upsertHandle) {
this.upsertHandle = upsertHandle;
}
@Override
protected void consumeOneRecord(GenericRecord record) {
upsertHandle.write(record);
}
@Override
protected void finish() {}
@Override
protected Void getResult() {
return null;
}
}
}

View File

@@ -56,22 +56,23 @@ public class ScheduleCompactionActionExecutor extends BaseActionExecutor<Option<
private HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
String deltaCommitsSinceTs = "0";
Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
String lastCompactionTs = "0";
if (lastCompaction.isPresent()) {
deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
lastCompactionTs = lastCompaction.get().getTimestamp();
}
int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline()
.findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
.findInstantsAfter(lastCompactionTs, Integer.MAX_VALUE).countInstants();
if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
LOG.info("Not scheduling compaction as only " + deltaCommitsSinceLastCompaction
+ " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
+ " delta commits was found since last compaction " + lastCompactionTs + ". Waiting for "
+ config.getInlineCompactDeltaCommitMax());
return new HoodieCompactionPlan();
}
LOG.info("Compacting merge on read table " + config.getBasePath());
LOG.info("Generating compaction plan for merge on read table " + config.getBasePath());
HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
try {
return compactor.generateCompactionPlan(jsc, table, config, instantTime,

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table.action.deltacommit;
import java.util.Map;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -36,13 +37,19 @@ public class BulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T
extends DeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private final Option<BulkInsertPartitioner> bulkInsertPartitioner;
private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;
public BulkInsertDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<BulkInsertPartitioner> bulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
public BulkInsertDeltaCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner) {
this(jsc, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty());
}
public BulkInsertDeltaCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner,
Option<Map<String, String>> extraMetadata) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
this.inputRecordsRDD = inputRecordsRDD;
this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@@ -52,10 +59,9 @@ public class BulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T
try {
return BulkInsertHelper.bulkInsert(inputRecordsRDD, instantTime, (HoodieTable<T>) table, config,
this, true, bulkInsertPartitioner);
} catch (HoodieInsertException ie) {
throw ie;
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}

View File

@@ -36,12 +36,12 @@ public class BulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPa
extends DeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
private final Option<BulkInsertPartitioner> bulkInsertPartitioner;
private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;
public BulkInsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
Option<BulkInsertPartitioner> bulkInsertPartitioner) {
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.bulkInsertPartitioner = bulkInsertPartitioner;

View File

@@ -18,10 +18,12 @@
package org.apache.hudi.table.action.deltacommit;
import java.util.Map;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.LazyInsertIterable;
@@ -48,10 +50,15 @@ public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T>
// UpsertPartitioner for MergeOnRead table type
private UpsertDeltaCommitPartitioner mergeOnReadUpsertPartitioner;
public DeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType) {
super(jsc, config, table, instantTime, operationType);
public DeltaCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType) {
this(jsc, config, table, instantTime, operationType, Option.empty());
}
public DeltaCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType,
Option<Map<String, String>> extraMetadata) {
super(jsc, config, table, instantTime, operationType, extraMetadata);
}
@Override

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -224,4 +225,11 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor<Hood
LOG.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
}
}
protected void dropBootstrapIndexIfNeeded(HoodieInstant instantToRollback) {
if (HoodieTimeline.compareTimestamps(instantToRollback.getTimestamp(), HoodieTimeline.EQUALS, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)) {
LOG.info("Dropping bootstrap index as metadata bootstrap commit is getting rolled back !!");
BootstrapIndex.getBootstrapIndex(table.getMetaClient()).dropIndex();
}
}
}

View File

@@ -82,6 +82,9 @@ public class CopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecuto
LOG.info("Clean out all base files generated for commit: " + resolvedInstant);
stats = getRollbackStrategy().execute(resolvedInstant);
}
dropBootstrapIndexIfNeeded(instantToRollback);
// Delete Inflight instant if enabled
deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, resolvedInstant);
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());

View File

@@ -98,6 +98,8 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
allRollbackStats = getRollbackStrategy().execute(resolvedInstant);
}
dropBootstrapIndexIfNeeded(resolvedInstant);
// Delete Inflight instants if enabled
deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant);
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());

View File

@@ -477,8 +477,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withBloomIndexUpdatePartitionPath(true)
.withGlobalSimpleIndexUpdatePartitionPath(true)
.build()).withTimelineLayoutVersion(VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);

View File

@@ -122,7 +122,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2,
updateRecords.iterator(), record1.getPartitionPath(), fileId, supplier);
Configuration conf = new Configuration();
AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchema());
AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchemaWithMetafields());
List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
new Path(config2.getBasePath() + "/" + insertResult.getStat().getPath()));
for (GenericRecord rec : oldRecords) {

View File

@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestBootstrapRegexModeSelector {
private HoodieWriteConfig getConfig(String regex, BootstrapMode selectedMode) {
return HoodieWriteConfig.newBuilder().withPath("")
.withBootstrapConfig(HoodieBootstrapConfig.newBuilder()
.withBootstrapModeSelectorRegex(regex)
.withBootstrapModeForRegexMatch(selectedMode).build())
.forTable("test-trip-table").build();
}
@Test
public void testModeSelector() {
List<String> partitionPaths = Arrays.asList("2020/05/01", "2020/05/02", "2020/05/10", "2020/05/11");
List<Pair<String, List<HoodieFileStatus>>> input = partitionPaths.stream()
.map(p -> Pair.<String, List<HoodieFileStatus>>of(p, new ArrayList<>())).collect(Collectors.toList());
String regex = "2020/05/1[0-9]";
BootstrapRegexModeSelector regexModeSelector = new BootstrapRegexModeSelector(getConfig(regex,
BootstrapMode.FULL_RECORD));
Map<BootstrapMode, List<String>> result = regexModeSelector.select(input);
assertTrue(result.get(BootstrapMode.METADATA_ONLY).contains("2020/05/01"));
assertTrue(result.get(BootstrapMode.METADATA_ONLY).contains("2020/05/02"));
assertTrue(result.get(BootstrapMode.FULL_RECORD).contains("2020/05/10"));
assertTrue(result.get(BootstrapMode.FULL_RECORD).contains("2020/05/11"));
assertEquals(2, result.get(BootstrapMode.METADATA_ONLY).size());
assertEquals(2, result.get(BootstrapMode.FULL_RECORD).size());
regexModeSelector = new BootstrapRegexModeSelector(getConfig(regex,
BootstrapMode.METADATA_ONLY));
result = regexModeSelector.select(input);
assertTrue(result.get(BootstrapMode.FULL_RECORD).contains("2020/05/01"));
assertTrue(result.get(BootstrapMode.FULL_RECORD).contains("2020/05/02"));
assertTrue(result.get(BootstrapMode.METADATA_ONLY).contains("2020/05/10"));
assertTrue(result.get(BootstrapMode.METADATA_ONLY).contains("2020/05/11"));
assertEquals(2, result.get(BootstrapMode.METADATA_ONLY).size());
assertEquals(2, result.get(BootstrapMode.FULL_RECORD).size());
}
}

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.UniformBootstrapModeSelector;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestUniformBootstrapModeSelector {
@Test
public void testFullBootstrapModeSelector() {
FullRecordBootstrapModeSelector modeSelector = new FullRecordBootstrapModeSelector(
HoodieWriteConfig.newBuilder().withPath("").build());
testModeSelector(modeSelector, BootstrapMode.FULL_RECORD);
}
@Test
public void testMetadataOnlyBootstrapModeSelector() {
MetadataOnlyBootstrapModeSelector modeSelector = new MetadataOnlyBootstrapModeSelector(
HoodieWriteConfig.newBuilder().withPath("").build());
testModeSelector(modeSelector, BootstrapMode.METADATA_ONLY);
}
private void testModeSelector(UniformBootstrapModeSelector modeSelector, BootstrapMode mode) {
List<String> partitionPaths = Arrays.asList("2020/05/01", "2020/05/02", "2020/05/10", "2020/05/11");
List<Pair<String, List<HoodieFileStatus>>> input = partitionPaths.stream()
.map(p -> Pair.<String, List<HoodieFileStatus>>of(p, new ArrayList<>())).collect(Collectors.toList());
Map<BootstrapMode, List<String>> result = modeSelector.select(input);
assertTrue(result.get(mode).contains("2020/05/01"));
assertTrue(result.get(mode).contains("2020/05/02"));
assertTrue(result.get(mode).contains("2020/05/10"));
assertTrue(result.get(mode).contains("2020/05/11"));
assertEquals(4, result.get(mode).size());
}
}

View File

@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.bootstrap;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestBootstrapUtils extends HoodieClientTestBase {
@Test
public void testAllLeafFoldersWithFiles() throws IOException {
// All directories including marker dirs.
List<String> folders = Arrays.asList("2016/04/15", "2016/05/16", "2016/05/17");
folders.forEach(f -> {
try {
metaClient.getFs().mkdirs(new Path(new Path(basePath), f));
} catch (IOException e) {
throw new HoodieException(e);
}
});
// Files inside partitions and marker directories
List<String> files = Arrays.asList(
"2016/04/15/1_1-0-1_20190528120000.parquet",
"2016/04/15/2_1-0-1_20190528120000.parquet",
"2016/05/16/3_1-0-1_20190528120000.parquet",
"2016/05/16/4_1-0-1_20190528120000.parquet",
"2016/04/17/5_1-0-1_20190528120000.parquet",
"2016/04/17/6_1-0-1_20190528120000.parquet");
files.forEach(f -> {
try {
metaClient.getFs().create(new Path(new Path(basePath), f));
} catch (IOException e) {
throw new HoodieException(e);
}
});
List<Pair<String, List<HoodieFileStatus>>> collected =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath, (status) -> {
return true;
});
assertEquals(3, collected.size());
collected.stream().forEach(k -> {
assertEquals(2, k.getRight().size());
});
// Simulate reading from un-partitioned dataset
collected =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath + "/" + folders.get(0), (status) -> {
return true;
});
assertEquals(1, collected.size());
collected.stream().forEach(k -> {
assertEquals(2, k.getRight().size());
});
}
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
@@ -259,7 +260,9 @@ public class TestHoodieCompactionStrategy {
operations.add(new HoodieCompactionOperation(df.getCommitTime(),
logFiles.stream().map(s -> s.getPath().toString()).collect(Collectors.toList()), df.getPath(), df.getFileId(),
partitionPath,
config.getCompactionStrategy().captureMetrics(config, Option.of(df), partitionPath, logFiles)));
config.getCompactionStrategy().captureMetrics(config, Option.of(df), partitionPath, logFiles),
df.getBootstrapBaseFile().map(BaseFile::getPath).orElse(null))
);
});
return operations;
}

View File

@@ -22,14 +22,17 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
@@ -48,22 +51,39 @@ import java.util.stream.Collectors;
* Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR).
*/
public class HoodieMergeOnReadTestUtils {
public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf, List<String> inputPaths,
String basePath) {
return getRecordsUsingInputFormat(conf, inputPaths, basePath, new JobConf(conf), true);
}
public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf, List<String> inputPaths,
String basePath,
JobConf jobConf,
boolean realtime) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf, basePath);
FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(),
realtime, jobConf);
String basePath, JobConf jobConf, boolean realtime) {
Schema schema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, realtime, schema,
HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
}
public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf, List<String> inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema,
String rawHiveColumnTypes, boolean projectCols, List<String> projectedColumns) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf, basePath);
FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(), realtime, jobConf);
Schema schema = HoodieAvroUtils.addMetadataFields(rawSchema);
String hiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(rawHiveColumnTypes);
setPropsForInputFormat(inputFormat, jobConf, schema, hiveColumnTypes, projectCols, projectedColumns);
final List<Field> fields;
if (projectCols) {
fields = schema.getFields().stream().filter(f -> projectedColumns.contains(f.name()))
.collect(Collectors.toList());
} else {
fields = schema.getFields();
}
final Schema projectedSchema = Schema.createRecord(fields.stream()
.map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal()))
.collect(Collectors.toList()));
Schema schema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
setPropsForInputFormat(inputFormat, jobConf, schema, basePath);
return inputPaths.stream().map(path -> {
setInputPath(jobConf, path);
List<GenericRecord> records = new ArrayList<>();
@@ -71,17 +91,18 @@ public class HoodieMergeOnReadTestUtils {
List<InputSplit> splits = Arrays.asList(inputFormat.getSplits(jobConf, 1));
for (InputSplit split : splits) {
RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null);
Void key = (Void) recordReader.createKey();
Object key = recordReader.createKey();
ArrayWritable writable = (ArrayWritable) recordReader.createValue();
while (recordReader.next(key, writable)) {
GenericRecordBuilder newRecord = new GenericRecordBuilder(schema);
GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema);
// writable returns an array with [field1, field2, _hoodie_commit_time,
// _hoodie_commit_seqno]
Writable[] values = writable.get();
assert schema.getFields().size() <= values.length;
schema.getFields().forEach(field -> {
newRecord.set(field, values[field.pos()]);
});
schema.getFields().stream()
.filter(f -> !projectCols || projectedColumns.contains(f.name()))
.map(f -> Pair.of(projectedSchema.getFields().stream()
.filter(p -> f.name().equals(p.name())).findFirst().get(), f))
.forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), values[fieldsPair.getValue().pos()]));
records.add(newRecord.build());
}
}
@@ -95,29 +116,40 @@ public class HoodieMergeOnReadTestUtils {
}).orElse(new ArrayList<>());
}
private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema,
String basePath) {
private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, List<String> projectedCols) {
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
Configuration conf = HoodieTestUtils.getDefaultHadoopConf();
final List<String> projectedColNames;
if (!projectCols) {
projectedColNames = fields.stream().map(Field::name).collect(Collectors.toList());
} else {
projectedColNames = projectedCols;
}
String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr"))
String names = fields.stream()
.filter(f -> projectedColNames.contains(f.name()))
.map(f -> f.name()).collect(Collectors.joining(","));
String positions = fields.stream()
.filter(f -> projectedColNames.contains(f.name()))
.map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
String hiveColumnNames = fields.stream()
.filter(field -> !field.name().equalsIgnoreCase("datestr"))
.map(Schema.Field::name).collect(Collectors.joining(","));
hiveColumnNames = hiveColumnNames + ",datestr";
String hiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES);
hiveColumnTypes = hiveColumnTypes + ",string";
Configuration conf = HoodieTestUtils.getDefaultHadoopConf();
String hiveColumnTypesWithDatestr = hiveColumnTypes + ",string";
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes);
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypesWithDatestr);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes);
conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypesWithDatestr);
conf.set(IOConstants.COLUMNS, hiveColumnNames);
conf.get(IOConstants.COLUMNS_TYPES, hiveColumnTypesWithDatestr);
// Hoodie Input formats are also configurable
Configurable configurable = (Configurable)inputFormat;

View File

@@ -17,6 +17,7 @@
###
log4j.rootLogger=WARN, CONSOLE
log4j.logger.org.apache.hudi=DEBUG
log4j.logger.org.apache.hadoop.hbase=ERROR
# CONSOLE is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender

View File

@@ -18,6 +18,7 @@
log4j.rootLogger=WARN, CONSOLE
log4j.logger.org.apache=INFO
log4j.logger.org.apache.hudi=DEBUG
log4j.logger.org.apache.hadoop.hbase=ERROR
# A1 is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender