1
0

[HUDI-2268] Add upgrade and downgrade to and from 0.9.0 (#3470)

- Added upgrade and downgrade step to and from 0.9.0. Upgrade adds few table properties. Downgrade recreates timeline server based marker files if any.
This commit is contained in:
Y Ethan Guo
2021-08-14 17:20:23 -07:00
committed by GitHub
parent 18e6b79947
commit 23dca6c237
27 changed files with 851 additions and 326 deletions

View File

@@ -28,15 +28,17 @@ import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption; import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
/** /**
* CLI command to assist in upgrading/downgrading Hoodie dataset to a different version. * CLI command to assist in upgrading/downgrading Hoodie table to a different version.
*/ */
@Component
public class UpgradeOrDowngradeCommand implements CommandMarker { public class UpgradeOrDowngradeCommand implements CommandMarker {
@CliCommand(value = "upgrade hoodie dataset ", help = "Upgrades hoodie dataset") @CliCommand(value = "upgrade table", help = "Upgrades a table")
public String upgradeHoodieDataset( public String upgradeHoodieTable(
@CliOption(key = {"toVersion"}, help = "To version of Hoodie dataset to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion, @CliOption(key = {"toVersion"}, help = "To version of Hoodie table to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion,
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
@@ -52,14 +54,14 @@ public class UpgradeOrDowngradeCommand implements CommandMarker {
int exitCode = process.waitFor(); int exitCode = process.waitFor();
HoodieCLI.refreshTableMetadata(); HoodieCLI.refreshTableMetadata();
if (exitCode != 0) { if (exitCode != 0) {
return String.format("Failed: Could not Upgrade/Downgrade Hoodie dataset to \"%s\".", toVersion); return String.format("Failed: Could not Upgrade/Downgrade Hoodie table to \"%s\".", toVersion);
} }
return String.format("Hoodie dataset upgraded/downgraded to ", toVersion); return String.format("Hoodie table upgraded/downgraded to ", toVersion);
} }
@CliCommand(value = "downgrade hoodie dataset ", help = "Upgrades hoodie dataset") @CliCommand(value = "downgrade table", help = "Downgrades a table")
public String downgradeHoodieDataset( public String downgradeHoodieTable(
@CliOption(key = {"toVersion"}, help = "To version of Hoodie dataset to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion, @CliOption(key = {"toVersion"}, help = "To version of Hoodie table to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion,
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
@@ -74,8 +76,8 @@ public class UpgradeOrDowngradeCommand implements CommandMarker {
int exitCode = process.waitFor(); int exitCode = process.waitFor();
HoodieCLI.refreshTableMetadata(); HoodieCLI.refreshTableMetadata();
if (exitCode != 0) { if (exitCode != 0) {
return String.format("Failed: Could not Upgrade/Downgrade Hoodie dataset to \"%s\".", toVersion); return String.format("Failed: Could not Upgrade/Downgrade Hoodie table to \"%s\".", toVersion);
} }
return String.format("Hoodie dataset upgraded/downgraded to ", toVersion); return String.format("Hoodie table upgraded/downgraded to ", toVersion);
} }
} }

View File

@@ -166,10 +166,23 @@ public class DirectWriteMarkers extends WriteMarkers {
return markerFiles; return markerFiles;
} }
/**
* Creates a marker file based on the full marker name excluding the base path and instant.
*
* @param markerName the full marker name, e.g., "2021/08/13/file1.marker.CREATE"
* @return path of the marker file
*/
public Option<Path> create(String markerName) {
return create(new Path(markerDirPath, markerName), true);
}
@Override @Override
protected Option<Path> create(String partitionPath, String dataFileName, IOType type, boolean checkIfExists) { protected Option<Path> create(String partitionPath, String dataFileName, IOType type, boolean checkIfExists) {
return create(getMarkerPath(partitionPath, dataFileName, type), checkIfExists);
}
private Option<Path> create(Path markerPath, boolean checkIfExists) {
HoodieTimer timer = new HoodieTimer().startTimer(); HoodieTimer timer = new HoodieTimer().startTimer();
Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
Path dirPath = markerPath.getParent(); Path dirPath = markerPath.getParent();
try { try {
if (!fs.exists(dirPath)) { if (!fs.exists(dirPath)) {

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table.upgrade; package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -33,6 +34,8 @@ import org.apache.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
/** /**
@@ -112,15 +115,27 @@ public abstract class AbstractUpgradeDowngrade {
// Perform the actual upgrade/downgrade; this has to be idempotent, for now. // Perform the actual upgrade/downgrade; this has to be idempotent, for now.
LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion); LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion);
Map<ConfigProperty, String> tableProps = new HashMap<>();
if (fromVersion.versionCode() < toVersion.versionCode()) { if (fromVersion.versionCode() < toVersion.versionCode()) {
// upgrade // upgrade
upgrade(fromVersion, toVersion, instantTime); while (fromVersion.versionCode() < toVersion.versionCode()) {
HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1);
tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
fromVersion = nextVersion;
}
} else { } else {
// downgrade // downgrade
downgrade(fromVersion, toVersion, instantTime); while (fromVersion.versionCode() > toVersion.versionCode()) {
HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1);
tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
fromVersion = prevVersion;
}
} }
// Write out the current version in hoodie.properties.updated file // Write out the current version in hoodie.properties.updated file
for (Map.Entry<ConfigProperty, String> entry: tableProps.entrySet()) {
metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
}
metaClient.getTableConfig().setTableVersion(toVersion); metaClient.getTableConfig().setTableVersion(toVersion);
createUpdatedFile(metaClient.getTableConfig().getProps()); createUpdatedFile(metaClient.getTableConfig().getProps());
@@ -143,7 +158,7 @@ public abstract class AbstractUpgradeDowngrade {
} }
} }
protected abstract void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime); protected abstract Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime);
protected abstract void downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime); protected abstract Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime);
} }

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.util.HashMap;
import java.util.Map;
public abstract class BaseOneToTwoUpgradeHandler implements UpgradeHandler {
@Override
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
tablePropsToAdd.put(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP, getPartitionColumns(config));
tablePropsToAdd.put(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD.key()));
tablePropsToAdd.put(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP, config.getString(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP));
return tablePropsToAdd;
}
abstract String getPartitionColumns(HoodieWriteConfig config);
}

View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class BaseOneToZeroDowngradeHandler implements DowngradeHandler {
@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
// fetch pending commit info
HoodieTable table = getTable(config, context);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
List<HoodieInstant> commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList());
for (HoodieInstant inflightInstant : commits) {
// delete existing markers
WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, inflightInstant.getTimestamp());
writeMarkers.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
}
return Collections.EMPTY_MAP;
}
abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
}

View File

@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.DirectWriteMarkers;
import com.esotericsoftware.minlog.Log;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX;
public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
HoodieTable table = getTable(config, context);
HoodieTableMetaClient metaClient = table.getMetaClient();
// re-create marker files if any partial timeline server based markers are found
HoodieTimeline inflightTimeline = metaClient.getCommitsTimeline().filterPendingExcludingCompaction();
List<HoodieInstant> commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList());
for (HoodieInstant inflightInstant : commits) {
// Converts the markers in new format to old format of direct markers
try {
convertToDirectMarkers(
inflightInstant.getTimestamp(), table, context, config.getMarkersDeleteParallelism());
} catch (IOException e) {
throw new HoodieException("Converting marker files to DIRECT style failed during downgrade", e);
}
}
return Collections.EMPTY_MAP;
}
abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
/**
* Converts the markers in new format(timeline server based) to old format of direct markers,
* i.e., one marker file per data file, without MARKERS.type file.
* This needs to be idempotent.
* 1. read all markers from timeline server based marker files
* 2. create direct style markers
* 3. delete marker type file
* 4. delete timeline server based marker files
*
* @param commitInstantTime instant of interest for marker conversion.
* @param table instance of {@link HoodieTable} to use
* @param context instance of {@link HoodieEngineContext} to use
* @param parallelism parallelism to use
*/
private void convertToDirectMarkers(final String commitInstantTime,
HoodieTable table,
HoodieEngineContext context,
int parallelism) throws IOException {
String markerDir = table.getMetaClient().getMarkerFolderPath(commitInstantTime);
FileSystem fileSystem = FSUtils.getFs(markerDir, context.getHadoopConf().newCopy());
Option<MarkerType> markerTypeOption = MarkerUtils.readMarkerType(fileSystem, markerDir);
if (markerTypeOption.isPresent()) {
switch (markerTypeOption.get()) {
case TIMELINE_SERVER_BASED:
// Reads all markers written by the timeline server
Map<String, Set<String>> markersMap =
MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
markerDir, fileSystem, context, parallelism);
DirectWriteMarkers directWriteMarkers = new DirectWriteMarkers(table, commitInstantTime);
// Recreates the markers in the direct format
markersMap.values().stream().flatMap(Collection::stream)
.forEach(directWriteMarkers::create);
// Deletes marker type file
MarkerUtils.deleteMarkerTypeFile(fileSystem, markerDir);
// Deletes timeline server based markers
deleteTimelineBasedMarkerFiles(markerDir, fileSystem);
break;
default:
throw new HoodieException("The marker type \"" + markerTypeOption.get().name()
+ "\" is not supported for rollback.");
}
} else {
// In case of partial failures during downgrade, there is a chance that marker type file was deleted,
// but timeline server based marker files are left. So deletes them if any
deleteTimelineBasedMarkerFiles(markerDir, fileSystem);
}
}
private void deleteTimelineBasedMarkerFiles(String markerDir, FileSystem fileSystem) throws IOException {
// Deletes timeline based marker files if any.
Path dirPath = new Path(markerDir);
FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
Predicate<FileStatus> prefixFilter = fileStatus ->
fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX);
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
.filter(prefixFilter)
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList());
markerDirSubPaths.forEach(fileToDelete -> {
try {
fileSystem.delete(new Path(fileToDelete), false);
} catch (IOException e) {
Log.warn("Deleting Timeline based marker files failed ", e);
}
});
}
}

View File

@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
@Override
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
// fetch pending commit info
//HoodieSparkTable table = HoodieSparkTable.create(config, context);
HoodieTable table = getTable(config, context);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if (commits.size() > 0 && instantTime != null) {
// ignore the latest inflight commit since a new commit would have been started and we need to fix any pending commits from previous launch
commits.remove(instantTime);
}
for (String commit : commits) {
// for every pending commit, delete old markers and re-create markers in new format
recreateMarkers(commit, table, context, config.getMarkersDeleteParallelism());
}
return Collections.EMPTY_MAP;
}
abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
/**
* Recreate markers in new format.
* Step1: Delete existing markers
* Step2: Collect all rollback file info.
* Step3: recreate markers for all interested files.
*
* @param commitInstantTime instant of interest for which markers need to be recreated.
* @param table instance of {@link HoodieTable} to use
* @param context instance of {@link HoodieEngineContext} to use
* @throws HoodieRollbackException on any exception during upgrade.
*/
protected void recreateMarkers(final String commitInstantTime,
HoodieTable table,
HoodieEngineContext context,
int parallelism) throws HoodieRollbackException {
try {
// fetch hoodie instant
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent()) {
// delete existing markers
WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT, table, commitInstantTime);
writeMarkers.quietDeleteMarkerDir(context, parallelism);
// generate rollback stats
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
}
List<HoodieRollbackStat> rollbackStats = getListBasedRollBackStats(table.getMetaClient(), table.getConfig(),
context, commitInstantOpt, rollbackRequests);
// recreate markers adhering to marker based rollback
for (HoodieRollbackStat rollbackStat : rollbackStats) {
for (String path : rollbackStat.getSuccessDeleteFiles()) {
String dataFileName = path.substring(path.lastIndexOf("/") + 1);
// not feasible to differentiate MERGE from CREATE. hence creating with MERGE IOType for all base files.
writeMarkers.create(rollbackStat.getPartitionPath(), dataFileName, IOType.MERGE);
}
for (FileStatus fileStatus : rollbackStat.getCommandBlocksCount().keySet()) {
writeMarkers.create(rollbackStat.getPartitionPath(), getFileNameForMarkerFromLogFile(fileStatus.getPath().toString(), table), IOType.APPEND);
}
}
}
} catch (Exception e) {
throw new HoodieRollbackException("Exception thrown while upgrading Hoodie Table from version 0 to 1", e);
}
}
abstract List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt,
List<ListingBasedRollbackRequest> rollbackRequests);
/**
* Curates file name for marker from existing log file path.
* log file format : partitionpath/.fileid_baseInstant.log.writetoken
* marker file format : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND
*
* @param logFilePath log file path for which marker file name needs to be generated.
* @return the marker file name thus curated.
*/
private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) {
Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath);
String fileId = FSUtils.getFileIdFromLogPath(logPath);
String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath);
String writeToken = FSUtils.getWriteTokenFromLogPath(logPath);
return FSUtils.makeDataFileName(baseInstant, writeToken, fileId, table.getBaseFileFormat().getFileExtension());
}
}

