1
0

[HUDI-1089] Refactor hudi-client to support multi-engine (#1827)

- This change breaks `hudi-client` into `hudi-client-common` and `hudi-spark-client` modules 
- Simple usages of Spark using jsc.parallelize() has been redone using EngineContext#map, EngineContext#flatMap etc
- Code changes in the PR, break classes into `BaseXYZ` parent classes with no spark dependencies living in `hudi-client-common`
- Classes on `hudi-spark-client` are named `SparkXYZ` extending the parent classes with all the Spark dependencies
- To simplify/cleanup, HoodieIndex#fetchRecordLocation has been removed and its usages in tests replaced with alternatives

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Mathieu
2020-10-02 05:25:29 +08:00
committed by GitHub
parent 5aaaf8bff1
commit 1f7add9291
380 changed files with 6071 additions and 4128 deletions

View File

@@ -20,10 +20,10 @@ jdk:
- openjdk8
jobs:
include:
- name: "Unit tests except hudi-client"
env: MODE=unit MODULES='!hudi-client' HUDI_QUIETER_LOGGING=1
- name: "Unit tests for hudi-client"
env: MODE=unit MODULES=hudi-client HUDI_QUIETER_LOGGING=1
- name: "Unit tests except hudi-spark-client"
env: MODE=unit MODULES='!hudi-client/hudi-spark-client' HUDI_QUIETER_LOGGING=1
- name: "Unit tests for hudi-spark-client"
env: MODE=unit MODULES=hudi-client/hudi-spark-client HUDI_QUIETER_LOGGING=1
- name: "Functional tests"
env: MODE=functional HUDI_QUIETER_LOGGING=1
- name: "Integration tests"

View File

@@ -148,7 +148,14 @@
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client</artifactId>
<artifactId>hudi-client-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>

View File

@@ -23,7 +23,8 @@ import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -162,10 +163,10 @@ public class SavepointsCommand implements CommandMarker {
return String.format("Savepoint \"%s\" deleted.", instantTime);
}
private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
return new HoodieWriteClient(jsc, config, false);
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config, false);
}
}

View File

@@ -22,12 +22,13 @@ import org.apache.hudi.cli.DeDupeType;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.cli.DedupeSparkJob;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.client.utils.ClientUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
@@ -35,8 +36,8 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.hudi.utilities.HDFSParquetImporter.Config;
import org.apache.hudi.utilities.HoodieCleaner;
@@ -343,7 +344,7 @@ public class SparkMain {
}
private static int rollback(JavaSparkContext jsc, String instantTime, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
if (client.rollback(instantTime)) {
LOG.info(String.format("The commit \"%s\" rolled back.", instantTime));
return 0;
@@ -355,7 +356,7 @@ public class SparkMain {
private static int createSavepoint(JavaSparkContext jsc, String commitTime, String user,
String comments, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
try {
client.savepoint(commitTime, user, comments);
LOG.info(String.format("The commit \"%s\" has been savepointed.", commitTime));
@@ -367,7 +368,7 @@ public class SparkMain {
}
private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
try {
client.restoreToSavepoint(savepointTime);
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
@@ -379,7 +380,7 @@ public class SparkMain {
}
private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
try {
client.deleteSavepoint(savepointTime);
LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime));
@@ -401,9 +402,10 @@ public class SparkMain {
*/
protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) {
HoodieWriteConfig config = getWriteConfig(basePath);
HoodieTableMetaClient metaClient = ClientUtils.createMetaClient(jsc.hadoopConfiguration(), config, false);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), false,
config.getConsistencyGuardConfig(), Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())));
try {
UpgradeDowngrade.run(metaClient, HoodieTableVersion.valueOf(toVersion), config, jsc, null);
new SparkUpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc)).run(metaClient, HoodieTableVersion.valueOf(toVersion), config, new HoodieSparkEngineContext(jsc), null);
LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion));
return 0;
} catch (Exception e) {
@@ -412,9 +414,9 @@ public class SparkMain {
}
}
private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
HoodieWriteConfig config = getWriteConfig(basePath);
return new HoodieWriteClient(jsc, config);
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config);
}
private static HoodieWriteConfig getWriteConfig(String basePath) {

View File

@@ -21,7 +21,7 @@ package org.apache.hudi.cli.utils;
import org.apache.hudi.cli.HoodieCliSparkConfig;
import org.apache.hudi.cli.commands.SparkEnvCommand;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -91,7 +91,7 @@ public class SparkUtil {
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, "BLOCK");
HoodieWriteClient.registerClasses(sparkConf);
SparkRDDWriteClient.registerClasses(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.junit.jupiter.api.AfterEach;
@@ -92,8 +93,9 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
// archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
archiveLog.archiveIfRequired(jsc);
HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
archiveLog.archiveIfRequired(context);
}
@AfterEach

View File

@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hadoop.fs.FileSystem;
@@ -179,8 +180,9 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
// archive
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration());
archiveLog.archiveIfRequired(jsc);
HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
archiveLog.archiveIfRequired(context);
CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104"));
assertTrue(cr.isSuccess());

View File

