1
0

HUDI-479: Eliminate or Minimize use of Guava if possible (#1159)

This commit is contained in:
Suneel Marthi
2020-03-28 03:11:32 -04:00
committed by GitHub
parent 1713f686f8
commit 8c3001363d
46 changed files with 429 additions and 217 deletions

View File

@@ -28,9 +28,9 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import com.google.common.collect.ImmutableSet;
import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption; import org.springframework.shell.core.annotation.CliOption;
@@ -123,7 +123,7 @@ public class RollbacksCommand implements CommandMarker {
class RollbackTimeline extends HoodieActiveTimeline { class RollbackTimeline extends HoodieActiveTimeline {
public RollbackTimeline(HoodieTableMetaClient metaClient) { public RollbackTimeline(HoodieTableMetaClient metaClient) {
super(metaClient, ImmutableSet.<String>builder().add(HoodieTimeline.ROLLBACK_EXTENSION).build()); super(metaClient, CollectionUtils.createImmutableSet(HoodieTimeline.ROLLBACK_EXTENSION));
} }
} }
} }

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.cli.common; package org.apache.hudi.cli.common;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@@ -30,12 +28,14 @@ import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -91,14 +91,14 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
* Generate commitMetadata in path. * Generate commitMetadata in path.
*/ */
public static HoodieCommitMetadata generateCommitMetadata(String basePath) throws IOException { public static HoodieCommitMetadata generateCommitMetadata(String basePath) throws IOException {
String file1P0C0 = String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "000");
HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "000"); String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, "000");
String file1P1C0 = return generateCommitMetadata(new HashMap<String, List<String>>() {
HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, "000"); {
return generateCommitMetadata(new ImmutableMap.Builder() put(DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0));
.put(DEFAULT_FIRST_PARTITION_PATH, new ImmutableList.Builder<>().add(file1P0C0).build()) put(DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0));
.put(DEFAULT_SECOND_PARTITION_PATH, new ImmutableList.Builder<>().add(file1P1C0).build()) }
.build()); });
} }
/** /**

View File

@@ -64,7 +64,6 @@ import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.table.WorkloadStat;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import com.google.common.collect.ImmutableMap;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.Partitioner; import org.apache.spark.Partitioner;
@@ -77,6 +76,7 @@ import org.apache.spark.storage.StorageLevel;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.ParseException; import java.text.ParseException;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -746,7 +746,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime(); String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime();
// Start the timer // Start the timer
final Timer.Context context = startContext(); final Timer.Context context = startContext();
ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats = ImmutableMap.builder(); Map<String, List<HoodieRollbackStat>> instantsToStats = new HashMap<>();
table.getActiveTimeline().createNewInstant( table.getActiveTimeline().createNewInstant(
new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant)); new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant));
instantsToRollback.forEach(instant -> { instantsToRollback.forEach(instant -> {
@@ -773,7 +773,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
} }
}); });
try { try {
finishRestore(context, instantsToStats.build(), finishRestore(context, Collections.unmodifiableMap(instantsToStats),
instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()), instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()),
startRollbackInstant, instantTime); startRollbackInstant, instantTime);
} catch (IOException io) { } catch (IOException io) {

View File

@@ -64,9 +64,8 @@ public class BloomIndexFileInfo implements Serializable {
* Does the given key fall within the range (inclusive). * Does the given key fall within the range (inclusive).
*/ */
public boolean isKeyInRange(String recordKey) { public boolean isKeyInRange(String recordKey) {
assert minRecordKey != null; return Objects.requireNonNull(minRecordKey).compareTo(recordKey) <= 0
assert maxRecordKey != null; && Objects.requireNonNull(maxRecordKey).compareTo(recordKey) >= 0;
return minRecordKey.compareTo(recordKey) <= 0 && maxRecordKey.compareTo(recordKey) >= 0;
} }
@Override @Override

View File

@@ -54,8 +54,8 @@ import org.apache.spark.util.SizeEstimator;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;

View File

