HUDI-243 Rename HoodieInputFormat and HoodieRealtimeInputFormat to HoodieParquetInputFormat and HoodieParquetRealtimeInputFormat
This commit is contained in:
committed by
vinoth chandar
parent
d0b9b56b7d
commit
93bc5e2153
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.JobConf;
|
|||||||
import org.apache.hadoop.mapred.RecordReader;
|
import org.apache.hadoop.mapred.RecordReader;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.util.HoodieAvroUtils;
|
import org.apache.hudi.common.util.HoodieAvroUtils;
|
||||||
import org.apache.hudi.hadoop.realtime.HoodieRealtimeInputFormat;
|
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR)
|
* Utility methods to aid in testing MergeOnRead (workaround for HoodieReadClient for MOR)
|
||||||
@@ -46,7 +46,7 @@ public class HoodieMergeOnReadTestUtils {
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
JobConf jobConf = new JobConf();
|
JobConf jobConf = new JobConf();
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(Schema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
|
Schema schema = HoodieAvroUtils.addMetadataFields(Schema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
|
||||||
HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat();
|
HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
|
||||||
setPropsForInputFormat(inputFormat, jobConf, schema, basePath);
|
setPropsForInputFormat(inputFormat, jobConf, schema, basePath);
|
||||||
return inputPaths.stream().map(path -> {
|
return inputPaths.stream().map(path -> {
|
||||||
setInputPath(jobConf, path);
|
setInputPath(jobConf, path);
|
||||||
@@ -76,8 +76,8 @@ public class HoodieMergeOnReadTestUtils {
|
|||||||
}).get();
|
}).get();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setPropsForInputFormat(HoodieRealtimeInputFormat inputFormat, JobConf jobConf, Schema schema,
|
private static void setPropsForInputFormat(HoodieParquetRealtimeInputFormat inputFormat, JobConf jobConf,
|
||||||
String basePath) {
|
Schema schema, String basePath) {
|
||||||
List<Schema.Field> fields = schema.getFields();
|
List<Schema.Field> fields = schema.getFields();
|
||||||
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
|
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
|
||||||
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
|
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
|
||||||
|
|||||||
@@ -18,9 +18,11 @@
|
|||||||
|
|
||||||
package com.uber.hoodie.hadoop;
|
package com.uber.hoodie.hadoop;
|
||||||
|
|
||||||
|
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Temporary class to allow seamless migration of com.uber.hoodie to org.apache.hudi
|
* Temporary class to allow seamless migration of com.uber.hoodie to org.apache.hudi
|
||||||
*/
|
*/
|
||||||
public class HoodieInputFormat extends org.apache.hudi.hadoop.HoodieInputFormat {
|
public class HoodieInputFormat extends HoodieParquetInputFormat {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,9 +18,11 @@
|
|||||||
|
|
||||||
package com.uber.hoodie.hadoop.realtime;
|
package com.uber.hoodie.hadoop.realtime;
|
||||||
|
|
||||||
|
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Temporary class to allow seamless migration of com.uber.hoodie to org.apache.hudi
|
* Temporary class to allow seamless migration of com.uber.hoodie to org.apache.hudi
|
||||||
*/
|
*/
|
||||||
public class HoodieRealtimeInputFormat extends org.apache.hudi.hadoop.realtime.HoodieRealtimeInputFormat {
|
public class HoodieRealtimeInputFormat extends HoodieParquetRealtimeInputFormat {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -57,9 +57,9 @@ import org.apache.log4j.Logger;
|
|||||||
* Hoodie/Non-Hoodie datasets
|
* Hoodie/Non-Hoodie datasets
|
||||||
*/
|
*/
|
||||||
@UseFileSplitsFromInputFormat
|
@UseFileSplitsFromInputFormat
|
||||||
public class HoodieInputFormat extends MapredParquetInputFormat implements Configurable {
|
public class HoodieParquetInputFormat extends MapredParquetInputFormat implements Configurable {
|
||||||
|
|
||||||
private static final transient Logger LOG = LogManager.getLogger(HoodieInputFormat.class);
|
private static final transient Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class);
|
||||||
|
|
||||||
protected Configuration conf;
|
protected Configuration conf;
|
||||||
|
|
||||||
@@ -70,8 +70,8 @@ import org.apache.hadoop.mapred.Reporter;
|
|||||||
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
|
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
|
||||||
import org.apache.hadoop.mapred.lib.CombineFileSplit;
|
import org.apache.hadoop.mapred.lib.CombineFileSplit;
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
import org.apache.hudi.hadoop.HoodieInputFormat;
|
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
||||||
import org.apache.hudi.hadoop.realtime.HoodieRealtimeInputFormat;
|
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -403,9 +403,9 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
|||||||
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
|
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
|
||||||
LOG.info("Input Format => " + inputFormatClass.getName());
|
LOG.info("Input Format => " + inputFormatClass.getName());
|
||||||
// **MOD** Set the hoodie filter in the combine
|
// **MOD** Set the hoodie filter in the combine
|
||||||
if (inputFormatClass.getName().equals(HoodieInputFormat.class.getName())) {
|
if (inputFormatClass.getName().equals(HoodieParquetInputFormat.class.getName())) {
|
||||||
combine.setHoodieFilter(true);
|
combine.setHoodieFilter(true);
|
||||||
} else if (inputFormatClass.getName().equals(HoodieRealtimeInputFormat.class.getName())) {
|
} else if (inputFormatClass.getName().equals(HoodieParquetRealtimeInputFormat.class.getName())) {
|
||||||
LOG.info("Setting hoodie filter and realtime input format");
|
LOG.info("Setting hoodie filter and realtime input format");
|
||||||
combine.setHoodieFilter(true);
|
combine.setHoodieFilter(true);
|
||||||
combine.setRealTime(true);
|
combine.setRealTime(true);
|
||||||
@@ -857,13 +857,13 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
|||||||
LOG.info("Listing status in HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim");
|
LOG.info("Listing status in HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim");
|
||||||
List<FileStatus> result;
|
List<FileStatus> result;
|
||||||
if (hoodieFilter) {
|
if (hoodieFilter) {
|
||||||
HoodieInputFormat input;
|
HoodieParquetInputFormat input;
|
||||||
if (isRealTime) {
|
if (isRealTime) {
|
||||||
LOG.info("Using HoodieRealtimeInputFormat");
|
LOG.info("Using HoodieRealtimeInputFormat");
|
||||||
input = new HoodieRealtimeInputFormat();
|
input = new HoodieParquetRealtimeInputFormat();
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Using HoodieInputFormat");
|
LOG.info("Using HoodieInputFormat");
|
||||||
input = new HoodieInputFormat();
|
input = new HoodieParquetInputFormat();
|
||||||
}
|
}
|
||||||
input.setConf(job.getConfiguration());
|
input.setConf(job.getConfiguration());
|
||||||
result = new ArrayList<FileStatus>(
|
result = new ArrayList<FileStatus>(
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ import org.apache.hudi.common.util.FSUtils;
|
|||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.hadoop.HoodieInputFormat;
|
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
||||||
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
|
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@@ -61,9 +61,9 @@ import org.apache.log4j.Logger;
|
|||||||
* Input Format, that provides a real-time view of data in a Hoodie dataset
|
* Input Format, that provides a real-time view of data in a Hoodie dataset
|
||||||
*/
|
*/
|
||||||
@UseFileSplitsFromInputFormat
|
@UseFileSplitsFromInputFormat
|
||||||
public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Configurable {
|
public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable {
|
||||||
|
|
||||||
private static final transient Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormat.class);
|
private static final transient Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
|
||||||
|
|
||||||
// These positions have to be deterministic across all tables
|
// These positions have to be deterministic across all tables
|
||||||
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
|
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
|
||||||
@@ -80,7 +80,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme
|
|||||||
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which
|
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which
|
||||||
// would be true until we have a way to index logs too)
|
// would be true until we have a way to index logs too)
|
||||||
// return from delta records map if we have some match.
|
// return from delta records map if we have some match.
|
||||||
String key = arrayWritable.get()[HoodieRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS]
|
String key = arrayWritable.get()[HoodieParquetRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS]
|
||||||
.toString();
|
.toString();
|
||||||
if (deltaRecordMap.containsKey(key)) {
|
if (deltaRecordMap.containsKey(key)) {
|
||||||
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
|
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
|
||||||
|
|||||||
@@ -27,8 +27,8 @@ public class AnnotationTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAnnotation() {
|
public void testAnnotation() {
|
||||||
assertTrue(HoodieInputFormat.class.isAnnotationPresent(UseFileSplitsFromInputFormat.class));
|
assertTrue(HoodieParquetInputFormat.class.isAnnotationPresent(UseFileSplitsFromInputFormat.class));
|
||||||
Annotation[] annotations = HoodieInputFormat.class.getAnnotations();
|
Annotation[] annotations = HoodieParquetInputFormat.class.getAnnotations();
|
||||||
boolean found = false;
|
boolean found = false;
|
||||||
for (Annotation annotation : annotations) {
|
for (Annotation annotation : annotations) {
|
||||||
if ("UseFileSplitsFromInputFormat".equals(annotation.annotationType().getSimpleName())) {
|
if ("UseFileSplitsFromInputFormat".equals(annotation.annotationType().getSimpleName())) {
|
||||||
|
|||||||
@@ -38,12 +38,12 @@ import org.junit.rules.TemporaryFolder;
|
|||||||
|
|
||||||
public class HoodieInputFormatTest {
|
public class HoodieInputFormatTest {
|
||||||
|
|
||||||
private HoodieInputFormat inputFormat;
|
private HoodieParquetInputFormat inputFormat;
|
||||||
private JobConf jobConf;
|
private JobConf jobConf;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
inputFormat = new HoodieInputFormat();
|
inputFormat = new HoodieParquetInputFormat();
|
||||||
jobConf = new JobConf();
|
jobConf = new JobConf();
|
||||||
inputFormat.setConf(jobConf);
|
inputFormat.setConf(jobConf);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,8 +31,8 @@ import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
|
|||||||
import org.apache.hudi.common.util.FSUtils;
|
import org.apache.hudi.common.util.FSUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.InvalidDatasetException;
|
import org.apache.hudi.exception.InvalidDatasetException;
|
||||||
import org.apache.hudi.hadoop.HoodieInputFormat;
|
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
||||||
import org.apache.hudi.hadoop.realtime.HoodieRealtimeInputFormat;
|
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
|
||||||
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
|
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
|
||||||
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
|
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
|
||||||
import org.apache.hudi.hive.util.SchemaUtil;
|
import org.apache.hudi.hive.util.SchemaUtil;
|
||||||
@@ -128,7 +128,7 @@ public class HiveSyncTool {
|
|||||||
// for now)
|
// for now)
|
||||||
String inputFormatClassName =
|
String inputFormatClassName =
|
||||||
cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.HoodieInputFormat.class.getName()
|
cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.HoodieInputFormat.class.getName()
|
||||||
: HoodieInputFormat.class.getName();
|
: HoodieParquetInputFormat.class.getName();
|
||||||
hoodieHiveClient.createTable(schema, inputFormatClassName,
|
hoodieHiveClient.createTable(schema, inputFormatClassName,
|
||||||
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
|
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
|
||||||
} else {
|
} else {
|
||||||
@@ -137,7 +137,7 @@ public class HiveSyncTool {
|
|||||||
// /ql/exec/DDLTask.java#L3488
|
// /ql/exec/DDLTask.java#L3488
|
||||||
String inputFormatClassName =
|
String inputFormatClassName =
|
||||||
cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
|
cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
|
||||||
: HoodieRealtimeInputFormat.class.getName();
|
: HoodieParquetRealtimeInputFormat.class.getName();
|
||||||
hoodieHiveClient.createTable(schema, inputFormatClassName,
|
hoodieHiveClient.createTable(schema, inputFormatClassName,
|
||||||
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
|
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user