@@ -24,7 +24,7 @@ import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -90,7 +90,7 @@ public class TestRollbacksCommand extends AbstractShellIntegrationTest {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
try (HoodieWriteClient client = getHoodieWriteClient(config)) {
try (AbstractHoodieWriteClient client = getHoodieWriteClient(config)) {
// Rollback inflight commit3 and commit2
client.rollback("102");
client.rollback("101");

View File

@@ -0,0 +1,264 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi-client</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.6.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-client-common</artifactId>
<version>${parent.version}</version>
<name>hudi-client-common</name>
<packaging>jar</packaging>
<dependencies>
<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-timeline-service</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- Dropwizard Metrics -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
<exclusions>
<exclusion>
<groupId>com.rabbitmq</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_dropwizard</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_pushgateway</artifactId>
</dependency>
<!-- Hoodie - Tests -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
<!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 -->
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-suite-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-commons</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
<configuration>
<skip>false</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/test/resources</directory>
</resource>
</resources>
</build>
</project>

View File

@@ -17,14 +17,15 @@
package org.apache.hudi.async;
import org.apache.hudi.client.Compactor;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.AbstractCompactor;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.client.common.EngineProperty;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
@@ -40,7 +41,7 @@ import java.util.stream.IntStream;
/**
* Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time.
*/
public class AsyncCompactService extends AbstractAsyncService {
public abstract class AsyncCompactService extends HoodieAsyncService {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
@@ -51,23 +52,25 @@ public class AsyncCompactService extends AbstractAsyncService {
public static final String COMPACT_POOL_NAME = "hoodiecompact";
private final int maxConcurrentCompaction;
private transient Compactor compactor;
private transient JavaSparkContext jssc;
private transient AbstractCompactor compactor;
private transient HoodieEngineContext context;
private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>();
private transient ReentrantLock queueLock = new ReentrantLock();
private transient Condition consumed = queueLock.newCondition();
public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
this(jssc, client, false);
public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client) {
this(context, client, false);
}
public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client, boolean runInDaemonMode) {
public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client, boolean runInDaemonMode) {
super(runInDaemonMode);
this.jssc = jssc;
this.compactor = new Compactor(client);
this.context = context;
this.compactor = createCompactor(client);
this.maxConcurrentCompaction = 1;
}
protected abstract AbstractCompactor createCompactor(AbstractHoodieWriteClient client);
/**
* Enqueues new Pending compaction.
*/
@@ -127,8 +130,8 @@ public class AsyncCompactService extends AbstractAsyncService {
return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
// Set Compactor Pool Name for allowing users to prioritize compaction
LOG.info("Setting Spark Pool name for compaction to " + COMPACT_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", COMPACT_POOL_NAME);
LOG.info("Setting pool name for compaction to " + COMPACT_POOL_NAME);
context.setProperty(EngineProperty.COMPACTION_POOL_NAME, COMPACT_POOL_NAME);
while (!isShutdownRequested()) {
final HoodieInstant instant = fetchNextCompactionInstant();

View File

@@ -34,9 +34,9 @@ import java.util.function.Function;
/**
* Base Class for running clean/delta-sync/compaction in separate thread and controlling their life-cycle.
*/
public abstract class AbstractAsyncService implements Serializable {
public abstract class HoodieAsyncService implements Serializable {
private static final Logger LOG = LogManager.getLogger(AbstractAsyncService.class);
private static final Logger LOG = LogManager.getLogger(HoodieAsyncService.class);
// Flag to track if the service is started.
private boolean started;
@@ -51,11 +51,11 @@ public abstract class AbstractAsyncService implements Serializable {
// Run in daemon mode
private final boolean runInDaemonMode;
protected AbstractAsyncService() {
protected HoodieAsyncService() {
this(false);
}
protected AbstractAsyncService(boolean runInDaemonMode) {
protected HoodieAsyncService(boolean runInDaemonMode) {
shutdownRequested = false;
this.runInDaemonMode = runInDaemonMode;
}

View File

@@ -20,13 +20,12 @@ package org.apache.hudi.callback.impl;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import static org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil.convertToJsonString;
/**
* A http implementation of {@link HoodieWriteCommitCallback}.
*/
@@ -43,7 +42,7 @@ public class HoodieWriteCommitHttpCallback implements HoodieWriteCommitCallback
@Override
public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
// convert to json
String callbackMsg = convertToJsonString(callbackMessage);
String callbackMsg = HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage);
LOG.info("Try to send callbackMsg, msg = " + callbackMsg);
client.send(callbackMsg);
}

View File

@@ -20,11 +20,10 @@ package org.apache.hudi.callback.util;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteCommitCallbackConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitCallbackException;
import static org.apache.hudi.config.HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP;
/**
* Factory help to create {@link HoodieWriteCommitCallback}.
*/
@@ -40,7 +39,7 @@ public class HoodieCommitCallbackFactory {
return (HoodieWriteCommitCallback) instance;
} else {
throw new HoodieCommitCallbackException(String.format("The value of the config option %s can not be null or "
+ "empty", CALLBACK_CLASS_PROP));
+ "empty", HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP));
}
}

View File

@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import java.io.IOException;
import java.io.Serializable;
/**
* Run one round of compaction.
*/
public abstract class AbstractCompactor<T extends HoodieRecordPayload, I, K, O> implements Serializable {
private static final long serialVersionUID = 1L;
protected transient AbstractHoodieWriteClient<T, I, K, O> compactionClient;
public AbstractCompactor(AbstractHoodieWriteClient<T, I, K, O> compactionClient) {
this.compactionClient = compactionClient;
}
public abstract void compact(HoodieInstant instant) throws IOException;
}

View File

@@ -19,17 +19,19 @@
package org.apache.hudi.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.common.EngineProperty;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.ClientUtils;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
@@ -43,7 +45,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
private static final Logger LOG = LogManager.getLogger(AbstractHoodieClient.class);
protected final transient FileSystem fs;
protected final transient JavaSparkContext jsc;
protected final transient HoodieEngineContext context;
protected final transient Configuration hadoopConf;
protected final HoodieWriteConfig config;
protected final String basePath;
@@ -56,15 +58,15 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
private transient Option<EmbeddedTimelineService> timelineServer;
private final boolean shouldStopTimelineServer;
protected AbstractHoodieClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
this(jsc, clientConfig, Option.empty());
protected AbstractHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
this(context, clientConfig, Option.empty());
}
protected AbstractHoodieClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
protected AbstractHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineServer) {
this.hadoopConf = jsc.hadoopConfiguration();
this.hadoopConf = context.getHadoopConf().get();
this.fs = FSUtils.getFs(clientConfig.getBasePath(), hadoopConf);
this.jsc = jsc;
this.context = context;
this.basePath = clientConfig.getBasePath();
this.config = clientConfig;
this.timelineServer = timelineServer;
@@ -99,7 +101,8 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
if (!timelineServer.isPresent()) {
// Run Embedded Timeline Server
LOG.info("Starting Timeline service !!");
timelineServer = Option.of(new EmbeddedTimelineService(hadoopConf, jsc.getConf(),
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null),
config.getClientSpecifiedViewStorageConfig()));
try {
timelineServer.get().startServer();
@@ -122,6 +125,8 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
}
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
return ClientUtils.createMetaClient(hadoopConf, config, loadActiveTimelineOnLoad);
return new HoodieTableMetaClient(hadoopConf, config.getBasePath(), loadActiveTimelineOnLoad,
config.getConsistencyGuardConfig(),
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())));
}
}

View File

@@ -18,22 +18,27 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -45,109 +50,178 @@ import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import com.codahale.metrics.Timer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Hoodie Write Client helps you build tables on HDFS [insert()] and then perform efficient mutations on an HDFS
* table [upsert()]
* <p>
* Note that, at any given time, there can only be one Spark job performing these operations on a Hoodie table.
* Abstract Write Client providing functionality for performing commit, index updates and rollback
* Reused for regular write operations like upsert/insert/bulk-insert.. as well as bootstrap
*
* @param <T> Sub type of HoodieRecordPayload
* @param <I> Type of inputs
* @param <K> Type of keys
* @param <O> Type of outputs
*/
public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieWriteClient<T> {
public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I, K, O> extends AbstractHoodieClient {
protected static final String LOOKUP_STR = "lookup";
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class);
private static final String LOOKUP_STR = "lookup";
private final boolean rollbackPending;
private final transient HoodieMetrics metrics;
private transient Timer.Context compactionTimer;
private transient AsyncCleanerService asyncCleanerService;
private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class);
protected final transient HoodieMetrics metrics;
private final transient HoodieIndex<T, I, K, O> index;
protected transient Timer.Context writeTimer = null;
protected transient Timer.Context compactionTimer;
private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;
protected final boolean rollbackPending;
protected transient AsyncCleanerService asyncCleanerService;
/**
* Create a write client, without cleaning up failed/inflight commits.
*
* @param jsc Java Spark Context
* @param context HoodieEngineContext
* @param clientConfig instance of HoodieWriteConfig
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
this(jsc, clientConfig, false);
public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
this(context, clientConfig, false);
}
/**
* Create a write client, with new hudi index.
*
* @param jsc Java Spark Context
* @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending) {
this(jsc, writeConfig, rollbackPending, HoodieIndex.createIndex(writeConfig));
}
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending, HoodieIndex index) {
this(jsc, writeConfig, rollbackPending, index, Option.empty());
public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
this(context, writeConfig, rollbackPending, Option.empty());
}
/**
* Create a write client, allows to specify all parameters.
* Create a write client, allows to specify all parameters.
*
* @param jsc Java Spark Context
* @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
* @param timelineService Timeline Service that runs as part of write client.
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending,
HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
super(jsc, index, writeConfig, timelineService);
public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;
this.index = createIndex(writeConfig);
}
protected abstract HoodieIndex<T, I, K, O> createIndex(HoodieWriteConfig writeConfig);
public void setOperationType(WriteOperationType operationType) {
this.operationType = operationType;
}
public WriteOperationType getOperationType() {
return this.operationType;
}
/**
* Register hudi classes for Kryo serialization.
*
* @param conf instance of SparkConf
* @return SparkConf
* Commit changes performed at the given instantTime marker.
*/
public static SparkConf registerClasses(SparkConf conf) {
conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class});
return conf;
public boolean commit(String instantTime, O writeStatuses) {
return commit(instantTime, writeStatuses, Option.empty());
}
/**
*
* Commit changes performed at the given instantTime marker.
*/
public boolean commit(String instantTime, O writeStatuses, Option<Map<String, String>> extraMetadata) {
HoodieTableMetaClient metaClient = createMetaClient(false);
String actionType = metaClient.getCommitActionType();
return commit(instantTime, writeStatuses, extraMetadata, actionType, Collections.emptyMap());
}
public abstract boolean commit(String instantTime, O writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds);
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType) {
return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap());
}
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
LOG.info("Committing " + instantTime + " action " + commitActionType);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = createTable(config, hadoopConf);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), commitActionType);
// Finalize write
finalizeWrite(table, instantTime, stats);
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
postCommit(table, metadata, instantTime, extraMetadata);
emitCommitMetrics(instantTime, metadata, commitActionType);
LOG.info("Committed " + instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
e);
}
// callback if needed.
if (config.writeCommitCallbackOn()) {
if (null == commitCallback) {
commitCallback = HoodieCommitCallbackFactory.create(config);
}
commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath()));
}
return true;
}
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);
void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {
if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime(), durationInMs,
metadata, actionType);
writeTimer = null;
}
} catch (ParseException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime
+ "Instant time is not of valid format", e);
}
}
/**
* Filter out HoodieRecords that already exists in the output folder. This is useful in deduplication.
*
* @param hoodieRecords Input RDD of Hoodie records.
* @return A subset of hoodieRecords RDD, with existing records filtered out.
* @param hoodieRecords Input Hoodie records.
* @return A subset of hoodieRecords, with existing records filtered out.
*/
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
Timer.Context indexTimer = metrics.getIndexCtx();
JavaRDD<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(hoodieRecords, jsc, table);
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}
public abstract I filterExists(I hoodieRecords);
/**
* Main API to run bootstrap to hudi.
@@ -156,8 +230,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
if (rollbackPending) {
rollBackInflightBootstrap();
}
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
table.bootstrap(jsc, extraMetadata);
HoodieTable<T, I, K, O> table = getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
table.bootstrap(context, extraMetadata);
}
/**
@@ -165,14 +239,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
protected void rollBackInflightBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
Option<String> instant = Option.fromJavaOptional(
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
LOG.info("Found pending bootstrap instants. Rolling them back");
table.rollbackBootstrap(jsc, HoodieActiveTimeline.createNewInstantTime());
table.rollbackBootstrap(context, HoodieActiveTimeline.createNewInstantTime());
LOG.info("Finished rolling back pending bootstrap");
}
@@ -181,21 +255,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
*
* @param records JavaRDD of hoodieRecords to upsert
* @param records hoodieRecords to upsert
* @param instantTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
* @return WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
return postWrite(result, instantTime, table);
}
public abstract O upsert(I records, final String instantTime);
/**
* Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
@@ -206,14 +270,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param instantTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
public abstract O upsertPreppedRecords(I preppedRecords, final String instantTime);
/**
* Inserts the given HoodieRecords, into the table. This API is intended to be used for normal writes.
@@ -225,14 +282,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param instantTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
return postWrite(result, instantTime, table);
}
public abstract O insert(I records, final String instantTime);
/**
* Inserts the given prepared records into the Hoodie table, at the supplied instantTime.
@@ -245,36 +295,27 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param instantTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
public abstract O insertPreppedRecords(I preppedRecords, final String instantTime);
/**
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk loads into a Hoodie
* table for the very first time (e.g: converting an existing table to Hoodie).
* <p>
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
* the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)}
* the numbers of files with less memory compared to the {@link AbstractHoodieWriteClient#insert(I, String)}
*
* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
return bulkInsert(records, instantTime, Option.empty());
}
public abstract O bulkInsert(I records, final String instantTime);
/**
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk loads into a Hoodie
* table for the very first time (e.g: converting an existing table to Hoodie).
* <p>
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
* the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)}. Optionally
* the numbers of files with less memory compared to the {@link AbstractHoodieWriteClient#insert(I, String)}. Optionally
* it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See
* {@link BulkInsertPartitioner}.
*
@@ -284,15 +325,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* into hoodie.
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String instantTime,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, userDefinedBulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
public abstract O bulkInsert(I records, final String instantTime,
Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
/**
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk loads into a Hoodie
@@ -300,7 +335,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* duplicates if needed.
* <p>
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
* the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)}. Optionally
* the numbers of files with less memory compared to the {@link AbstractHoodieWriteClient#insert(I, String)}. Optionally
* it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See
* {@link BulkInsertPartitioner}.
*
@@ -310,31 +345,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* into hoodie.
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime,
Option<BulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
/**
* Removes all existing records from the partitions affected and inserts the given HoodieRecords, into the table.
* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.insertOverwrite(jsc, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instantTime,
Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
@@ -344,12 +356,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param instantTime Commit time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
setOperationType(WriteOperationType.DELETE);
HoodieWriteMetadata result = table.delete(jsc,instantTime, keys);
return postWrite(result, instantTime, table);
}
public abstract O delete(K keys, final String instantTime);
/**
* Common method containing steps to be performed after write (upsert/insert/..) operations including auto-commit.
@@ -358,31 +365,21 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param hoodieTable Hoodie Table
* @return Write Status
*/
private JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata result, String instantTime, HoodieTable<T> hoodieTable) {
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
if (result.isCommitted()) {
// Perform post commit operations.
if (result.getFinalizeDuration().isPresent()) {
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
result.getWriteStats().get().size());
}
protected abstract O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable<T, I, K, O> hoodieTable);
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());
emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
hoodieTable.getMetaClient().getCommitActionType());
}
return result.getWriteStatuses();
}
@Override
protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
/**
* Post Commit Hook. Derived classes use this method to perform post-commit processing
*
* @param table table to commit on
* @param metadata Commit Metadata corresponding to committed instant
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
*/
protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
try {
// Delete the marker directory for the instant.
new MarkerFiles(table, instantTime).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());
new MarkerFiles(table, instantTime).quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
// Do an inline compaction if enabled
if (config.isInlineCompaction()) {
@@ -393,15 +390,15 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
}
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf);
archiveLog.archiveIfRequired(jsc);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
autoCleanOnCommit(instantTime);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
private void runAnyPendingCompactions(HoodieTable<?> table) {
protected void runAnyPendingCompactions(HoodieTable<T, I, K, O> table) {
table.getActiveTimeline().getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().getInstants()
.forEach(instant -> {
LOG.info("Running previously failed inflight compaction at instant " + instant);
@@ -411,9 +408,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
/**
* Handle auto clean during commit.
*
* @param instantTime
*/
private void autoCleanOnCommit(String instantTime) {
protected void autoCleanOnCommit(String instantTime) {
if (config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit,
if (config.isAsyncClean()) {
@@ -434,7 +432,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param comment - Comment for the savepoint
*/
public void savepoint(String user, String comment) {
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
if (table.getCompletedCommitsTimeline().empty()) {
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
}
@@ -458,8 +456,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param comment - Comment for the savepoint
*/
public void savepoint(String instantTime, String user, String comment) {
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
table.savepoint(jsc, instantTime, user, comment);
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
table.savepoint(context, instantTime, user, comment);
}
/**
@@ -470,7 +468,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @return true if the savepoint was deleted successfully
*/
public void deleteSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
SavepointHelpers.deleteSavepoint(table, savepointTime);
}
@@ -485,7 +483,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @return true if the savepoint was restored to successfully
*/
public void restoreToSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
SavepointHelpers.validateSavepointPresence(table, savepointTime);
restoreToInstant(savepointTime);
SavepointHelpers.validateSavepointRestore(table, savepointTime);
@@ -500,16 +498,16 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
LOG.info("Begin rollback of instant " + commitInstantTime);
final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
final Timer.Context context = this.metrics.getRollbackCtx();
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent()) {
HoodieRollbackMetadata rollbackMetadata = table.rollback(jsc, rollbackInstantTime, commitInstantOpt.get(), true);
if (context != null) {
long durationInMs = metrics.getDurationInMs(context.stop());
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
}
return true;
@@ -531,12 +529,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws HoodieRestoreException {
LOG.info("Begin restore to instant " + instantTime);
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
Timer.Context context = metrics.getRollbackCtx();
Timer.Context timerContext = metrics.getRollbackCtx();
try {
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieRestoreMetadata restoreMetadata = table.restore(jsc, restoreInstantTime, instantTime);
if (context != null) {
final long durationInMs = metrics.getDurationInMs(context.stop());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
if (timerContext != null) {
final long durationInMs = metrics.getDurationInMs(timerContext.stop());
final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream()
.flatMap(Collection::stream)
.mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted)
@@ -549,16 +547,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
}
/**
* Releases any resources used by the client.
*/
@Override
public void close() {
AsyncCleanerService.forceShutdown(asyncCleanerService);
asyncCleanerService = null;
super.close();
}
/**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
@@ -566,10 +554,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
LOG.info("Cleaner started");
final Timer.Context context = metrics.getCleanCtx();
HoodieCleanMetadata metadata = HoodieTable.create(config, hadoopConf).clean(jsc, cleanInstantTime);
if (context != null && metadata != null) {
long durationMs = metrics.getDurationInMs(context.stop());
final Timer.Context timerContext = metrics.getCleanCtx();
HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
@@ -634,7 +622,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime));
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, actionType,
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime));
}
@@ -656,8 +644,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
LOG.info("Scheduling compaction at instant time :" + instantTime);
Option<HoodieCompactionPlan> plan = HoodieTable.create(config, hadoopConf)
.scheduleCompaction(jsc, instantTime, extraMetadata);
Option<HoodieCompactionPlan> plan = createTable(config, hadoopConf)
.scheduleCompaction(context, instantTime, extraMetadata);
return plan.isPresent();
}
@@ -667,7 +655,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param compactionInstantTime Compaction Instant Time
* @return RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> compact(String compactionInstantTime) {
public O compact(String compactionInstantTime) {
return compact(compactionInstantTime, config.shouldAutoCommit());
}
@@ -678,38 +666,15 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param writeStatuses RDD of WriteStatus to inspect errors and counts
* @param extraMetadata Extra Metadata to be stored
*/
public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) throws IOException {
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata(
table, compactionInstantTime, writeStatuses, config.getSchema());
extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
}
public abstract void commitCompaction(String compactionInstantTime, O writeStatuses,
Option<Map<String, String>> extraMetadata) throws IOException;
/**
* Commit Compaction and track metrics.
*/
protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses, HoodieTable<T> table,
String compactionCommitTime) {
protected abstract void completeCompaction(HoodieCommitMetadata metadata, O writeStatuses,
HoodieTable<T, I, K, O> table, String compactionCommitTime);
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
CompactHelpers.completeInflightCompaction(table, compactionCommitTime, metadata);
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + compactionCommitTime, e);
}
}
LOG.info("Compacted successfully on commit " + compactionCommitTime);
}
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
@@ -717,8 +682,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param inflightInstant Inflight Compaction Instant
* @param table Hoodie Table
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) {
table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
table.rollback(context, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}
@@ -726,7 +691,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* Cleanup all pending commits.
*/
private void rollbackPendingCommits() {
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
@@ -747,27 +712,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param compactionInstantTime Compaction Instant Time
* @return RDD of Write Status
*/
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
rollbackInflightCompaction(inflightInstant, table);
table.getMetaClient().reloadActiveTimeline();
}
compactionTimer = metrics.getCompactionCtx();
HoodieWriteMetadata compactionMetadata = table.compact(jsc, compactionInstantTime);
JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
completeCompaction(compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime);
}
return statuses;
}
protected abstract O compact(String compactionInstantTime, boolean shouldComplete);
/**
* Performs a compaction operation on a table, serially before or after an insert/upsert action.
*/
private Option<String> inlineCompact(Option<Map<String, String>> extraMetadata) {
protected Option<String> inlineCompact(Option<Map<String, String>> extraMetadata) {
Option<String> compactionInstantTimeOpt = scheduleCompaction(extraMetadata);
compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
// inline compaction should auto commit as the user is never given control
@@ -775,4 +725,82 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
});
return compactionInstantTimeOpt;
}
/**
* Finalize Write operation.
*
* @param table HoodieTable
* @param instantTime Instant Time
* @param stats Hoodie Write Stat
*/
protected void finalizeWrite(HoodieTable<T, I, K, O> table, String instantTime, List<HoodieWriteStat> stats) {
try {
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
table.finalizeWrite(context, instantTime, stats);
if (finalizeCtx != null) {
Option<Long> durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
durationInMs.ifPresent(duration -> {
LOG.info("Finalize write elapsed time (milliseconds): " + duration);
metrics.updateFinalizeWriteMetrics(duration, stats.size());
});
}
} catch (HoodieIOException ioe) {
throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
}
}
public HoodieMetrics getMetrics() {
return metrics;
}
public HoodieIndex<T, I, K, O> getIndex() {
return index;
}
/**
* Get HoodieTable and init {@link Timer.Context}.
*
* @param operationType write operation type
* @param instantTime current inflight instant time
* @return HoodieTable
*/
protected abstract HoodieTable<T, I, K, O> getTableAndInitCtx(WriteOperationType operationType, String instantTime);
/**
* Sets write schema from last instant since deletes may not have schema set in the config.
*/
protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
try {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
Option<HoodieInstant> lastInstant =
activeTimeline.filterCompletedInstants().filter(s -> s.getAction().equals(metaClient.getCommitActionType()))
.lastInstant();
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
if (commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY)) {
config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY));
} else {
throw new HoodieIOException("Latest commit does not have any schema in commit metadata");
}
} else {
throw new HoodieIOException("Deletes issued without any prior commits");
}
} catch (IOException e) {
throw new HoodieIOException("IOException thrown while reading last commit metadata", e);
}
}
@Override
public void close() {
// release AsyncCleanerService
AsyncCleanerService.forceShutdown(asyncCleanerService);
asyncCleanerService = null;
// Stop timeline-server if running
super.close();
// Calling this here releases any resources used by your index, so make sure to finish any related operations
// before this point
this.index.close();
}
}

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.client;
import org.apache.hudi.async.AbstractAsyncService;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
@@ -31,15 +31,15 @@ import java.util.concurrent.Executors;
/**
* Clean service running concurrently with write operation.
*/
class AsyncCleanerService extends AbstractAsyncService {
class AsyncCleanerService extends HoodieAsyncService {
private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);
private final HoodieWriteClient<?> writeClient;
private final AbstractHoodieWriteClient writeClient;
private final String cleanInstantTime;
private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
protected AsyncCleanerService(HoodieWriteClient<?> writeClient, String cleanInstantTime) {
protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String cleanInstantTime) {
this.writeClient = writeClient;
this.cleanInstantTime = cleanInstantTime;
}
@@ -52,7 +52,7 @@ class AsyncCleanerService extends AbstractAsyncService {
}), executor);
}
public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient,
public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient,
String instantTime) {
AsyncCleanerService asyncCleanerService = null;
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {

View File

@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
@@ -45,7 +46,6 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -65,8 +65,8 @@ public class CompactionAdminClient extends AbstractHoodieClient {
private static final Logger LOG = LogManager.getLogger(CompactionAdminClient.class);
public CompactionAdminClient(JavaSparkContext jsc, String basePath) {
super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build());
public CompactionAdminClient(HoodieEngineContext context, String basePath) {
super(context, HoodieWriteConfig.newBuilder().withPath(basePath).build());
}
/**
@@ -85,14 +85,14 @@ public class CompactionAdminClient extends AbstractHoodieClient {
if (plan.getOperations() != null) {
List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
jsc.setJobGroup(this.getClass().getSimpleName(), "Validate compaction operations");
return jsc.parallelize(ops, parallelism).map(op -> {
context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations");
return context.map(ops, op -> {
try {
return validateCompactionOperation(metaClient, compactionInstant, op, Option.of(fsView));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).collect();
}, parallelism);
}
return new ArrayList<>();
}
@@ -351,8 +351,8 @@ public class CompactionAdminClient extends AbstractHoodieClient {
} else {
LOG.info("The following compaction renaming operations needs to be performed to un-schedule");
if (!dryRun) {
jsc.setJobGroup(this.getClass().getSimpleName(), "Execute unschedule operations");
return jsc.parallelize(renameActions, parallelism).map(lfPair -> {
context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations");
return context.map(renameActions, lfPair -> {
try {
LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight());
@@ -363,7 +363,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
+ lfPair.getLeft().getBaseCommitTime() + "\" to recover from failure ***\n\n\n");
return new RenameOpResult(lfPair, false, Option.of(e));
}
}).collect();
}, parallelism);
} else {
LOG.info("Dry-Run Mode activated for rename operations");
return renameActions.parallelStream().map(lfPair -> new RenameOpResult(lfPair, false, false, Option.empty()))
@@ -394,17 +394,17 @@ public class CompactionAdminClient extends AbstractHoodieClient {
"Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant);
List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
jsc.setJobGroup(this.getClass().getSimpleName(), "Generate compaction unscheduling operations");
return jsc.parallelize(ops, parallelism).flatMap(op -> {
context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations");
return context.flatMap(ops, op -> {
try {
return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op,
Option.of(fsView), skipValidation).iterator();
Option.of(fsView), skipValidation).stream();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
} catch (CompactionValidationException ve) {
throw new HoodieException(ve);
}
}).collect();
}, parallelism);
}
LOG.warn("No operations for compaction instant : " + compactionInstant);
return new ArrayList<>();

View File

@@ -16,21 +16,23 @@
* limitations under the License.
*/
package org.apache.hudi.table;
package org.apache.hudi.client;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
@@ -63,17 +65,17 @@ public class ReplaceArchivalHelper implements Serializable {
/**
* Delete all files represented by FileSlices in parallel. Return true if all files are deleted successfully.
*/
public static boolean deleteReplacedFileGroups(JavaSparkContext jsc, HoodieTableMetaClient metaClient,
public static boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieTableMetaClient metaClient,
TableFileSystemView fileSystemView,
HoodieInstant instant, List<String> replacedPartitions) {
JavaRDD<String> partitions = jsc.parallelize(replacedPartitions, replacedPartitions.size());
return partitions.map(partition -> {
List<Boolean> f = context.map(replacedPartitions, partition -> {
Stream<FileSlice> fileSlices = fileSystemView.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp(), partition)
.flatMap(g -> g.getAllRawFileSlices());
.flatMap(HoodieFileGroup::getAllRawFileSlices);
return fileSlices.allMatch(slice -> deleteFileSlice(slice, metaClient, instant));
}, replacedPartitions.size());
return fileSlices.map(slice -> deleteFileSlice(slice, metaClient, instant)).allMatch(x -> x);
}).reduce((x, y) -> x & y);
return f.stream().reduce((x, y) -> x & y).orElse(true);
}
private static boolean deleteFileSlice(FileSlice fileSlice, HoodieTableMetaClient metaClient, HoodieInstant instant) {
@@ -95,5 +97,4 @@ public class ReplaceArchivalHelper implements Serializable {
return false;
}
}
}

View File

@@ -20,30 +20,28 @@ package org.apache.hudi.client.bootstrap;
import java.io.Serializable;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.List;
/**
* Creates RDD of Hoodie Records with complete record data, given a list of partitions to be bootstrapped.
* Creates Hoodie Records with complete record data, given a list of partitions to be bootstrapped.
*/
public abstract class FullRecordBootstrapDataProvider implements Serializable {
public abstract class FullRecordBootstrapDataProvider<I> implements Serializable {
protected static final Logger LOG = LogManager.getLogger(FullRecordBootstrapDataProvider.class);
protected final TypedProperties props;
protected final transient JavaSparkContext jsc;
protected final transient HoodieEngineContext context;
public FullRecordBootstrapDataProvider(TypedProperties props, JavaSparkContext jsc) {
public FullRecordBootstrapDataProvider(TypedProperties props, HoodieEngineContext context) {
this.props = props;
this.jsc = jsc;
this.context = context;
}
/**
@@ -51,8 +49,8 @@ public abstract class FullRecordBootstrapDataProvider implements Serializable {
* @param tableName Hudi Table Name
* @param sourceBasePath Source Base Path
* @param partitionPaths Partition Paths
* @return JavaRDD of input records
* @return input records
*/
public abstract JavaRDD<HoodieRecord> generateInputRecordRDD(String tableName,
public abstract I generateInputRecords(String tableName,
String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths);
}

View File

@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.bootstrap;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.avro.Schema;
import java.util.List;
/**
* Bootstrap Schema Provider. Schema provided in config is used. If not available, use schema from Parquet
*/
public abstract class HoodieBootstrapSchemaProvider {
protected final HoodieWriteConfig writeConfig;
public HoodieBootstrapSchemaProvider(HoodieWriteConfig writeConfig) {
this.writeConfig = writeConfig;
}
/**
* Main API to select avro schema for bootstrapping.
* @param context HoodieEngineContext
* @param partitions List of partitions with files within them
* @return Avro Schema
*/
public final Schema getBootstrapSchema(HoodieEngineContext context, List<Pair<String, List<HoodieFileStatus>>> partitions) {
if (writeConfig.getSchema() != null) {
// Use schema specified by user if set
Schema userSchema = Schema.parse(writeConfig.getSchema());
if (!HoodieAvroUtils.getNullSchema().equals(userSchema)) {
return userSchema;
}
}
return getBootstrapSourceSchema(context, partitions);
}
/**
* Select a random file to be used to generate avro schema.
* Override this method to get custom schema selection.
* @param context HoodieEngineContext
* @param partitions List of partitions with files within them
* @return Avro Schema
*/
protected abstract Schema getBootstrapSourceSchema(HoodieEngineContext context,
List<Pair<String, List<HoodieFileStatus>>> partitions);
}

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.common;
/**
* Properties specific to each engine, that can be set/obtained from.
*/
public enum EngineProperty {
// hostname to bind embedded timeline server to
EMBEDDED_SERVER_HOST,
// Pool/queue to use to run compaction.
COMPACTION_POOL_NAME,
// Amount of total memory available to each engine executor
TOTAL_MEMORY_AVAILABLE,
// Fraction of that memory, that is already in use by the engine
MEMORY_FRACTION_IN_USE,
}

View File

@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.common;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.client.common.function.SerializableConsumer;
import org.apache.hudi.client.common.function.SerializableFunction;
import org.apache.hudi.client.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
/**
* Base class contains the context information needed by the engine at runtime. It will be extended by different
* engine implementation if needed.
*/
public abstract class HoodieEngineContext {
/**
* A wrapped hadoop configuration which can be serialized.
*/
private SerializableConfiguration hadoopConf;
private TaskContextSupplier taskContextSupplier;
public HoodieEngineContext(SerializableConfiguration hadoopConf, TaskContextSupplier taskContextSupplier) {
this.hadoopConf = hadoopConf;
this.taskContextSupplier = taskContextSupplier;
}
public SerializableConfiguration getHadoopConf() {
return hadoopConf;
}
public TaskContextSupplier getTaskContextSupplier() {
return taskContextSupplier;
}
public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);
public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);
public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);
public abstract <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism);
public abstract void setProperty(EngineProperty key, String value);
public abstract Option<String> getProperty(EngineProperty key);
public abstract void setJobStatus(String activeModule, String activityDescription);
}

View File

@@ -16,27 +16,23 @@
* limitations under the License.
*/
package org.apache.hudi.client;
package org.apache.hudi.client.common;
import org.apache.spark.TaskContext;
import org.apache.hudi.common.util.Option;
import java.io.Serializable;
import java.util.function.Supplier;
/**
* Spark task context supplier.
* Base task context supplier.
*/
public class SparkTaskContextSupplier implements Serializable {
public abstract class TaskContextSupplier implements Serializable {
public Supplier<Integer> getPartitionIdSupplier() {
return () -> TaskContext.getPartitionId();
}
public abstract Supplier<Integer> getPartitionIdSupplier();
public Supplier<Integer> getStageIdSupplier() {
return () -> TaskContext.get().stageId();
}
public abstract Supplier<Integer> getStageIdSupplier();
public Supplier<Long> getAttemptIdSupplier() {
return () -> TaskContext.get().taskAttemptId();
}
public abstract Supplier<Long> getAttemptIdSupplier();
public abstract Option<String> getProperty(EngineProperty prop);
}

View File

@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.common.function;
import java.io.Serializable;
/**
* A wrapped {@link java.util.function.Consumer} which can be serialized.
*
* @param <I> input type
*/
@FunctionalInterface
public interface SerializableConsumer<I> extends Serializable {
void accept(I t) throws Exception;
}

View File

@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.common.function;
import java.io.Serializable;
/**
* A wrapped {@link java.util.function.Function} which can be serialized.
*
* @param <I> input data type
* @param <O> output data type
*/
@FunctionalInterface
public interface SerializableFunction<I, O> extends Serializable {
O apply(I v1) throws Exception;
}

View File

@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.common.function;
import scala.Tuple2;
import java.io.Serializable;
/**
* A function that returns key-value pairs (Tuple2&lt;K, V&gt;).
*/
@FunctionalInterface
public interface SerializablePairFunction<I, K, V> extends Serializable {
Tuple2<K, V> call(I t) throws Exception;
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.client.embedded;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
@@ -25,10 +26,8 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.NetworkUtils;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import java.io.IOException;
@@ -46,13 +45,10 @@ public class EmbeddedTimelineService {
private transient FileSystemViewManager viewManager;
private transient TimelineService server;
public EmbeddedTimelineService(Configuration hadoopConf, SparkConf sparkConf, FileSystemViewStorageConfig config) {
setHostAddrFromSparkConf(sparkConf);
if (hostAddr == null) {
this.hostAddr = NetworkUtils.getHostname();
}
public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, FileSystemViewStorageConfig config) {
setHostAddr(embeddedTimelineServiceHostAddr);
this.config = config;
this.hadoopConf = new SerializableConfiguration(hadoopConf);
this.hadoopConf = context.getHadoopConf();
this.viewManager = createViewManager();
}
@@ -75,13 +71,13 @@ public class EmbeddedTimelineService {
LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort);
}
private void setHostAddrFromSparkConf(SparkConf sparkConf) {
String hostAddr = sparkConf.get("spark.driver.host", null);
if (hostAddr != null) {
LOG.info("Overriding hostIp to (" + hostAddr + ") found in spark-conf. It was " + this.hostAddr);
this.hostAddr = hostAddr;
private void setHostAddr(String embeddedTimelineServiceHostAddr) {
if (embeddedTimelineServiceHostAddr != null) {
LOG.info("Overriding hostIp to (" + embeddedTimelineServiceHostAddr + ") found in spark-conf. It was " + this.hostAddr);
this.hostAddr = embeddedTimelineServiceHostAddr;
} else {
LOG.warn("Unable to find driver bind address from spark config");
this.hostAddr = NetworkUtils.getHostname();
}
}

View File

@@ -118,6 +118,8 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
String.valueOf(DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
setDefaultOnCondition(props, !props.containsKey(SPILLABLE_MAP_BASE_PATH_PROP), SPILLABLE_MAP_BASE_PATH_PROP,
DEFAULT_SPILLABLE_MAP_BASE_PATH);
setDefaultOnCondition(props, !props.containsKey(MAX_MEMORY_FOR_MERGE_PROP), MAX_MEMORY_FOR_MERGE_PROP,
String.valueOf(DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES));
setDefaultOnCondition(props, !props.containsKey(WRITESTATUS_FAILURE_FRACTION_PROP),
WRITESTATUS_FAILURE_FRACTION_PROP, String.valueOf(DEFAULT_WRITESTATUS_FAILURE_FRACTION));
return config;

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.config;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
@@ -51,7 +50,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Class storing configs for the {@link HoodieWriteClient}.
* Class storing configs for the HoodieWriteClient.
*/
@Immutable
public class HoodieWriteConfig extends DefaultHoodieConfig {
@@ -672,7 +671,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public boolean getPushGatewayRandomJobNameSuffix() {
return Boolean.parseBoolean(props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX));
}
/**
* memory configs.
*/
@@ -755,6 +754,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM));
}
public Long getMaxMemoryPerPartitionMerge() {
return Long.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE_PROP));
}
public static class Builder {
protected final Properties props = new Properties();

View File

@@ -18,13 +18,13 @@
package org.apache.hudi.execution;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
@@ -43,10 +43,10 @@ public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
private HoodieWriteConfig config;
private String instantTime;
private boolean areRecordsSorted;
private HoodieTable<T> hoodieTable;
private HoodieTable hoodieTable;
private String idPrefix;
private SparkTaskContextSupplier sparkTaskContextSupplier;
private WriteHandleFactory<T> writeHandleFactory;
private TaskContextSupplier taskContextSupplier;
private WriteHandleFactory writeHandleFactory;
private final List<WriteStatus> statuses = new ArrayList<>();
// Stores the open HoodieWriteHandle for each table partition path
@@ -55,15 +55,15 @@ public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
private Map<String, HoodieWriteHandle> handles = new HashMap<>();
public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
boolean areRecordsSorted, HoodieTable<T> hoodieTable, String idPrefix,
SparkTaskContextSupplier sparkTaskContextSupplier,
WriteHandleFactory<T> writeHandleFactory) {
boolean areRecordsSorted, HoodieTable hoodieTable, String idPrefix,
TaskContextSupplier taskContextSupplier,
WriteHandleFactory writeHandleFactory) {
this.config = config;
this.instantTime = instantTime;
this.areRecordsSorted = areRecordsSorted;
this.hoodieTable = hoodieTable;
this.idPrefix = idPrefix;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
this.taskContextSupplier = taskContextSupplier;
this.writeHandleFactory = writeHandleFactory;
}
@@ -81,7 +81,7 @@ public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
}
// Lazily initialize the handle, for the first time
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
insertPayload.getPartitionPath(), idPrefix, sparkTaskContextSupplier);
insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle);
}
@@ -90,7 +90,7 @@ public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
statuses.add(handle.close());
// Open new handle
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
insertPayload.getPartitionPath(), idPrefix, sparkTaskContextSupplier);
insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle);
}
handle.write(insertPayload, payload.insertValue, payload.exception);