@@ -18,19 +18,19 @@
package org.apache.hudi.metrics; package org.apache.hudi.metrics;
import com.google.common.base.Preconditions;
import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import java.io.Closeable; import java.io.Closeable;
import java.util.Objects;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
/** /**
* Implementation of Jmx reporter, which used to report jmx metric. * Implementation of Jmx reporter, which used to report jmx metric.
*/ */
@@ -92,7 +92,7 @@ public class JmxMetricsReporter extends MetricsReporter {
@Override @Override
public void stop() { public void stop() {
Preconditions.checkNotNull(jmxReporterServer, "jmxReporterServer is not running."); Objects.requireNonNull(jmxReporterServer, "jmxReporterServer is not running.");
try { try {
jmxReporterServer.stop(); jmxReporterServer.stop();
} catch (Exception e) { } catch (Exception e) {

View File

@@ -18,20 +18,23 @@
package org.apache.hudi.metrics; package org.apache.hudi.metrics;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter; import com.codahale.metrics.jmx.JmxReporter;
import com.google.common.base.Preconditions;
import java.io.IOException; import java.io.IOException;
import java.rmi.NoSuchObjectException; import java.rmi.NoSuchObjectException;
import java.rmi.registry.LocateRegistry; import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry; import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject; import java.rmi.server.UnicastRemoteObject;
import java.util.Objects;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory; import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL; import javax.management.remote.JMXServiceURL;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
/** /**
* A reporter which publishes metric values to a JMX server. * A reporter which publishes metric values to a JMX server.
@@ -78,10 +81,9 @@ public class JmxReporterServer {
} }
public JmxReporterServer build() { public JmxReporterServer build() {
Preconditions.checkNotNull(registry, "registry cannot be null!"); Objects.requireNonNull(registry, "registry cannot be null!");
Preconditions.checkNotNull(mBeanServer, "mBeanServer cannot be null!"); Objects.requireNonNull(mBeanServer, "mBeanServer cannot be null!");
Preconditions ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(host), "host cannot be null or empty!");
.checkArgument(!StringUtils.isNullOrEmpty(host), "host cannot be null or empty!");
return new JmxReporterServer(registry, host, port, mBeanServer); return new JmxReporterServer(registry, host, port, mBeanServer);
} }
} }
@@ -110,7 +112,7 @@ public class JmxReporterServer {
} }
public void start() { public void start() {
Preconditions.checkArgument(reporter != null && connector != null, ValidationUtils.checkArgument(reporter != null && connector != null,
"reporter or connector cannot be null!"); "reporter or connector cannot be null!");
try { try {
connector.start(); connector.start();

View File

@@ -18,12 +18,12 @@
package org.apache.hudi.metrics; package org.apache.hudi.metrics;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import com.codahale.metrics.Gauge; import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.google.common.io.Closeables;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -53,8 +53,7 @@ public class Metrics {
Runtime.getRuntime().addShutdownHook(new Thread(() -> { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try { try {
reporter.report(); reporter.report();
reporter.stop(); FileIOUtils.close(reporter.getReporter(), true);
Closeables.close(reporter.getReporter(), true);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@@ -32,6 +32,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView.SliceView; import org.apache.hudi.common.table.TableFileSystemView.SliceView;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.HoodieAvroUtils; import org.apache.hudi.common.util.HoodieAvroUtils;
@@ -43,7 +44,6 @@ import org.apache.hudi.table.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import com.google.common.collect.Sets;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@@ -113,7 +113,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
// loaded and load it using CompositeAvroLogReader // loaded and load it using CompositeAvroLogReader
// Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
String maxInstantTime = metaClient String maxInstantTime = metaClient
.getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp(); .filterCompletedInstants().lastInstant().get().getTimestamp();
LOG.info("MaxMemoryPerCompaction => " + SparkConfigUtils.getMaxMemoryPerCompaction(config.getProps())); LOG.info("MaxMemoryPerCompaction => " + SparkConfigUtils.getMaxMemoryPerCompaction(config.getProps()));

View File

@@ -214,7 +214,7 @@ public class RollbackHelper implements Serializable {
private Map<HeaderMetadataType, String> generateHeader(String commit) { private Map<HeaderMetadataType, String> generateHeader(String commit) {
// generate metadata // generate metadata
Map<HeaderMetadataType, String> header = new HashMap<>(); Map<HeaderMetadataType, String> header = new HashMap<>(3);
header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit); header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,

View File

@@ -249,17 +249,13 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5)); JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
String filename0 = String filename0 =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Collections.singletonList(record1), HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Collections.singletonList(record1), schema, null, false);
schema, null, false);
String filename1 = String filename1 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", new ArrayList<>(), HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", new ArrayList<>(), schema, null, false);
schema, null, false);
String filename2 = String filename2 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record2), HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record2), schema, null, false);
schema, null, false);
String filename3 = String filename3 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record4), HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record4), schema, null, false);
schema, null, false);
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);

View File

@@ -45,6 +45,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.ConsistencyGuardConfig; import org.apache.hudi.common.util.ConsistencyGuardConfig;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
@@ -55,9 +56,6 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@@ -290,7 +288,7 @@ public class TestCleaner extends TestHoodieClientBase {
List<String> commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId)); List<String> commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId));
for (int i = 0; i < dataFiles.size(); i++) { for (int i = 0; i < dataFiles.size(); i++) {
assertEquals("File " + fileId + " does not have latest versions on commits" + commitedVersions, assertEquals("File " + fileId + " does not have latest versions on commits" + commitedVersions,
Iterables.get(dataFiles, i).getCommitTime(), (dataFiles.get(i)).getCommitTime(),
commitedVersions.get(commitedVersions.size() - 1 - i)); commitedVersions.get(commitedVersions.size() - 1 - i));
} }
} }
@@ -740,12 +738,14 @@ public class TestCleaner extends TestHoodieClientBase {
String file1P1C0 = String file1P1C0 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
HoodieCommitMetadata commitMetadata = generateCommitMetadata(new ImmutableMap.Builder() HoodieCommitMetadata commitMetadata = generateCommitMetadata(
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, Collections.unmodifiableMap(new HashMap<String, List<String>>() {
new ImmutableList.Builder<>().add(file1P0C0).build()) {
.put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0));
new ImmutableList.Builder<>().add(file1P1C0).build()) put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0));
.build()); }
})
);
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
@@ -778,12 +778,12 @@ public class TestCleaner extends TestHoodieClientBase {
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
HoodieTestUtils HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update
commitMetadata = generateCommitMetadata(new ImmutableMap.Builder() commitMetadata = generateCommitMetadata(new HashMap<String, List<String>>() {
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, {
new ImmutableList.Builder<>().add(file1P0C0).add(file2P0C1).build()) put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
.put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
new ImmutableList.Builder<>().add(file1P1C0).add(file2P1C1).build()) }
.build()); });
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"), new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
@@ -814,10 +814,9 @@ public class TestCleaner extends TestHoodieClientBase {
String file3P0C2 = String file3P0C2 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");
commitMetadata = generateCommitMetadata(new ImmutableMap.Builder() commitMetadata = generateCommitMetadata(CollectionUtils
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, .createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
new ImmutableList.Builder<>().add(file1P0C0).add(file2P0C1).add(file3P0C2).build()) CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2)));
.build());
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"), new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
@@ -840,10 +839,8 @@ public class TestCleaner extends TestHoodieClientBase {
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update
String file4P0C3 = String file4P0C3 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003"); HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003");
commitMetadata = generateCommitMetadata(new ImmutableMap.Builder() commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
new ImmutableList.Builder<>().add(file1P0C0).add(file2P0C1).add(file4P0C3).build())
.build());
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"), new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
@@ -871,10 +868,8 @@ public class TestCleaner extends TestHoodieClientBase {
// No cleaning on partially written file, with no commit. // No cleaning on partially written file, with no commit.
HoodieTestUtils HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update
commitMetadata = generateCommitMetadata(new ImmutableMap.Builder() commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file3P0C2)));
new ImmutableList.Builder<>().add(file3P0C2).build())
.build());
metaClient.getActiveTimeline().createNewInstant( metaClient.getActiveTimeline().createNewInstant(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004")); new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"));
metaClient.getActiveTimeline().transitionRequestedToInflight( metaClient.getActiveTimeline().transitionRequestedToInflight(

View File

@@ -27,7 +27,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import com.google.common.collect.ImmutableMap;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@@ -122,8 +121,14 @@ public class TestHoodieCompactionStrategy {
sizesMap.put(100 * MB, Collections.singletonList(MB)); sizesMap.put(100 * MB, Collections.singletonList(MB));
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
Map<Long, String> keyToPartitionMap = new ImmutableMap.Builder().put(120 * MB, partitionPaths[2]) Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() {
.put(110 * MB, partitionPaths[2]).put(100 * MB, partitionPaths[1]).put(90 * MB, partitionPaths[0]).build(); {
put(120 * MB, partitionPaths[2]);
put(110 * MB, partitionPaths[2]);
put(100 * MB, partitionPaths[1]);
put(90 * MB, partitionPaths[0]);
}
});
DayBasedCompactionStrategy strategy = new DayBasedCompactionStrategy(); DayBasedCompactionStrategy strategy = new DayBasedCompactionStrategy();
HoodieWriteConfig writeConfig = HoodieWriteConfig writeConfig =
@@ -162,9 +167,16 @@ public class TestHoodieCompactionStrategy {
String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1)); String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1));
String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5)); String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5));
Map<Long, String> keyToPartitionMap = new ImmutableMap.Builder().put(120 * MB, currentDay) Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() {
.put(110 * MB, currentDayMinus1).put(100 * MB, currentDayMinus2).put(80 * MB, currentDayMinus3) {
.put(90 * MB, currentDayPlus1).put(70 * MB, currentDayPlus5).build(); put(120 * MB, currentDay);
put(110 * MB, currentDayMinus1);
put(100 * MB, currentDayMinus2);
put(80 * MB, currentDayMinus3);
put(90 * MB, currentDayPlus1);
put(70 * MB, currentDayPlus5);
}
});
BoundedPartitionAwareCompactionStrategy strategy = new BoundedPartitionAwareCompactionStrategy(); BoundedPartitionAwareCompactionStrategy strategy = new BoundedPartitionAwareCompactionStrategy();
HoodieWriteConfig writeConfig = HoodieWriteConfig writeConfig =
@@ -204,9 +216,16 @@ public class TestHoodieCompactionStrategy {
String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1)); String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1));
String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5)); String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5));
Map<Long, String> keyToPartitionMap = new ImmutableMap.Builder().put(120 * MB, currentDay) Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() {
.put(110 * MB, currentDayMinus1).put(100 * MB, currentDayMinus2).put(80 * MB, currentDayMinus3) {
.put(90 * MB, currentDayPlus1).put(70 * MB, currentDayPlus5).build(); put(120 * MB, currentDay);
put(110 * MB, currentDayMinus1);
put(100 * MB, currentDayMinus2);
put(80 * MB, currentDayMinus3);
put(90 * MB, currentDayPlus1);
put(70 * MB, currentDayPlus5);
}
});
UnBoundedPartitionAwareCompactionStrategy strategy = new UnBoundedPartitionAwareCompactionStrategy(); UnBoundedPartitionAwareCompactionStrategy strategy = new UnBoundedPartitionAwareCompactionStrategy();
HoodieWriteConfig writeConfig = HoodieWriteConfig writeConfig =

View File

@@ -23,7 +23,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.Schema.Type; import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
@@ -32,6 +31,7 @@ import org.apache.avro.generic.GenericRecord;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -49,13 +49,22 @@ public class MercifulJsonConverter {
* Build type processor map for each avro type. * Build type processor map for each avro type.
*/ */
private static Map<Schema.Type, JsonToAvroFieldProcessor> getFieldTypeProcessors() { private static Map<Schema.Type, JsonToAvroFieldProcessor> getFieldTypeProcessors() {
return new ImmutableMap.Builder<Type, JsonToAvroFieldProcessor>().put(Type.STRING, generateStringTypeHandler()) return Collections.unmodifiableMap(new HashMap<Schema.Type, JsonToAvroFieldProcessor>() {
.put(Type.BOOLEAN, generateBooleanTypeHandler()).put(Type.DOUBLE, generateDoubleTypeHandler()) {
.put(Type.FLOAT, generateFloatTypeHandler()).put(Type.INT, generateIntTypeHandler()) put(Type.STRING, generateStringTypeHandler());
.put(Type.LONG, generateLongTypeHandler()).put(Type.ARRAY, generateArrayTypeHandler()) put(Type.BOOLEAN, generateBooleanTypeHandler());
.put(Type.RECORD, generateRecordTypeHandler()).put(Type.ENUM, generateEnumTypeHandler()) put(Type.DOUBLE, generateDoubleTypeHandler());
.put(Type.MAP, generateMapTypeHandler()).put(Type.BYTES, generateBytesTypeHandler()) put(Type.FLOAT, generateFloatTypeHandler());
.put(Type.FIXED, generateFixedTypeHandler()).build(); put(Type.INT, generateIntTypeHandler());
put(Type.LONG, generateLongTypeHandler());
put(Type.ARRAY, generateArrayTypeHandler());
put(Type.RECORD, generateRecordTypeHandler());
put(Type.ENUM, generateEnumTypeHandler());
put(Type.MAP, generateMapTypeHandler());
put(Type.BYTES, generateBytesTypeHandler());
put(Type.FIXED, generateFixedTypeHandler());
}
});
} }
/** /**

View File

@@ -18,10 +18,9 @@
package org.apache.hudi.common.model; package org.apache.hudi.common.model;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import com.google.common.collect.ImmutableList;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@@ -38,8 +37,8 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
public static String FILENAME_METADATA_FIELD = "_hoodie_file_name"; public static String FILENAME_METADATA_FIELD = "_hoodie_file_name";
public static final List<String> HOODIE_META_COLUMNS = public static final List<String> HOODIE_META_COLUMNS =
new ImmutableList.Builder<String>().add(COMMIT_TIME_METADATA_FIELD).add(COMMIT_SEQNO_METADATA_FIELD) CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
.add(RECORD_KEY_METADATA_FIELD).add(PARTITION_PATH_METADATA_FIELD).add(FILENAME_METADATA_FIELD).build(); RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD);
/** /**
* Identifies the record across the table. * Identifies the record across the table.

View File

@@ -26,13 +26,13 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ConsistencyGuardConfig; import org.apache.hudi.common.util.ConsistencyGuardConfig;
import org.apache.hudi.common.util.FailSafeConsistencyGuard;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FailSafeConsistencyGuard;
import org.apache.hudi.common.util.NoOpConsistencyGuard; import org.apache.hudi.common.util.NoOpConsistencyGuard;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@@ -246,7 +246,7 @@ public class HoodieTableMetaClient implements Serializable {
/** /**
* Return raw file-system. * Return raw file-system.
* *
* @return * @return fs
*/ */
public FileSystem getRawFs() { public FileSystem getRawFs() {
return getFs().getFileSystem(); return getFs().getFileSystem();

View File

@@ -26,7 +26,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@@ -38,8 +37,10 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
@@ -106,13 +107,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
public HoodieActiveTimeline(HoodieTableMetaClient metaClient) { public HoodieActiveTimeline(HoodieTableMetaClient metaClient) {
this(metaClient, new ImmutableSet.Builder<String>().addAll(VALID_EXTENSIONS_IN_ACTIVE_TIMELINE).build()); this(metaClient, Collections.unmodifiableSet(VALID_EXTENSIONS_IN_ACTIVE_TIMELINE));
} }
public HoodieActiveTimeline(HoodieTableMetaClient metaClient, boolean applyLayoutFilter) { public HoodieActiveTimeline(HoodieTableMetaClient metaClient, boolean applyLayoutFilter) {
this(metaClient, this(metaClient, Collections.unmodifiableSet(VALID_EXTENSIONS_IN_ACTIVE_TIMELINE), applyLayoutFilter);
new ImmutableSet.Builder<String>()
.addAll(VALID_EXTENSIONS_IN_ACTIVE_TIMELINE).build(), applyLayoutFilter);
} }
/** /**
@@ -166,7 +165,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
public void deleteCompactionRequested(HoodieInstant instant) { public void deleteCompactionRequested(HoodieInstant instant) {
ValidationUtils.checkArgument(instant.isRequested()); ValidationUtils.checkArgument(instant.isRequested());
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); ValidationUtils.checkArgument(Objects.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION));
deleteInstantFile(instant); deleteInstantFile(instant);
} }

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;

View File

@@ -20,11 +20,11 @@ package org.apache.hudi.common.table.timeline;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import com.google.common.collect.Sets;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -113,7 +113,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
@Override @Override
public HoodieTimeline getCommitsAndCompactionTimeline() { public HoodieTimeline getCommitsAndCompactionTimeline() {
Set<String> validActions = Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION); Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION);
return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details); return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details);
} }
@@ -145,7 +145,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
* Get all instants (commits, delta commits) that produce new data, in the active timeline. * Get all instants (commits, delta commits) that produce new data, in the active timeline.
*/ */
public HoodieTimeline getCommitsTimeline() { public HoodieTimeline getCommitsTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)); return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION));
} }
/** /**
@@ -153,8 +153,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
* timeline. * timeline.
*/ */
public HoodieTimeline getAllCommitsTimeline() { public HoodieTimeline getAllCommitsTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION, return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION,
SAVEPOINT_ACTION, ROLLBACK_ACTION)); CLEAN_ACTION, COMPACTION_ACTION, SAVEPOINT_ACTION, ROLLBACK_ACTION));
} }
/** /**

View File

@@ -19,9 +19,9 @@
package org.apache.hudi.common.table.timeline; package org.apache.hudi.common.table.timeline;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import java.io.Serializable; import java.io.Serializable;
@@ -41,8 +41,8 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
* A COMPACTION action eventually becomes COMMIT when completed. So, when grouping instants * A COMPACTION action eventually becomes COMMIT when completed. So, when grouping instants
* for state transitions, this needs to be taken into account * for state transitions, this needs to be taken into account
*/ */
private static final Map<String, String> COMPARABLE_ACTIONS = new ImmutableMap.Builder<String, String>() private static final Map<String, String> COMPARABLE_ACTIONS =
.put(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.COMMIT_ACTION).build(); CollectionUtils.createImmutableMap(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.COMMIT_ACTION);
public static final Comparator<HoodieInstant> ACTION_COMPARATOR = public static final Comparator<HoodieInstant> ACTION_COMPARATOR =
Comparator.comparing(instant -> getComparableAction(instant.getAction())); Comparator.comparing(instant -> getComparableAction(instant.getAction()));

