[MINOR] Code Cleanup, remove redundant code (#1337)
This commit is contained in:
@@ -52,6 +52,7 @@ import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@@ -407,11 +408,11 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
|
||||
// We create three parquet file, each having one record. (two different partitions)
|
||||
String filename1 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, null, true);
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, null, true);
|
||||
String filename2 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record2), schema, null, true);
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), schema, null, true);
|
||||
String filename3 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true);
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), schema, null, true);
|
||||
|
||||
// We do the tag again
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
@@ -431,7 +432,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
assertEquals(FSUtils.getFileId(filename2), record._2.get().getRight());
|
||||
}
|
||||
} else if (record._1.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) {
|
||||
assertTrue(!record._2.isPresent());
|
||||
assertFalse(record._2.isPresent());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -456,7 +457,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
BloomFilterTypeCode.SIMPLE.name());
|
||||
filter.add(record2.getRecordKey());
|
||||
String filename =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, filter, true);
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, filter, true);
|
||||
assertTrue(filter.mightContain(record1.getRecordKey()));
|
||||
assertTrue(filter.mightContain(record2.getRecordKey()));
|
||||
|
||||
@@ -472,7 +473,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
// Check results
|
||||
for (HoodieRecord record : taggedRecordRDD.collect()) {
|
||||
if (record.getKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) {
|
||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename)));
|
||||
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename));
|
||||
} else if (record.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
|
||||
assertFalse(record.isCurrentLocationKnown());
|
||||
}
|
||||
|
||||
@@ -62,7 +62,6 @@ import static org.junit.Assert.fail;
|
||||
|
||||
public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
|
||||
private String schemaStr;
|
||||
private Schema schema;
|
||||
|
||||
public TestHoodieGlobalBloomIndex() {
|
||||
@@ -73,7 +72,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
initSparkContexts("TestHoodieGlobalBloomIndex");
|
||||
initPath();
|
||||
// We have some records to be tagged (two different partitions)
|
||||
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
|
||||
String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
|
||||
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
@@ -30,7 +30,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.After;
|
||||
@@ -39,6 +38,7 @@ import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
@@ -376,7 +376,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkArchiveCommitTimeline() throws IOException, InterruptedException {
|
||||
public void checkArchiveCommitTimeline() throws IOException {
|
||||
HoodieWriteConfig cfg =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2).forTable("test-trip-table")
|
||||
@@ -403,12 +403,12 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
|
||||
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
|
||||
List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);
|
||||
assertEquals(new HashSet(archivedInstants), archivedTimeline.getInstants().collect(Collectors.toSet()));
|
||||
assertEquals(new HashSet<>(archivedInstants), archivedTimeline.getInstants().collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) {
|
||||
HoodieTimeline timeline = metaClient.getActiveTimeline().reload()
|
||||
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterInflights();
|
||||
.getTimelineOfActions(Collections.singleton(HoodieTimeline.CLEAN_ACTION)).filterInflights();
|
||||
assertEquals("Loaded inflight clean actions and the count should match", expectedTotalInstants,
|
||||
timeline.countInstants());
|
||||
}
|
||||
|
||||
@@ -33,14 +33,15 @@ import org.apache.hudi.io.compact.strategy.UnBoundedCompactionStrategy;
|
||||
import org.apache.hudi.io.compact.strategy.UnBoundedPartitionAwareCompactionStrategy;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
@@ -57,11 +58,11 @@ public class TestHoodieCompactionStrategy {
|
||||
|
||||
@Test
|
||||
public void testUnBounded() {
|
||||
Map<Long, List<Long>> sizesMap = Maps.newHashMap();
|
||||
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, Lists.newArrayList());
|
||||
sizesMap.put(100 * MB, Lists.newArrayList(MB));
|
||||
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||
Map<Long, List<Long>> sizesMap = new HashMap<>();
|
||||
sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, new ArrayList<>());
|
||||
sizesMap.put(100 * MB, Collections.singletonList(MB));
|
||||
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
|
||||
UnBoundedCompactionStrategy strategy = new UnBoundedCompactionStrategy();
|
||||
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build();
|
||||
@@ -72,11 +73,11 @@ public class TestHoodieCompactionStrategy {
|
||||
|
||||
@Test
|
||||
public void testBoundedIOSimple() {
|
||||
Map<Long, List<Long>> sizesMap = Maps.newHashMap();
|
||||
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, Lists.newArrayList());
|
||||
sizesMap.put(100 * MB, Lists.newArrayList(MB));
|
||||
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||
Map<Long, List<Long>> sizesMap = new HashMap<>();
|
||||
sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, new ArrayList<>());
|
||||
sizesMap.put(100 * MB, Collections.singletonList(MB));
|
||||
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
|
||||
BoundedIOCompactionStrategy strategy = new BoundedIOCompactionStrategy();
|
||||
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
|
||||
@@ -95,11 +96,11 @@ public class TestHoodieCompactionStrategy {
|
||||
|
||||
@Test
|
||||
public void testLogFileSizeCompactionSimple() {
|
||||
Map<Long, List<Long>> sizesMap = Maps.newHashMap();
|
||||
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, Lists.newArrayList());
|
||||
sizesMap.put(100 * MB, Lists.newArrayList(MB));
|
||||
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||
Map<Long, List<Long>> sizesMap = new HashMap<>();
|
||||
sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, new ArrayList<>());
|
||||
sizesMap.put(100 * MB, Collections.singletonList(MB));
|
||||
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
|
||||
LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy();
|
||||
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
|
||||
@@ -119,11 +120,11 @@ public class TestHoodieCompactionStrategy {
|
||||
|
||||
@Test
|
||||
public void testDayBasedCompactionSimple() {
|
||||
Map<Long, List<Long>> sizesMap = Maps.newHashMap();
|
||||
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, Lists.newArrayList());
|
||||
sizesMap.put(100 * MB, Lists.newArrayList(MB));
|
||||
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||
Map<Long, List<Long>> sizesMap = new HashMap<>();
|
||||
sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, new ArrayList<>());
|
||||
sizesMap.put(100 * MB, Collections.singletonList(MB));
|
||||
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
|
||||
|
||||
Map<Long, String> keyToPartitionMap = new ImmutableMap.Builder().put(120 * MB, partitionPaths[2])
|
||||
.put(110 * MB, partitionPaths[2]).put(100 * MB, partitionPaths[1]).put(90 * MB, partitionPaths[0]).build();
|
||||
@@ -147,13 +148,13 @@ public class TestHoodieCompactionStrategy {
|
||||
|
||||
@Test
|
||||
public void testBoundedPartitionAwareCompactionSimple() {
|
||||
Map<Long, List<Long>> sizesMap = Maps.newHashMap();
|
||||
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, Lists.newArrayList());
|
||||
sizesMap.put(100 * MB, Lists.newArrayList(MB));
|
||||
sizesMap.put(70 * MB, Lists.newArrayList(MB));
|
||||
sizesMap.put(80 * MB, Lists.newArrayList(MB));
|
||||
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||
Map<Long, List<Long>> sizesMap = new HashMap<>();
|
||||
sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, new ArrayList<>());
|
||||
sizesMap.put(100 * MB, Collections.singletonList(MB));
|
||||
sizesMap.put(70 * MB, Collections.singletonList(MB));
|
||||
sizesMap.put(80 * MB, Collections.singletonList(MB));
|
||||
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
|
||||
|
||||
SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd");
|
||||
Date today = new Date();
|
||||
@@ -189,13 +190,13 @@ public class TestHoodieCompactionStrategy {
|
||||
|
||||
@Test
|
||||
public void testUnboundedPartitionAwareCompactionSimple() {
|
||||
Map<Long, List<Long>> sizesMap = Maps.newHashMap();
|
||||
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, Lists.newArrayList());
|
||||
sizesMap.put(100 * MB, Lists.newArrayList(MB));
|
||||
sizesMap.put(80 * MB, Lists.newArrayList(MB));
|
||||
sizesMap.put(70 * MB, Lists.newArrayList(MB));
|
||||
sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
|
||||
Map<Long, List<Long>> sizesMap = new HashMap<>();
|
||||
sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB));
|
||||
sizesMap.put(110 * MB, new ArrayList<>());
|
||||
sizesMap.put(100 * MB, Collections.singletonList(MB));
|
||||
sizesMap.put(80 * MB, Collections.singletonList(MB));
|
||||
sizesMap.put(70 * MB, Collections.singletonList(MB));
|
||||
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
|
||||
|
||||
SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd");
|
||||
Date today = new Date();
|
||||
|
||||
@@ -24,7 +24,6 @@ import org.apache.hudi.common.util.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
@@ -220,7 +219,6 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
|
||||
* HoodieLogFormat V1.
|
||||
*/
|
||||
@Deprecated
|
||||
@VisibleForTesting
|
||||
public HoodieAvroDataBlock(List<IndexedRecord> records, Schema schema) {
|
||||
super(new HashMap<>(), new HashMap<>(), Option.empty(), Option.empty(), null, false);
|
||||
this.records = records;
|
||||
@@ -264,7 +262,6 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@VisibleForTesting
|
||||
public byte[] getBytes(Schema schema) throws IOException {
|
||||
|
||||
GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema);
|
||||
|
||||
@@ -25,7 +25,6 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
@@ -35,9 +34,9 @@ import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* Abstract class defining a block in HoodieLogFile.
|
||||
*/
|
||||
@@ -192,7 +191,7 @@ public abstract class HoodieLogBlock {
|
||||
*/
|
||||
public static Map<HeaderMetadataType, String> getLogMetadata(DataInputStream dis) throws IOException {
|
||||
|
||||
Map<HeaderMetadataType, String> metadata = Maps.newHashMap();
|
||||
Map<HeaderMetadataType, String> metadata = new HashMap<>();
|
||||
// 1. Read the metadata written out
|
||||
int metadataCount = dis.readInt();
|
||||
try {
|
||||
@@ -231,7 +230,7 @@ public abstract class HoodieLogBlock {
|
||||
/**
|
||||
* When lazyReading of blocks is turned on, inflate the content of a log block from disk.
|
||||
*/
|
||||
protected void inflate() throws IOException {
|
||||
protected void inflate() throws HoodieIOException {
|
||||
|
||||
try {
|
||||
content = Option.of(new byte[(int) this.getBlockContentLocation().get().getBlockSize()]);
|
||||
@@ -239,13 +238,9 @@ public abstract class HoodieLogBlock {
|
||||
inputStream.readFully(content.get(), 0, content.get().length);
|
||||
safeSeek(inputStream, this.getBlockContentLocation().get().getBlockEndPos());
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
// TODO : fs.open() and return inputstream again, need to pass FS configuration
|
||||
// because the inputstream might close/timeout for large number of log blocks to be merged
|
||||
inflate();
|
||||
} catch (IOException io) {
|
||||
throw new HoodieIOException("unable to lazily read log block from disk", io);
|
||||
}
|
||||
// TODO : fs.open() and return inputstream again, need to pass FS configuration
|
||||
// because the inputstream might close/timeout for large number of log blocks to be merged
|
||||
inflate();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -262,7 +257,7 @@ public abstract class HoodieLogBlock {
|
||||
*
|
||||
* @param inputStream Input Stream
|
||||
* @param pos Position to seek
|
||||
* @throws IOException
|
||||
* @throws IOException -
|
||||
*/
|
||||
private static void safeSeek(FSDataInputStream inputStream, long pos) throws IOException {
|
||||
try {
|
||||
|
||||
@@ -31,6 +31,7 @@ import org.apache.log4j.Logger;
|
||||
import java.io.Serializable;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
@@ -95,9 +96,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
|
||||
@Override
|
||||
public HoodieTimeline filterPendingExcludingCompaction() {
|
||||
return new HoodieDefaultTimeline(instants.stream().filter(instant -> {
|
||||
return (!instant.isCompleted()) && (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
}), details);
|
||||
return new HoodieDefaultTimeline(instants.stream().filter(instant -> (!instant.isCompleted())
|
||||
&& (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION))), details);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -107,9 +107,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
|
||||
@Override
|
||||
public HoodieTimeline filterCompletedAndCompactionInstants() {
|
||||
return new HoodieDefaultTimeline(instants.stream().filter(s -> {
|
||||
return !s.isInflight() || s.getAction().equals(HoodieTimeline.COMPACTION_ACTION);
|
||||
}), details);
|
||||
return new HoodieDefaultTimeline(instants.stream().filter(s -> !s.isInflight()
|
||||
|| s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), details);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -127,8 +126,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
@Override
|
||||
public HoodieDefaultTimeline findInstantsInRange(String startTs, String endTs) {
|
||||
return new HoodieDefaultTimeline(
|
||||
instants.stream().filter(s -> HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)),
|
||||
details);
|
||||
instants.stream().filter(s -> HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -163,7 +161,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
* Get only pure commits (inflight and completed) in the active timeline.
|
||||
*/
|
||||
public HoodieTimeline getCommitTimeline() {
|
||||
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION));
|
||||
return getTimelineOfActions(Collections.singleton(COMMIT_ACTION));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -188,7 +188,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
/*
|
||||
* This is overridden to incrementally apply file-slices to rocks DB
|
||||
*/
|
||||
protected void applyDeltaFileSlicesToPartitionView(String partition, List<HoodieFileGroup> deltaFileGroups,
|
||||
|
||||
@@ -47,8 +47,6 @@ import com.esotericsoftware.kryo.Kryo;
|
||||
import com.esotericsoftware.kryo.io.Input;
|
||||
import com.esotericsoftware.kryo.io.Output;
|
||||
import com.esotericsoftware.kryo.serializers.JavaSerializer;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
@@ -351,7 +349,7 @@ public class HoodieTestUtils {
|
||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
|
||||
.overBaseCommit(location.getInstantTime()).withFs(fs).build();
|
||||
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
logWriter.appendBlock(new HoodieAvroDataBlock(value.stream().map(r -> {
|
||||
@@ -372,7 +370,7 @@ public class HoodieTestUtils {
|
||||
|
||||
public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath) throws IOException {
|
||||
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(new Path(basePath), true);
|
||||
List<FileStatus> returns = Lists.newArrayList();
|
||||
List<FileStatus> returns = new ArrayList<>();
|
||||
while (itr.hasNext()) {
|
||||
LocatedFileStatus status = itr.next();
|
||||
if (status.getPath().getName().contains(".parquet")) {
|
||||
|
||||
@@ -39,7 +39,6 @@ import org.apache.hudi.common.util.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.util.SchemaTestUtil;
|
||||
import org.apache.hudi.exception.CorruptedLogFileException;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
@@ -58,10 +57,11 @@ import org.junit.runners.Parameterized;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -139,7 +139,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
|
||||
@@ -157,7 +157,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
|
||||
@@ -223,7 +223,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
HoodieLogFile logFile1 = writer.getLogFile();
|
||||
HoodieLogFile logFile2 = writer2.getLogFile();
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
|
||||
@@ -241,7 +241,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
|
||||
@@ -321,7 +321,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
|
||||
// Some data & append two times.
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
|
||||
@@ -346,7 +346,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords = records.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
|
||||
@@ -374,7 +374,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
Schema schema = getSimpleSchema();
|
||||
List<IndexedRecord> copyOfRecords1 = records1.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
|
||||
@@ -438,7 +438,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||
.withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||
|
||||
@@ -478,7 +478,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
|
||||
@@ -564,7 +564,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> copyOfRecords1 = records1.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
|
||||
@@ -609,7 +609,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords1 = records1.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
@@ -670,7 +670,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords1 = records1.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
|
||||
@@ -749,7 +749,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords1 = records1.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
|
||||
@@ -834,7 +834,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords1 = records1.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
|
||||
|
||||
@@ -901,7 +901,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords1 = records1.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
@@ -943,7 +943,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
|
||||
// Write 1
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
|
||||
@@ -983,7 +983,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords1 = records1.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
@@ -1030,7 +1030,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
|
||||
// Write 1
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
|
||||
@@ -1131,7 +1131,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
|
||||
.overBaseCommit("100").withFs(fs).build();
|
||||
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records.subList(0, numRecordsInLog1), header);
|
||||
@@ -1145,7 +1145,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
|
||||
.overBaseCommit("100").withFs(fs).withSizeThreshold(size - 1).build();
|
||||
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header2 = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header2 = new HashMap<>();
|
||||
header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock2 = new HoodieAvroDataBlock(records2.subList(0, numRecordsInLog2), header2);
|
||||
@@ -1204,7 +1204,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords1 = records1.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
|
||||
@@ -1271,7 +1271,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||
Schema schema = getSimpleSchema();
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
|
||||
@@ -1331,7 +1331,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords1 = records1.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
|
||||
|
||||
@@ -28,16 +28,17 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
@@ -177,11 +178,11 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
@Test
|
||||
public void testTimelineGetOperations() {
|
||||
List<HoodieInstant> allInstants = getAllInstants();
|
||||
Supplier<Stream<HoodieInstant>> sup = () -> allInstants.stream();
|
||||
Supplier<Stream<HoodieInstant>> sup = allInstants::stream;
|
||||
timeline = new HoodieActiveTimeline(metaClient, true);
|
||||
timeline.setInstants(allInstants);
|
||||
|
||||
/**
|
||||
/*
|
||||
* Helper function to check HoodieTimeline only contains some type of Instant actions.
|
||||
* @param timeline The HoodieTimeline to check
|
||||
* @param actions The actions that should be present in the timeline being checked
|
||||
@@ -197,13 +198,13 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION));
|
||||
checkTimeline.accept(timeline.getCommitsAndCompactionTimeline(),
|
||||
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION));
|
||||
checkTimeline.accept(timeline.getCommitTimeline(), Sets.newHashSet(HoodieTimeline.COMMIT_ACTION));
|
||||
checkTimeline.accept(timeline.getCommitTimeline(), Collections.singleton(HoodieTimeline.COMMIT_ACTION));
|
||||
|
||||
checkTimeline.accept(timeline.getDeltaCommitTimeline(), Sets.newHashSet(HoodieTimeline.DELTA_COMMIT_ACTION));
|
||||
checkTimeline.accept(timeline.getCleanerTimeline(), Sets.newHashSet(HoodieTimeline.CLEAN_ACTION));
|
||||
checkTimeline.accept(timeline.getRollbackTimeline(), Sets.newHashSet(HoodieTimeline.ROLLBACK_ACTION));
|
||||
checkTimeline.accept(timeline.getRestoreTimeline(), Sets.newHashSet(HoodieTimeline.RESTORE_ACTION));
|
||||
checkTimeline.accept(timeline.getSavePointTimeline(), Sets.newHashSet(HoodieTimeline.SAVEPOINT_ACTION));
|
||||
checkTimeline.accept(timeline.getDeltaCommitTimeline(), Collections.singleton(HoodieTimeline.DELTA_COMMIT_ACTION));
|
||||
checkTimeline.accept(timeline.getCleanerTimeline(), Collections.singleton(HoodieTimeline.CLEAN_ACTION));
|
||||
checkTimeline.accept(timeline.getRollbackTimeline(), Collections.singleton(HoodieTimeline.ROLLBACK_ACTION));
|
||||
checkTimeline.accept(timeline.getRestoreTimeline(), Collections.singleton(HoodieTimeline.RESTORE_ACTION));
|
||||
checkTimeline.accept(timeline.getSavePointTimeline(), Collections.singleton(HoodieTimeline.SAVEPOINT_ACTION));
|
||||
checkTimeline.accept(timeline.getAllCommitsTimeline(),
|
||||
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION,
|
||||
HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION,
|
||||
@@ -212,8 +213,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
// Get some random Instants
|
||||
Random rand = new Random();
|
||||
Set<String> randomInstants = sup.get().filter(i -> rand.nextBoolean())
|
||||
.map(i -> i.getAction())
|
||||
.collect(Collectors.toSet());
|
||||
.map(HoodieInstant::getAction).collect(Collectors.toSet());
|
||||
checkTimeline.accept(timeline.getTimelineOfActions(randomInstants), randomInstants);
|
||||
}
|
||||
|
||||
@@ -318,7 +318,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
|
||||
timeline.setInstants(allInstants);
|
||||
timeline.createNewInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "2"));
|
||||
allInstants.stream().map(i -> i.getTimestamp()).forEach(s -> assertTrue(timeline.containsOrBeforeTimelineStarts(s)));
|
||||
allInstants.stream().map(HoodieInstant::getTimestamp).forEach(s -> assertTrue(timeline.containsOrBeforeTimelineStarts(s)));
|
||||
assertTrue(timeline.containsOrBeforeTimelineStarts("0"));
|
||||
assertFalse(timeline.containsOrBeforeTimelineStarts(String.valueOf(System.currentTimeMillis() + 1000)));
|
||||
assertFalse(timeline.getTimelineHash().isEmpty());
|
||||
@@ -356,7 +356,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
@Test
|
||||
public void testFiltering() {
|
||||
List<HoodieInstant> allInstants = getAllInstants();
|
||||
Supplier<Stream<HoodieInstant>> sup = () -> allInstants.stream();
|
||||
Supplier<Stream<HoodieInstant>> sup = allInstants::stream;
|
||||
|
||||
timeline = new HoodieActiveTimeline(metaClient);
|
||||
timeline.setInstants(allInstants);
|
||||
@@ -368,7 +368,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
Collections.reverse(v2);
|
||||
assertEquals(v1, v2);
|
||||
|
||||
/**
|
||||
/*
|
||||
* Helper function to check HoodieTimeline only contains some type of Instant states.
|
||||
* @param timeline The HoodieTimeline to check
|
||||
* @param states The states that should be present in the timeline being checked
|
||||
@@ -378,8 +378,8 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
sup.get().filter(i -> !states.contains(i.getState())).forEach(i -> assertFalse(timeline.containsInstant(i)));
|
||||
};
|
||||
|
||||
checkFilter.accept(timeline.filter(i -> false), Sets.newHashSet());
|
||||
checkFilter.accept(timeline.filterInflights(), Sets.newHashSet(State.INFLIGHT));
|
||||
checkFilter.accept(timeline.filter(i -> false), new HashSet<>());
|
||||
checkFilter.accept(timeline.filterInflights(), Collections.singleton(State.INFLIGHT));
|
||||
checkFilter.accept(timeline.filterInflightsAndRequested(),
|
||||
Sets.newHashSet(State.INFLIGHT, State.REQUESTED));
|
||||
|
||||
@@ -387,7 +387,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
// This cannot be done using checkFilter as it involves both states and actions
|
||||
final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
|
||||
final Set<State> states = Sets.newHashSet(State.REQUESTED, State.COMPLETED);
|
||||
final Set<String> actions = Sets.newHashSet(HoodieTimeline.COMPACTION_ACTION);
|
||||
final Set<String> actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION);
|
||||
sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction()))
|
||||
.forEach(i -> assertTrue(t1.containsInstant(i)));
|
||||
sup.get().filter(i -> !(states.contains(i.getState()) || actions.contains(i.getAction())))
|
||||
@@ -395,9 +395,9 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
|
||||
// filterPendingCompactionTimeline
|
||||
final HoodieTimeline t2 = timeline.filterPendingCompactionTimeline();
|
||||
sup.get().filter(i -> i.getAction() == HoodieTimeline.COMPACTION_ACTION)
|
||||
sup.get().filter(i -> i.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
|
||||
.forEach(i -> assertTrue(t2.containsInstant(i)));
|
||||
sup.get().filter(i -> i.getAction() != HoodieTimeline.COMPACTION_ACTION)
|
||||
sup.get().filter(i -> !i.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
|
||||
.forEach(i -> assertFalse(t2.containsInstant(i)));
|
||||
}
|
||||
|
||||
@@ -407,7 +407,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
*/
|
||||
private List<HoodieInstant> getAllInstants() {
|
||||
timeline = new HoodieActiveTimeline(metaClient);
|
||||
List<HoodieInstant> allInstants = new ArrayList<HoodieInstant>();
|
||||
List<HoodieInstant> allInstants = new ArrayList<>();
|
||||
long commitTime = 1;
|
||||
for (State state : State.values()) {
|
||||
if (state == State.INVALID) {
|
||||
@@ -417,19 +417,19 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
// Following are not valid combinations of actions and state so we should
|
||||
// not be generating them.
|
||||
if (state == State.REQUESTED) {
|
||||
if (action == HoodieTimeline.SAVEPOINT_ACTION || action == HoodieTimeline.RESTORE_ACTION
|
||||
|| action == HoodieTimeline.ROLLBACK_ACTION) {
|
||||
if (action.equals(HoodieTimeline.SAVEPOINT_ACTION) || action.equals(HoodieTimeline.RESTORE_ACTION)
|
||||
|| action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (state == State.INFLIGHT && action == HoodieTimeline.ROLLBACK_ACTION) {
|
||||
if (state == State.INFLIGHT && action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
|
||||
continue;
|
||||
}
|
||||
if (state == State.COMPLETED && action == HoodieTimeline.ROLLBACK_ACTION) {
|
||||
if (state == State.COMPLETED && action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
|
||||
continue;
|
||||
}
|
||||
// Compaction complete is called commit complete
|
||||
if (state == State.COMPLETED && action == HoodieTimeline.COMPACTION_ACTION) {
|
||||
if (state == State.COMPLETED && action.equals(HoodieTimeline.COMPACTION_ACTION)) {
|
||||
action = HoodieTimeline.COMMIT_ACTION;
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,6 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -234,7 +233,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
* @param expTotalFileSlices Total number of file-slices across file-groups in the partition path
|
||||
* @param expTotalDataFiles Total number of data-files across file-groups in the partition path
|
||||
* @param includeInvalidAndInflight Whether view includes inflight and invalid file-groups.
|
||||
* @throws Exception
|
||||
* @throws Exception -
|
||||
*/
|
||||
protected void testViewForFileSlicesWithAsyncCompaction(boolean skipCreatingDataFile, boolean isCompactionInFlight,
|
||||
int expTotalFileSlices, int expTotalDataFiles, boolean includeInvalidAndInflight) throws Exception {
|
||||
@@ -652,7 +651,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
List<HoodieBaseFile> dataFileList =
|
||||
roView.getLatestBaseFilesBeforeOrOn("2016/05/01", commitTime4).collect(Collectors.toList());
|
||||
assertEquals(3, dataFileList.size());
|
||||
Set<String> filenames = Sets.newHashSet();
|
||||
Set<String> filenames = new HashSet<>();
|
||||
for (HoodieBaseFile status : dataFileList) {
|
||||
filenames.add(status.getFileName());
|
||||
}
|
||||
@@ -660,7 +659,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
|
||||
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)));
|
||||
|
||||
filenames = Sets.newHashSet();
|
||||
filenames = new HashSet<>();
|
||||
List<HoodieLogFile> logFilesList = rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4, true)
|
||||
.map(FileSlice::getLogFiles).flatMap(logFileList -> logFileList).collect(Collectors.toList());
|
||||
assertEquals(logFilesList.size(), 4);
|
||||
@@ -679,7 +678,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
// Reset the max commit time
|
||||
List<HoodieBaseFile> dataFiles =
|
||||
roView.getLatestBaseFilesBeforeOrOn("2016/05/01", commitTime3).collect(Collectors.toList());
|
||||
filenames = Sets.newHashSet();
|
||||
filenames = new HashSet<>();
|
||||
for (HoodieBaseFile status : dataFiles) {
|
||||
filenames.add(status.getFileName());
|
||||
}
|
||||
@@ -739,7 +738,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
|
||||
for (HoodieFileGroup fileGroup : fileGroups) {
|
||||
String fileId = fileGroup.getFileGroupId().getFileId();
|
||||
Set<String> filenames = Sets.newHashSet();
|
||||
Set<String> filenames = new HashSet<>();
|
||||
fileGroup.getAllBaseFiles().forEach(dataFile -> {
|
||||
assertEquals("All same fileId should be grouped", fileId, dataFile.getFileId());
|
||||
filenames.add(dataFile.getFileName());
|
||||
@@ -817,7 +816,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
List<HoodieBaseFile> dataFiles =
|
||||
roView.getLatestBaseFilesInRange(Lists.newArrayList(commitTime2, commitTime3)).collect(Collectors.toList());
|
||||
assertEquals(isLatestFileSliceOnly ? 2 : 3, dataFiles.size());
|
||||
Set<String> filenames = Sets.newHashSet();
|
||||
Set<String> filenames = new HashSet<>();
|
||||
for (HoodieBaseFile status : dataFiles) {
|
||||
filenames.add(status.getFileName());
|
||||
}
|
||||
@@ -888,7 +887,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
roView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime2).collect(Collectors.toList());
|
||||
if (!isLatestFileSliceOnly) {
|
||||
assertEquals(2, dataFiles.size());
|
||||
Set<String> filenames = Sets.newHashSet();
|
||||
Set<String> filenames = new HashSet<>();
|
||||
for (HoodieBaseFile status : dataFiles) {
|
||||
filenames.add(status.getFileName());
|
||||
}
|
||||
@@ -983,7 +982,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
|
||||
List<HoodieBaseFile> statuses1 = roView.getLatestBaseFiles().collect(Collectors.toList());
|
||||
assertEquals(3, statuses1.size());
|
||||
Set<String> filenames = Sets.newHashSet();
|
||||
Set<String> filenames = new HashSet<>();
|
||||
for (HoodieBaseFile status : statuses1) {
|
||||
filenames.add(status.getFileName());
|
||||
}
|
||||
@@ -1100,7 +1099,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
assertEquals("One data-file is expected as there is only one file-group", 1, dataFiles.size());
|
||||
assertEquals("Expect only valid commit", "1", dataFiles.get(0).getCommitTime());
|
||||
|
||||
/** Merge API Tests **/
|
||||
// Merge API Tests
|
||||
Arrays.asList(partitionPath1, partitionPath2, partitionPath3).forEach(partitionPath -> {
|
||||
List<FileSlice> fileSliceList =
|
||||
rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5).collect(Collectors.toList());
|
||||
|
||||
@@ -21,7 +21,6 @@ package org.apache.hudi.hadoop.hive;
|
||||
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
||||
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@@ -381,7 +380,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
||||
}
|
||||
|
||||
// Use HiveInputFormat if any of the paths is not splittable
|
||||
Class inputFormatClass = part.getInputFileFormatClass();
|
||||
Class<?> inputFormatClass = part.getInputFileFormatClass();
|
||||
String inputFormatClassName = inputFormatClass.getName();
|
||||
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
|
||||
LOG.info("Input Format => " + inputFormatClass.getName());
|
||||
@@ -484,7 +483,6 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
||||
/**
|
||||
* Gets all the path indices that should not be combined.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public Set<Integer> getNonCombinablePathIndices(JobConf job, Path[] paths, int numThreads)
|
||||
throws ExecutionException, InterruptedException {
|
||||
LOG.info("Total number of paths: " + paths.length + ", launching " + numThreads
|
||||
@@ -719,7 +717,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
||||
CombineHiveInputSplit hsplit = (CombineHiveInputSplit) split;
|
||||
|
||||
String inputFormatClassName = null;
|
||||
Class inputFormatClass;
|
||||
Class<?> inputFormatClass;
|
||||
try {
|
||||
inputFormatClassName = hsplit.inputFormatClassName();
|
||||
inputFormatClass = job.getClassByName(inputFormatClassName);
|
||||
@@ -865,7 +863,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
||||
}
|
||||
}
|
||||
|
||||
return (CombineFileSplit[]) inputSplitShims.toArray(new HadoopShimsSecure.InputSplitShim[inputSplitShims.size()]);
|
||||
return inputSplitShims.toArray(new HadoopShimsSecure.InputSplitShim[inputSplitShims.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -78,8 +78,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
|
||||
this.iterator = this.executor.getQueue().iterator();
|
||||
this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf),
|
||||
split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(),
|
||||
Boolean
|
||||
.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
|
||||
Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
|
||||
false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
|
||||
// convert Hoodie log record to Hadoop AvroWritable and buffer
|
||||
GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get();
|
||||
|
||||
@@ -37,7 +37,6 @@ import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.hadoop.InputFormatTestUtil;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
@@ -69,6 +68,7 @@ import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -109,7 +109,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
.overBaseCommit(baseCommit).withFs(fs).withLogVersion(logVersion).withLogWriteToken("1-0-1")
|
||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
|
||||
// generate metadata
|
||||
Map<HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HeaderMetadataType.INSTANT_TIME, newCommit);
|
||||
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, rolledBackInstant);
|
||||
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
|
||||
@@ -130,7 +130,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0"));
|
||||
}
|
||||
Schema writeSchema = records.get(0).getSchema();
|
||||
Map<HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
|
||||
@@ -144,7 +144,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).overBaseCommit(baseCommit)
|
||||
.withLogVersion(logVersion).withFs(fs).build();
|
||||
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, oldCommit);
|
||||
|
||||
@@ -34,8 +34,6 @@ import org.apache.hudi.exception.InvalidTableException;
|
||||
import org.apache.hudi.hive.util.SchemaUtil;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
@@ -71,7 +69,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@SuppressWarnings("ConstantConditions")
|
||||
public class HoodieHiveClient {
|
||||
|
||||
private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
|
||||
@@ -193,7 +190,7 @@ public class HoodieHiveClient {
|
||||
}
|
||||
|
||||
private List<String> constructChangePartitions(String tableName, List<String> partitions) {
|
||||
List<String> changePartitions = Lists.newArrayList();
|
||||
List<String> changePartitions = new ArrayList<>();
|
||||
// Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first
|
||||
String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + syncConfig.databaseName + HIVE_ESCAPE_CHARACTER;
|
||||
changePartitions.add(useDatabase);
|
||||
@@ -215,7 +212,7 @@ public class HoodieHiveClient {
|
||||
* Generate a list of PartitionEvent based on the changes required.
|
||||
*/
|
||||
List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions) {
|
||||
Map<String, String> paths = Maps.newHashMap();
|
||||
Map<String, String> paths = new HashMap<>();
|
||||
for (Partition tablePartition : tablePartitions) {
|
||||
List<String> hivePartitionValues = tablePartition.getValues();
|
||||
Collections.sort(hivePartitionValues);
|
||||
@@ -224,7 +221,7 @@ public class HoodieHiveClient {
|
||||
paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
|
||||
}
|
||||
|
||||
List<PartitionEvent> events = Lists.newArrayList();
|
||||
List<PartitionEvent> events = new ArrayList<>();
|
||||
for (String storagePartition : partitionStoragePartitions) {
|
||||
Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, storagePartition);
|
||||
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
|
||||
@@ -287,7 +284,7 @@ public class HoodieHiveClient {
|
||||
throw new IllegalArgumentException(
|
||||
"Failed to get schema for table " + tableName + " does not exist");
|
||||
}
|
||||
Map<String, String> schema = Maps.newHashMap();
|
||||
Map<String, String> schema = new HashMap<>();
|
||||
ResultSet result = null;
|
||||
try {
|
||||
DatabaseMetaData databaseMetaData = connection.getMetaData();
|
||||
@@ -417,7 +414,6 @@ public class HoodieHiveClient {
|
||||
/**
|
||||
* Read schema from a data file from the last compaction commit done.
|
||||
*/
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
private MessageType readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws IOException {
|
||||
HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new HoodieHiveSyncException(
|
||||
"Could not read schema from last compaction, no compaction commits found on path " + syncConfig.basePath));
|
||||
@@ -434,7 +430,6 @@ public class HoodieHiveClient {
|
||||
/**
|
||||
* Read the schema from the log file on path.
|
||||
*/
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
private MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path)
|
||||
throws IOException {
|
||||
MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path);
|
||||
@@ -626,7 +621,6 @@ public class HoodieHiveClient {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
|
||||
if (!lastCommitTimeSynced.isPresent()) {
|
||||
LOG.info("Last commit time synced is not known, listing all partitions in " + syncConfig.basePath + ",FS :" + fs);
|
||||
|
||||
@@ -21,10 +21,10 @@ package org.apache.hudi.hive;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -85,9 +85,9 @@ public class SchemaDifference {
|
||||
public Builder(MessageType storageSchema, Map<String, String> tableSchema) {
|
||||
this.storageSchema = storageSchema;
|
||||
this.tableSchema = tableSchema;
|
||||
deleteColumns = Lists.newArrayList();
|
||||
updateColumnTypes = Maps.newHashMap();
|
||||
addColumnTypes = Maps.newHashMap();
|
||||
deleteColumns = new ArrayList<>();
|
||||
updateColumnTypes = new HashMap<>();
|
||||
addColumnTypes = new HashMap<>();
|
||||
}
|
||||
|
||||
public Builder deleteTableColumn(String column) {
|
||||
|
||||
@@ -18,20 +18,19 @@
|
||||
|
||||
package org.apache.hudi.hive.util;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
public class ColumnNameXLator {
|
||||
|
||||
private static Map<String, String> xformMap = Maps.newHashMap();
|
||||
private static Map<String, String> xformMap = new HashMap<>();
|
||||
|
||||
public static String translateNestedColumn(String colName) {
|
||||
Map.Entry entry;
|
||||
for (Iterator ic = xformMap.entrySet().iterator(); ic.hasNext(); colName =
|
||||
colName.replaceAll((String) entry.getKey(), (String) entry.getValue())) {
|
||||
entry = (Map.Entry) ic.next();
|
||||
Map.Entry<String,String> entry;
|
||||
for (Iterator<Map.Entry<String, String>> ic = xformMap.entrySet().iterator(); ic.hasNext(); colName =
|
||||
colName.replaceAll(entry.getKey(), entry.getValue())) {
|
||||
entry = ic.next();
|
||||
}
|
||||
|
||||
return colName;
|
||||
|
||||
Reference in New Issue
Block a user