View File

@@ -18,9 +18,12 @@
package org.apache.hudi.table.upgrade; package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import java.util.Map;
/** /**
* Interface to assist in downgrading Hoodie table. * Interface to assist in downgrading Hoodie table.
*/ */
@@ -32,6 +35,7 @@ public interface DowngradeHandler {
* @param config instance of {@link HoodieWriteConfig} to be used. * @param config instance of {@link HoodieWriteConfig} to be used.
* @param context instance of {@link HoodieEngineContext} to be used. * @param context instance of {@link HoodieEngineContext} to be used.
* @param instantTime current instant time that should not touched. * @param instantTime current instant time that should not touched.
* @return Map of config properties and its values to be added to table properties.
*/ */
void downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime); Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime);
} }

View File

@@ -18,9 +18,12 @@
package org.apache.hudi.table.upgrade; package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import java.util.Map;
/** /**
* Interface to assist in upgrading Hoodie table. * Interface to assist in upgrading Hoodie table.
*/ */
@@ -32,6 +35,7 @@ public interface UpgradeHandler {
* @param config instance of {@link HoodieWriteConfig} to be used. * @param config instance of {@link HoodieWriteConfig} to be used.
* @param context instance of {@link HoodieEngineContext} to be used. * @param context instance of {@link HoodieEngineContext} to be used.
* @param instantTime current instant time that should not be touched. * @param instantTime current instant time that should not be touched.
* @return Map of config properties and its values to be added to table properties.
*/ */
void upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime); Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime);
} }

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table.upgrade; package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.HoodieTableVersion;
@@ -25,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpgradeDowngradeException; import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
public class FlinkUpgradeDowngrade extends AbstractUpgradeDowngrade { public class FlinkUpgradeDowngrade extends AbstractUpgradeDowngrade {
public FlinkUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) { public FlinkUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) {
@@ -42,18 +44,22 @@ public class FlinkUpgradeDowngrade extends AbstractUpgradeDowngrade {
} }
@Override @Override
protected void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) { if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) {
new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime); return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime);
} else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) {
return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime);
} else { } else {
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
} }
} }
@Override @Override
protected void downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) { if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) {
new OneToZeroDowngradeHandler().downgrade(config, context, instantTime); return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime);
} else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) {
return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime);
} else { } else {
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
} }

