1
0

Timeline Service with Incremental View Syncing support

This commit is contained in:
Balaji Varadarajan
2019-02-12 21:29:14 -08:00
committed by vinoth chandar
parent 446f99aa0f
commit 64fec64097
117 changed files with 8943 additions and 873 deletions

View File

@@ -0,0 +1,100 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.uber.hoodie;
import com.uber.hoodie.client.embedded.EmbeddedTimelineService;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import java.io.IOException;
import java.io.Serializable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Abstract class taking care of holding common member variables (FileSystem, SparkContext, HoodieConfigs)
* Also, manages embedded timeline-server if enabled.
*/
public abstract class AbstractHoodieClient implements Serializable {
private static final Logger logger = LogManager.getLogger(AbstractHoodieClient.class);
protected final transient FileSystem fs;
protected final transient JavaSparkContext jsc;
protected final HoodieWriteConfig config;
protected final String basePath;
/**
* Timeline Server has the same lifetime as that of Client.
* Any operations done on the same timeline service will be able to take advantage
* of the cached file-system view. New completed actions will be synced automatically
* in an incremental fashion.
*/
private transient EmbeddedTimelineService timelineServer;
protected AbstractHoodieClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
this.fs = FSUtils.getFs(clientConfig.getBasePath(), jsc.hadoopConfiguration());
this.jsc = jsc;
this.basePath = clientConfig.getBasePath();
this.config = clientConfig;
startEmbeddedServerView();
}
/**
* Releases any resources used by the client.
*/
public void close() {
stopEmbeddedServerView(true);
}
private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) {
if (timelineServer != null) {
logger.info("Stopping Timeline service !!");
timelineServer.stop();
timelineServer = null;
// Reset Storage Config to Client specified config
if (resetViewStorageConfig) {
config.resetViewStorageConfig();
}
}
}
private synchronized void startEmbeddedServerView() {
if (config.isEmbeddedTimelineServerEnabled()) {
if (timelineServer == null) {
// Run Embedded Timeline Server
logger.info("Starting Timeline service !!");
timelineServer = new EmbeddedTimelineService(jsc.hadoopConfiguration(), jsc.getConf(),
config.getClientSpecifiedViewStorageConfig());
try {
timelineServer.startServer();
// Allow executor to find this newly instantiated timeline service
config.setViewStorageConfig(timelineServer.getRemoteFileSystemViewConfig());
} catch (IOException e) {
logger.warn("Unable to start timeline service. Proceeding as if embedded server is disabled", e);
stopEmbeddedServerView(false);
}
} else {
logger.info("Timeline Server already running. Not restarting the service");
}
} else {
logger.info("Embedded Timeline Server is disabled. Not starting timeline service");
}
}
}

View File