View File

@@ -18,15 +18,13 @@
package org.apache.hudi.execution;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
@@ -41,41 +39,39 @@ import java.util.function.Function;
/**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new files.
*/
public class LazyInsertIterable<T extends HoodieRecordPayload>
public abstract class HoodieLazyInsertIterable<T extends HoodieRecordPayload>
extends LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> {
protected final HoodieWriteConfig hoodieConfig;
protected final String instantTime;
protected boolean areRecordsSorted;
protected final HoodieTable<T> hoodieTable;
protected final HoodieTable hoodieTable;
protected final String idPrefix;
protected SparkTaskContextSupplier sparkTaskContextSupplier;
protected WriteHandleFactory<T> writeHandleFactory;
protected TaskContextSupplier taskContextSupplier;
protected WriteHandleFactory writeHandleFactory;
public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
SparkTaskContextSupplier sparkTaskContextSupplier) {
this(sortedRecordItr, true, config, instantTime, hoodieTable, idPrefix, sparkTaskContextSupplier);
}
public LazyInsertIterable(Iterator<HoodieRecord<T>> recordItr, boolean areRecordsSorted,
HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String idPrefix, SparkTaskContextSupplier sparkTaskContextSupplier) {
this(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, sparkTaskContextSupplier,
public HoodieLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
boolean areRecordsSorted,
HoodieWriteConfig config,
String instantTime,
HoodieTable hoodieTable,
String idPrefix,
TaskContextSupplier taskContextSupplier) {
this(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier,
new CreateHandleFactory<>());
}
public LazyInsertIterable(Iterator<HoodieRecord<T>> recordItr, boolean areRecordsSorted,
HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String idPrefix, SparkTaskContextSupplier sparkTaskContextSupplier,
WriteHandleFactory<T> writeHandleFactory) {
public HoodieLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr, boolean areRecordsSorted,
HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable,
String idPrefix, TaskContextSupplier taskContextSupplier,
WriteHandleFactory writeHandleFactory) {
super(recordItr);
this.areRecordsSorted = areRecordsSorted;
this.hoodieConfig = config;
this.instantTime = instantTime;
this.hoodieTable = hoodieTable;
this.idPrefix = idPrefix;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
this.taskContextSupplier = taskContextSupplier;
this.writeHandleFactory = writeHandleFactory;
}
@@ -108,32 +104,11 @@ public class LazyInsertIterable<T extends HoodieRecordPayload>
@Override
protected void start() {}
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =
new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(), getTransformFunction(schema));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
} catch (Exception e) {
throw new HoodieException(e);
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
}
}
}
@Override
protected void end() {}
protected CopyOnWriteInsertHandler getInsertHandler() {
return new CopyOnWriteInsertHandler(hoodieConfig, instantTime, areRecordsSorted, hoodieTable, idPrefix,
sparkTaskContextSupplier, writeHandleFactory);
taskContextSupplier, writeHandleFactory);
}
}

