[HUDI-346] Set allowMultipleEmptyLines to false for EmptyLineSeparator rule (#1025)
This commit is contained in:
@@ -35,7 +35,6 @@ public class HoodieCLI {
|
|||||||
public static HoodieTableMetaClient tableMetadata;
|
public static HoodieTableMetaClient tableMetadata;
|
||||||
public static HoodieTableMetaClient syncTableMetadata;
|
public static HoodieTableMetaClient syncTableMetadata;
|
||||||
|
|
||||||
|
|
||||||
public enum CLIState {
|
public enum CLIState {
|
||||||
INIT, DATASET, SYNC
|
INIT, DATASET, SYNC
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -70,7 +70,6 @@ public class RepairsCommand implements CommandMarker {
|
|||||||
return "Deduplication failed ";
|
return "Deduplication failed ";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@CliCommand(value = "repair addpartitionmeta", help = "Add partition metadata to a dataset, if not present")
|
@CliCommand(value = "repair addpartitionmeta", help = "Add partition metadata to a dataset, if not present")
|
||||||
public String addPartitionMeta(
|
public String addPartitionMeta(
|
||||||
@CliOption(key = {"dryrun"}, help = "Should we actually add or just print what would be done",
|
@CliOption(key = {"dryrun"}, help = "Should we actually add or just print what would be done",
|
||||||
|
|||||||
@@ -53,7 +53,6 @@ public class SavepointsCommand implements CommandMarker {
|
|||||||
return HoodieCLI.tableMetadata != null;
|
return HoodieCLI.tableMetadata != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@CliAvailabilityIndicator({"savepoint create"})
|
@CliAvailabilityIndicator({"savepoint create"})
|
||||||
public boolean isCreateSavepointAvailable() {
|
public boolean isCreateSavepointAvailable() {
|
||||||
return HoodieCLI.tableMetadata != null;
|
return HoodieCLI.tableMetadata != null;
|
||||||
@@ -127,7 +126,6 @@ public class SavepointsCommand implements CommandMarker {
|
|||||||
return "Savepoint " + commitTime + " rolled back";
|
return "Savepoint " + commitTime + " rolled back";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
|
@CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
|
||||||
public String refreshMetaClient() throws IOException {
|
public String refreshMetaClient() throws IOException {
|
||||||
HoodieCLI.refreshTableMetadata();
|
HoodieCLI.refreshTableMetadata();
|
||||||
@@ -140,5 +138,4 @@ public class SavepointsCommand implements CommandMarker {
|
|||||||
return new HoodieWriteClient(jsc, config, false);
|
return new HoodieWriteClient(jsc, config, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -57,5 +57,4 @@ public class InputStreamConsumer extends Thread {
|
|||||||
stdout.start();
|
stdout.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -132,7 +132,6 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Builder withAutoClean(Boolean autoClean) {
|
public Builder withAutoClean(Boolean autoClean) {
|
||||||
props.setProperty(AUTO_CLEAN_PROP, String.valueOf(autoClean));
|
props.setProperty(AUTO_CLEAN_PROP, String.valueOf(autoClean));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -74,7 +74,6 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Builder on(boolean metricsOn) {
|
public Builder on(boolean metricsOn) {
|
||||||
props.setProperty(METRICS_ON, String.valueOf(metricsOn));
|
props.setProperty(METRICS_ON, String.valueOf(metricsOn));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -147,7 +147,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
|
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public int getWriteBufferLimitBytes() {
|
public int getWriteBufferLimitBytes() {
|
||||||
return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
|
return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.exception;
|
package org.apache.hudi.exception;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Exception thrown when dependent system is not available
|
* Exception thrown when dependent system is not available
|
||||||
|
|||||||
@@ -27,7 +27,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
|||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.spark.api.java.function.Function2;
|
import org.apache.spark.api.java.function.Function2;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map function that handles a sorted stream of HoodieRecords
|
* Map function that handles a sorted stream of HoodieRecords
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -52,7 +52,6 @@ public abstract class LazyIterableIterator<I, O> implements Iterable<O>, Iterato
|
|||||||
*/
|
*/
|
||||||
protected abstract O computeNext();
|
protected abstract O computeNext();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called once, after all elements are processed.
|
* Called once, after all elements are processed.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -47,7 +47,6 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
|
|||||||
this.config = config;
|
this.config = config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(HoodieWriteConfig config,
|
public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(HoodieWriteConfig config,
|
||||||
JavaSparkContext jsc) throws HoodieIndexException {
|
JavaSparkContext jsc) throws HoodieIndexException {
|
||||||
switch (config.getIndexType()) {
|
switch (config.getIndexType()) {
|
||||||
@@ -108,7 +107,6 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
|
|||||||
*/
|
*/
|
||||||
public abstract boolean canIndexLogFiles();
|
public abstract boolean canIndexLogFiles();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An index is "implicit" with respect to storage, if just writing new data to a file slice, updates the index as
|
* An index is "implicit" with respect to storage, if just writing new data to a file slice, updates the index as
|
||||||
* well. This is used by storage, to save memory footprint in certain cases.
|
* well. This is used by storage, to save memory footprint in certain cases.
|
||||||
|
|||||||
@@ -38,7 +38,6 @@ import org.apache.spark.api.java.JavaSparkContext;
|
|||||||
import org.apache.spark.api.java.function.Function;
|
import org.apache.spark.api.java.function.Function;
|
||||||
import org.apache.spark.api.java.function.Function2;
|
import org.apache.spark.api.java.function.Function2;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hoodie Index implementation backed by an in-memory Hash map.
|
* Hoodie Index implementation backed by an in-memory Hash map.
|
||||||
* <p>
|
* <p>
|
||||||
|
|||||||
@@ -268,7 +268,6 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean rollbackCommit(String commitTime) {
|
public boolean rollbackCommit(String commitTime) {
|
||||||
// Nope, don't need to do anything.
|
// Nope, don't need to do anything.
|
||||||
|
|||||||
@@ -170,7 +170,6 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
|
|||||||
return deletePaths;
|
return deletePaths;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Selects the versions for file for cleaning, such that it
|
* Selects the versions for file for cleaning, such that it
|
||||||
* <p>
|
* <p>
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
|
|
||||||
public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
||||||
|
|
||||||
protected final String instantTime;
|
protected final String instantTime;
|
||||||
|
|||||||
@@ -86,7 +86,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
|
|||||||
dataFileToBeMerged);
|
dataFileToBeMerged);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static Schema createHoodieWriteSchema(Schema originalSchema) {
|
public static Schema createHoodieWriteSchema(Schema originalSchema) {
|
||||||
return HoodieAvroUtils.addMetadataFields(originalSchema);
|
return HoodieAvroUtils.addMetadataFields(originalSchema);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,7 +16,6 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
package org.apache.hudi.io.compact.strategy;
|
package org.apache.hudi.io.compact.strategy;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|||||||
@@ -51,7 +51,6 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
|
|||||||
private final String commitTime;
|
private final String commitTime;
|
||||||
private final Schema schema;
|
private final Schema schema;
|
||||||
|
|
||||||
|
|
||||||
public HoodieParquetWriter(String commitTime, Path file, HoodieParquetConfig parquetConfig, Schema schema)
|
public HoodieParquetWriter(String commitTime, Path file, HoodieParquetConfig parquetConfig, Schema schema)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
|
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
|
||||||
|
|||||||
@@ -80,7 +80,6 @@ import org.apache.spark.api.java.JavaSparkContext;
|
|||||||
import org.apache.spark.api.java.function.PairFlatMapFunction;
|
import org.apache.spark.api.java.function.PairFlatMapFunction;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of a very heavily read-optimized Hoodie Table where
|
* Implementation of a very heavily read-optimized Hoodie Table where
|
||||||
* <p>
|
* <p>
|
||||||
|
|||||||
@@ -212,7 +212,6 @@ public class RollbackExecutor implements Serializable {
|
|||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private Map<HeaderMetadataType, String> generateHeader(String commit) {
|
private Map<HeaderMetadataType, String> generateHeader(String commit) {
|
||||||
// generate metadata
|
// generate metadata
|
||||||
Map<HeaderMetadataType, String> header = Maps.newHashMap();
|
Map<HeaderMetadataType, String> header = Maps.newHashMap();
|
||||||
|
|||||||
@@ -49,7 +49,6 @@ public class WorkloadProfile<T extends HoodieRecordPayload> implements Serializa
|
|||||||
|
|
||||||
private final WorkloadStat globalStat;
|
private final WorkloadStat globalStat;
|
||||||
|
|
||||||
|
|
||||||
public WorkloadProfile(JavaRDD<HoodieRecord<T>> taggedRecords) {
|
public WorkloadProfile(JavaRDD<HoodieRecord<T>> taggedRecords) {
|
||||||
this.taggedRecords = taggedRecords;
|
this.taggedRecords = taggedRecords;
|
||||||
this.partitionPathStatMap = new HashMap<>();
|
this.partitionPathStatMap = new HashMap<>();
|
||||||
|
|||||||
@@ -67,7 +67,6 @@ public class HoodieClientExample {
|
|||||||
cli.run();
|
cli.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void run() throws Exception {
|
public void run() throws Exception {
|
||||||
|
|
||||||
SparkConf sparkConf = new SparkConf().setAppName("hoodie-client-example");
|
SparkConf sparkConf = new SparkConf().setAppName("hoodie-client-example");
|
||||||
|
|||||||
@@ -70,7 +70,6 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
|||||||
.asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
|
.asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(expected = TimeoutException.class)
|
@Test(expected = TimeoutException.class)
|
||||||
public void testCheckFailingAppears() throws Exception {
|
public void testCheckFailingAppears() throws Exception {
|
||||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||||
|
|||||||
@@ -92,7 +92,6 @@ public class HoodieClientTestUtils {
|
|||||||
new File(parentPath + "/" + commitTime + suffix).createNewFile();
|
new File(parentPath + "/" + commitTime + suffix).createNewFile();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static void fakeCommitFile(String basePath, String commitTime) throws IOException {
|
public static void fakeCommitFile(String basePath, String commitTime) throws IOException {
|
||||||
fakeMetaFile(basePath, commitTime, HoodieTimeline.COMMIT_EXTENSION);
|
fakeMetaFile(basePath, commitTime, HoodieTimeline.COMMIT_EXTENSION);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -79,7 +79,6 @@ public class TestRawTripPayload implements HoodieRecordPayload<TestRawTripPayloa
|
|||||||
return partitionPath;
|
return partitionPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TestRawTripPayload preCombine(TestRawTripPayload another) {
|
public TestRawTripPayload preCombine(TestRawTripPayload another) {
|
||||||
return another;
|
return another;
|
||||||
@@ -129,7 +128,6 @@ public class TestRawTripPayload implements HoodieRecordPayload<TestRawTripPayloa
|
|||||||
return baos.toByteArray();
|
return baos.toByteArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private String unCompressData(byte[] data) throws IOException {
|
private String unCompressData(byte[] data) throws IOException {
|
||||||
try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data))) {
|
try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data))) {
|
||||||
return FileIOUtils.readAsUTFString(iis, dataSize);
|
return FileIOUtils.readAsUTFString(iis, dataSize);
|
||||||
|
|||||||
@@ -291,7 +291,6 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTagLocation() throws Exception {
|
public void testTagLocation() throws Exception {
|
||||||
// We have some records to be tagged (two different partitions)
|
// We have some records to be tagged (two different partitions)
|
||||||
@@ -433,7 +432,6 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBloomFilterFalseError() throws IOException, InterruptedException {
|
public void testBloomFilterFalseError() throws IOException, InterruptedException {
|
||||||
// We have two hoodie records
|
// We have two hoodie records
|
||||||
|
|||||||
@@ -196,7 +196,6 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
assertEquals(new HashSet<>(Arrays.asList("f4", "f1")), new HashSet<>(recordKeyToFileComps.get("005")));
|
assertEquals(new HashSet<>(Arrays.asList("f4", "f1")), new HashSet<>(recordKeyToFileComps.get("005")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTagLocation() throws Exception {
|
public void testTagLocation() throws Exception {
|
||||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
||||||
|
|||||||
@@ -250,7 +250,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
|||||||
assertEquals(4, writeStatus.getStat().getNumWrites());// 3 rewritten records + 1 new record
|
assertEquals(4, writeStatus.getStat().getNumWrites());// 3 rewritten records + 1 new record
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {
|
private List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {
|
||||||
List<HoodieRecord> records = new ArrayList<>();
|
List<HoodieRecord> records = new ArrayList<>();
|
||||||
for (int i = 0; i < n; i++) {
|
for (int i = 0; i < n; i++) {
|
||||||
@@ -387,7 +386,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
|||||||
assertEquals("If the number of records are more than 1150, then there should be a new file", 3, counts);
|
assertEquals("If the number of records are more than 1150, then there should be a new file", 3, counts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize,
|
private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize,
|
||||||
String testPartitionPath, boolean autoSplitInserts) throws Exception {
|
String testPartitionPath, boolean autoSplitInserts) throws Exception {
|
||||||
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
|
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
|
||||||
@@ -419,7 +417,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
|||||||
return partitioner;
|
return partitioner;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUpsertPartitioner() throws Exception {
|
public void testUpsertPartitioner() throws Exception {
|
||||||
final String testPartitionPath = "2016/09/26";
|
final String testPartitionPath = "2016/09/26";
|
||||||
@@ -429,7 +426,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
|||||||
assertEquals("Total of 2 insert buckets", 2, insertBuckets.size());
|
assertEquals("Total of 2 insert buckets", 2, insertBuckets.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
|
public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
|
||||||
final String testPartitionPath = "2016/09/26";
|
final String testPartitionPath = "2016/09/26";
|
||||||
|
|||||||
@@ -39,7 +39,6 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport {
|
|||||||
public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key";
|
public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key";
|
||||||
public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key";
|
public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key";
|
||||||
|
|
||||||
|
|
||||||
public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, BloomFilter bloomFilter) {
|
public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, BloomFilter bloomFilter) {
|
||||||
super(schema, avroSchema);
|
super(schema, avroSchema);
|
||||||
this.bloomFilter = bloomFilter;
|
this.bloomFilter = bloomFilter;
|
||||||
|
|||||||
@@ -80,7 +80,6 @@ public class HoodieJsonPayload implements HoodieRecordPayload<HoodieJsonPayload>
|
|||||||
return baos.toByteArray();
|
return baos.toByteArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private String unCompressData(byte[] data) throws IOException {
|
private String unCompressData(byte[] data) throws IOException {
|
||||||
InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data));
|
InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data));
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -51,7 +51,6 @@ public class HoodiePartitionMetadata {
|
|||||||
|
|
||||||
private static Logger log = LogManager.getLogger(HoodiePartitionMetadata.class);
|
private static Logger log = LogManager.getLogger(HoodiePartitionMetadata.class);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct metadata from existing partition
|
* Construct metadata from existing partition
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -98,7 +98,6 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
|
|||||||
this.data = null;
|
this.data = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the current currentLocation of the record. This should happen exactly-once
|
* Sets the current currentLocation of the record. This should happen exactly-once
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -114,7 +114,6 @@ public class HoodieTableConfig implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read the table type from the table properties and if not found, return the default
|
* Read the table type from the table properties and if not found, return the default
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -267,7 +267,6 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
return archivedTimeline;
|
return archivedTimeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method to initialize a dataset, with given basePath, tableType, name, archiveFolder
|
* Helper method to initialize a dataset, with given basePath, tableType, name, archiveFolder
|
||||||
*/
|
*/
|
||||||
@@ -410,7 +409,6 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method to scan all hoodie-instant metafiles and construct HoodieInstant objects
|
* Helper method to scan all hoodie-instant metafiles and construct HoodieInstant objects
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -294,7 +294,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
return new HoodieLogFormatVersion(inputStream.readInt());
|
return new HoodieLogFormatVersion(inputStream.readInt());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private boolean readMagic() throws IOException {
|
private boolean readMagic() throws IOException {
|
||||||
try {
|
try {
|
||||||
boolean hasMagic = hasNextMagic();
|
boolean hasMagic = hasNextMagic();
|
||||||
|
|||||||
@@ -97,7 +97,6 @@ public interface HoodieLogFormat {
|
|||||||
public HoodieLogBlock prev() throws IOException;
|
public HoodieLogBlock prev() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builder class to construct the default log format writer
|
* Builder class to construct the default log format writer
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -45,7 +45,6 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
|
|||||||
this.keysToDelete = keysToDelete;
|
this.keysToDelete = keysToDelete;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private HoodieDeleteBlock(Option<byte[]> content, FSDataInputStream inputStream, boolean readBlockLazily,
|
private HoodieDeleteBlock(Option<byte[]> content, FSDataInputStream inputStream, boolean readBlockLazily,
|
||||||
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
|
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
|
||||||
Map<HeaderMetadataType, String> footer) {
|
Map<HeaderMetadataType, String> footer) {
|
||||||
|
|||||||
@@ -171,7 +171,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
|
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get only the cleaner action (inflight and completed) in the active timeline
|
* Get only the cleaner action (inflight and completed) in the active timeline
|
||||||
*/
|
*/
|
||||||
@@ -364,7 +363,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
return inflight;
|
return inflight;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
|
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
|
||||||
Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
|
Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
|
||||||
Path commitFilePath = new Path(metaClient.getMetaPath(), toInstant.getFileName());
|
Path commitFilePath = new Path(metaClient.getMetaPath(), toInstant.getFileName());
|
||||||
|
|||||||
@@ -93,7 +93,6 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
|
|||||||
in.defaultReadObject();
|
in.defaultReadObject();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static Path getArchiveLogPath(String archiveFolder) {
|
public static Path getArchiveLogPath(String archiveFolder) {
|
||||||
return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE);
|
return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
|||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
|
||||||
|
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
public class InstantDTO {
|
public class InstantDTO {
|
||||||
|
|
||||||
|
|||||||
@@ -655,7 +655,6 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
|||||||
.map(Option::get);
|
.map(Option::get);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected Option<HoodieDataFile> getLatestDataFile(HoodieFileGroup fileGroup) {
|
protected Option<HoodieDataFile> getLatestDataFile(HoodieFileGroup fileGroup) {
|
||||||
return Option
|
return Option
|
||||||
.fromJavaOptional(fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst());
|
.fromJavaOptional(fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst());
|
||||||
|
|||||||
@@ -130,7 +130,6 @@ public class FileSystemViewManager {
|
|||||||
return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf);
|
return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an in-memory file System view for a dataset
|
* Create an in-memory file System view for a dataset
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -111,7 +111,6 @@ public class AvroUtils {
|
|||||||
return serializeAvroMetadata(compactionWorkload, HoodieCompactionPlan.class);
|
return serializeAvroMetadata(compactionWorkload, HoodieCompactionPlan.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static Option<byte[]> serializeCleanerPlan(HoodieCleanerPlan cleanPlan) throws IOException {
|
public static Option<byte[]> serializeCleanerPlan(HoodieCleanerPlan cleanPlan) throws IOException {
|
||||||
return serializeAvroMetadata(cleanPlan, HoodieCleanerPlan.class);
|
return serializeAvroMetadata(cleanPlan, HoodieCleanerPlan.class);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,7 +63,6 @@ public interface ConsistencyGuard {
|
|||||||
*/
|
*/
|
||||||
void waitTillAllFilesDisappear(String dirPath, List<String> files) throws IOException, TimeoutException;
|
void waitTillAllFilesDisappear(String dirPath, List<String> files) throws IOException, TimeoutException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait Till target visibility is reached
|
* Wait Till target visibility is reached
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -109,7 +109,6 @@ public class FSUtils {
|
|||||||
return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId);
|
return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static String makeDataFileName(String commitTime, String writeToken, String fileId) {
|
public static String makeDataFileName(String commitTime, String writeToken, String fileId) {
|
||||||
return String.format("%s_%s_%s.parquet", fileId, writeToken, commitTime);
|
return String.format("%s_%s_%s.parquet", fileId, writeToken, commitTime);
|
||||||
}
|
}
|
||||||
@@ -150,7 +149,6 @@ public class FSUtils {
|
|||||||
return fullFileName.split("_")[0];
|
return fullFileName.split("_")[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets all partition paths assuming date partitioning (year, month, day) three levels down.
|
* Gets all partition paths assuming date partitioning (year, month, day) three levels down.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -183,7 +183,6 @@ public class HoodieAvroUtils {
|
|||||||
return record;
|
return record;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the old
|
* Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the old
|
||||||
* schema
|
* schema
|
||||||
|
|||||||
@@ -32,7 +32,6 @@ import java.lang.reflect.InvocationTargetException;
|
|||||||
import org.apache.hudi.exception.HoodieSerializationException;
|
import org.apache.hudi.exception.HoodieSerializationException;
|
||||||
import org.objenesis.instantiator.ObjectInstantiator;
|
import org.objenesis.instantiator.ObjectInstantiator;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing / deserializing objects.
|
* {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing / deserializing objects.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -29,7 +29,6 @@ import org.apache.hudi.common.util.collection.Pair;
|
|||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
|
||||||
public class TimelineDiffHelper {
|
public class TimelineDiffHelper {
|
||||||
|
|
||||||
protected static Logger log = LogManager.getLogger(TimelineDiffHelper.class);
|
protected static Logger log = LogManager.getLogger(TimelineDiffHelper.class);
|
||||||
|
|||||||
@@ -16,7 +16,6 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
package org.apache.hudi.common.util.queue;
|
package org.apache.hudi.common.util.queue;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@@ -143,7 +142,6 @@ public class BoundedInMemoryExecutor<I, O, E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public boolean isRemaining() {
|
public boolean isRemaining() {
|
||||||
return queue.iterator().hasNext();
|
return queue.iterator().hasNext();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ package org.apache.hudi.common.util.queue;
|
|||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Consume entries from queue and execute callback function
|
* Consume entries from queue and execute callback function
|
||||||
*/
|
*/
|
||||||
@@ -59,5 +58,4 @@ public abstract class BoundedInMemoryQueueConsumer<I, O> {
|
|||||||
*/
|
*/
|
||||||
protected abstract O getResult();
|
protected abstract O getResult();
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -123,5 +123,4 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness {
|
|||||||
assertArrayEquals(new Text("data3").getBytes(), archivedTimeline.getInstantDetails(instant3).get());
|
assertArrayEquals(new Text("data3").getBytes(), archivedTimeline.getInstantDetails(instant3).get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -471,7 +471,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
|
public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
|
||||||
Writer writer =
|
Writer writer =
|
||||||
@@ -556,7 +555,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
|||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAvroLogRecordReaderBasic() throws IOException, URISyntaxException, InterruptedException {
|
public void testAvroLogRecordReaderBasic() throws IOException, URISyntaxException, InterruptedException {
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
||||||
|
|||||||
@@ -287,8 +287,6 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
|||||||
* HELPER METHODS
|
* HELPER METHODS
|
||||||
*********************************************************************************************************
|
*********************************************************************************************************
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper to run one or more rounds of cleaning, incrementally syncing the view and then validate
|
* Helper to run one or more rounds of cleaning, incrementally syncing the view and then validate
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -41,7 +41,6 @@ public class TestDFSPropertiesConfiguration {
|
|||||||
private static MiniDFSCluster dfsCluster;
|
private static MiniDFSCluster dfsCluster;
|
||||||
private static DistributedFileSystem dfs;
|
private static DistributedFileSystem dfs;
|
||||||
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void initClass() throws Exception {
|
public static void initClass() throws Exception {
|
||||||
hdfsTestService = new HdfsTestService();
|
hdfsTestService = new HdfsTestService();
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ import org.codehaus.jackson.JsonNode;
|
|||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
public class TestHoodieAvroUtils {
|
public class TestHoodieAvroUtils {
|
||||||
|
|
||||||
private static String EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"testrec\"," + "\"fields\": [ "
|
private static String EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"testrec\"," + "\"fields\": [ "
|
||||||
|
|||||||
@@ -64,7 +64,6 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
|
|||||||
|
|
||||||
private transient FileSystem fs;
|
private transient FileSystem fs;
|
||||||
|
|
||||||
|
|
||||||
public HoodieROTablePathFilter() {
|
public HoodieROTablePathFilter() {
|
||||||
hoodiePathCache = new HashMap<>();
|
hoodiePathCache = new HashMap<>();
|
||||||
nonHoodiePathCache = new HashSet<>();
|
nonHoodiePathCache = new HashSet<>();
|
||||||
|
|||||||
@@ -42,7 +42,6 @@ public class SafeParquetRecordReaderWrapper implements RecordReader<NullWritable
|
|||||||
// Number of fields in Value Schema
|
// Number of fields in Value Schema
|
||||||
private final int numValueFields;
|
private final int numValueFields;
|
||||||
|
|
||||||
|
|
||||||
public SafeParquetRecordReaderWrapper(RecordReader<NullWritable, ArrayWritable> parquetReader) {
|
public SafeParquetRecordReaderWrapper(RecordReader<NullWritable, ArrayWritable> parquetReader) {
|
||||||
this.parquetReader = parquetReader;
|
this.parquetReader = parquetReader;
|
||||||
ArrayWritable arrayWritable = parquetReader.createValue();
|
ArrayWritable arrayWritable = parquetReader.createValue();
|
||||||
|
|||||||
@@ -150,7 +150,6 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
|||||||
return rtSplits.toArray(new InputSplit[rtSplits.size()]);
|
return rtSplits.toArray(new InputSplit[rtSplits.size()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FileStatus[] listStatus(JobConf job) throws IOException {
|
public FileStatus[] listStatus(JobConf job) throws IOException {
|
||||||
// Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
|
// Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
|
||||||
|
|||||||
@@ -73,7 +73,6 @@ public class HoodieRealtimeFileSplit extends FileSplit {
|
|||||||
return new String(bytes, StandardCharsets.UTF_8);
|
return new String(bytes, StandardCharsets.UTF_8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(DataOutput out) throws IOException {
|
public void write(DataOutput out) throws IOException {
|
||||||
super.write(out);
|
super.write(out);
|
||||||
|
|||||||
@@ -113,7 +113,6 @@ public class InputFormatTestUtil {
|
|||||||
return partitionPath;
|
return partitionPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static File prepareSimpleParquetDataset(TemporaryFolder basePath, Schema schema, int numberOfFiles,
|
public static File prepareSimpleParquetDataset(TemporaryFolder basePath, Schema schema, int numberOfFiles,
|
||||||
int numberOfRecords, String commitNumber) throws Exception {
|
int numberOfRecords, String commitNumber) throws Exception {
|
||||||
basePath.create();
|
basePath.create();
|
||||||
|
|||||||
@@ -32,8 +32,8 @@ import org.junit.Before;
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
public class TestHoodieROTablePathFilter extends HoodieCommonTestHarness {
|
public class TestHoodieROTablePathFilter extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
|
|||||||
@@ -58,7 +58,6 @@ public class TestRecordReaderValueIterator {
|
|||||||
this.entries = entries;
|
this.entries = entries;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean next(IntWritable key, Text value) throws IOException {
|
public boolean next(IntWritable key, Text value) throws IOException {
|
||||||
if (currIndex >= entries.size()) {
|
if (currIndex >= entries.size()) {
|
||||||
|
|||||||
@@ -157,7 +157,6 @@ public class HiveSyncTool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Syncs the list of storage parititions passed in (checks if the partition is in hive, if not adds it or if the
|
* Syncs the list of storage parititions passed in (checks if the partition is in hive, if not adds it or if the
|
||||||
* partition path does not match, it updates the partition path)
|
* partition path does not match, it updates the partition path)
|
||||||
|
|||||||
@@ -234,7 +234,6 @@ public class HoodieHiveClient {
|
|||||||
return events;
|
return events;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scan table partitions
|
* Scan table partitions
|
||||||
*/
|
*/
|
||||||
@@ -531,8 +530,6 @@ public class HoodieHiveClient {
|
|||||||
return responses;
|
return responses;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private void createHiveConnection() {
|
private void createHiveConnection() {
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -132,7 +132,6 @@ public class SchemaUtil {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns equivalent Hive table schema read from a parquet file
|
* Returns equivalent Hive table schema read from a parquet file
|
||||||
*
|
*
|
||||||
@@ -296,7 +295,6 @@ public class SchemaUtil {
|
|||||||
return finalStr;
|
return finalStr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static String hiveCompatibleFieldName(String fieldName, boolean isNested) {
|
private static String hiveCompatibleFieldName(String fieldName, boolean isNested) {
|
||||||
String result = fieldName;
|
String result = fieldName;
|
||||||
if (isNested) {
|
if (isNested) {
|
||||||
|
|||||||
@@ -148,7 +148,6 @@ public class TestHiveSyncTool {
|
|||||||
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
|
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBasicSync() throws Exception {
|
public void testBasicSync() throws Exception {
|
||||||
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
|
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
|
||||||
|
|||||||
@@ -218,8 +218,6 @@ public class HiveTestService {
|
|||||||
|
|
||||||
// XXX: From org.apache.hadoop.hive.metastore.HiveMetaStore,
|
// XXX: From org.apache.hadoop.hive.metastore.HiveMetaStore,
|
||||||
// with changes to support binding to a specified IP address (not only 0.0.0.0)
|
// with changes to support binding to a specified IP address (not only 0.0.0.0)
|
||||||
|
|
||||||
|
|
||||||
private static final class ChainedTTransportFactory extends TTransportFactory {
|
private static final class ChainedTTransportFactory extends TTransportFactory {
|
||||||
|
|
||||||
private final TTransportFactory parentTransFactory;
|
private final TTransportFactory parentTransFactory;
|
||||||
@@ -236,7 +234,6 @@ public class HiveTestService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static final class TServerSocketKeepAlive extends TServerSocket {
|
private static final class TServerSocketKeepAlive extends TServerSocket {
|
||||||
|
|
||||||
public TServerSocketKeepAlive(int port) throws TTransportException {
|
public TServerSocketKeepAlive(int port) throws TTransportException {
|
||||||
|
|||||||
@@ -160,7 +160,6 @@ public class DataSourceUtils {
|
|||||||
return new HoodieWriteClient<>(jssc, writeConfig, true);
|
return new HoodieWriteClient<>(jssc, writeConfig, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
|
public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
|
||||||
String commitTime, String operation) {
|
String commitTime, String operation) {
|
||||||
if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
|
if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
|
||||||
|
|||||||
@@ -35,7 +35,6 @@ import org.apache.log4j.Logger;
|
|||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
|
||||||
|
|
||||||
public class HoodieCompactor {
|
public class HoodieCompactor {
|
||||||
|
|
||||||
private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class);
|
private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class);
|
||||||
|
|||||||
@@ -75,7 +75,6 @@ import org.apache.spark.sql.Row;
|
|||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import scala.collection.JavaConversions;
|
import scala.collection.JavaConversions;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sync's one batch of data to hoodie dataset
|
* Sync's one batch of data to hoodie dataset
|
||||||
*/
|
*/
|
||||||
@@ -155,7 +154,6 @@ public class DeltaSync implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private final HoodieTableType tableType;
|
private final HoodieTableType tableType;
|
||||||
|
|
||||||
|
|
||||||
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
|
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
|
||||||
HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
|
HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
|
||||||
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
|
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
|
||||||
|
|||||||
@@ -65,7 +65,6 @@ import org.apache.log4j.Logger;
|
|||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target
|
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target
|
||||||
* dataset. Does not maintain any state, queries at runtime to see how far behind the target dataset is from the source
|
* dataset. Does not maintain any state, queries at runtime to see how far behind the target dataset is from the source
|
||||||
@@ -267,11 +266,9 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
@Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.")
|
@Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.")
|
||||||
public String checkpoint = null;
|
public String checkpoint = null;
|
||||||
|
|
||||||
|
|
||||||
@Parameter(names = {"--help", "-h"}, help = true)
|
@Parameter(names = {"--help", "-h"}, help = true)
|
||||||
public Boolean help = false;
|
public Boolean help = false;
|
||||||
|
|
||||||
|
|
||||||
public boolean isAsyncCompactionEnabled() {
|
public boolean isAsyncCompactionEnabled() {
|
||||||
return continuousMode && !forceDisableCompaction
|
return continuousMode && !forceDisableCompaction
|
||||||
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(storageType));
|
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(storageType));
|
||||||
|
|||||||
@@ -57,7 +57,6 @@ public class SchedulerConfGenerator {
|
|||||||
compactionMinShare.toString());
|
compactionMinShare.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper to set Spark Scheduling Configs dynamically
|
* Helper to set Spark Scheduling Configs dynamically
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -44,7 +44,6 @@ public final class SourceFormatAdapter {
|
|||||||
|
|
||||||
private final Source source;
|
private final Source source;
|
||||||
|
|
||||||
|
|
||||||
public SourceFormatAdapter(Source source) {
|
public SourceFormatAdapter(Source source) {
|
||||||
this.source = source;
|
this.source = source;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,7 +48,6 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
|
|
||||||
private final String outputDateFormat;
|
private final String outputDateFormat;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Supported configs
|
* Supported configs
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -209,7 +209,6 @@ public class TimelineServerPerf implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class PerfStats implements Serializable {
|
private static class PerfStats implements Serializable {
|
||||||
|
|
||||||
private final String partition;
|
private final String partition;
|
||||||
|
|||||||
@@ -63,7 +63,6 @@ public class HiveIncrPullSource extends AvroSource {
|
|||||||
|
|
||||||
private final String incrPullRootPath;
|
private final String incrPullRootPath;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configs supported
|
* Configs supported
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -50,7 +50,6 @@ public class AvroConvertor implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private transient Injection<GenericRecord, byte[]> recordInjection;
|
private transient Injection<GenericRecord, byte[]> recordInjection;
|
||||||
|
|
||||||
|
|
||||||
public AvroConvertor(String schemaStr) {
|
public AvroConvertor(String schemaStr) {
|
||||||
this.schemaStr = schemaStr;
|
this.schemaStr = schemaStr;
|
||||||
}
|
}
|
||||||
@@ -79,7 +78,6 @@ public class AvroConvertor implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public GenericRecord fromJson(String json) throws IOException {
|
public GenericRecord fromJson(String json) throws IOException {
|
||||||
initSchema();
|
initSchema();
|
||||||
initJsonConvertor();
|
initJsonConvertor();
|
||||||
@@ -90,7 +88,6 @@ public class AvroConvertor implements Serializable {
|
|||||||
return new Schema.Parser().parse(schemaStr);
|
return new Schema.Parser().parse(schemaStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public GenericRecord fromAvroBinary(byte[] avroBinary) {
|
public GenericRecord fromAvroBinary(byte[] avroBinary) {
|
||||||
initSchema();
|
initSchema();
|
||||||
initInjection();
|
initInjection();
|
||||||
|
|||||||
@@ -43,7 +43,6 @@ import scala.collection.mutable.ArrayBuffer;
|
|||||||
import scala.collection.mutable.StringBuilder;
|
import scala.collection.mutable.StringBuilder;
|
||||||
import scala.util.Either;
|
import scala.util.Either;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Source to read data from Kafka, incrementally
|
* Source to read data from Kafka, incrementally
|
||||||
*/
|
*/
|
||||||
@@ -250,7 +249,6 @@ public class KafkaOffsetGen {
|
|||||||
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
|
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public String getTopicName() {
|
public String getTopicName() {
|
||||||
return topicName;
|
return topicName;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,7 +61,6 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
private static MiniDFSCluster dfsCluster;
|
private static MiniDFSCluster dfsCluster;
|
||||||
private static DistributedFileSystem dfs;
|
private static DistributedFileSystem dfs;
|
||||||
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void initClass() throws Exception {
|
public static void initClass() throws Exception {
|
||||||
hdfsTestService = new HdfsTestService();
|
hdfsTestService = new HdfsTestService();
|
||||||
|
|||||||
@@ -78,7 +78,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
testUtils.teardown();
|
testUtils.teardown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJsonKafkaSource() throws IOException {
|
public void testJsonKafkaSource() throws IOException {
|
||||||
|
|
||||||
@@ -132,7 +131,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
assertEquals(Option.empty(), fetch4AsRows.getBatch());
|
assertEquals(Option.empty(), fetch4AsRows.getBatch());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
|
private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
|
||||||
HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
|
HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
|
||||||
for (int i = 0; i < partitions.length; i++) {
|
for (int i = 0; i < partitions.length; i++) {
|
||||||
|
|||||||
2
pom.xml
2
pom.xml
@@ -914,7 +914,7 @@
|
|||||||
</goals>
|
</goals>
|
||||||
</execution>
|
</execution>
|
||||||
</executions>
|
</executions>
|
||||||
</plugin>
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
</build>
|
</build>
|
||||||
</profile>
|
</profile>
|
||||||
|
|||||||
@@ -101,6 +101,9 @@
|
|||||||
<module name="ModifierOrder"/>
|
<module name="ModifierOrder"/>
|
||||||
<module name="EmptyLineSeparator">
|
<module name="EmptyLineSeparator">
|
||||||
<property name="allowNoEmptyLineBetweenFields" value="true"/>
|
<property name="allowNoEmptyLineBetweenFields" value="true"/>
|
||||||
|
<property name="allowMultipleEmptyLines" value="false"/>
|
||||||
|
<property name="tokens" value="PACKAGE_DEF, IMPORT, CLASS_DEF, INTERFACE_DEF, ENUM_DEF,
|
||||||
|
STATIC_INIT, INSTANCE_INIT, METHOD_DEF, CTOR_DEF"/>
|
||||||
</module>
|
</module>
|
||||||
<module name="SeparatorWrap">
|
<module name="SeparatorWrap">
|
||||||
<property name="id" value="SeparatorWrapDot"/>
|
<property name="id" value="SeparatorWrapDot"/>
|
||||||
|
|||||||
Reference in New Issue
Block a user