@@ -36,6 +36,7 @@ import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.CompactionUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.func.OperationResult;
@@ -56,16 +57,12 @@ import org.apache.spark.api.java.JavaSparkContext;
/**
* Client to perform admin operations related to compaction
*/
public class CompactionAdminClient implements Serializable {
public class CompactionAdminClient extends AbstractHoodieClient {
private static Logger log = LogManager.getLogger(CompactionAdminClient.class);
private final transient JavaSparkContext jsc;
private final String basePath;
public CompactionAdminClient(JavaSparkContext jsc, String basePath) {
this.jsc = jsc;
this.basePath = basePath;
super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build());
}
/**
@@ -123,19 +120,17 @@ public class CompactionAdminClient implements Serializable {
// Only if all operations are successfully executed
if (!dryRun && allSuccess.isPresent() && allSuccess.get()) {
// Overwrite compaction request with empty compaction operations
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, compactionInstant);
HoodieCompactionPlan newPlan =
HoodieCompactionPlan.newBuilder().setOperations(new ArrayList<>()).setExtraMetadata(plan.getExtraMetadata())
.build();
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, compactionInstant);
Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName());
if (metaClient.getFs().exists(inflightPath)) {
// We need to rollback data-files because of this inflight compaction before unscheduling
throw new IllegalStateException("Please rollback the inflight compaction before unscheduling");
}
metaClient.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, compactionInstant),
AvroUtils.serializeCompactionPlan(newPlan));
// Leave the trace in aux folder but delete from metapath.
// TODO: Add a rollback instant but for compaction
HoodieInstant instant = new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, compactionInstant);
boolean deleted = metaClient.getFs().delete(new Path(metaClient.getMetaPath(), instant.getFileName()), false);
Preconditions.checkArgument(deleted, "Unable to delete compaction instant.");
}
return res;
}

View File

@@ -24,19 +24,16 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.util.CompactionUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
@@ -52,13 +49,10 @@ import scala.Tuple2;
/**
* Provides an RDD based API for accessing/filtering Hoodie tables, based on keys.
*/
public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializable {
public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
private static final Logger logger = LogManager.getLogger(HoodieReadClient.class);
private final transient JavaSparkContext jsc;
private final transient FileSystem fs;
/**
* TODO: We need to persist the index type into hoodie.properties and be able to access the index
* just with a simple basepath pointing to the dataset. Until, then just always assume a
@@ -94,9 +88,8 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
* @param clientConfig instance of HoodieWriteConfig
*/
public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
super(jsc, clientConfig);
final String basePath = clientConfig.getBasePath();
this.jsc = jsc;
this.fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
this.hoodieTable = HoodieTable
@@ -130,7 +123,6 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
* @return a dataframe
*/
public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism) throws Exception {
assertSqlContext();
JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD = index
.fetchRecordLocation(hoodieKeys, jsc, hoodieTable);

View File

@@ -62,7 +62,6 @@ import com.uber.hoodie.table.UserDefinedBulkInsertPartitioner;
import com.uber.hoodie.table.WorkloadProfile;
import com.uber.hoodie.table.WorkloadStat;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Arrays;
@@ -73,7 +72,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -92,12 +90,9 @@ import scala.Tuple2;
* Note that, at any given time, there can only be one Spark job performing these operatons on a
* Hoodie dataset.
*/
public class HoodieWriteClient<T extends HoodieRecordPayload> implements Serializable {
public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
private static Logger logger = LogManager.getLogger(HoodieWriteClient.class);
private final transient FileSystem fs;
private final transient JavaSparkContext jsc;
private final HoodieWriteConfig config;
private final boolean rollbackInFlight;
private final transient HoodieMetrics metrics;
private final transient HoodieIndex<T> index;
@@ -126,9 +121,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
@VisibleForTesting
HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
boolean rollbackInFlight, HoodieIndex index) {
this.fs = FSUtils.getFs(clientConfig.getBasePath(), jsc.hadoopConfiguration());
this.jsc = jsc;
this.config = clientConfig;
super(jsc, clientConfig);
this.index = index;
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackInFlight = rollbackInFlight;
@@ -160,7 +153,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
* Upserts a bunch of new records into the Hoodie table, at the supplied commitTime
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx();
HoodieTable<T> table = getTableAndInitCtx(records);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords = combineOnCondition(
@@ -189,7 +182,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
*/
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords,
final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx();
HoodieTable<T> table = getTableAndInitCtx(preppedRecords);
try {
return upsertRecordsInternal(preppedRecords, commitTime, table, true);
} catch (Throwable e) {
@@ -213,7 +206,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx();
HoodieTable<T> table = getTableAndInitCtx(records);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords = combineOnCondition(
@@ -241,7 +234,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
*/
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords,
final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx();
HoodieTable<T> table = getTableAndInitCtx(preppedRecords);
try {
return upsertRecordsInternal(preppedRecords, commitTime, table, false);
} catch (Throwable e) {
@@ -290,7 +283,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
*/
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx();
HoodieTable<T> table = getTableAndInitCtx(records);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords = combineOnCondition(
@@ -324,7 +317,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
*/
public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords,
final String commitTime, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx();
HoodieTable<T> table = getTableAndInitCtx(preppedRecords);
try {
return bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner);
} catch (Throwable e) {
@@ -977,7 +970,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
* Releases any resources used by the client.
*/
public void close() {
// UNDER CONSTRUCTION
// Stop timeline-server if running
super.close();
}
/**
@@ -1189,8 +1183,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
}
private HoodieTable getTableAndInitCtx() {
// Create a Hoodie table which encapsulated the commits and files visible
private HoodieTable getTableAndInitCtx(JavaRDD<HoodieRecord<T>> records) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);

View File

@@ -0,0 +1,100 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.uber.hoodie.client.embedded;
import com.uber.hoodie.common.SerializableConfiguration;
import com.uber.hoodie.common.table.view.FileSystemViewManager;
import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig;
import com.uber.hoodie.common.table.view.FileSystemViewStorageType;
import com.uber.hoodie.common.util.NetworkUtils;
import com.uber.hoodie.timeline.service.TimelineService;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
/**
* Timeline Service that runs as part of write client
*/
public class EmbeddedTimelineService {
private static Logger logger = LogManager.getLogger(EmbeddedTimelineService.class);
private int serverPort;
private String hostAddr;
private final SerializableConfiguration hadoopConf;
private final FileSystemViewStorageConfig config;
private transient FileSystemViewManager viewManager;
private transient TimelineService server;
public EmbeddedTimelineService(Configuration hadoopConf, SparkConf sparkConf, FileSystemViewStorageConfig config) {
setHostAddrFromSparkConf(sparkConf);
if (hostAddr == null) {
this.hostAddr = NetworkUtils.getHostname();
}
this.config = config;
this.hadoopConf = new SerializableConfiguration(hadoopConf);
this.viewManager = createViewManager();
}
private FileSystemViewManager createViewManager() {
// Using passed-in configs to build view storage configs
FileSystemViewStorageConfig.Builder builder =
FileSystemViewStorageConfig.newBuilder().fromProperties(config.getProps());
FileSystemViewStorageType storageType = builder.build().getStorageType();
if (storageType.equals(FileSystemViewStorageType.REMOTE_ONLY)
|| storageType.equals(FileSystemViewStorageType.REMOTE_FIRST)) {
// Reset to default if set to Remote
builder.withStorageType(FileSystemViewStorageType.MEMORY);
}
return FileSystemViewManager.createViewManager(hadoopConf, builder.build());
}
public void startServer() throws IOException {
server = new TimelineService(0, viewManager);
serverPort = server.startService();
logger.info("Started embedded timeline server at " + hostAddr + ":" + serverPort);
}
private void setHostAddrFromSparkConf(SparkConf sparkConf) {
String hostAddr = sparkConf.get("spark.driver.host", null);
if (hostAddr != null) {
logger.info("Overriding hostIp to (" + hostAddr + ") found in spark-conf. It was " + this.hostAddr);
this.hostAddr = hostAddr;
} else {
logger.warn("Unable to find driver bind address from spark config");
}
}
/**
* Retrieves proper view storage configs for remote clients to access this service
*/
public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
return FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST)
.withRemoteServerHost(hostAddr).withRemoteServerPort(serverPort).build();
}
public void stop() {
if (null != server) {
this.server.close();
this.server = null;
this.viewManager = null;
}
}
}

View File

@@ -1,51 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.config;
import java.io.Serializable;
import java.util.Properties;
/**
* Default Way to load Hoodie config through a java.util.Properties
*/
public class DefaultHoodieConfig implements Serializable {
protected final Properties props;
public DefaultHoodieConfig(Properties props) {
this.props = props;
}
public static void setDefaultOnCondition(Properties props, boolean condition, String propName,
String defaultValue) {
if (condition) {
props.setProperty(propName, defaultValue);
}
}
public static void setDefaultOnCondition(Properties props, boolean condition,
DefaultHoodieConfig config) {
if (condition) {
props.putAll(config.getProps());
}
}
public Properties getProps() {
return props;
}
}

View File

@@ -19,6 +19,7 @@ package com.uber.hoodie.config;
import com.google.common.base.Preconditions;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
@@ -71,9 +72,20 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
private static final String CONSISTENCY_CHECK_ENABLED = "hoodie.consistency.check.enabled";
private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "false";
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
// We keep track of original config and rewritten config
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
private FileSystemViewStorageConfig viewStorageConfig;
private HoodieWriteConfig(Properties props) {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
}
public static HoodieWriteConfig.Builder newBuilder() {
@@ -157,6 +169,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED));
}
public boolean isEmbeddedTimelineServerEnabled() {
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
}
/**
* compaction properties
**/
@@ -434,6 +450,22 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Double.valueOf(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP));
}
public FileSystemViewStorageConfig getViewStorageConfig() {
return viewStorageConfig;
}
public void setViewStorageConfig(FileSystemViewStorageConfig viewStorageConfig) {
this.viewStorageConfig = viewStorageConfig;
}
public void resetViewStorageConfig() {
this.setViewStorageConfig(getClientSpecifiedViewStorageConfig());
}
public FileSystemViewStorageConfig getClientSpecifiedViewStorageConfig() {
return clientSpecifiedViewStorageConfig;
}
public static class Builder {
private final Properties props = new Properties();
@@ -442,6 +474,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private boolean isCompactionConfigSet = false;
private boolean isMetricsConfigSet = false;
private boolean isMemoryConfigSet = false;
private boolean isViewConfigSet = false;
public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
@@ -569,6 +602,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withFileSystemViewConfig(FileSystemViewStorageConfig viewStorageConfig) {
props.putAll(viewStorageConfig.getProps());
isViewConfigSet = true;
return this;
}
public Builder withFinalizeWriteParallelism(int parallelism) {
props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism));
return this;
@@ -579,10 +618,13 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withEmbeddedTimelineServerEnabled(boolean enabled) {
props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled));
return this;
}
public HoodieWriteConfig build() {
HoodieWriteConfig config = new HoodieWriteConfig(props);
// Check for mandatory properties
Preconditions.checkArgument(config.getBasePath() != null);
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM),
@@ -611,6 +653,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED),
CONSISTENCY_CHECK_ENABLED, DEFAULT_CONSISTENCY_CHECK_ENABLED);
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet,
@@ -623,6 +667,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HoodieMetricsConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isMemoryConfigSet,
HoodieMemoryConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isViewConfigSet,
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build());
// Build WriteConfig at the end
HoodieWriteConfig config = new HoodieWriteConfig(props);
Preconditions.checkArgument(config.getBasePath() != null);
return config;
}
}