View File

@@ -21,35 +21,26 @@ package org.apache.hudi.index;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.bloom.HoodieBloomIndex;
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
import org.apache.hudi.index.hbase.HBaseIndex;
import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
import org.apache.hudi.index.simple.HoodieSimpleIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
/**
* Base class for different types of indexes to determine the mapping from uuid.
*
* @param <T> Sub type of HoodieRecordPayload
* @param <I> Type of inputs
* @param <K> Type of keys
* @param <O> Type of outputs
*/
@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Serializable {
public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O> implements Serializable {
protected final HoodieWriteConfig config;
@@ -57,49 +48,13 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
this.config = config;
}
public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(
HoodieWriteConfig config) throws HoodieIndexException {
// first use index class config to create index.
if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config);
if (!(instance instanceof HoodieIndex)) {
throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex");
}
return (HoodieIndex) instance;
}
switch (config.getIndexType()) {
case HBASE:
return new HBaseIndex<>(config);
case INMEMORY:
return new InMemoryHashIndex<>(config);
case BLOOM:
return new HoodieBloomIndex<>(config);
case GLOBAL_BLOOM:
return new HoodieGlobalBloomIndex<>(config);
case SIMPLE:
return new HoodieSimpleIndex<>(config);
case GLOBAL_SIMPLE:
return new HoodieGlobalSimpleIndex<>(config);
default:
throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType());
}
}
/**
* Checks if the given [Keys] exists in the hoodie table and returns [Key, Option[partitionPath, fileID]] If the
* optional is empty, then the key is not found.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public abstract JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(
JavaRDD<HoodieKey> hoodieKeys, final JavaSparkContext jsc, HoodieTable<T> hoodieTable);
/**
* Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually
* present).
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) throws HoodieIndexException;
public abstract I tagLocation(I records, HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException;
/**
* Extracts the location of written records, and updates the index.
@@ -107,11 +62,11 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
* TODO(vc): We may need to propagate the record as well in a WriteStatus class
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public abstract JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) throws HoodieIndexException;
public abstract O updateLocation(O writeStatusRDD, HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException;
/**
* Rollback the efffects of the commit made at instantTime.
* Rollback the effects of the commit made at instantTime.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public abstract boolean rollbackCommit(String instantTime);

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.index;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -26,8 +27,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
@@ -42,28 +41,26 @@ public class HoodieIndexUtils {
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
*
* @param partitions list of partitions of interest
* @param jsc instance of {@link JavaSparkContext} to use
* @param context instance of {@link HoodieEngineContext} to use
* @param hoodieTable instance of {@link HoodieTable} of interest
* @return the list of Pairs of partition path and fileId
*/
public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(final List<String> partitions,
final JavaSparkContext jsc,
final HoodieEngineContext context,
final HoodieTable hoodieTable) {
jsc.setJobGroup(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions");
return jsc.parallelize(partitions, Math.max(partitions.size(), 1))
.flatMap(partitionPath -> {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
List<Pair<String, HoodieBaseFile>> filteredFiles = new ArrayList<>();
if (latestCommitTime.isPresent()) {
filteredFiles = hoodieTable.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
.map(f -> Pair.of(partitionPath, f))
.collect(toList());
}
return filteredFiles.iterator();
})
.collect();
context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions");
return context.flatMap(partitions, partitionPath -> {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
List<Pair<String, HoodieBaseFile>> filteredFiles = new ArrayList<>();
if (latestCommitTime.isPresent()) {
filteredFiles = hoodieTable.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
.map(f -> Pair.of(partitionPath, f))
.collect(toList());
}
return filteredFiles.stream();
}, Math.max(partitions.size(), 1));
}
/**

View File

@@ -18,17 +18,17 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
public class AppendHandleFactory<T extends HoodieRecordPayload> extends WriteHandleFactory<T> {
public class AppendHandleFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> {
@Override
public HoodieAppendHandle<T> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
final HoodieTable<T> hoodieTable, final String partitionPath,
final String fileIdPrefix, final SparkTaskContextSupplier sparkTaskContextSupplier) {
public HoodieAppendHandle<T, I, K, O> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
final HoodieTable<T, I, K, O> hoodieTable, final String partitionPath,
final String fileIdPrefix, final TaskContextSupplier sparkTaskContextSupplier) {
return new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
getNextFileId(fileIdPrefix), sparkTaskContextSupplier);

View File

@@ -18,19 +18,19 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
public class CreateHandleFactory<T extends HoodieRecordPayload> extends WriteHandleFactory<T> {
public class CreateHandleFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> {
@Override
public HoodieWriteHandle<T> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
final HoodieTable<T> hoodieTable, final String partitionPath,
final String fileIdPrefix, SparkTaskContextSupplier sparkTaskContextSupplier) {
public HoodieWriteHandle<T, I, K, O> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
final HoodieTable<T, I, K, O> hoodieTable, final String partitionPath,
final String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
getNextFileId(fileIdPrefix), sparkTaskContextSupplier);
getNextFileId(fileIdPrefix), taskContextSupplier);
}
}

View File

@@ -19,8 +19,8 @@
package org.apache.hudi.io;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
@@ -40,7 +40,9 @@ import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieUpsertException;
@@ -51,7 +53,6 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.util.SizeEstimator;
import java.io.IOException;
import java.util.ArrayList;
@@ -64,7 +65,7 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* IO Operation to append data onto an existing file.
*/
public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> {
public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class);
// This acts as the sequenceID for records written
@@ -101,16 +102,19 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
// Total number of new records inserted into the delta file
private long insertRecordsWritten = 0;
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
private SizeEstimator<HoodieRecord> sizeEstimator;
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
writeStatus.setStat(new HoodieDeltaWriteStat());
this.fileId = fileId;
this.recordItr = recordItr;
sizeEstimator = new DefaultSizeEstimator();
}
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, null, sparkTaskContextSupplier);
}
@@ -134,7 +138,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
writeStatus.setPartitionPath(partitionPath);
writeStatus.getStat().setPartitionPath(partitionPath);
writeStatus.getStat().setFileId(fileId);
averageRecordSize = SizeEstimator.estimate(record);
averageRecordSize = sizeEstimator.sizeEstimate(record);
try {
//save hoodie partition meta in the partition path
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime,
@@ -335,7 +339,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
// Recompute averageRecordSize before writing a new block and update existing value with
// avg of new and old
LOG.info("AvgRecordSize => " + averageRecordSize);
averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
averageRecordSize = (averageRecordSize + sizeEstimator.sizeEstimate(record)) / 2;
doAppend(header);
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
numberOfRecords = 0;

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.io;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
@@ -33,13 +33,13 @@ import org.apache.hudi.table.HoodieTable;
* writing more than 1 skeleton file for the same bootstrap file.
* @param <T> HoodieRecordPayload
*/
public class HoodieBootstrapHandle<T extends HoodieRecordPayload> extends HoodieCreateHandle<T> {
public class HoodieBootstrapHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieCreateHandle<T, I, K, O> {
public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
super(config, commitTime, hoodieTable, partitionPath, fileId,
Pair.of(HoodieAvroUtils.RECORD_KEY_SCHEMA,
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.RECORD_KEY_SCHEMA)), sparkTaskContextSupplier);
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.RECORD_KEY_SCHEMA)), taskContextSupplier);
}
@Override