View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
public class OneToTwoUpgradeHandler extends BaseOneToTwoUpgradeHandler {
@Override
String getPartitionColumns(HoodieWriteConfig config) {
return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD.key());
}
}

View File

@@ -20,31 +20,17 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import java.util.List;
import java.util.stream.Collectors;
/** /**
* Downgrade handle to assist in downgrading hoodie table from version 1 to 0. * Downgrade handle to assist in downgrading hoodie table from version 1 to 0.
*/ */
public class OneToZeroDowngradeHandler implements DowngradeHandler { public class OneToZeroDowngradeHandler extends BaseOneToZeroDowngradeHandler {
@Override @Override
public void downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
// fetch pending commit info return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
List<HoodieInstant> commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList());
for (HoodieInstant commitInstant : commits) {
// delete existing markers
WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, commitInstant.getTimestamp());
writeMarkers.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
}
} }
} }

View File

@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
public class TwoToOneDowngradeHandler extends BaseTwoToOneDowngradeHandler {
@Override
HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}
}

View File

@@ -21,120 +21,31 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* Upgrade handle to assist in upgrading hoodie table from version 0 to 1. * Upgrade handle to assist in upgrading hoodie table from version 0 to 1.
*/ */
public class ZeroToOneUpgradeHandler implements UpgradeHandler { public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler {
@Override @Override
public void upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
// fetch pending commit info return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if (commits.size() > 0 && instantTime != null) {
// ignore the latest inflight commit since a new commit would have been started and we need to fix any pending commits from previous launch
commits.remove(instantTime);
}
for (String commit : commits) {
// for every pending commit, delete old markers and re-create markers in new format
recreateMarkers(commit, table, context, config.getMarkersType(), config.getMarkersDeleteParallelism());
}
} }
/** @Override
* Recreate markers in new format. List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt,
* Step1: Delete existing markers List<ListingBasedRollbackRequest> rollbackRequests) {
* Step2: Collect all rollback file info. return new ListingBasedRollbackHelper(metaClient, config)
* Step3: recreate markers for all interested files. .collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests);
*
* @param commitInstantTime instant of interest for which markers need to be recreated.
* @param table instance of {@link HoodieFlinkTable} to use
* @param context instance of {@link HoodieEngineContext} to use
* @param markerType marker type to use
* @throws HoodieRollbackException on any exception during upgrade.
*/
private static void recreateMarkers(final String commitInstantTime,
HoodieFlinkTable table,
HoodieEngineContext context,
MarkerType markerType,
int parallelism) throws HoodieRollbackException {
try {
// fetch hoodie instant
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent()) {
// delete existing markers
WriteMarkers writeMarkers = WriteMarkersFactory.get(markerType, table, commitInstantTime);
writeMarkers.quietDeleteMarkerDir(context, parallelism);
// generate rollback stats
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
}
List<HoodieRollbackStat> rollbackStats = new ListingBasedRollbackHelper(table.getMetaClient(), table.getConfig())
.collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests);
// recreate markers adhering to marker based rollback
for (HoodieRollbackStat rollbackStat : rollbackStats) {
for (String path : rollbackStat.getSuccessDeleteFiles()) {
String dataFileName = path.substring(path.lastIndexOf("/") + 1);
// not feasible to differentiate MERGE from CREATE. hence creating with MERGE IOType for all base files.
writeMarkers.create(rollbackStat.getPartitionPath(), dataFileName, IOType.MERGE);
}
for (FileStatus fileStatus : rollbackStat.getCommandBlocksCount().keySet()) {
writeMarkers.create(rollbackStat.getPartitionPath(), getFileNameForMarkerFromLogFile(fileStatus.getPath().toString(), table), IOType.APPEND);
}
}
}
} catch (Exception e) {
throw new HoodieRollbackException("Exception thrown while upgrading Hoodie Table from version 0 to 1", e);
}
}
/**
* Curates file name for marker from existing log file path.
* log file format : partitionpath/.fileid_baseInstant.log.writetoken
* marker file format : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND
*
* @param logFilePath log file path for which marker file name needs to be generated.
* @return the marker file name thus curated.
*/
private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) {
Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath);
String fileId = FSUtils.getFileIdFromLogPath(logPath);
String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath);
String writeToken = FSUtils.getWriteTokenFromLogPath(logPath);
return FSUtils.makeDataFileName(baseInstant, writeToken, fileId, table.getBaseFileFormat().getFileExtension());
} }
} }

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.config.HoodieWriteConfig;
/**
* Upgrade handle to assist in upgrading hoodie table from version 1 to 2.
*/
public class OneToTwoUpgradeHandler extends BaseOneToTwoUpgradeHandler {
@Override
String getPartitionColumns(HoodieWriteConfig config) {
return HoodieSparkUtils.getPartitionColumns(config.getProps());
}
}