View File

@@ -224,6 +224,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
@VisibleForTesting
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Tuple2<String, HoodieDataFile>> dataFilesList = jsc
.parallelize(partitions, Math.max(partitions.size(), 1)).flatMapToPair(partitionPath -> {
@@ -243,7 +244,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
return jsc.parallelize(dataFilesList, Math.max(dataFilesList.size(), 1)).mapToPair(ft -> {
try {
String[] minMaxKeys = ParquetUtils
.readMinMaxRecordKeys(hoodieTable.getHadoopConf(), ft._2().getFileStatus().getPath());
.readMinMaxRecordKeys(hoodieTable.getHadoopConf(), new Path(ft._2().getPath()));
return new Tuple2<>(ft._1(),
new BloomIndexFileInfo(ft._2().getFileName(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {

View File

@@ -26,13 +26,14 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.TableFileSystemView.RealtimeView;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieAppendException;
import com.uber.hoodie.exception.HoodieUpsertException;
@@ -67,7 +68,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private List<IndexedRecord> recordList = new ArrayList<>();
// Buffer for holding records (to be deleted) in memory before they are flushed to disk
private List<HoodieKey> keysToDelete = new ArrayList<>();
private TableFileSystemView.RealtimeView fileSystemView;
private String partitionPath;
private Iterator<HoodieRecord<T>> recordItr;
// Total number of records written during an append
@@ -98,7 +99,6 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
super(config, commitTime, hoodieTable);
writeStatus.setStat(new HoodieDeltaWriteStat());
this.fileId = fileId;
this.fileSystemView = hoodieTable.getRTFileSystemView();
this.recordItr = recordItr;
}
@@ -110,15 +110,15 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
if (doInit) {
this.partitionPath = record.getPartitionPath();
// extract some information from the first record
Optional<FileSlice> fileSlice = fileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice1 -> fileSlice1.getFileId().equals(fileId)).findFirst();
RealtimeView rtView = hoodieTable.getRTFileSystemView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
// Set the base commit time as the current commitTime for new inserts into log files
String baseInstantTime = commitTime;
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseInstantTime();
} else {
// This means there is no base data file, start appending to a new log file
fileSlice = Optional.of(new FileSlice(partitionPath, baseInstantTime, this.fileId));
fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId));
logger.info("New InsertHandle for partition :" + partitionPath);
}
writeStatus.getStat().setPrevCommit(baseInstantTime);
@@ -242,6 +242,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
try {
// flush any remaining records to disk
doAppend(header);
long sizeInBytes = writer.getCurrentSize();
if (writer != null) {
writer.close();
}
@@ -251,6 +252,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
writeStatus.getStat().setNumInserts(insertRecordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten);
writeStatus.getStat().setFileSizeInBytes(sizeInBytes);
writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalUpsertTime(timer.endTimer());
@@ -266,7 +268,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
return writeStatus;
}
private Writer createLogWriter(Optional<FileSlice> fileSlice, String baseCommitTime)
private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
throws IOException, InterruptedException {
return HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath))
@@ -305,4 +307,4 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
}
}
}
}