View File

@@ -19,8 +19,8 @@
package org.apache.hudi.io;
import org.apache.avro.Schema;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -47,7 +47,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> {
public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);
@@ -59,17 +59,17 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
private Map<String, HoodieRecord<T>> recordMap;
private boolean useWriterSchema = false;
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config),
sparkTaskContextSupplier);
taskContextSupplier);
}
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
SparkTaskContextSupplier sparkTaskContextSupplier) {
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, writerSchemaIncludingAndExcludingMetadataPair,
sparkTaskContextSupplier);
taskContextSupplier);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
@@ -80,7 +80,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.sparkTaskContextSupplier);
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.taskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
@@ -90,10 +90,10 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
/**
* Called by the compactor code path.
*/
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
SparkTaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, sparkTaskContextSupplier);
TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier);
this.recordMap = recordMap;
this.useWriterSchema = true;
}

View File

@@ -24,14 +24,14 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.FileSystem;
public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
public abstract class HoodieIOHandle<T extends HoodieRecordPayload, I, K, O> {
protected final String instantTime;
protected final HoodieWriteConfig config;
protected final FileSystem fs;
protected final HoodieTable<T> hoodieTable;
protected final HoodieTable<T, I, K, O> hoodieTable;
HoodieIOHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable) {
HoodieIOHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable) {
this.instantTime = instantTime;
this.config = config;
this.hoodieTable = hoodieTable;

View File

@@ -38,11 +38,11 @@ import scala.Tuple2;
*
* @param <T>
*/
public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload> extends HoodieReadHandle<T> {
public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T, I, K, O> {
private final Pair<String, HoodieBaseFile> partitionPathBaseFilePair;
public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T> hoodieTable,
public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, HoodieBaseFile> partitionPathBaseFilePair) {
super(config, null, hoodieTable, Pair.of(partitionPathBaseFilePair.getLeft(), partitionPathBaseFilePair.getRight().getFileId()));
this.partitionPathBaseFilePair = partitionPathBaseFilePair;

View File

@@ -42,7 +42,7 @@ import java.util.Set;
/**
* Takes a bunch of keys and returns ones that are present in the file group.
*/
public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends HoodieReadHandle<T> {
public class HoodieKeyLookupHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieKeyLookupHandle.class);
@@ -54,7 +54,7 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie
private long totalKeysChecked;
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T> hoodieTable,
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFilePair) {
super(config, null, hoodieTable, partitionPathFilePair);
this.tableType = hoodieTable.getMetaClient().getTableType();

View File

@@ -18,9 +18,8 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
@@ -54,7 +53,7 @@ import java.util.Map;
import java.util.Set;
@SuppressWarnings("Duplicates")
public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> {
public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
@@ -71,9 +70,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
protected boolean useWriterSchema;
private HoodieBaseFile baseFileToMerge;
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
init(fileId, recordItr);
init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
}
@@ -81,10 +81,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
/**
* Called by compactor code path.
*/
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
init(fileId, this.partitionPath, dataFileToBeMerged);
@@ -134,7 +134,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
createMarkerFile(partitionPath, newFileName);
// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, sparkTaskContextSupplier);
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier);
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
@@ -149,7 +149,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
try {
// Load the new records in a map
long memoryForMerge = SparkConfigUtils.getMaxMemoryPerPartitionMerge(config.getProps());
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema));

View File

@@ -28,9 +28,9 @@ import java.io.IOException;
/**
* Extract range information for a given file slice.
*/
public class HoodieRangeInfoHandle<T extends HoodieRecordPayload> extends HoodieReadHandle<T> {
public class HoodieRangeInfoHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieReadHandle<T, I, K, O> {
public HoodieRangeInfoHandle(HoodieWriteConfig config, HoodieTable<T> hoodieTable,
public HoodieRangeInfoHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFilePair) {
super(config, null, hoodieTable, partitionPathFilePair);
}

View File

@@ -34,11 +34,11 @@ import org.apache.hadoop.fs.Path;
/**
* Base class for read operations done logically on the file group.
*/
public abstract class HoodieReadHandle<T extends HoodieRecordPayload> extends HoodieIOHandle {
public abstract class HoodieReadHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieIOHandle<T, I, K, O> {
protected final Pair<String, String> partitionPathFilePair;
public HoodieReadHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
public HoodieReadHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFilePair) {
super(config, instantTime, hoodieTable);
this.partitionPathFilePair = partitionPathFilePair;

View File

@@ -18,8 +18,8 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -41,24 +41,24 @@ import java.util.Queue;
* The implementation performs a merge-sort by comparing the key of the record being written to the list of
* keys in newRecordKeys (sorted in-memory).
*/
public class HoodieSortedMergeHandle<T extends HoodieRecordPayload> extends HoodieMergeHandle<T> {
public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
private Queue<String> newRecordKeysSorted = new PriorityQueue<>();
public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier);
newRecordKeysSorted.addAll(keyToNewRecords.keySet());
}
/**
* Called by compactor code path.
*/
public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecordsOrig, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier sparkTaskContextSupplier) {
HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, keyToNewRecordsOrig, partitionPath, fileId, dataFileToBeMerged,
sparkTaskContextSupplier);
taskContextSupplier);
newRecordKeysSorted.addAll(keyToNewRecords.keySet());
}

View File

@@ -19,8 +19,8 @@
package org.apache.hudi.io;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -49,7 +49,7 @@ import java.io.IOException;
/**
* Base class for all write operations logically performed at the file group level.
*/
public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends HoodieIOHandle {
public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieIOHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class);
@@ -60,17 +60,17 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
protected final String partitionPath;
protected final String fileId;
protected final String writeToken;
protected final SparkTaskContextSupplier sparkTaskContextSupplier;
protected final TaskContextSupplier taskContextSupplier;
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T> hoodieTable, SparkTaskContextSupplier sparkTaskContextSupplier) {
String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) {
this(config, instantTime, partitionPath, fileId, hoodieTable,
getWriterSchemaIncludingAndExcludingMetadataPair(config), sparkTaskContextSupplier);
getWriterSchemaIncludingAndExcludingMetadataPair(config), taskContextSupplier);
}
protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId,
HoodieTable<T> hoodieTable, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
SparkTaskContextSupplier sparkTaskContextSupplier) {
HoodieTable<T, I, K, O> hoodieTable, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable);
this.partitionPath = partitionPath;
this.fileId = fileId;
@@ -79,7 +79,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
this.timer = new HoodieTimer().startTimer();
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
}
@@ -179,19 +179,19 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
}
protected int getPartitionId() {
return sparkTaskContextSupplier.getPartitionIdSupplier().get();
return taskContextSupplier.getPartitionIdSupplier().get();
}
protected int getStageId() {
return sparkTaskContextSupplier.getStageIdSupplier().get();
return taskContextSupplier.getStageIdSupplier().get();
}
protected long getAttemptId() {
return sparkTaskContextSupplier.getAttemptIdSupplier().get();
return taskContextSupplier.getAttemptIdSupplier().get();
}
protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T> hoodieTable,
HoodieWriteConfig config, Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, sparkTaskContextSupplier);
protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T, I, K, O> hoodieTable,
HoodieWriteConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException {
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, taskContextSupplier);
}
}