View File

@@ -19,31 +19,17 @@
package org.apache.hudi.table.upgrade; package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import java.util.List;
import java.util.stream.Collectors;
/** /**
* Downgrade handle to assist in downgrading hoodie table from version 1 to 0. * Downgrade handle to assist in downgrading hoodie table from version 1 to 0.
*/ */
public class OneToZeroDowngradeHandler implements DowngradeHandler { public class OneToZeroDowngradeHandler extends BaseOneToZeroDowngradeHandler {
@Override @Override
public void downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
// fetch pending commit info return HoodieSparkTable.create(config, context);
HoodieSparkTable table = HoodieSparkTable.create(config, context);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
List<HoodieInstant> commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList());
for (HoodieInstant commitInstant : commits) {
// delete existing markers
WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, commitInstant.getTimestamp());
writeMarkers.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
}
} }
} }

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table.upgrade; package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.HoodieTableVersion;
@@ -25,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpgradeDowngradeException; import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
public class SparkUpgradeDowngrade extends AbstractUpgradeDowngrade { public class SparkUpgradeDowngrade extends AbstractUpgradeDowngrade {
@@ -43,22 +45,25 @@ public class SparkUpgradeDowngrade extends AbstractUpgradeDowngrade {
} catch (IOException e) { } catch (IOException e) {
throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e); throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e);
} }
} }
@Override @Override
protected void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) { if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) {
new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime); return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime);
} else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) {
return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime);
} else { } else {
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
} }
} }
@Override @Override
protected void downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) { if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) {
new OneToZeroDowngradeHandler().downgrade(config, context, instantTime); return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime);
} else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) {
return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime);
} else { } else {
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
} }

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
/**
* Downgrade handle to assist in downgrading hoodie table from version 2 to 1.
*/
public class TwoToOneDowngradeHandler extends BaseTwoToOneDowngradeHandler {
@Override
HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
return HoodieSparkTable.create(config, context);
}
}

View File

@@ -20,120 +20,31 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* Upgrade handle to assist in upgrading hoodie table from version 0 to 1. * Upgrade handle to assist in upgrading hoodie table from version 0 to 1.
*/ */
public class ZeroToOneUpgradeHandler implements UpgradeHandler { public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler {
@Override @Override
public void upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
// fetch pending commit info return HoodieSparkTable.create(config, context);
HoodieSparkTable table = HoodieSparkTable.create(config, context);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if (commits.size() > 0 && instantTime != null) {
// ignore the latest inflight commit since a new commit would have been started and we need to fix any pending commits from previous launch
commits.remove(instantTime);
}
for (String commit : commits) {
// for every pending commit, delete old markers and re-create markers in new format
recreateMarkers(commit, table, context, config.getMarkersType(), config.getMarkersDeleteParallelism());
}
} }
/** @Override
* Recreate markers in new format. List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt,
* Step1: Delete existing markers List<ListingBasedRollbackRequest> rollbackRequests) {
* Step2: Collect all rollback file info. return new ListingBasedRollbackHelper(metaClient, config)
* Step3: recreate markers for all interested files. .collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests);
*
* @param commitInstantTime instant of interest for which markers need to be recreated.
* @param table instance of {@link HoodieSparkTable} to use
* @param context instance of {@link HoodieEngineContext} to use
* @param markerType marker type to use
* @throws HoodieRollbackException on any exception during upgrade.
*/
private static void recreateMarkers(final String commitInstantTime,
HoodieSparkTable table,
HoodieEngineContext context,
MarkerType markerType,
int parallelism) throws HoodieRollbackException {
try {
// fetch hoodie instant
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent()) {
// delete existing markers
WriteMarkers writeMarkers = WriteMarkersFactory.get(markerType, table, commitInstantTime);
writeMarkers.quietDeleteMarkerDir(context, parallelism);
// generate rollback stats
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
}
List<HoodieRollbackStat> rollbackStats = new ListingBasedRollbackHelper(table.getMetaClient(), table.getConfig())
.collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests);
// recreate markers adhering to marker based rollback
for (HoodieRollbackStat rollbackStat : rollbackStats) {
for (String path : rollbackStat.getSuccessDeleteFiles()) {
String dataFileName = path.substring(path.lastIndexOf("/") + 1);
// not feasible to differentiate MERGE from CREATE. hence creating with MERGE IOType for all base files.
writeMarkers.create(rollbackStat.getPartitionPath(), dataFileName, IOType.MERGE);
}
for (FileStatus fileStatus : rollbackStat.getCommandBlocksCount().keySet()) {
writeMarkers.create(rollbackStat.getPartitionPath(), getFileNameForMarkerFromLogFile(fileStatus.getPath().toString(), table), IOType.APPEND);
}
}
}
} catch (Exception e) {
throw new HoodieRollbackException("Exception thrown while upgrading Hoodie Table from version 0 to 1", e);
}
}
/**
* Curates file name for marker from existing log file path.
* log file format : partitionpath/.fileid_baseInstant.log.writetoken
* marker file format : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND
*
* @param logFilePath log file path for which marker file name needs to be generated.
* @return the marker file name thus curated.
*/
private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) {
Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath);
String fileId = FSUtils.getFileIdFromLogPath(logPath);
String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath);
String writeToken = FSUtils.getWriteTokenFromLogPath(logPath);
return FSUtils.makeDataFileName(baseInstant, writeToken, fileId, table.getBaseFileFormat().getFileExtension());
} }
} }