View File

@@ -25,9 +25,8 @@ import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.SyncableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable;
@@ -51,7 +50,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
private static Logger logger = LogManager.getLogger(HoodieCleanHelper.class);
private final TableFileSystemView fileSystemView;
private final SyncableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline;
private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations;
private HoodieTable<T> hoodieTable;
@@ -59,12 +58,13 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
public HoodieCleanHelper(HoodieTable<T> hoodieTable, HoodieWriteConfig config) {
this.hoodieTable = hoodieTable;
this.fileSystemView = hoodieTable.getCompletedFileSystemView();
this.fileSystemView = hoodieTable.getHoodieView();
this.commitTimeline = hoodieTable.getCompletedCommitTimeline();
this.config = config;
this.fgIdToPendingCompactionOperations =
((HoodieTableFileSystemView)hoodieTable.getRTFileSystemView()).getFgIdToPendingCompaction().entrySet()
.stream().map(entry -> Pair.of(entry.getKey(), entry.getValue().getValue()))
((SyncableFileSystemView)hoodieTable.getRTFileSystemView()).getPendingCompactionOperations()
.map(entry -> Pair.of(new HoodieFileGroupId(entry.getValue().getPartitionPath(),
entry.getValue().getFileId()), entry.getValue()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
@@ -86,7 +86,14 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
Iterator<FileSlice> fileSliceIterator = fileGroup.getAllFileSlices().iterator();
// do not cleanup slice required for pending compaction
Iterator<FileSlice> fileSliceIterator = fileGroup.getAllFileSlices()
.filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
if (isFileGroupInPendingCompaction(fileGroup)) {
// We have already saved the last version of file-groups for pending compaction Id
keepVersions--;
}
while (fileSliceIterator.hasNext() && keepVersions > 0) {
// Skip this most recent version
FileSlice nextSlice = fileSliceIterator.next();
@@ -100,16 +107,14 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
// Delete the remaining files
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
if (!isFileSliceNeededForPendingCompaction(nextSlice)) {
if (nextSlice.getDataFile().isPresent()) {
HoodieDataFile dataFile = nextSlice.getDataFile().get();
deletePaths.add(dataFile.getFileStatus().getPath().toString());
}
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getPath().toString())
.collect(Collectors.toList()));
}
if (nextSlice.getDataFile().isPresent()) {
HoodieDataFile dataFile = nextSlice.getDataFile().get();
deletePaths.add(dataFile.getPath());
}
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getPath().toString())
.collect(Collectors.toList()));
}
}
}
@@ -180,7 +185,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
.compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime,
HoodieTimeline.GREATER)) {
// this is a commit, that should be cleaned.
aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileStatus().getPath().toString()));
aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getPath()));
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getPath().toString())
@@ -258,4 +263,8 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
}
return false;
}
private boolean isFileGroupInPendingCompaction(HoodieFileGroup fg) {
return fgIdToPendingCompactionOperations.containsKey(fg.getFileGroupId());
}
}