View File

@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.client.common.EngineProperty;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.util.Option;
import java.util.Properties;
import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION;
import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FRACTION_FOR_MERGE;
import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP;
import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE_PROP;
import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP;
import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE_PROP;
public class IOUtils {
/**
* Dynamic calculation of max memory to use for for spillable map. user.available.memory = executor.memory *
* (1 - memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime
* the engine memory fractions/total memory is changed, the memory used for spillable map changes
* accordingly
*/
public static long getMaxMemoryAllowedForMerge(TaskContextSupplier context, String maxMemoryFraction) {
Option<String> totalMemoryOpt = context.getProperty(EngineProperty.TOTAL_MEMORY_AVAILABLE);
Option<String> memoryFractionOpt = context.getProperty(EngineProperty.MEMORY_FRACTION_IN_USE);
if (totalMemoryOpt.isPresent() && memoryFractionOpt.isPresent()) {
long executorMemoryInBytes = Long.parseLong(totalMemoryOpt.get());
double memoryFraction = Double.parseDouble(memoryFractionOpt.get());
double maxMemoryFractionForMerge = Double.parseDouble(maxMemoryFraction);
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction);
long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge);
return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge);
} else {
return DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
}
}
public static long getMaxMemoryPerPartitionMerge(TaskContextSupplier context, Properties properties) {
if (properties.containsKey(MAX_MEMORY_FOR_MERGE_PROP)) {
return Long.parseLong(properties.getProperty(MAX_MEMORY_FOR_MERGE_PROP));
}
String fraction = properties.getProperty(MAX_MEMORY_FRACTION_FOR_MERGE_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_MERGE);
return getMaxMemoryAllowedForMerge(context, fraction);
}
public static long getMaxMemoryPerCompaction(TaskContextSupplier context, Properties properties) {
if (properties.containsKey(MAX_MEMORY_FOR_COMPACTION_PROP)) {
return Long.parseLong(properties.getProperty(MAX_MEMORY_FOR_COMPACTION_PROP));
}
String fraction = properties.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION);
return getMaxMemoryAllowedForMerge(context, fraction);
}
}

View File

@@ -18,16 +18,16 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
public abstract class WriteHandleFactory<T extends HoodieRecordPayload> {
public abstract class WriteHandleFactory<T extends HoodieRecordPayload, I, K, O> {
private int numFilesWritten = 0;
public abstract HoodieWriteHandle<T> create(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileIdPrefix, SparkTaskContextSupplier sparkTaskContextSupplier);
public abstract HoodieWriteHandle<T, I, K, O> create(HoodieWriteConfig config, String commitTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileIdPrefix, TaskContextSupplier taskContextSupplier);
protected String getNextFileId(String idPfx) {
return String.format("%s-%d", idPfx, numFilesWritten++);

Some files were not shown because too many files have changed in this diff Show More