View File

@@ -24,8 +24,8 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;

View File

@@ -26,8 +26,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView; import org.apache.hudi.common.table.SyncableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO; import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO; import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO; import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
import org.apache.hudi.common.table.timeline.dto.InstantDTO; import org.apache.hudi.common.table.timeline.dto.InstantDTO;

View File

@@ -28,7 +28,6 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata; import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.HoodieRollbackStat;
import com.google.common.collect.ImmutableMap;
import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader; import org.apache.avro.file.FileReader;
@@ -42,6 +41,7 @@ import org.apache.avro.specific.SpecificRecordBase;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -54,18 +54,18 @@ public class AvroUtils {
public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime, Option<Long> durationInMs, public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime, Option<Long> durationInMs,
List<String> commits, Map<String, List<HoodieRollbackStat>> commitToStats) { List<String> commits, Map<String, List<HoodieRollbackStat>> commitToStats) {
ImmutableMap.Builder<String, List<HoodieRollbackMetadata>> commitToStatBuilder = ImmutableMap.builder(); Map<String, List<HoodieRollbackMetadata>> commitToStatsMap = new HashMap<>();
for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) { for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
commitToStatBuilder.put(commitToStat.getKey(), commitToStatsMap.put(commitToStat.getKey(),
Collections.singletonList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue()))); Collections.singletonList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue())));
} }
return new HoodieRestoreMetadata(startRestoreTime, durationInMs.orElseGet(() -> -1L), commits, return new HoodieRestoreMetadata(startRestoreTime, durationInMs.orElseGet(() -> -1L), commits,
commitToStatBuilder.build(), DEFAULT_VERSION); Collections.unmodifiableMap(commitToStatsMap), DEFAULT_VERSION);
} }
public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbackTime, Option<Long> durationInMs, public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbackTime, Option<Long> durationInMs,
List<String> commits, List<HoodieRollbackStat> rollbackStats) { List<String> commits, List<HoodieRollbackStat> rollbackStats) {
ImmutableMap.Builder<String, HoodieRollbackPartitionMetadata> partitionMetadataBuilder = ImmutableMap.builder(); Map<String, HoodieRollbackPartitionMetadata> partitionMetadataBuilder = new HashMap<>();
int totalDeleted = 0; int totalDeleted = 0;
for (HoodieRollbackStat stat : rollbackStats) { for (HoodieRollbackStat stat : rollbackStats) {
HoodieRollbackPartitionMetadata metadata = new HoodieRollbackPartitionMetadata(stat.getPartitionPath(), HoodieRollbackPartitionMetadata metadata = new HoodieRollbackPartitionMetadata(stat.getPartitionPath(),
@@ -75,18 +75,18 @@ public class AvroUtils {
} }
return new HoodieRollbackMetadata(startRollbackTime, durationInMs.orElseGet(() -> -1L), totalDeleted, commits, return new HoodieRollbackMetadata(startRollbackTime, durationInMs.orElseGet(() -> -1L), totalDeleted, commits,
partitionMetadataBuilder.build(), DEFAULT_VERSION); Collections.unmodifiableMap(partitionMetadataBuilder), DEFAULT_VERSION);
} }
public static HoodieSavepointMetadata convertSavepointMetadata(String user, String comment, public static HoodieSavepointMetadata convertSavepointMetadata(String user, String comment,
Map<String, List<String>> latestFiles) { Map<String, List<String>> latestFiles) {
ImmutableMap.Builder<String, HoodieSavepointPartitionMetadata> partitionMetadataBuilder = ImmutableMap.builder(); Map<String, HoodieSavepointPartitionMetadata> partitionMetadataBuilder = new HashMap<>();
for (Map.Entry<String, List<String>> stat : latestFiles.entrySet()) { for (Map.Entry<String, List<String>> stat : latestFiles.entrySet()) {
HoodieSavepointPartitionMetadata metadata = new HoodieSavepointPartitionMetadata(stat.getKey(), stat.getValue()); HoodieSavepointPartitionMetadata metadata = new HoodieSavepointPartitionMetadata(stat.getKey(), stat.getValue());
partitionMetadataBuilder.put(stat.getKey(), metadata); partitionMetadataBuilder.put(stat.getKey(), metadata);
} }
return new HoodieSavepointMetadata(user, System.currentTimeMillis(), comment, partitionMetadataBuilder.build(), return new HoodieSavepointMetadata(user, System.currentTimeMillis(), comment,
DEFAULT_VERSION); Collections.unmodifiableMap(partitionMetadataBuilder), DEFAULT_VERSION);
} }
public static Option<byte[]> serializeCompactionPlan(HoodieCompactionPlan compactionWorkload) throws IOException { public static Option<byte[]> serializeCompactionPlan(HoodieCompactionPlan compactionWorkload) throws IOException {

View File

@@ -28,10 +28,10 @@ import org.apache.hudi.common.versioning.clean.CleanMetadataMigrator;
import org.apache.hudi.common.versioning.clean.CleanV1MigrationHandler; import org.apache.hudi.common.versioning.clean.CleanV1MigrationHandler;
import org.apache.hudi.common.versioning.clean.CleanV2MigrationHandler; import org.apache.hudi.common.versioning.clean.CleanV2MigrationHandler;
import com.google.common.collect.ImmutableMap;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
public class CleanerUtils { public class CleanerUtils {
public static final Integer CLEAN_METADATA_VERSION_1 = CleanV1MigrationHandler.VERSION; public static final Integer CLEAN_METADATA_VERSION_1 = CleanV1MigrationHandler.VERSION;
@@ -40,14 +40,14 @@ public class CleanerUtils {
public static HoodieCleanMetadata convertCleanMetadata(HoodieTableMetaClient metaClient, public static HoodieCleanMetadata convertCleanMetadata(HoodieTableMetaClient metaClient,
String startCleanTime, Option<Long> durationInMs, List<HoodieCleanStat> cleanStats) { String startCleanTime, Option<Long> durationInMs, List<HoodieCleanStat> cleanStats) {
ImmutableMap.Builder<String, HoodieCleanPartitionMetadata> partitionMetadataBuilder = ImmutableMap.builder(); Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = new HashMap<>();
int totalDeleted = 0; int totalDeleted = 0;
String earliestCommitToRetain = null; String earliestCommitToRetain = null;
for (HoodieCleanStat stat : cleanStats) { for (HoodieCleanStat stat : cleanStats) {
HoodieCleanPartitionMetadata metadata = HoodieCleanPartitionMetadata metadata =
new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(), new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(),
stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles()); stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles());
partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); partitionMetadataMap.put(stat.getPartitionPath(), metadata);
totalDeleted += stat.getSuccessDeleteFiles().size(); totalDeleted += stat.getSuccessDeleteFiles().size();
if (earliestCommitToRetain == null) { if (earliestCommitToRetain == null) {
// This will be the same for all partitions // This will be the same for all partitions
@@ -56,8 +56,7 @@ public class CleanerUtils {
} }
return new HoodieCleanMetadata(startCleanTime, return new HoodieCleanMetadata(startCleanTime,
durationInMs.orElseGet(() -> -1L), totalDeleted, earliestCommitToRetain, durationInMs.orElseGet(() -> -1L), totalDeleted, earliestCommitToRetain, partitionMetadataMap, CLEAN_METADATA_VERSION_2);
partitionMetadataBuilder.build(), CLEAN_METADATA_VERSION_2);
} }
/** /**

View File

@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.util;
import org.apache.hudi.common.util.collection.Pair;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class CollectionUtils {
/**
* Determines whether two iterators contain equal elements in the same order. More specifically,
* this method returns {@code true} if {@code iterator1} and {@code iterator2} contain the same
* number of elements and every element of {@code iterator1} is equal to the corresponding element
* of {@code iterator2}.
*
* <p>Note that this will modify the supplied iterators, since they will have been advanced some
* number of elements forward.
*/
public static boolean elementsEqual(Iterator<?> iterator1, Iterator<?> iterator2) {
while (iterator1.hasNext()) {
if (!iterator2.hasNext()) {
return false;
}
Object o1 = iterator1.next();
Object o2 = iterator2.next();
if (!Objects.equals(o1, o2)) {
return false;
}
}
return !iterator2.hasNext();
}
@SafeVarargs
public static <T> Set<T> createSet(final T... elements) {
return Stream.of(elements).collect(Collectors.toSet());
}
public static <K,V> Map<K, V> createImmutableMap(final K key, final V value) {
return Collections.unmodifiableMap(Collections.singletonMap(key, value));
}
@SafeVarargs
public static <T> List<T> createImmutableList(final T... elements) {
return Collections.unmodifiableList(Stream.of(elements).collect(Collectors.toList()));
}
public static <K,V> Map<K,V> createImmutableMap(final Map<K,V> map) {
return Collections.unmodifiableMap(map);
}
@SafeVarargs
public static <K,V> Map<K,V> createImmutableMap(final Pair<K,V>... elements) {
Map<K,V> map = new HashMap<>();
for (Pair<K,V> pair: elements) {
map.put(pair.getLeft(), pair.getRight());
}
return Collections.unmodifiableMap(map);
}
@SafeVarargs
public static <T> Set<T> createImmutableSet(final T... elements) {
return Collections.unmodifiableSet(createSet(elements));
}
public static <T> Set<T> createImmutableSet(final Set<T> set) {
return Collections.unmodifiableSet(set);
}
public static <T> List<T> createImmutableList(final List<T> list) {
return Collections.unmodifiableList(list);
}
private static Object[] checkElementsNotNull(Object... array) {
return checkElementsNotNull(array, array.length);
}
private static Object[] checkElementsNotNull(Object[] array, int length) {
for (int i = 0; i < length; i++) {
checkElementNotNull(array[i], i);
}
return array;
}
private static Object checkElementNotNull(Object element, int index) {
return Objects.requireNonNull(element, "Element is null at index " + index);
}
}

View File

@@ -46,8 +46,8 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function; import java.util.function.Function;
import java.util.regex.Matcher; import java.util.regex.Matcher;
@@ -213,7 +213,7 @@ public class FSUtils {
* @param basePathStr Base-Path * @param basePathStr Base-Path
* @param consumer Callback for processing * @param consumer Callback for processing
* @param excludeMetaFolder Exclude .hoodie folder * @param excludeMetaFolder Exclude .hoodie folder
* @throws IOException * @throws IOException -
*/ */
static void processFiles(FileSystem fs, String basePathStr, Function<FileStatus, Boolean> consumer, static void processFiles(FileSystem fs, String basePathStr, Function<FileStatus, Boolean> consumer,
boolean excludeMetaFolder) throws IOException { boolean excludeMetaFolder) throws IOException {

View File

@@ -18,7 +18,10 @@
package org.apache.hudi.common.util; package org.apache.hudi.common.util;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
@@ -91,4 +94,26 @@ public class FileIOUtils {
out.flush(); out.flush();
out.close(); out.close();
} }
/**
* Closes a {@link Closeable}, with control over whether an {@code IOException} may be thrown.
* @param closeable the {@code Closeable} object to be closed, or null,
* in which case this method does nothing.
* @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code close} methods.
*
* @throws IOException if {@code swallowIOException} is false and {@code close} throws an {@code IOException}.
*/
public static void close(@Nullable Closeable closeable, boolean swallowIOException)
throws IOException {
if (closeable == null) {
return;
}
try {
closeable.close();
} catch (IOException e) {
if (!swallowIOException) {
throw e;
}
}
}
} }

View File

@@ -21,14 +21,21 @@ package org.apache.hudi.common.util;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import com.google.common.reflect.ClassPath; import org.apache.log4j.LogManager;
import com.google.common.reflect.ClassPath.ClassInfo; import org.apache.log4j.Logger;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
@@ -36,6 +43,8 @@ import java.util.stream.Stream;
*/ */
public class ReflectionUtils { public class ReflectionUtils {
private static final Logger LOG = LogManager.getLogger(ReflectionUtils.class);
private static Map<String, Class<?>> clazzCache = new HashMap<>(); private static Map<String, Class<?>> clazzCache = new HashMap<>();
private static Class<?> getClass(String clazzName) { private static Class<?> getClass(String clazzName) {
@@ -90,16 +99,58 @@ public class ReflectionUtils {
} }
/** /**
* Return stream of top level class names in the same class path as passed-in class. * Scans all classes accessible from the context class loader
* * which belong to the given package and subpackages.
* @param clazz *
* @param clazz class
* @return Stream of Class names in package
*/ */
public static Stream<String> getTopLevelClassesInClasspath(Class clazz) { public static Stream<String> getTopLevelClassesInClasspath(Class<?> clazz) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
String packageName = clazz.getPackage().getName();
String path = packageName.replace('.', '/');
Enumeration<URL> resources = null;
try { try {
ClassPath classPath = ClassPath.from(clazz.getClassLoader()); resources = classLoader.getResources(path);
return classPath.getTopLevelClasses().stream().map(ClassInfo::getName);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Got exception while dumping top level classes", e); LOG.error("Unable to fetch Resources in package " + e.getMessage());
} }
List<File> directories = new ArrayList<>();
while (Objects.requireNonNull(resources).hasMoreElements()) {
URL resource = resources.nextElement();
try {
directories.add(new File(resource.toURI()));
} catch (URISyntaxException e) {
LOG.error("Unable to get " + e.getMessage());
}
}
List<String> classes = new ArrayList<>();
for (File directory : directories) {
classes.addAll(findClasses(directory, packageName));
}
return classes.stream();
}
/**
* Recursive method used to find all classes in a given directory and subdirs.
*
* @param directory The base directory
* @param packageName The package name for classes found inside the base directory
* @return classes in the package
*/
private static List<String> findClasses(File directory, String packageName) {
List<String> classes = new ArrayList<>();
if (!directory.exists()) {
return classes;
}
File[] files = directory.listFiles();
for (File file : Objects.requireNonNull(files)) {
if (file.isDirectory()) {
classes.addAll(findClasses(file, packageName + "." + file.getName()));
} else if (file.getName().endsWith(".class")) {
classes.add(packageName + '.' + file.getName().substring(0, file.getName().length() - 6));
}
}
return classes;
} }
} }

View File

@@ -20,20 +20,20 @@ package org.apache.hudi.common.minicluster;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.exception.HoodieIOException;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.nio.file.Files;
import java.util.Objects;
/** /**
* An HDFS minicluster service implementation. * An HDFS minicluster service implementation.
@@ -53,8 +53,8 @@ public class HdfsTestService {
*/ */
private MiniDFSCluster miniDfsCluster; private MiniDFSCluster miniDfsCluster;
public HdfsTestService() { public HdfsTestService() throws IOException {
workDir = Files.createTempDir().getAbsolutePath(); workDir = Files.createTempDirectory("temp").getName(0).toString();
} }
public Configuration getHadoopConf() { public Configuration getHadoopConf() {

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.common.minicluster; package org.apache.hudi.common.minicluster;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@@ -35,6 +34,7 @@ import java.io.OutputStream;
import java.io.Reader; import java.io.Reader;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.nio.file.Files;
import java.util.Objects; import java.util.Objects;
/** /**
@@ -75,8 +75,8 @@ public class ZookeeperTestService {
private ZooKeeperServer zooKeeperServer; private ZooKeeperServer zooKeeperServer;
private boolean started = false; private boolean started = false;
public ZookeeperTestService(Configuration config) { public ZookeeperTestService(Configuration config) throws IOException {
this.workDir = Files.createTempDir().getAbsolutePath(); this.workDir = Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath();
this.hadoopConf = config; this.hadoopConf = config;
} }

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.common.model; package org.apache.hudi.common.model;
import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.FileIOUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@@ -34,7 +35,6 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import com.google.common.collect.Sets;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@@ -195,9 +195,9 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
// Test that various types of getXXX operations from HoodieActiveTimeline // Test that various types of getXXX operations from HoodieActiveTimeline
// return the correct set of Instant // return the correct set of Instant
checkTimeline.accept(timeline.getCommitsTimeline(), checkTimeline.accept(timeline.getCommitsTimeline(),
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)); CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION));
checkTimeline.accept(timeline.getCommitsAndCompactionTimeline(), checkTimeline.accept(timeline.getCommitsAndCompactionTimeline(),
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION)); CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION));
checkTimeline.accept(timeline.getCommitTimeline(), Collections.singleton(HoodieTimeline.COMMIT_ACTION)); checkTimeline.accept(timeline.getCommitTimeline(), Collections.singleton(HoodieTimeline.COMMIT_ACTION));
checkTimeline.accept(timeline.getDeltaCommitTimeline(), Collections.singleton(HoodieTimeline.DELTA_COMMIT_ACTION)); checkTimeline.accept(timeline.getDeltaCommitTimeline(), Collections.singleton(HoodieTimeline.DELTA_COMMIT_ACTION));
checkTimeline.accept(timeline.getCleanerTimeline(), Collections.singleton(HoodieTimeline.CLEAN_ACTION)); checkTimeline.accept(timeline.getCleanerTimeline(), Collections.singleton(HoodieTimeline.CLEAN_ACTION));
@@ -205,7 +205,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
checkTimeline.accept(timeline.getRestoreTimeline(), Collections.singleton(HoodieTimeline.RESTORE_ACTION)); checkTimeline.accept(timeline.getRestoreTimeline(), Collections.singleton(HoodieTimeline.RESTORE_ACTION));
checkTimeline.accept(timeline.getSavePointTimeline(), Collections.singleton(HoodieTimeline.SAVEPOINT_ACTION)); checkTimeline.accept(timeline.getSavePointTimeline(), Collections.singleton(HoodieTimeline.SAVEPOINT_ACTION));
checkTimeline.accept(timeline.getAllCommitsTimeline(), checkTimeline.accept(timeline.getAllCommitsTimeline(),
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION,
HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.SAVEPOINT_ACTION, HoodieTimeline.ROLLBACK_ACTION)); HoodieTimeline.SAVEPOINT_ACTION, HoodieTimeline.ROLLBACK_ACTION));
@@ -380,12 +380,12 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
checkFilter.accept(timeline.filter(i -> false), new HashSet<>()); checkFilter.accept(timeline.filter(i -> false), new HashSet<>());
checkFilter.accept(timeline.filterInflights(), Collections.singleton(State.INFLIGHT)); checkFilter.accept(timeline.filterInflights(), Collections.singleton(State.INFLIGHT));
checkFilter.accept(timeline.filterInflightsAndRequested(), checkFilter.accept(timeline.filterInflightsAndRequested(),
Sets.newHashSet(State.INFLIGHT, State.REQUESTED)); CollectionUtils.createSet(State.INFLIGHT, State.REQUESTED));
// filterCompletedAndCompactionInstants // filterCompletedAndCompactionInstants
// This cannot be done using checkFilter as it involves both states and actions // This cannot be done using checkFilter as it involves both states and actions
final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants(); final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
final Set<State> states = Sets.newHashSet(State.REQUESTED, State.COMPLETED); final Set<State> states = CollectionUtils.createSet(State.REQUESTED, State.COMPLETED);
final Set<String> actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION); final Set<String> actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION);
sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction())) sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction()))
.forEach(i -> assertTrue(t1.containsInstant(i))); .forEach(i -> assertTrue(t1.containsInstant(i)));

View File

@@ -27,9 +27,9 @@ import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
@@ -41,6 +41,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
@@ -48,8 +49,6 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -135,8 +134,11 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
// Clean first slice // Clean first slice
testCleans(view, Collections.singletonList("21"), testCleans(view, Collections.singletonList("21"),
new ImmutableMap.Builder<String, List<String>>().put("11", Arrays.asList("12", "13", "15")).build(), new HashMap<String, List<String>>() {
instantsToFiles, Collections.singletonList("11")); {
put("11", Arrays.asList("12", "13", "15"));
}
}, instantsToFiles, Collections.singletonList("11"));
// Add one more ingestion instant. This should be 2nd slice now // Add one more ingestion instant. This should be 2nd slice now
instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("22"), true, "19", 2)); instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("22"), true, "19", 2));
@@ -251,7 +253,11 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
* Case where a clean happened and then rounds of ingestion and compaction happened * Case where a clean happened and then rounds of ingestion and compaction happened
*/ */
testCleans(view2, Collections.singletonList("19"), testCleans(view2, Collections.singletonList("19"),
new ImmutableMap.Builder<String, List<String>>().put("11", Arrays.asList("12", "13", "14")).build(), new HashMap<String, List<String>>() {
{
put("11", Arrays.asList("12", "13", "14"));
}
},
instantsToFiles, Collections.singletonList("11")); instantsToFiles, Collections.singletonList("11"));
scheduleCompaction(view2, "20"); scheduleCompaction(view2, "20");
instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("21", "22"), true, "20", 2)); instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("21", "22"), true, "20", 2));
@@ -439,7 +445,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
List<HoodieRollbackMetadata> rollbackM = new ArrayList<>(); List<HoodieRollbackMetadata> rollbackM = new ArrayList<>();
rollbackM.add(rollbackMetadata); rollbackM.add(rollbackMetadata);
metadata.setHoodieRestoreMetadata(new ImmutableMap.Builder().put(rollbackInstant, rollbackM).build()); metadata.setHoodieRestoreMetadata(CollectionUtils.createImmutableMap(rollbackInstant, rollbackM));
List<String> rollbackInstants = new ArrayList<>(); List<String> rollbackInstants = new ArrayList<>();
rollbackInstants.add(rollbackInstant); rollbackInstants.add(rollbackInstant);
metadata.setInstantsToRollback(rollbackInstants); metadata.setInstantsToRollback(rollbackInstants);
@@ -646,7 +652,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
HoodieTimeline timeline1 = view1.getTimeline(); HoodieTimeline timeline1 = view1.getTimeline();
HoodieTimeline timeline2 = view2.getTimeline(); HoodieTimeline timeline2 = view2.getTimeline();
Assert.assertEquals(view1.getLastInstant(), view2.getLastInstant()); Assert.assertEquals(view1.getLastInstant(), view2.getLastInstant());
Iterators.elementsEqual(timeline1.getInstants().iterator(), timeline2.getInstants().iterator()); CollectionUtils.elementsEqual(timeline1.getInstants().iterator(), timeline2.getInstants().iterator());
// View Checks // View Checks
Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap1 = partitions.stream().flatMap(view1::getAllFileGroups) Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap1 = partitions.stream().flatMap(view1::getAllFileGroups)

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.common.table.view; package org.apache.hudi.common.table.view;
import junit.framework.TestCase;
import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieBaseFile;
@@ -30,6 +29,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import junit.framework.TestCase;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@@ -45,10 +46,10 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class TestPriorityBasedFileSystemView extends TestCase { public class TestPriorityBasedFileSystemView extends TestCase {

View File

@@ -31,8 +31,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.Assert; import org.junit.Assert;
@@ -82,12 +80,17 @@ public class CompactionTestUtils {
createDeltaCommit(metaClient, "004"); createDeltaCommit(metaClient, "004");
createDeltaCommit(metaClient, "006"); createDeltaCommit(metaClient, "006");
Map<String, String> baseInstantsToCompaction = new ImmutableMap.Builder<String, String>().put("000", "001") Map<String, String> baseInstantsToCompaction = new HashMap<String, String>() {
.put("002", "003").put("004", "005").put("006", "007").build(); {
put("000", "001");
put("002", "003");
put("004", "005");
put("006", "007");
}
};
List<Integer> expectedNumEntries = List<Integer> expectedNumEntries =
Arrays.asList(numEntriesInPlan1, numEntriesInPlan2, numEntriesInPlan3, numEntriesInPlan4); Arrays.asList(numEntriesInPlan1, numEntriesInPlan2, numEntriesInPlan3, numEntriesInPlan4);
List<HoodieCompactionPlan> plans = List<HoodieCompactionPlan> plans = CollectionUtils.createImmutableList(plan1, plan2, plan3, plan4);
new ImmutableList.Builder<HoodieCompactionPlan>().add(plan1, plan2, plan3, plan4).build();
IntStream.range(0, 4).boxed().forEach(idx -> { IntStream.range(0, 4).boxed().forEach(idx -> {
if (expectedNumEntries.get(idx) > 0) { if (expectedNumEntries.get(idx) > 0) {
Assert.assertEquals("check if plan " + idx + " has exp entries", expectedNumEntries.get(idx).longValue(), Assert.assertEquals("check if plan " + idx + " has exp entries", expectedNumEntries.get(idx).longValue(),

View File

@@ -31,7 +31,6 @@ import org.apache.hudi.common.util.CompactionTestUtils.TestHoodieBaseFile;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.versioning.compaction.CompactionPlanMigrator; import org.apache.hudi.common.versioning.compaction.CompactionPlanMigrator;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@@ -39,6 +38,7 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
@@ -59,8 +59,12 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
private static String TEST_WRITE_TOKEN = "1-0-1"; private static String TEST_WRITE_TOKEN = "1-0-1";
private static final Map<String, Double> METRICS = private static final Map<String, Double> METRICS = new HashMap<String, Double>() {
new ImmutableMap.Builder<String, Double>().put("key1", 1.0).put("key2", 3.0).build(); {
put("key1", 1.0);
put("key2", 3.0);
}
};
private Function<Pair<String, FileSlice>, Map<String, Double>> metricsCaptureFn = (partitionFileSlice) -> METRICS; private Function<Pair<String, FileSlice>, Map<String, Double>> metricsCaptureFn = (partitionFileSlice) -> METRICS;
@Before @Before

View File

@@ -23,8 +23,8 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
@@ -33,7 +34,6 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@@ -135,7 +135,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
// Get the maxCommit from the last delta or compaction or commit - when // Get the maxCommit from the last delta or compaction or commit - when
// bootstrapped from COW table // bootstrapped from COW table
String maxCommitTime = metaClient String maxCommitTime = metaClient
.getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp(); .filterCompletedInstants().lastInstant().get().getTimestamp();
rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime)); rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.hive.util;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.FileIOUtils;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
@@ -51,6 +50,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.nio.file.Files;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@@ -78,8 +78,8 @@ public class HiveTestService {
private TServer tServer; private TServer tServer;
private HiveServer2 hiveServer; private HiveServer2 hiveServer;
public HiveTestService(Configuration configuration) { public HiveTestService(Configuration configuration) throws IOException {
this.workDir = Files.createTempDir().getAbsolutePath(); this.workDir = Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath();
} }
public Configuration getHadoopConf() { public Configuration getHadoopConf() {
@@ -139,7 +139,8 @@ public class HiveTestService {
File derbyLogFile = new File(localHiveDir, "derby.log"); File derbyLogFile = new File(localHiveDir, "derby.log");
derbyLogFile.createNewFile(); derbyLogFile.createNewFile();
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath()); setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, Files.createTempDir().getAbsolutePath()); conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath());
conf.set("datanucleus.schema.autoCreateTables", "true"); conf.set("datanucleus.schema.autoCreateTables", "true");
conf.set("hive.metastore.schema.verification", "false"); conf.set("hive.metastore.schema.verification", "false");
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath()); setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());

View File

@@ -18,11 +18,12 @@
package org.apache.hudi.integ; package org.apache.hudi.integ;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import com.google.common.collect.ImmutableList;
import org.junit.Test; import org.junit.Test;
import java.util.Collections;
import java.util.List; import java.util.List;
/** /**
@@ -97,16 +98,15 @@ public class ITTestHoodieDemo extends ITTestBase {
} }
private void setupDemo() throws Exception { private void setupDemo() throws Exception {
List<String> cmds = new ImmutableList.Builder<String>() List<String> cmds = CollectionUtils.createImmutableList("hdfs dfsadmin -safemode wait",
.add("hdfs dfsadmin -safemode wait") // handle NN going into safe mode at times "hdfs dfs -mkdir -p " + HDFS_DATA_DIR,
.add("hdfs dfs -mkdir -p " + HDFS_DATA_DIR) "hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " + HDFS_BATCH_PATH1,
.add("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " + HDFS_BATCH_PATH1) "/bin/bash " + DEMO_CONTAINER_SCRIPT);
.add("/bin/bash " + DEMO_CONTAINER_SCRIPT).build();
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds); executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
// create input dir in presto coordinator // create input dir in presto coordinator
cmds = new ImmutableList.Builder<String>() cmds = Collections.singletonList("mkdir -p " + HDFS_DATA_DIR);
.add("mkdir -p " + HDFS_DATA_DIR).build();
executeCommandStringsInDocker(PRESTO_COORDINATOR, cmds); executeCommandStringsInDocker(PRESTO_COORDINATOR, cmds);
// copy presto sql files to presto coordinator // copy presto sql files to presto coordinator
@@ -116,22 +116,21 @@ public class ITTestHoodieDemo extends ITTestBase {
} }
private void ingestFirstBatchAndHiveSync() throws Exception { private void ingestFirstBatchAndHiveSync() throws Exception {
List<String> cmds = new ImmutableList.Builder<String>() List<String> cmds = CollectionUtils.createImmutableList(
.add("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE "spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
+ " --table-type COPY_ON_WRITE " + " --table-type COPY_ON_WRITE "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME + " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties " + " --props /var/demo/config/dfs-source.properties "
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME)) + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME),
.add("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE ("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
+ " --table-type MERGE_ON_READ " + " --table-type MERGE_ON_READ "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME + " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties " + " --props /var/demo/config/dfs-source.properties "
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME)) + " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME)));
.build();
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds); executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
} }
@@ -168,23 +167,22 @@ public class ITTestHoodieDemo extends ITTestBase {
} }
private void ingestSecondBatchAndHiveSync() throws Exception { private void ingestSecondBatchAndHiveSync() throws Exception {
List<String> cmds = new ImmutableList.Builder<String>() List<String> cmds = CollectionUtils.createImmutableList(
.add("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH2 + " " + HDFS_BATCH_PATH2) ("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH2 + " " + HDFS_BATCH_PATH2),
.add("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE ("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
+ " --table-type COPY_ON_WRITE " + " --table-type COPY_ON_WRITE "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME + " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties " + " --props /var/demo/config/dfs-source.properties "
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME)) + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME)),
.add("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE ("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
+ " --table-type MERGE_ON_READ " + " --table-type MERGE_ON_READ "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts " + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME + " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties " + " --props /var/demo/config/dfs-source.properties "
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider " + " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME)) + " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME)));
.build();
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds); executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
} }

View File

@@ -23,8 +23,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CollectionUtils;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import java.util.List; import java.util.List;
@@ -68,7 +68,8 @@ public class HoodieDataSourceHelpers {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) { if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
return metaClient.getActiveTimeline().getTimelineOfActions( return metaClient.getActiveTimeline().getTimelineOfActions(
Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION)); CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
HoodieActiveTimeline.DELTA_COMMIT_ACTION));
} else { } else {
return metaClient.getCommitTimeline().filterCompletedInstants(); return metaClient.getCommitTimeline().filterCompletedInstants();
} }

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ReflectionUtils;
@@ -38,7 +39,6 @@ import com.beust.jcommander.IValueValidator;
import com.beust.jcommander.JCommander; import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException; import com.beust.jcommander.ParameterException;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
@@ -83,7 +83,7 @@ public class HoodieSnapshotExporter {
public static class OutputFormatValidator implements IValueValidator<String> { public static class OutputFormatValidator implements IValueValidator<String> {
static final String HUDI = "hudi"; static final String HUDI = "hudi";
static final List<String> FORMATS = ImmutableList.of("json", "parquet", HUDI); static final List<String> FORMATS = CollectionUtils.createImmutableList("json", "parquet", HUDI);
@Override @Override
public void validate(String name, String value) { public void validate(String name, String value) {

View File

@@ -135,7 +135,7 @@ public class HoodieIncrSource extends RowSource {
* instantEndpts.getValue()); if (!partitionFields.isEmpty()) { // _hoodie_partition_path String hoodiePartitionPath * instantEndpts.getValue()); if (!partitionFields.isEmpty()) { // _hoodie_partition_path String hoodiePartitionPath
* = row.getString(3); List<Object> partitionVals = * = row.getString(3); List<Object> partitionVals =
* extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream() .map(o -> (Object) * extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream() .map(o -> (Object)
* o).collect(Collectors.toList()); Preconditions.checkArgument(partitionVals.size() == partitionFields.size(), * o).collect(Collectors.toList()); ValidationUtils.checkArgument(partitionVals.size() == partitionFields.size(),
* "#partition-fields != #partition-values-extracted"); List<Object> rowObjs = new * "#partition-fields != #partition-values-extracted"); List<Object> rowObjs = new
* ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); rowObjs.addAll(partitionVals); return * ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); rowObjs.addAll(partitionVals); return
* RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema)); * RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema));

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
@@ -41,7 +42,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema; import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder; import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
import com.google.common.collect.ImmutableList;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@@ -146,7 +146,7 @@ public class UtilitiesTestBase {
hiveSyncConfig.basePath = basePath; hiveSyncConfig.basePath = basePath;
hiveSyncConfig.assumeDatePartitioning = false; hiveSyncConfig.assumeDatePartitioning = false;
hiveSyncConfig.usePreApacheInputFormat = false; hiveSyncConfig.usePreApacheInputFormat = false;
hiveSyncConfig.partitionFields = new ImmutableList.Builder<String>().add("datestr").build(); hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList("datestr");
return hiveSyncConfig; return hiveSyncConfig;
} }

View File

@@ -534,13 +534,6 @@
<version>${joda.version}</version> <version>${joda.version}</version>
</dependency> </dependency>
<!-- we have to stay at <= 16.0, due to issues with HBase client -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency> <dependency>
<groupId>xerces</groupId> <groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId> <artifactId>xercesImpl</artifactId>

View File

@@ -264,8 +264,8 @@
</module> </module>
<module name="CommentsIndentation"/> <module name="CommentsIndentation"/>
<module name="IllegalImport"> <module name="IllegalImport">
<property name="illegalPkgs" value="org.apache.commons" /> <property name="illegalPkgs" value="org.apache.commons, com.google.common" />
<property name="illegalClasses" value="java.util.Optional, com.google.common.base.Optional" /> <property name="illegalClasses" value="java.util.Optional" />
</module> </module>
<module name="EmptyStatement" /> <module name="EmptyStatement" />