View File

@@ -155,6 +155,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
logger.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records "
+ recordsWritten);
try {
storageWriter.close();
HoodieWriteStat stat = new HoodieWriteStat();
@@ -165,7 +166,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(writeStatus.getFileId());
stat.setPaths(new Path(config.getBasePath()), path, tempPath);
stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath()));
long fileSizeInBytes = FSUtils.getFileSize(fs, getStorageWriterPath());
stat.setTotalWriteBytes(fileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalCreateTime(timer.endTimer());
@@ -182,4 +185,4 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
// Use tempPath for storage writer if possible
return (this.tempPath == null) ? this.path : this.tempPath;
}
}
}

View File

@@ -20,7 +20,6 @@ import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.HoodieTimer;
@@ -48,7 +47,6 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
protected final HoodieTable<T> hoodieTable;
protected final Schema originalSchema;
protected final Schema writerSchema;
protected HoodieTimeline hoodieTimeline;
protected HoodieTimer timer;
protected final WriteStatus writeStatus;
@@ -57,7 +55,6 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
this.config = config;
this.fs = hoodieTable.getMetaClient().getFs();
this.hoodieTable = hoodieTable;
this.hoodieTimeline = hoodieTable.getCompletedCommitsTimeline();
this.originalSchema = new Schema.Parser().parse(config.getSchema());
this.writerSchema = createHoodieWriteSchema(originalSchema);
this.timer = new HoodieTimer().startTimer();
@@ -159,4 +156,4 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
public abstract WriteStatus close();
public abstract WriteStatus getWriteStatus();
}
}

