[HUDI-855] Run Cleaner async with writing (#1577)
- Cleaner can now run concurrently with write operation - Configs to turn on/off Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
committed by
GitHub
parent
31247e9b34
commit
8919be6a5d
@@ -26,7 +26,6 @@ import org.apache.hudi.cli.TableHeader;
|
|||||||
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
|
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
|
||||||
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
|
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
|
||||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
@@ -36,6 +35,8 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
|||||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.testutils.HoodieTestDataGenerator;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.springframework.shell.core.CommandResult;
|
import org.springframework.shell.core.CommandResult;
|
||||||
@@ -43,11 +44,10 @@ import org.springframework.shell.core.CommandResult;
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
@@ -77,6 +77,10 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
|
|||||||
Configuration conf = HoodieCLI.conf;
|
Configuration conf = HoodieCLI.conf;
|
||||||
|
|
||||||
metaClient = HoodieCLI.getTableMetaClient();
|
metaClient = HoodieCLI.getTableMetaClient();
|
||||||
|
String fileId1 = UUID.randomUUID().toString();
|
||||||
|
String fileId2 = UUID.randomUUID().toString();
|
||||||
|
HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath);
|
||||||
|
|
||||||
// Create four commits
|
// Create four commits
|
||||||
for (int i = 100; i < 104; i++) {
|
for (int i = 100; i < 104; i++) {
|
||||||
String timestamp = String.valueOf(i);
|
String timestamp = String.valueOf(i);
|
||||||
@@ -86,7 +90,8 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
|
|||||||
// Inflight Compaction
|
// Inflight Compaction
|
||||||
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
|
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
|
||||||
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
|
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
|
||||||
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf);
|
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf, fileId1, fileId2,
|
||||||
|
Option.empty(), Option.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
@@ -103,9 +108,6 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
|
|||||||
assertNotNull(propsFilePath, "Not found properties file");
|
assertNotNull(propsFilePath, "Not found properties file");
|
||||||
|
|
||||||
// First, run clean
|
// First, run clean
|
||||||
Files.createFile(Paths.get(tablePath,
|
|
||||||
HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
|
||||||
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
|
|
||||||
SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.getPath(), new ArrayList<>());
|
SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.getPath(), new ArrayList<>());
|
||||||
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
|
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
|
||||||
"Loaded 1 clean and the count should match");
|
"Loaded 1 clean and the count should match");
|
||||||
@@ -125,7 +127,7 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
|
|||||||
|
|
||||||
// EarliestCommandRetained should be 102, since hoodie.cleaner.commits.retained=2
|
// EarliestCommandRetained should be 102, since hoodie.cleaner.commits.retained=2
|
||||||
// Total Time Taken need read from metadata
|
// Total Time Taken need read from metadata
|
||||||
rows.add(new Comparable[] {clean.getTimestamp(), "102", "0", getLatestCleanTimeTakenInMillis().toString()});
|
rows.add(new Comparable[] {clean.getTimestamp(), "102", "2", getLatestCleanTimeTakenInMillis().toString()});
|
||||||
|
|
||||||
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
|
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
|
||||||
expected = removeNonWordAndStripSpace(expected);
|
expected = removeNonWordAndStripSpace(expected);
|
||||||
@@ -142,12 +144,6 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
|
|||||||
assertNotNull(propsFilePath, "Not found properties file");
|
assertNotNull(propsFilePath, "Not found properties file");
|
||||||
|
|
||||||
// First, run clean with two partition
|
// First, run clean with two partition
|
||||||
Files.createFile(Paths.get(tablePath,
|
|
||||||
HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
|
||||||
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
|
|
||||||
Files.createFile(Paths.get(tablePath,
|
|
||||||
HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
|
|
||||||
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
|
|
||||||
SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.toString(), new ArrayList<>());
|
SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.toString(), new ArrayList<>());
|
||||||
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
|
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
|
||||||
"Loaded 1 clean and the count should match");
|
"Loaded 1 clean and the count should match");
|
||||||
@@ -165,9 +161,11 @@ public class TestCleansCommand extends AbstractShellIntegrationTest {
|
|||||||
// There should be two partition path
|
// There should be two partition path
|
||||||
List<Comparable[]> rows = new ArrayList<>();
|
List<Comparable[]> rows = new ArrayList<>();
|
||||||
rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
|
rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
|
||||||
|
HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "1", "0"});
|
||||||
|
rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_THIRD_PARTITION_PATH,
|
||||||
HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"});
|
HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"});
|
||||||
rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
||||||
HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"});
|
HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "1", "0"});
|
||||||
|
|
||||||
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
|
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
|
||||||
expected = removeNonWordAndStripSpace(expected);
|
expected = removeNonWordAndStripSpace(expected);
|
||||||
|
|||||||
@@ -1,106 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hudi.cli.integ;
|
|
||||||
|
|
||||||
import org.apache.hudi.cli.HoodieCLI;
|
|
||||||
import org.apache.hudi.cli.commands.TableCommand;
|
|
||||||
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
|
|
||||||
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
|
|
||||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|
||||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
import org.springframework.shell.core.CommandResult;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
||||||
|
|
||||||
public class ITTestCleansCommand extends AbstractShellIntegrationTest {
|
|
||||||
|
|
||||||
private String tablePath;
|
|
||||||
private URL propsFilePath;
|
|
||||||
|
|
||||||
@BeforeEach
|
|
||||||
public void init() throws IOException {
|
|
||||||
HoodieCLI.conf = jsc.hadoopConfiguration();
|
|
||||||
|
|
||||||
String tableName = "test_table";
|
|
||||||
tablePath = basePath + File.separator + tableName;
|
|
||||||
propsFilePath = this.getClass().getClassLoader().getResource("clean.properties");
|
|
||||||
|
|
||||||
// Create table and connect
|
|
||||||
new TableCommand().createTable(
|
|
||||||
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
|
|
||||||
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
|
||||||
|
|
||||||
Configuration conf = HoodieCLI.conf;
|
|
||||||
|
|
||||||
metaClient = HoodieCLI.getTableMetaClient();
|
|
||||||
// Create four commits
|
|
||||||
for (int i = 100; i < 104; i++) {
|
|
||||||
String timestamp = String.valueOf(i);
|
|
||||||
// Requested Compaction
|
|
||||||
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
|
|
||||||
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
|
|
||||||
// Inflight Compaction
|
|
||||||
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
|
|
||||||
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
|
|
||||||
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test case for cleans run.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testRunClean() throws IOException {
|
|
||||||
// First, there should none of clean instant.
|
|
||||||
assertEquals(0, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count());
|
|
||||||
|
|
||||||
// Check properties file exists.
|
|
||||||
assertNotNull(propsFilePath, "Not found properties file");
|
|
||||||
|
|
||||||
// Create partition metadata
|
|
||||||
Files.createFile(Paths.get(tablePath,
|
|
||||||
HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
|
||||||
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
|
|
||||||
Files.createFile(Paths.get(tablePath,
|
|
||||||
HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
|
|
||||||
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
|
|
||||||
|
|
||||||
CommandResult cr = getShell().executeCommand("cleans run --sparkMaster local --propsFilePath " + propsFilePath.toString());
|
|
||||||
assertTrue(cr.isSuccess());
|
|
||||||
|
|
||||||
// After run clean, there should have 1 clean instant
|
|
||||||
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
|
|
||||||
"Loaded 1 clean and the count should match");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.cli.testutils;
|
package org.apache.hudi.cli.testutils;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
@@ -67,6 +68,12 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
|
|||||||
|
|
||||||
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
||||||
Option<Integer> writes, Option<Integer> updates) {
|
Option<Integer> writes, Option<Integer> updates) {
|
||||||
|
createCommitFileWithMetadata(basePath, commitTime, configuration, UUID.randomUUID().toString(),
|
||||||
|
UUID.randomUUID().toString(), writes, updates);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
||||||
|
String fileId1, String fileId2, Option<Integer> writes, Option<Integer> updates) {
|
||||||
Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
|
Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
|
||||||
HoodieTimeline.makeRequestedCommitFileName(commitTime))
|
HoodieTimeline.makeRequestedCommitFileName(commitTime))
|
||||||
.forEach(f -> {
|
.forEach(f -> {
|
||||||
@@ -77,7 +84,8 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
|
|||||||
FileSystem fs = FSUtils.getFs(basePath, configuration);
|
FileSystem fs = FSUtils.getFs(basePath, configuration);
|
||||||
os = fs.create(commitFile, true);
|
os = fs.create(commitFile, true);
|
||||||
// Generate commitMetadata
|
// Generate commitMetadata
|
||||||
HoodieCommitMetadata commitMetadata = generateCommitMetadata(basePath, commitTime, writes, updates);
|
HoodieCommitMetadata commitMetadata =
|
||||||
|
generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates);
|
||||||
// Write empty commit metadata
|
// Write empty commit metadata
|
||||||
os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
@@ -103,8 +111,14 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
|
|||||||
|
|
||||||
public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime,
|
public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime,
|
||||||
Option<Integer> writes, Option<Integer> updates) throws IOException {
|
Option<Integer> writes, Option<Integer> updates) throws IOException {
|
||||||
String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime);
|
return generateCommitMetadata(basePath, commitTime, UUID.randomUUID().toString(), UUID.randomUUID().toString(),
|
||||||
String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime);
|
writes, updates);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1,
|
||||||
|
String fileId2, Option<Integer> writes, Option<Integer> updates) throws IOException {
|
||||||
|
String file1P0C0 = HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1);
|
||||||
|
String file1P1C0 = HoodieTestUtils.createDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime, fileId2);
|
||||||
return generateCommitMetadata(new HashMap<String, List<String>>() {
|
return generateCommitMetadata(new HashMap<String, List<String>>() {
|
||||||
{
|
{
|
||||||
put(DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0));
|
put(DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0));
|
||||||
|
|||||||
@@ -16,7 +16,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hudi.utilities.deltastreamer;
|
package org.apache.hudi.async;
|
||||||
|
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
@@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base Class for running delta-sync/compaction in separate thread and controlling their life-cycle.
|
* Base Class for running clean/delta-sync/compaction in separate thread and controlling their life-cycle.
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractDeltaStreamerService implements Serializable {
|
public abstract class AbstractAsyncService implements Serializable {
|
||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(AbstractDeltaStreamerService.class);
|
private static final Logger LOG = LogManager.getLogger(AbstractAsyncService.class);
|
||||||
|
|
||||||
// Flag to track if the service is started.
|
// Flag to track if the service is started.
|
||||||
private boolean started;
|
private boolean started;
|
||||||
@@ -49,15 +49,15 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
|
|||||||
// Future tracking delta-sync/compaction
|
// Future tracking delta-sync/compaction
|
||||||
private transient CompletableFuture future;
|
private transient CompletableFuture future;
|
||||||
|
|
||||||
AbstractDeltaStreamerService() {
|
protected AbstractAsyncService() {
|
||||||
shutdownRequested = false;
|
shutdownRequested = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isShutdownRequested() {
|
protected boolean isShutdownRequested() {
|
||||||
return shutdownRequested;
|
return shutdownRequested;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isShutdown() {
|
protected boolean isShutdown() {
|
||||||
return shutdown;
|
return shutdown;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,7 +67,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
|
|||||||
* @throws ExecutionException
|
* @throws ExecutionException
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
void waitForShutdown() throws ExecutionException, InterruptedException {
|
public void waitForShutdown() throws ExecutionException, InterruptedException {
|
||||||
try {
|
try {
|
||||||
future.get();
|
future.get();
|
||||||
} catch (ExecutionException ex) {
|
} catch (ExecutionException ex) {
|
||||||
@@ -82,7 +82,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
|
|||||||
*
|
*
|
||||||
* @param force Forcefully shutdown
|
* @param force Forcefully shutdown
|
||||||
*/
|
*/
|
||||||
void shutdown(boolean force) {
|
public void shutdown(boolean force) {
|
||||||
if (!shutdownRequested || force) {
|
if (!shutdownRequested || force) {
|
||||||
shutdownRequested = true;
|
shutdownRequested = true;
|
||||||
if (executor != null) {
|
if (executor != null) {
|
||||||
@@ -145,7 +145,9 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
|
|||||||
} finally {
|
} finally {
|
||||||
// Mark as shutdown
|
// Mark as shutdown
|
||||||
shutdown = true;
|
shutdown = true;
|
||||||
onShutdownCallback.apply(error);
|
if (null != onShutdownCallback) {
|
||||||
|
onShutdownCallback.apply(error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -0,0 +1,85 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.client;
|
||||||
|
|
||||||
|
import org.apache.hudi.async.AbstractAsyncService;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clean service running concurrently with write operation.
|
||||||
|
*/
|
||||||
|
class AsyncCleanerService extends AbstractAsyncService {
|
||||||
|
|
||||||
|
private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);
|
||||||
|
|
||||||
|
private final HoodieWriteClient<?> writeClient;
|
||||||
|
private final String cleanInstantTime;
|
||||||
|
private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||||
|
|
||||||
|
protected AsyncCleanerService(HoodieWriteClient<?> writeClient, String cleanInstantTime) {
|
||||||
|
this.writeClient = writeClient;
|
||||||
|
this.cleanInstantTime = cleanInstantTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Pair<CompletableFuture, ExecutorService> startService() {
|
||||||
|
return Pair.of(CompletableFuture.supplyAsync(() -> {
|
||||||
|
writeClient.clean(cleanInstantTime);
|
||||||
|
return true;
|
||||||
|
}), executor);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient,
|
||||||
|
String instantTime) {
|
||||||
|
AsyncCleanerService asyncCleanerService = null;
|
||||||
|
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
|
||||||
|
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation");
|
||||||
|
asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
|
||||||
|
asyncCleanerService.start(null);
|
||||||
|
} else {
|
||||||
|
LOG.info("Auto cleaning is not enabled. Not running cleaner now");
|
||||||
|
}
|
||||||
|
return asyncCleanerService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void waitForCompletion(AsyncCleanerService asyncCleanerService) {
|
||||||
|
if (asyncCleanerService != null) {
|
||||||
|
LOG.info("Waiting for async cleaner to finish");
|
||||||
|
try {
|
||||||
|
asyncCleanerService.waitForShutdown();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new HoodieException("Error waiting for async cleaning to finish", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void forceShutdown(AsyncCleanerService asyncCleanerService) {
|
||||||
|
if (asyncCleanerService != null) {
|
||||||
|
LOG.info("Shutting down async cleaner");
|
||||||
|
asyncCleanerService.shutdown(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -80,6 +80,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
private final boolean rollbackPending;
|
private final boolean rollbackPending;
|
||||||
private final transient HoodieMetrics metrics;
|
private final transient HoodieMetrics metrics;
|
||||||
private transient Timer.Context compactionTimer;
|
private transient Timer.Context compactionTimer;
|
||||||
|
private transient AsyncCleanerService asyncCleanerService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a write client, without cleaning up failed/inflight commits.
|
* Create a write client, without cleaning up failed/inflight commits.
|
||||||
@@ -95,28 +96,28 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
* Create a write client, with new hudi index.
|
* Create a write client, with new hudi index.
|
||||||
*
|
*
|
||||||
* @param jsc Java Spark Context
|
* @param jsc Java Spark Context
|
||||||
* @param clientConfig instance of HoodieWriteConfig
|
* @param writeConfig instance of HoodieWriteConfig
|
||||||
* @param rollbackPending whether need to cleanup pending commits
|
* @param rollbackPending whether need to cleanup pending commits
|
||||||
*/
|
*/
|
||||||
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending) {
|
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending) {
|
||||||
this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig));
|
this(jsc, writeConfig, rollbackPending, HoodieIndex.createIndex(writeConfig));
|
||||||
}
|
}
|
||||||
|
|
||||||
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
|
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending, HoodieIndex index) {
|
||||||
this(jsc, clientConfig, rollbackPending, index, Option.empty());
|
this(jsc, writeConfig, rollbackPending, index, Option.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a write client, allows to specify all parameters.
|
* Create a write client, allows to specify all parameters.
|
||||||
*
|
*
|
||||||
* @param jsc Java Spark Context
|
* @param jsc Java Spark Context
|
||||||
* @param clientConfig instance of HoodieWriteConfig
|
* @param writeConfig instance of HoodieWriteConfig
|
||||||
* @param rollbackPending whether need to cleanup pending commits
|
* @param rollbackPending whether need to cleanup pending commits
|
||||||
* @param timelineService Timeline Service that runs as part of write client.
|
* @param timelineService Timeline Service that runs as part of write client.
|
||||||
*/
|
*/
|
||||||
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending,
|
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending,
|
||||||
HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
|
HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
|
||||||
super(jsc, index, clientConfig, timelineService);
|
super(jsc, index, writeConfig, timelineService);
|
||||||
this.metrics = new HoodieMetrics(config, config.getTableName());
|
this.metrics = new HoodieMetrics(config, config.getTableName());
|
||||||
this.rollbackPending = rollbackPending;
|
this.rollbackPending = rollbackPending;
|
||||||
}
|
}
|
||||||
@@ -158,6 +159,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
|
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
|
||||||
table.validateUpsertSchema();
|
table.validateUpsertSchema();
|
||||||
setOperationType(WriteOperationType.UPSERT);
|
setOperationType(WriteOperationType.UPSERT);
|
||||||
|
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
|
||||||
HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
|
HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
|
||||||
if (result.getIndexLookupDuration().isPresent()) {
|
if (result.getIndexLookupDuration().isPresent()) {
|
||||||
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
|
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
|
||||||
@@ -178,6 +180,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
|
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
|
||||||
table.validateUpsertSchema();
|
table.validateUpsertSchema();
|
||||||
setOperationType(WriteOperationType.UPSERT_PREPPED);
|
setOperationType(WriteOperationType.UPSERT_PREPPED);
|
||||||
|
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
|
||||||
HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
|
HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
|
||||||
return postWrite(result, instantTime, table);
|
return postWrite(result, instantTime, table);
|
||||||
}
|
}
|
||||||
@@ -196,6 +199,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
|
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
|
||||||
table.validateInsertSchema();
|
table.validateInsertSchema();
|
||||||
setOperationType(WriteOperationType.INSERT);
|
setOperationType(WriteOperationType.INSERT);
|
||||||
|
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
|
||||||
HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
|
HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
|
||||||
return postWrite(result, instantTime, table);
|
return postWrite(result, instantTime, table);
|
||||||
}
|
}
|
||||||
@@ -215,6 +219,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
|
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
|
||||||
table.validateInsertSchema();
|
table.validateInsertSchema();
|
||||||
setOperationType(WriteOperationType.INSERT_PREPPED);
|
setOperationType(WriteOperationType.INSERT_PREPPED);
|
||||||
|
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
|
||||||
HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
|
HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
|
||||||
return postWrite(result, instantTime, table);
|
return postWrite(result, instantTime, table);
|
||||||
}
|
}
|
||||||
@@ -254,6 +259,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
|
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
|
||||||
table.validateInsertSchema();
|
table.validateInsertSchema();
|
||||||
setOperationType(WriteOperationType.BULK_INSERT);
|
setOperationType(WriteOperationType.BULK_INSERT);
|
||||||
|
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
|
||||||
HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner);
|
HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner);
|
||||||
return postWrite(result, instantTime, table);
|
return postWrite(result, instantTime, table);
|
||||||
}
|
}
|
||||||
@@ -279,6 +285,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
|
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
|
||||||
table.validateInsertSchema();
|
table.validateInsertSchema();
|
||||||
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
|
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
|
||||||
|
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
|
||||||
HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
|
HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
|
||||||
return postWrite(result, instantTime, table);
|
return postWrite(result, instantTime, table);
|
||||||
}
|
}
|
||||||
@@ -338,18 +345,30 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
|
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
|
||||||
archiveLog.archiveIfRequired(hadoopConf);
|
archiveLog.archiveIfRequired(hadoopConf);
|
||||||
if (config.isAutoClean()) {
|
autoCleanOnCommit(instantTime);
|
||||||
// Call clean to cleanup if there is anything to cleanup after the commit,
|
|
||||||
LOG.info("Auto cleaning is enabled. Running cleaner now");
|
|
||||||
clean(instantTime);
|
|
||||||
} else {
|
|
||||||
LOG.info("Auto cleaning is not enabled. Not running cleaner now");
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle auto clean during commit.
|
||||||
|
* @param instantTime
|
||||||
|
*/
|
||||||
|
private void autoCleanOnCommit(String instantTime) {
|
||||||
|
if (config.isAutoClean()) {
|
||||||
|
// Call clean to cleanup if there is anything to cleanup after the commit,
|
||||||
|
if (config.isAsyncClean()) {
|
||||||
|
LOG.info("Cleaner has been spawned already. Waiting for it to finish");
|
||||||
|
AsyncCleanerService.waitForCompletion(asyncCleanerService);
|
||||||
|
LOG.info("Cleaner has finished");
|
||||||
|
} else {
|
||||||
|
LOG.info("Auto cleaning is enabled. Running cleaner now");
|
||||||
|
clean(instantTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a savepoint based on the latest commit action on the timeline.
|
* Create a savepoint based on the latest commit action on the timeline.
|
||||||
*
|
*
|
||||||
@@ -477,7 +496,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
// Stop timeline-server if running
|
AsyncCleanerService.forceShutdown(asyncCleanerService);
|
||||||
|
asyncCleanerService = null;
|
||||||
super.close();
|
super.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,6 +40,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
|
|
||||||
public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
|
public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
|
||||||
public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
|
public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
|
||||||
|
public static final String ASYNC_CLEAN_PROP = "hoodie.clean.async";
|
||||||
|
|
||||||
// Turn on inline compaction - after fw delta commits a inline compaction will be run
|
// Turn on inline compaction - after fw delta commits a inline compaction will be run
|
||||||
public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline";
|
public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline";
|
||||||
// Run a compaction every N delta commits
|
// Run a compaction every N delta commits
|
||||||
@@ -101,6 +103,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = "false";
|
public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = "false";
|
||||||
private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
|
private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
|
||||||
private static final String DEFAULT_AUTO_CLEAN = "true";
|
private static final String DEFAULT_AUTO_CLEAN = "true";
|
||||||
|
private static final String DEFAULT_ASYNC_CLEAN = "false";
|
||||||
private static final String DEFAULT_INLINE_COMPACT = "false";
|
private static final String DEFAULT_INLINE_COMPACT = "false";
|
||||||
private static final String DEFAULT_INCREMENTAL_CLEANER = "true";
|
private static final String DEFAULT_INCREMENTAL_CLEANER = "true";
|
||||||
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "5";
|
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "5";
|
||||||
@@ -143,6 +146,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withAsyncClean(Boolean asyncClean) {
|
||||||
|
props.setProperty(ASYNC_CLEAN_PROP, String.valueOf(asyncClean));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
|
public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
|
||||||
props.setProperty(CLEANER_INCREMENTAL_MODE, String.valueOf(incrementalCleaningMode));
|
props.setProperty(CLEANER_INCREMENTAL_MODE, String.valueOf(incrementalCleaningMode));
|
||||||
return this;
|
return this;
|
||||||
@@ -247,6 +255,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
public HoodieCompactionConfig build() {
|
public HoodieCompactionConfig build() {
|
||||||
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
|
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
|
||||||
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
|
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(ASYNC_CLEAN_PROP), ASYNC_CLEAN_PROP,
|
||||||
|
DEFAULT_ASYNC_CLEAN);
|
||||||
setDefaultOnCondition(props, !props.containsKey(CLEANER_INCREMENTAL_MODE), CLEANER_INCREMENTAL_MODE,
|
setDefaultOnCondition(props, !props.containsKey(CLEANER_INCREMENTAL_MODE), CLEANER_INCREMENTAL_MODE,
|
||||||
DEFAULT_INCREMENTAL_CLEANER);
|
DEFAULT_INCREMENTAL_CLEANER);
|
||||||
setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_PROP), INLINE_COMPACT_PROP,
|
setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_PROP), INLINE_COMPACT_PROP,
|
||||||
|
|||||||
@@ -296,6 +296,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP));
|
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isAsyncClean() {
|
||||||
|
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.ASYNC_CLEAN_PROP));
|
||||||
|
}
|
||||||
|
|
||||||
public boolean incrementalCleanerModeEnabled() {
|
public boolean incrementalCleanerModeEnabled() {
|
||||||
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.CLEANER_INCREMENTAL_MODE));
|
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.CLEANER_INCREMENTAL_MODE));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -185,8 +185,9 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
|
|||||||
Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
|
Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
|
||||||
final HoodieCleanerPlan cleanerPlan = requestClean(jsc);
|
final HoodieCleanerPlan cleanerPlan = requestClean(jsc);
|
||||||
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
|
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
|
||||||
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
|
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()
|
||||||
|
&& cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
|
||||||
|
// Only create cleaner plan which does some work
|
||||||
final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
|
final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
|
||||||
// Save to both aux and timeline folder
|
// Save to both aux and timeline folder
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -51,6 +51,7 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||||
import org.apache.hudi.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.testutils.HoodieTestDataGenerator;
|
||||||
@@ -83,6 +84,7 @@ import scala.Tuple3;
|
|||||||
import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
|
import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -128,15 +130,8 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), hadoopConf);
|
HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), hadoopConf);
|
||||||
|
|
||||||
assertFalse(table.getCompletedCommitsTimeline().empty());
|
assertFalse(table.getCompletedCommitsTimeline().empty());
|
||||||
if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) {
|
// We no longer write empty cleaner plans when there is nothing to be cleaned.
|
||||||
// We no longer write empty cleaner plans when there are not enough commits present
|
assertTrue(table.getCompletedCleanTimeline().empty());
|
||||||
assertTrue(table.getCompletedCleanTimeline().empty());
|
|
||||||
} else {
|
|
||||||
String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
|
|
||||||
assertFalse(table.getCompletedCleanTimeline().empty());
|
|
||||||
assertEquals(instantTime, table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(),
|
|
||||||
"The clean instant should be the same as the commit instant");
|
|
||||||
}
|
|
||||||
|
|
||||||
HoodieIndex index = HoodieIndex.createIndex(cfg);
|
HoodieIndex index = HoodieIndex.createIndex(cfg);
|
||||||
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
|
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
|
||||||
@@ -439,17 +434,32 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
|
|
||||||
if (simulateRetryFailure) {
|
if (simulateRetryFailure) {
|
||||||
HoodieInstant completedCleanInstant = new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs);
|
HoodieInstant completedCleanInstant = new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs);
|
||||||
|
HoodieCleanMetadata metadata = CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
|
||||||
|
metadata.getPartitionMetadata().values().forEach(p -> {
|
||||||
|
String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath();
|
||||||
|
p.getSuccessDeleteFiles().forEach(p2 -> {
|
||||||
|
try {
|
||||||
|
metaClient.getFs().create(new Path(dirPath, p2), true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
|
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
|
||||||
HoodieCleanMetadata cleanMetadata2 = writeClient.clean(getNextInstant());
|
HoodieCleanMetadata newCleanMetadata = writeClient.clean(getNextInstant());
|
||||||
|
// No new clean metadata would be created. Only the previous one will be retried
|
||||||
|
assertNull(newCleanMetadata);
|
||||||
|
HoodieCleanMetadata cleanMetadata2 = CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
|
||||||
assertEquals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain());
|
assertEquals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain());
|
||||||
assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted());
|
assertEquals(cleanMetadata1.getTotalFilesDeleted(), cleanMetadata2.getTotalFilesDeleted());
|
||||||
assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), cleanMetadata2.getPartitionMetadata().keySet());
|
assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), cleanMetadata2.getPartitionMetadata().keySet());
|
||||||
final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant);
|
final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant);
|
||||||
cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> {
|
cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> {
|
||||||
HoodieCleanPartitionMetadata p1 = cleanMetadata1.getPartitionMetadata().get(k);
|
HoodieCleanPartitionMetadata p1 = cleanMetadata1.getPartitionMetadata().get(k);
|
||||||
HoodieCleanPartitionMetadata p2 = retriedCleanMetadata.getPartitionMetadata().get(k);
|
HoodieCleanPartitionMetadata p2 = retriedCleanMetadata.getPartitionMetadata().get(k);
|
||||||
assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns());
|
assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns());
|
||||||
assertEquals(p1.getSuccessDeleteFiles(), p2.getFailedDeleteFiles());
|
assertEquals(p1.getSuccessDeleteFiles(), p2.getSuccessDeleteFiles());
|
||||||
|
assertEquals(p1.getFailedDeleteFiles(), p2.getFailedDeleteFiles());
|
||||||
assertEquals(p1.getPartitionPath(), p2.getPartitionPath());
|
assertEquals(p1.getPartitionPath(), p2.getPartitionPath());
|
||||||
assertEquals(k, p1.getPartitionPath());
|
assertEquals(k, p1.getPartitionPath());
|
||||||
});
|
});
|
||||||
@@ -487,12 +497,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
|
||||||
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
|
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
|
||||||
assertEquals(0,
|
assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
|
||||||
getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
|
|
||||||
.size(), "Must not clean any files");
|
|
||||||
assertEquals(0,
|
|
||||||
getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
|
|
||||||
.size(), "Must not clean any files");
|
|
||||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
|
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
|
||||||
file1P0C0));
|
file1P0C0));
|
||||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
|
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
|
||||||
@@ -548,9 +553,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
// No cleaning on partially written file, with no commit.
|
// No cleaning on partially written file, with no commit.
|
||||||
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update
|
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update
|
||||||
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
|
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
|
||||||
assertEquals(0,
|
assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
|
||||||
getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
|
|
||||||
.size(), "Must not clean any files");
|
|
||||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
|
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
|
||||||
file3P0C2));
|
file3P0C2));
|
||||||
}
|
}
|
||||||
@@ -819,11 +822,8 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||||
|
|
||||||
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry);
|
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry);
|
||||||
assertEquals(0,
|
assertEquals(0, hoodieCleanStatsThree.size(),
|
||||||
getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
|
|
||||||
.getSuccessDeleteFiles().size(),
|
|
||||||
"Must not clean any file. We have to keep 1 version before the latest commit time to keep");
|
"Must not clean any file. We have to keep 1 version before the latest commit time to keep");
|
||||||
|
|
||||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
|
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
|
||||||
file1P0C0));
|
file1P0C0));
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.utilities.deltastreamer;
|
package org.apache.hudi.utilities.deltastreamer;
|
||||||
|
|
||||||
import org.apache.hudi.client.HoodieWriteClient;
|
import org.apache.hudi.client.HoodieWriteClient;
|
||||||
|
import org.apache.hudi.async.AbstractAsyncService;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
@@ -326,7 +327,7 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
/**
|
/**
|
||||||
* Syncs data either in single-run or in continuous mode.
|
* Syncs data either in single-run or in continuous mode.
|
||||||
*/
|
*/
|
||||||
public static class DeltaSyncService extends AbstractDeltaStreamerService {
|
public static class DeltaSyncService extends AbstractAsyncService {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
/**
|
/**
|
||||||
@@ -532,7 +533,7 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
/**
|
/**
|
||||||
* Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time.
|
* Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time.
|
||||||
*/
|
*/
|
||||||
public static class AsyncCompactService extends AbstractDeltaStreamerService {
|
public static class AsyncCompactService extends AbstractAsyncService {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
private final int maxConcurrentCompaction;
|
private final int maxConcurrentCompaction;
|
||||||
|
|||||||
Reference in New Issue
Block a user