diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 4983e4bba..63d0bffbe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -731,12 +731,30 @@ public abstract class AbstractHoodieWriteClient table) { + HoodieTimeline inflightTimelineWithReplaceCommit = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); + HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> { + if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + Option> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant); + return !instantPlan.isPresent(); + } else { + return true; + } + }); + return inflightTimelineExcludeClusteringCommit; + } + /** * Cleanup all pending commits. */ private void rollbackPendingCommits() { HoodieTable table = createTable(config, hadoopConf); - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); + HoodieTimeline inflightTimeline = getInflightTimelineExcludeCompactionAndClustering(table); List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); for (String commit : commits) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 181edd3ca..932a455da 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -152,7 +152,18 @@ public class TableSchemaResolver { * @throws Exception */ public Schema getTableAvroSchema() throws Exception { - Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true); + return getTableAvroSchema(true); + } + + /** + * Gets schema for a hoodie table in Avro format, can choice if include metadata fields. + * + * @param includeMetadataFields choice if include metadata fields + * @return Avro schema for this table + * @throws Exception + */ + public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception { + Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(includeMetadataFields); return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : getTableAvroSchemaFromDataFile(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 06fe9619d..ada964fbb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -109,7 +109,7 @@ public interface HoodieTimeline extends Serializable { /** * Filter this timeline to just include the in-flights excluding compaction instants. * - * @return New instance of HoodieTimeline with just in-flights excluding compaction inflights + * @return New instance of HoodieTimeline with just in-flights excluding compaction instants */ HoodieTimeline filterPendingExcludingCompaction(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index b07f00f61..fd768c669 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -204,9 +204,10 @@ class TestStructuredStreaming extends HoodieClientTestBase { // check have schedule clustering and clustering file group to one waitTillHasCompletedReplaceInstant(destPath, 120, 5) metaClient.reloadActiveTimeline() - assertEquals(1, getLatestFileGroupsFileId.size) + assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, checkClusteringResult) + structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) } @Test @@ -219,21 +220,21 @@ class TestStructuredStreaming extends HoodieClientTestBase { override def execute(): Unit = { waitTillHasCompletedReplaceInstant(destPath, 120, 5) } - } - , "Should have replace commit completed") + }, msg) println(msg) } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, checkClusteringResult) + structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) } def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, - isInlineClustering: Boolean, checkClusteringResult: String => Unit): Unit = { + isInlineClustering: Boolean, partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = { // First insert of data - val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList + val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, partitionOfRecords)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) // Second insert of data - val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList + val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, partitionOfRecords)).toList val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) val hudiOptions = getInlineClusteringOpts(isInlineClustering.toString, "2", 100) @@ -252,7 +253,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { // check have more than one file group this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true) - assertTrue(getLatestFileGroupsFileId().size > 1) + assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1) // check clustering result checkClusteringResult(destPath) @@ -265,10 +266,10 @@ class TestStructuredStreaming extends HoodieClientTestBase { Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf) } - private def getLatestFileGroupsFileId():Array[String] = { + private def getLatestFileGroupsFileId(partition: String):Array[String] = { getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline, HoodieTestTable.of(metaClient).listAllBaseFiles()) - tableView.getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) + tableView.getLatestFileSlices(partition) .toArray().map(slice => slice.asInstanceOf[FileSlice].getFileGroupId.getFileId) } @@ -283,7 +284,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { this.metaClient.reloadActiveTimeline() val completeReplaceSize = this.metaClient.getActiveTimeline.getCompletedReplaceTimeline().getInstants.toArray.size println("completeReplaceSize:" + completeReplaceSize) - if(completeReplaceSize > 0) { + if (completeReplaceSize > 0) { success = true } } catch { @@ -293,7 +294,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { Thread.sleep(sleepSecsAfterEachRun * 1000) currTime = System.currentTimeMillis } - if (!success) throw new IllegalStateException("Timed-out waiting for " + " have completed replace instant appear in " + tablePath) + if (!success) throw new IllegalStateException("Timed-out waiting for completing replace instant appear in " + tablePath) } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java new file mode 100644 index 000000000..394771caf --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.jetbrains.annotations.TestOnly; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class HoodieClusteringJob { + + private static final Logger LOG = LogManager.getLogger(HoodieClusteringJob.class); + private final Config cfg; + private transient FileSystem fs; + private TypedProperties props; + private final JavaSparkContext jsc; + + public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) { + this.cfg = cfg; + this.jsc = jsc; + this.props = cfg.propsFilePath == null + ? UtilHelpers.buildProperties(cfg.configs) + : readConfigFromFileSystem(jsc, cfg); + } + + private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { + final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); + + return UtilHelpers + .readConfig(fs, new Path(cfg.propsFilePath), cfg.configs) + .getConfig(); + } + + public static class Config implements Serializable { + @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true) + public String basePath = null; + @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true) + public String tableName = null; + @Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only need when cluster. " + + "And schedule clustering can generate it.", required = false) + public String clusteringInstantTime = null; + @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false) + public int parallelism = 1; + @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) + public String sparkMaster = null; + @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true) + public String sparkMemory = null; + @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false) + public int retry = 0; + + @Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering") + public Boolean runSchedule = false; + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + + "hoodie client for clustering") + public String propsFilePath = null; + + @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + splitter = IdentitySplitter.class) + public List configs = new ArrayList<>(); + } + + public static void main(String[] args) { + final Config cfg = new Config(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0 || (!cfg.runSchedule && cfg.clusteringInstantTime == null)) { + cmd.usage(); + System.exit(1); + } + final JavaSparkContext jsc = UtilHelpers.buildSparkContext("clustering-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory); + HoodieClusteringJob clusteringJob = new HoodieClusteringJob(jsc, cfg); + int result = clusteringJob.cluster(cfg.retry); + String resultMsg = String.format("Clustering with basePath: %s, tableName: %s, runSchedule: %s", + cfg.basePath, cfg.tableName, cfg.runSchedule); + if (result == -1) { + LOG.error(resultMsg + " failed"); + } else { + LOG.info(resultMsg + " success"); + } + jsc.stop(); + } + + public int cluster(int retry) { + this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); + int ret = UtilHelpers.retry(retry, () -> { + if (cfg.runSchedule) { + LOG.info("Do schedule"); + Option instantTime = doSchedule(jsc); + int result = instantTime.isPresent() ? 0 : -1; + if (result == 0) { + LOG.info("The schedule instant time is " + instantTime.get()); + } + return result; + } else { + LOG.info("Do cluster"); + return doCluster(jsc); + } + }, "Cluster failed"); + return ret; + } + + private String getSchemaFromLatestInstant() throws Exception { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath, true); + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) { + throw new HoodieException("Cannot run clustering without any completed commits"); + } + Schema schema = schemaUtil.getTableAvroSchema(false); + return schema.toString(); + } + + private int doCluster(JavaSparkContext jsc) throws Exception { + String schemaStr = getSchemaFromLatestInstant(); + SparkRDDWriteClient client = + UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); + JavaRDD writeResponse = + (JavaRDD) client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses(); + return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse); + } + + @TestOnly + public Option doSchedule() throws Exception { + return this.doSchedule(jsc); + } + + private Option doSchedule(JavaSparkContext jsc) throws Exception { + String schemaStr = getSchemaFromLatestInstant(); + SparkRDDWriteClient client = + UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); + return client.scheduleClustering(Option.empty()); + } + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 30d5445b3..6830f7f74 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -109,21 +109,16 @@ public class HoodieCompactor { public int compact(int retry) { this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); - int ret = -1; - try { - do { - if (cfg.runSchedule) { - if (null == cfg.strategyClassName) { - throw new IllegalArgumentException("Missing Strategy class name for running compaction"); - } - ret = doSchedule(jsc); - } else { - ret = doCompact(jsc); + int ret = UtilHelpers.retry(retry, () -> { + if (cfg.runSchedule) { + if (null == cfg.strategyClassName) { + throw new IllegalArgumentException("Missing Strategy class name for running compaction"); } - } while (ret != 0 && retry-- > 0); - } catch (Throwable t) { - LOG.error(t); - } + return doSchedule(jsc); + } else { + return doCompact(jsc); + } + }, "Compact failed"); return ret; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 7d5c9a439..19de67f7b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -428,4 +428,22 @@ public class UtilHelpers { SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType); return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, null); } + + @FunctionalInterface + public interface CheckedSupplier { + T get() throws Throwable; + } + + public static int retry(int maxRetryCount, CheckedSupplier supplier, String errorMessage) { + int ret = -1; + try { + do { + ret = supplier.get(); + } while (ret != 0 && maxRetryCount-- > 0); + } catch (Throwable t) { + LOG.error(errorMessage, t); + } + return ret; + } + } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 50488bc11..0ce9acaf4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -296,7 +296,9 @@ public class DeltaSync implements Serializable { // Retrieve the previous round checkpoints, if any Option resumeCheckpointStr = Option.empty(); if (commitTimelineOpt.isPresent()) { - Option lastCommit = commitTimelineOpt.get().lastInstant(); + // TODO: now not support replace action HUDI-1500 + Option lastCommit = commitTimelineOpt.get() + .filter(instant -> !instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant(); if (lastCommit.isPresent()) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 6966e2cd6..eeef8ed79 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -42,6 +42,7 @@ import org.apache.hudi.hive.HoodieHiveClient; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.utilities.DummySchemaProvider; +import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.CsvDFSSource; @@ -162,6 +163,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties", dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties", dfs, dfsBasePath + "/config/uber_config.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties"); TypedProperties props = new TypedProperties(); props.setProperty("include", "sql-transformer.properties"); @@ -404,6 +406,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { }); res.get(timeoutInSecs, TimeUnit.SECONDS); } + + static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath); + HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); + int numDeltaCommits = (int) timeline.getInstants().count(); + assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); + } } @Test @@ -629,8 +639,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { cfg.tableType = tableType.name(); cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); - - deltaStreamerTestRunner(cfg, (r) -> { + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, dfs); TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs); @@ -643,8 +653,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { }); } - private void deltaStreamerTestRunner(HoodieDeltaStreamer.Config cfg, Function condition) throws Exception { - HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function condition) throws Exception { Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> { try { ds.sync(); @@ -653,7 +662,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } }); - TestHelpers.waitTillCondition(condition, 180); + TestHelpers.waitTillCondition(condition, 240); ds.shutdownGracefully(); dsFuture.get(); } @@ -672,14 +681,72 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true")); cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2")); - - deltaStreamerTestRunner(cfg, (r) -> { + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true); int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; LOG.info("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize); return completeReplaceSize > 0; }); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true); + assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length); + } + + private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, + String clusteringInstantTime, boolean runSchedule) { + HoodieClusteringJob.Config config = new HoodieClusteringJob.Config(); + config.basePath = basePath; + config.clusteringInstantTime = clusteringInstantTime; + config.runSchedule = runSchedule; + config.propsFilePath = dfsBasePath + "/clusteringjob.properties"; + return config; + } + + @Test + public void testHoodieAsyncClusteringJob() throws Exception { + String tableBasePath = dfsBasePath + "/asyncClustering"; + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); + cfg.configs.add(String.format("%s=true", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY)); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, + null, true); + HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, scheduleClusteringConfig); + Option scheduleClusteringInstantTime = Option.empty(); + try { + scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule(); + } catch (Exception e) { + LOG.warn("Schedule clustering failed", e); + return false; + } + if (scheduleClusteringInstantTime.isPresent()) { + LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get()); + HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, + scheduleClusteringInstantTime.get(), false); + HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig); + clusterClusteringJob.cluster(clusterClusteringConfig.retry); + LOG.info("Cluster success"); + } else { + LOG.warn("Schedule clustering failed"); + } + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true); + int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; + int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; + System.out.println("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize); + return completeReplaceSize > 0; + }); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true); + assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length); } /** diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/clusteringjob.properties b/hudi-utilities/src/test/resources/delta-streamer-config/clusteringjob.properties new file mode 100644 index 000000000..9a089fe41 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/clusteringjob.properties @@ -0,0 +1,18 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +hoodie.clustering.inline.max.commits=2 \ No newline at end of file