View File

@@ -24,7 +24,6 @@ import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.util.DefaultSizeEstimator;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
@@ -56,7 +55,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
private Map<String, HoodieRecord<T>> keyToNewRecords;
private Set<String> writtenRecordKeys;
private HoodieStorageWriter<IndexedRecord> storageWriter;
private TableFileSystemView.ReadOptimizedView fileSystemView;
private Path newFilePath;
private Path oldFilePath;
private Path tempPath = null;
@@ -69,20 +67,17 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String fileId) {
super(config, commitTime, hoodieTable);
this.fileSystemView = hoodieTable.getROFileSystemView();
String partitionPath = init(fileId, recordItr);
init(fileId, partitionPath,
fileSystemView.getLatestDataFiles(partitionPath)
.filter(dataFile -> dataFile.getFileId().equals(fileId)).findFirst());
hoodieTable.getROFileSystemView().getLatestDataFile(partitionPath, fileId).get());
}
/**
* Called by compactor code path
*/
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String fileId, Optional<HoodieDataFile> dataFileToBeMerged) {
Map<String, HoodieRecord<T>> keyToNewRecords, String fileId, HoodieDataFile dataFileToBeMerged) {
super(config, commitTime, hoodieTable);
this.fileSystemView = hoodieTable.getROFileSystemView();
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get())
@@ -92,12 +87,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
/**
* Extract old file path, initialize StorageWriter and WriteStatus
*/
private void init(String fileId, String partitionPath, Optional<HoodieDataFile> dataFileToBeMerged) {
private void init(String fileId, String partitionPath, HoodieDataFile dataFileToBeMerged) {
this.writtenRecordKeys = new HashSet<>();
writeStatus.setStat(new HoodieWriteStat());
try {
//TODO: dataFileToBeMerged must be optional. Will be fixed by Nishith's changes to support insert to log-files
String latestValidFilePath = dataFileToBeMerged.get().getFileName();
String latestValidFilePath = dataFileToBeMerged.getFileName();
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime,
@@ -276,7 +270,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
storageWriter.close();
}
writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath()));
long fileSizeInBytes = FSUtils.getFileSize(fs, getStorageWriterPath());
writeStatus.getStat().setTotalWriteBytes(fileSizeInBytes);
writeStatus.getStat().setFileSizeInBytes(fileSizeInBytes);
writeStatus.getStat().setNumWrites(recordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);

View File

@@ -37,6 +37,7 @@ import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner;
import com.uber.hoodie.common.util.CompactionUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
@@ -89,6 +90,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
List<CompactionOperation> operations = compactionPlan.getOperations().stream().map(
CompactionOperation::convertFromAvroRecordInstance).collect(toList());
log.info("Compactor compacting " + operations + " files");
return jsc.parallelize(operations, operations.size())
.map(s -> compact(table, metaClient, config, s, compactionInstantTime))
.flatMap(List::iterator);
@@ -113,7 +115,6 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
.getTimelineOfActions(
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
log.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs,
@@ -125,9 +126,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
return Lists.<WriteStatus>newArrayList();
}
Optional<HoodieDataFile> oldDataFileOpt = hoodieCopyOnWriteTable.getROFileSystemView()
.getLatestDataFilesOn(operation.getPartitionPath(), operation.getBaseInstantTime())
.filter(df -> df.getFileId().equals(operation.getFileId())).findFirst();
Option<HoodieDataFile> oldDataFileOpt = hoodieCopyOnWriteTable.getROFileSystemView()
.getDataFileOn(operation.getPartitionPath(), operation.getBaseInstantTime(), operation.getFileId());
// Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result;
@@ -135,7 +135,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
// new base parquet file.
if (operation.getDataFilePath().isPresent()) {
result = hoodieCopyOnWriteTable
.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords(), oldDataFileOpt);
.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords(), oldDataFileOpt.get());
} else {
result = hoodieCopyOnWriteTable
.handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(), scanner.iterator());
@@ -189,6 +189,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
// In case no partitions could be picked, return no compaction plan
return null;
}
TableFileSystemView.RealtimeView fileSystemView = hoodieTable.getRTFileSystemView();
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
List<HoodieCompactionOperation> operations =