View File

@@ -18,11 +18,17 @@
package org.apache.hudi package org.apache.hudi
import java.util.Properties
import org.apache.avro.Schema import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
import org.apache.spark.SPARK_VERSION import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.avro.SchemaConverters
@@ -221,11 +227,41 @@ object HoodieSparkUtils extends SparkAdapterSupport {
val leftExp = toAttribute(attribute, tableSchema) val leftExp = toAttribute(attribute, tableSchema)
val rightExp = Literal.create(s"%$value%") val rightExp = Literal.create(s"%$value%")
sparkAdapter.createLike(leftExp, rightExp) sparkAdapter.createLike(leftExp, rightExp)
case _=> null case _ => null
} }
) )
} }
/**
* @param properties config properties
* @return partition columns
*/
def getPartitionColumns(properties: Properties): String = {
val props = new TypedProperties(properties)
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
getPartitionColumns(keyGenerator, props)
}
/**
* @param keyGen key generator
* @return partition columns
*/
def getPartitionColumns(keyGen: KeyGenerator, typedProperties: TypedProperties): String = {
keyGen match {
// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format
// is: "field_name: field_type", we extract the field_name from the partition path field.
case c: BaseKeyGenerator
if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] =>
c.getPartitionPathFields.asScala.map(pathField =>
pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
.mkString(",")
case b: BaseKeyGenerator => b.getPartitionPathFields.asScala.mkString(",")
case _ => typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD.key())
}
}
private def toAttribute(columnName: String, tableSchema: StructType): AttributeReference = { private def toAttribute(columnName: String, tableSchema: StructType): AttributeReference = {
val field = tableSchema.find(p => p.name == columnName) val field = tableSchema.find(p => p.name == columnName)
assert(field.isDefined, s"Cannot find column: $columnName, Table Columns are: " + assert(field.isDefined, s"Cannot find column: $columnName, Table Columns are: " +

View File

@@ -18,10 +18,6 @@
package org.apache.hudi.table.upgrade; package org.apache.hudi.table.upgrade;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieConfig;
@@ -31,19 +27,31 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.Assertions; import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
@@ -52,20 +60,26 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_TYPE_PROP; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_TYPE_PROP;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -76,11 +90,29 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestUpgradeDowngrade extends HoodieClientTestBase { public class TestUpgradeDowngrade extends HoodieClientTestBase {
private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with deletePartialMarkerFiles={0} and TableType = {1}"; private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with deletePartialMarkerFiles={0} and TableType = {1}";
private static final String TEST_NAME_WITH_DOWNGRADE_PARAMS = "[{index}] Test with deletePartialMarkerFiles={0} and TableType = {1} and "
+ "From version = {2}";
public static Stream<Arguments> configParams() { public static Stream<Arguments> configParams() {
Object[][] data = new Object[][] { Object[][] data = new Object[][] {
{true, HoodieTableType.COPY_ON_WRITE}, {false, HoodieTableType.COPY_ON_WRITE}, {true, HoodieTableType.COPY_ON_WRITE},
{true, HoodieTableType.MERGE_ON_READ}, {false, HoodieTableType.MERGE_ON_READ} {false, HoodieTableType.COPY_ON_WRITE},
{true, HoodieTableType.MERGE_ON_READ},
{false, HoodieTableType.MERGE_ON_READ}
};
return Stream.of(data).map(Arguments::of);
}
public static Stream<Arguments> downGradeConfigParams() {
Object[][] data = new Object[][] {
{true, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.TWO},
{false, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.TWO},
{true, HoodieTableType.MERGE_ON_READ, HoodieTableVersion.TWO},
{false, HoodieTableType.MERGE_ON_READ, HoodieTableVersion.TWO},
{true, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.ONE},
{false, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.ONE},
{true, HoodieTableType.MERGE_ON_READ, HoodieTableVersion.ONE},
{false, HoodieTableType.MERGE_ON_READ, HoodieTableVersion.ONE}
}; };
return Stream.of(data).map(Arguments::of); return Stream.of(data).map(Arguments::of);
} }
@@ -88,9 +120,9 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
@BeforeEach @BeforeEach
public void setUp() throws Exception { public void setUp() throws Exception {
initSparkContexts(); initSparkContexts();
initDFS(); initPath();
initTestDataGenerator(); initTestDataGenerator();
initDFSMetaClient(); initMetaClient();
} }
@AfterEach @AfterEach
@@ -100,21 +132,21 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
@Test @Test
public void testLeftOverUpdatedPropFileCleanup() throws IOException { public void testLeftOverUpdatedPropFileCleanup() throws IOException {
testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ); testUpgradeZeroToOneInternal(true, true, HoodieTableType.MERGE_ON_READ);
} }
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS) @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams") @MethodSource("configParams")
public void testUpgrade(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { public void testUpgradeZeroToOne(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException {
testUpgradeInternal(false, deletePartialMarkerFiles, tableType); testUpgradeZeroToOneInternal(false, deletePartialMarkerFiles, tableType);
} }
public void testUpgradeInternal(boolean induceResiduesFromPrevUpgrade, boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { public void testUpgradeZeroToOneInternal(boolean induceResiduesFromPrevUpgrade, boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException {
// init config, table and client. // init config, table and client.
Map<String, String> params = new HashMap<>(); Map<String, String> params = new HashMap<>();
if (tableType == HoodieTableType.MERGE_ON_READ) { if (tableType == HoodieTableType.MERGE_ON_READ) {
params.put(HOODIE_TABLE_TYPE_PROP.key(), HoodieTableType.MERGE_ON_READ.name()); params.put(HOODIE_TABLE_TYPE_PROP.key(), HoodieTableType.MERGE_ON_READ.name());
metaClient = HoodieTestUtils.init(dfs.getConf(), dfsBasePath, HoodieTableType.MERGE_ON_READ); metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
} }
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg); SparkRDDWriteClient client = getHoodieWriteClient(cfg);
@@ -151,30 +183,116 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
assertMarkerFilesForUpgrade(table, commitInstant, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices); assertMarkerFilesForUpgrade(table, commitInstant, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
// verify hoodie.table.version got upgraded // verify hoodie.table.version got upgraded
metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ONE.versionCode()); assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ONE.versionCode());
assertTableVersionFromPropertyFile(HoodieTableVersion.ONE); assertTableVersionFromPropertyFile(HoodieTableVersion.ONE);
// trigger 3rd commit with marker based rollback enabled. // trigger 3rd commit with marker based rollback enabled.
/* HUDI-2310
List<HoodieRecord> thirdBatch = triggerCommit("003", tableType, true); List<HoodieRecord> thirdBatch = triggerCommit("003", tableType, true);
// Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback. // Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback.
assertRows(inputRecords.getKey(), thirdBatch); assertRows(inputRecords.getKey(), thirdBatch);
if (induceResiduesFromPrevUpgrade) { if (induceResiduesFromPrevUpgrade) {
assertFalse(dfs.exists(new Path(metaClient.getMetaPath(), SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE))); assertFalse(dfs.exists(new Path(metaClient.getMetaPath(), SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE)));
} }*/
} }
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS) @ParameterizedTest
@MethodSource("configParams") @EnumSource(value = HoodieTableType.class)
public void testDowngrade(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { public void testUpgradeOneToTwo(HoodieTableType tableType) throws IOException {
// init config, table and client. // init config, table and client.
Map<String, String> params = new HashMap<>(); Map<String, String> params = new HashMap<>();
addNewTableParamsToProps(params);
if (tableType == HoodieTableType.MERGE_ON_READ) { if (tableType == HoodieTableType.MERGE_ON_READ) {
params.put(HOODIE_TABLE_TYPE_PROP.key(), HoodieTableType.MERGE_ON_READ.name()); params.put(HOODIE_TABLE_TYPE_PROP.key(), HoodieTableType.MERGE_ON_READ.name());
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
} }
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg); SparkRDDWriteClient client = getHoodieWriteClient(cfg);
// Write inserts
doInsert(client);
// downgrade table props
downgradeTableConfigsFromTwoToOne(cfg);
// perform upgrade
new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.TWO, cfg, context, null);
// verify hoodie.table.version got upgraded
metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.TWO.versionCode());
assertTableVersionFromPropertyFile(HoodieTableVersion.TWO);
// verify table props
assertTableProps(cfg);
}
private void addNewTableParamsToProps(Map<String, String> params) {
params.put(KeyGeneratorOptions.RECORDKEY_FIELD.key(), "uuid");
params.put(KeyGeneratorOptions.PARTITIONPATH_FIELD.key(), "partition_path");
params.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP.key(), metaClient.getTableConfig().getTableName());
params.put(HOODIE_BASE_FILE_FORMAT_PROP.key(), HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().name());
}
private void doInsert(SparkRDDWriteClient client) {
// Write 1 (only inserts)
String commit1 = "000";
client.startCommitWithTime(commit1);
List<HoodieRecord> records = dataGen.generateInserts(commit1, 100);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
client.insert(writeRecords, commit1).collect();
}
private void downgradeTableConfigsFromTwoToOne(HoodieWriteConfig cfg) throws IOException {
Properties properties = new Properties(cfg.getProps());
properties.remove(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS.key());
properties.remove(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key());
properties.remove(HoodieTableConfig.HOODIE_TABLE_NAME_PROP.key());
properties.remove(HOODIE_BASE_FILE_FORMAT_PROP.key());
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP.key(), "1");
metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType(), properties);
// set hoodie.table.version to 1 in hoodie.properties file
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE);
}
private void assertTableProps(HoodieWriteConfig cfg) {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
Properties originalProps = cfg.getProps();
assertEquals(tableConfig.getPartitionFieldProp(), originalProps.getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD.key()));
assertEquals(tableConfig.getRecordKeyFieldProp(), originalProps.getProperty(KeyGeneratorOptions.RECORDKEY_FIELD.key()));
assertEquals(tableConfig.getTableName(), cfg.getTableName());
assertEquals(tableConfig.getBaseFileFormat().name(), originalProps.getProperty(HOODIE_BASE_FILE_FORMAT_PROP.key()));
}
@ParameterizedTest(name = TEST_NAME_WITH_DOWNGRADE_PARAMS)
@MethodSource("downGradeConfigParams")
public void testDowngrade(boolean deletePartialMarkerFiles, HoodieTableType tableType, HoodieTableVersion fromVersion) throws IOException {
MarkerType markerType = fromVersion == HoodieTableVersion.TWO ? MarkerType.TIMELINE_SERVER_BASED : MarkerType.DIRECT;
// init config, table and client.
Map<String, String> params = new HashMap<>();
if (fromVersion == HoodieTableVersion.TWO) {
addNewTableParamsToProps(params);
}
if (tableType == HoodieTableType.MERGE_ON_READ) {
params.put(HOODIE_TABLE_TYPE_PROP.key(), HoodieTableType.MERGE_ON_READ.name());
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
}
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(true)
.withMarkersType(markerType.name()).withProps(params).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
if (fromVersion == HoodieTableVersion.TWO) {
// set table configs
HoodieTableConfig tableConfig = metaClient.getTableConfig();
tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_NAME_PROP, cfg.getTableName());
tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP, cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD.key()));
tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS, cfg.getString(KeyGeneratorOptions.RECORDKEY_FIELD.key()));
tableConfig.setValue(HOODIE_BASE_FILE_FORMAT_PROP, cfg.getString(HOODIE_BASE_FILE_FORMAT_PROP));
}
// prepare data. Make 2 commits, in which 2nd is not committed. // prepare data. Make 2 commits, in which 2nd is not committed.
List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>(); List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
@@ -185,7 +303,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
HoodieInstant commitInstant = table.getPendingCommitTimeline().lastInstant().get(); HoodieInstant commitInstant = table.getPendingCommitTimeline().lastInstant().get();
// delete one of the marker files in 2nd commit if need be. // delete one of the marker files in 2nd commit if need be.
WriteMarkers writeMarkers = WriteMarkersFactory.get(getConfig().getMarkersType(), table, commitInstant.getTimestamp()); WriteMarkers writeMarkers = WriteMarkersFactory.get(markerType, table, commitInstant.getTimestamp());
List<String> markerPaths = new ArrayList<>(writeMarkers.allMarkerFilePaths()); List<String> markerPaths = new ArrayList<>(writeMarkers.allMarkerFilePaths());
if (deletePartialMarkerFiles) { if (deletePartialMarkerFiles) {
String toDeleteMarkerFile = markerPaths.get(0); String toDeleteMarkerFile = markerPaths.get(0);
@@ -193,30 +311,55 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
markerPaths.remove(toDeleteMarkerFile); markerPaths.remove(toDeleteMarkerFile);
} }
// set hoodie.table.version to 1 in hoodie.properties file // set hoodie.table.version to fromVersion in hoodie.properties file
prepForDowngrade(); HoodieTableVersion toVersion = HoodieTableVersion.ZERO;
if (fromVersion == HoodieTableVersion.TWO) {
prepForDowngradeFromTwoToOne();
toVersion = HoodieTableVersion.ONE;
} else {
prepForDowngradeFromOneToZero();
}
// downgrade should be performed. all marker files should be deleted // downgrade should be performed. all marker files should be deleted
new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.ZERO, cfg, context, null); new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, toVersion, cfg, context, null);
// assert marker files // assert marker files
assertMarkerFilesForDowngrade(table, commitInstant); assertMarkerFilesForDowngrade(table, commitInstant, toVersion == HoodieTableVersion.ONE);
// verify hoodie.table.version got downgraded // verify hoodie.table.version got downgraded
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode()); metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath())
assertTableVersionFromPropertyFile(HoodieTableVersion.ZERO); .setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), toVersion.versionCode());
assertTableVersionFromPropertyFile(toVersion);
// trigger 3rd commit with marker based rollback disabled. // trigger 3rd commit with marker based rollback disabled.
/* HUDI-2310
List<HoodieRecord> thirdBatch = triggerCommit("003", tableType, false); List<HoodieRecord> thirdBatch = triggerCommit("003", tableType, false);
// Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback. // Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback.
assertRows(inputRecords.getKey(), thirdBatch); assertRows(inputRecords.getKey(), thirdBatch);
*/
} }
private void assertMarkerFilesForDowngrade(HoodieTable table, HoodieInstant commitInstant) throws IOException { private void assertMarkerFilesForDowngrade(HoodieTable table, HoodieInstant commitInstant, boolean assertExists) throws IOException {
// Verify recreated marker files are as expected // Verify recreated marker files are as expected
WriteMarkers writeMarkers = WriteMarkersFactory.get(getConfig().getMarkersType(), table, commitInstant.getTimestamp()); WriteMarkers writeMarkers = WriteMarkersFactory.get(getConfig().getMarkersType(), table, commitInstant.getTimestamp());
assertFalse(writeMarkers.doesMarkerDirExist()); if (assertExists) {
assertTrue(writeMarkers.doesMarkerDirExist());
assertEquals(0, getTimelineServerBasedMarkerFileCount(table.getMetaClient().getMarkerFolderPath(commitInstant.getTimestamp()),
table.getMetaClient().getFs()));
} else {
assertFalse(writeMarkers.doesMarkerDirExist());
}
}
private long getTimelineServerBasedMarkerFileCount(String markerDir, FileSystem fileSystem) throws IOException {
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(markerDir));
Predicate<String> prefixFilter = pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX);
return Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath().toString())
.filter(prefixFilter)
.collect(Collectors.toList()).stream().count();
} }
private void assertMarkerFilesForUpgrade(HoodieTable table, HoodieInstant commitInstant, List<FileSlice> firstPartitionCommit2FileSlices, private void assertMarkerFilesForUpgrade(HoodieTable table, HoodieInstant commitInstant, List<FileSlice> firstPartitionCommit2FileSlices,
@@ -349,7 +492,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
//just generate two partitions //just generate two partitions
dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
//1. prepare data //1. prepare data
HoodieTestDataGenerator.writePartitionMetadata(dfs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, dfsBasePath); HoodieTestDataGenerator.writePartitionMetadata(metaClient.getFs(), new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
/** /**
* Write 1 (only inserts) * Write 1 (only inserts)
*/ */
@@ -396,7 +539,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
return Pair.of(records, records2); return Pair.of(records, records2);
} }
private void prepForDowngrade() throws IOException { private void prepForDowngradeFromOneToZero() throws IOException {
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE); metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE);
Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE);
try (FSDataOutputStream os = metaClient.getFs().create(propertyFile)) { try (FSDataOutputStream os = metaClient.getFs().create(propertyFile)) {
@@ -404,13 +547,21 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
} }
} }
private void prepForDowngradeFromTwoToOne() throws IOException {
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.TWO);
Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE);
try (FSDataOutputStream os = metaClient.getFs().create(propertyFile)) {
metaClient.getTableConfig().getProps().store(os, "");
}
}
private void createResidualFile() throws IOException { private void createResidualFile() throws IOException {
Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE);
Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE); Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE);
// Step1: Copy hoodie.properties to hoodie.properties.orig // Step1: Copy hoodie.properties to hoodie.properties.orig
FileUtil.copy(metaClient.getFs(), propertyFile, metaClient.getFs(), updatedPropertyFile, FileUtil.copy(metaClient.getFs(), propertyFile, metaClient.getFs(), updatedPropertyFile,
false, metaClient.getHadoopConf()); false, hadoopConf);
} }
private void assertTableVersionFromPropertyFile(HoodieTableVersion expectedVersion) throws IOException { private void assertTableVersionFromPropertyFile(HoodieTableVersion expectedVersion) throws IOException {

View File

@@ -30,7 +30,9 @@ public enum HoodieTableVersion {
// < 0.6.0 versions // < 0.6.0 versions
ZERO(0), ZERO(0),
// 0.6.0 onwards // 0.6.0 onwards
ONE(1); ONE(1),
// 0.9.0 onwards
TWO(2);
private final int versionCode; private final int versionCode;
@@ -43,10 +45,10 @@ public enum HoodieTableVersion {
} }
public static HoodieTableVersion current() { public static HoodieTableVersion current() {
return ONE; return TWO;
} }
static HoodieTableVersion versionFromCode(int versionCode) { public static HoodieTableVersion versionFromCode(int versionCode) {
return Arrays.stream(HoodieTableVersion.values()) return Arrays.stream(HoodieTableVersion.values())
.filter(v -> v.versionCode == versionCode).findAny() .filter(v -> v.versionCode == versionCode).findAny()
.orElseThrow(() -> new HoodieException("Unknown versionCode:" + versionCode)); .orElseThrow(() -> new HoodieException("Unknown versionCode:" + versionCode));

View File

@@ -179,12 +179,13 @@ public class MarkerUtils {
try { try {
if (fileSystem.exists(dirPath)) { if (fileSystem.exists(dirPath)) {
FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
Predicate<String> prefixFilter = pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX); Predicate<FileStatus> prefixFilter = fileStatus ->
Predicate<String> markerTypeFilter = fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX);
pathStr -> !stripMarkerFolderPrefix(pathStr, markerDir).equals(MARKER_TYPE_FILENAME); Predicate<FileStatus> markerTypeFilter = fileStatus ->
!fileStatus.getPath().getName().equals(MARKER_TYPE_FILENAME);
List<String> markerDirSubPaths = Arrays.stream(fileStatuses) List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath().toString())
.filter(prefixFilter.and(markerTypeFilter)) .filter(prefixFilter.and(markerTypeFilter))
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList()); .collect(Collectors.toList());
if (markerDirSubPaths.size() > 0) { if (markerDirSubPaths.size() > 0) {
@@ -216,4 +217,4 @@ public class MarkerUtils {
throw new HoodieIOException(ioe.getMessage(), ioe); throw new HoodieIOException(ioe.getMessage(), ioe);
} }
} }
} }

View File

@@ -18,6 +18,9 @@
package org.apache.hudi package org.apache.hudi
import java.util
import java.util.Properties
import org.apache.avro.Schema import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
@@ -29,9 +32,8 @@ import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.TableSchemaResolver
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, BOOTSTRAP_INDEX_CLASS} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, BOOTSTRAP_INDEX_CLASS}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
@@ -51,8 +53,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkContext} import org.apache.spark.{SPARK_VERSION, SparkContext}
import java.util
import java.util.Properties
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
@@ -118,7 +118,7 @@ object HoodieSparkSqlWriter {
} else { } else {
// Handle various save modes // Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs) handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator) val partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
// Create the table if not present // Create the table if not present
if (!tableExists) { if (!tableExists) {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP) val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP)

View File

@@ -17,13 +17,13 @@
package org.apache.hudi package org.apache.hudi
import java.util.Properties
import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig.{METADATA_ENABLE_PROP, METADATA_VALIDATE_PROP} import org.apache.hudi.common.config.HoodieMetadataConfig.{METADATA_ENABLE_PROP, METADATA_VALIDATE_PROP}
import org.apache.hudi.common.config.{HoodieConfig, TypedProperties} import org.apache.hudi.common.config.{HoodieConfig, TypedProperties}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
import java.util.Properties
import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.{mapAsScalaMapConverter, _} import scala.collection.JavaConverters.{mapAsScalaMapConverter, _}
@@ -96,23 +96,7 @@ object HoodieWriterUtils {
val props = new TypedProperties() val props = new TypedProperties()
props.putAll(parameters.asJava) props.putAll(parameters.asJava)
val keyGen = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) val keyGen = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
getPartitionColumns(keyGen) HoodieSparkUtils.getPartitionColumns(keyGen, props)
}
def getPartitionColumns(keyGen: KeyGenerator): String = {
keyGen match {
// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format
// is: "field_name: field_type", we extract the field_name from the partition path field.
case c: BaseKeyGenerator
if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] =>
c.getPartitionPathFields.asScala.map(pathField =>
pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
.mkString(",")
case b: BaseKeyGenerator => b.getPartitionPathFields.asScala.mkString(",")
case _=> null
}
} }
def convertMapToHoodieConfig(parameters: Map[String, String]): HoodieConfig = { def convertMapToHoodieConfig(parameters: Map[String, String]): HoodieConfig = {

View File

@@ -273,7 +273,6 @@ public class MarkerDirState implements Serializable {
private void syncMarkersFromFileSystem() { private void syncMarkersFromFileSystem() {
Map<String, Set<String>> fileMarkersSetMap = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( Map<String, Set<String>> fileMarkersSetMap = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
markerDirPath, fileSystem, hoodieEngineContext, parallelism); markerDirPath, fileSystem, hoodieEngineContext, parallelism);
for (String markersFilePathStr : fileMarkersSetMap.keySet()) { for (String markersFilePathStr : fileMarkersSetMap.keySet()) {
Set<String> fileMarkers = fileMarkersSetMap.get(markersFilePathStr); Set<String> fileMarkers = fileMarkersSetMap.get(markersFilePathStr);
if (!fileMarkers.isEmpty()) { if (!fileMarkers.isEmpty()) {

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.DataSourceUtils; import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.HoodieWriterUtils;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
@@ -50,7 +49,6 @@ import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.hive.HiveSyncTool;
@@ -62,6 +60,7 @@ import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaSet; import org.apache.hudi.utilities.schema.SchemaSet;
@@ -249,7 +248,7 @@ public class DeltaSync implements Serializable {
} }
} else { } else {
this.commitTimelineOpt = Option.empty(); this.commitTimelineOpt = Option.empty();
String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator); String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType) .setTableType(cfg.tableType)
.setTableName(cfg.targetTableName) .setTableName(cfg.targetTableName)
@@ -353,7 +352,7 @@ public class DeltaSync implements Serializable {
} }
} }
} else { } else {
String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator); String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType) .setTableType(cfg.tableType)
.setTableName(cfg.targetTableName) .setTableName(cfg.targetTableName)