[HUDI-1399] support a independent clustering spark job to asynchronously clustering (#2379)
* [HUDI-1481] add structured streaming and delta streamer clustering unit test * [HUDI-1399] support a independent clustering spark job to asynchronously clustering * [HUDI-1399] support a independent clustering spark job to asynchronously clustering * [HUDI-1498] Read clustering plan from requested file for inflight instant (#2389) * [HUDI-1399] support a independent clustering spark job with schedule generate instant time Co-authored-by: satishkotha <satishkotha@uber.com>
This commit is contained in:
@@ -42,6 +42,7 @@ import org.apache.hudi.hive.HoodieHiveClient;
|
||||
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
|
||||
import org.apache.hudi.keygen.SimpleKeyGenerator;
|
||||
import org.apache.hudi.utilities.DummySchemaProvider;
|
||||
import org.apache.hudi.utilities.HoodieClusteringJob;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
|
||||
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
|
||||
import org.apache.hudi.utilities.sources.CsvDFSSource;
|
||||
@@ -162,6 +163,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties", dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
|
||||
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties", dfs, dfsBasePath + "/config/uber_config.properties");
|
||||
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
|
||||
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties");
|
||||
|
||||
TypedProperties props = new TypedProperties();
|
||||
props.setProperty("include", "sql-transformer.properties");
|
||||
@@ -404,6 +406,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
});
|
||||
res.get(timeoutInSecs, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem fs) {
|
||||
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath);
|
||||
HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants();
|
||||
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
||||
int numDeltaCommits = (int) timeline.getInstants().count();
|
||||
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -629,8 +639,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
cfg.tableType = tableType.name();
|
||||
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
|
||||
|
||||
deltaStreamerTestRunner(cfg, (r) -> {
|
||||
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
||||
deltaStreamerTestRunner(ds, cfg, (r) -> {
|
||||
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||
TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, dfs);
|
||||
TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
|
||||
@@ -643,8 +653,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
});
|
||||
}
|
||||
|
||||
private void deltaStreamerTestRunner(HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
|
||||
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
||||
private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
|
||||
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
|
||||
try {
|
||||
ds.sync();
|
||||
@@ -653,7 +662,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
}
|
||||
});
|
||||
|
||||
TestHelpers.waitTillCondition(condition, 180);
|
||||
TestHelpers.waitTillCondition(condition, 240);
|
||||
ds.shutdownGracefully();
|
||||
dsFuture.get();
|
||||
}
|
||||
@@ -672,14 +681,72 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
|
||||
cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true"));
|
||||
cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2"));
|
||||
|
||||
deltaStreamerTestRunner(cfg, (r) -> {
|
||||
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
||||
deltaStreamerTestRunner(ds, cfg, (r) -> {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
|
||||
int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
|
||||
int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
|
||||
LOG.info("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize);
|
||||
return completeReplaceSize > 0;
|
||||
});
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
|
||||
assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
|
||||
}
|
||||
|
||||
private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
|
||||
String clusteringInstantTime, boolean runSchedule) {
|
||||
HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
|
||||
config.basePath = basePath;
|
||||
config.clusteringInstantTime = clusteringInstantTime;
|
||||
config.runSchedule = runSchedule;
|
||||
config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
|
||||
return config;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHoodieAsyncClusteringJob() throws Exception {
|
||||
String tableBasePath = dfsBasePath + "/asyncClustering";
|
||||
// Keep it higher than batch-size to test continuous mode
|
||||
int totalRecords = 3000;
|
||||
|
||||
// Initial bulk insert
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
|
||||
cfg.continuousMode = true;
|
||||
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
|
||||
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
|
||||
cfg.configs.add(String.format("%s=true", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY));
|
||||
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
||||
deltaStreamerTestRunner(ds, cfg, (r) -> {
|
||||
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
|
||||
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
|
||||
null, true);
|
||||
HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, scheduleClusteringConfig);
|
||||
Option<String> scheduleClusteringInstantTime = Option.empty();
|
||||
try {
|
||||
scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Schedule clustering failed", e);
|
||||
return false;
|
||||
}
|
||||
if (scheduleClusteringInstantTime.isPresent()) {
|
||||
LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get());
|
||||
HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
|
||||
scheduleClusteringInstantTime.get(), false);
|
||||
HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
|
||||
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
|
||||
LOG.info("Cluster success");
|
||||
} else {
|
||||
LOG.warn("Schedule clustering failed");
|
||||
}
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
|
||||
int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
|
||||
int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
|
||||
System.out.println("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize);
|
||||
return completeReplaceSize > 0;
|
||||
});
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
|
||||
assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
###
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
###
|
||||
hoodie.clustering.inline.max.commits=2
|
||||
Reference in New Issue
Block a user