View File

@@ -59,8 +59,8 @@ public abstract class CompactionStrategy implements Serializable {
Map<String, Double> metrics = Maps.newHashMap();
Long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
// Total size of all the log files
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(Optional::isPresent)
.map(Optional::get).reduce((size1, size2) -> size1 + size2).orElse(0L);
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0)
.reduce((size1, size2) -> size1 + size2).orElse(0L);
// Total read will be the base file + all the log files
Long totalIORead = FSUtils.getSizeInMB((dataFile.isPresent() ? dataFile.get().getFileSize() : 0L)
+ totalLogFileSize);

View File

@@ -46,7 +46,7 @@ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrat
// Total size of all the log files
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize)
.filter(Optional::isPresent).map(Optional::get).reduce((size1, size2) -> size1 + size2)
.filter(size -> size >= 0).reduce((size1, size2) -> size1 + size2)
.orElse(0L);
// save the metrics needed during the order
metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue());

View File

@@ -58,7 +58,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
@@ -186,9 +185,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, Optional<HoodieDataFile> dataFileOpt) throws IOException {
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieDataFile oldDataFile) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, keyToNewRecords, dataFileOpt);
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, commitTime, fileId);
}
@@ -231,7 +230,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, Optional<HoodieDataFile> dataFileToBeMerged) {
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieDataFile dataFileToBeMerged) {
return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged);
}

View File

@@ -32,13 +32,13 @@ import com.uber.hoodie.common.model.HoodieRollingStat;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.SyncableFileSystemView;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieCompactionException;
@@ -55,7 +55,6 @@ import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -155,8 +154,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor();
try {
return compactor.generateCompactionPlan(jsc, this, config, instantTime,
new HashSet<>(((HoodieTableFileSystemView)getRTFileSystemView())
.getFgIdToPendingCompaction().keySet()));
((SyncableFileSystemView)getRTFileSystemView()).getPendingCompactionOperations()
.map(instantTimeCompactionopPair -> instantTimeCompactionopPair.getValue().getFileGroupId())
.collect(Collectors.toSet()));
} catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
}
@@ -460,7 +461,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
// TODO (NA) : Make this static part of utility
@VisibleForTesting
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(hoodieLogFile -> hoodieLogFile.getFileSize().get())
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(hoodieLogFile -> hoodieLogFile.getFileSize())
.filter(size -> size > 0)
.reduce((a, b) -> (a + b)).orElse(0L);
// Here we assume that if there is no base parquet file, all log files contain only inserts.
// We can then just get the parquet equivalent size of these log files, compare that with

View File

@@ -21,14 +21,17 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.SerializableConfiguration;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.SyncableFileSystemView;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.FileSystemViewManager;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -62,12 +65,26 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex<T> index;
private SerializableConfiguration hadoopConfiguration;
private transient FileSystemViewManager viewManager;
protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc) {
this.config = config;
this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration());
this.viewManager = FileSystemViewManager.createViewManager(
new SerializableConfiguration(jsc.hadoopConfiguration()), config.getViewStorageConfig());
this.metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true);
this.index = HoodieIndex.createIndex(config, jsc);
}
private synchronized FileSystemViewManager getViewManager() {
if (null == viewManager) {
viewManager = FileSystemViewManager.createViewManager(hadoopConfiguration,
config.getViewStorageConfig());
}
return viewManager;
}
public static <T extends HoodieRecordPayload> HoodieTable<T> getHoodieTable(
HoodieTableMetaClient metaClient, HoodieWriteConfig config, JavaSparkContext jsc) {
switch (metaClient.getTableType()) {
@@ -118,22 +135,21 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* Get the read optimized view of the file system for this table
*/
public TableFileSystemView.ReadOptimizedView getROFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
return getViewManager().getFileSystemView(metaClient.getBasePath());
}
/**
* Get the real time view of the file system for this table
*/
public TableFileSystemView.RealtimeView getRTFileSystemView() {
return new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants());
return getViewManager().getFileSystemView(metaClient.getBasePath());
}
/**
* Get the completed (commit + compaction) view of the file system for this table
* Get complete view of the file system for this table with ability to force sync
*/
public TableFileSystemView getCompletedFileSystemView() {
return new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline());
public SyncableFileSystemView getHoodieView() {
return getViewManager().getFileSystemView(metaClient.getBasePath());
}
/**