1
0

[HUDI-1143] Change timestamp field in HoodieTestDataGenerator from double to long

This commit is contained in:
shenh062326
2020-09-06 16:00:45 +08:00
committed by n3nash
parent 6c84ef20ac
commit 581d54097c
18 changed files with 176 additions and 47 deletions

View File

@@ -53,7 +53,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -111,7 +111,7 @@ import static org.apache.spark.sql.functions.callUDF;
*/
public class TestBootstrap extends HoodieClientTestBase {
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,double,double,double,double,"
+ "struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
@TempDir
@@ -155,7 +155,7 @@ public class TestBootstrap extends HoodieClientTestBase {
rtInputFormat.setConf(rtJobConf);
}
public Schema generateNewDataSetAndReturnSchema(double timestamp, int numRecords, List<String> partitionPaths,
public Schema generateNewDataSetAndReturnSchema(long timestamp, int numRecords, List<String> partitionPaths,
String srcPath) throws Exception {
boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
Dataset<Row> df = generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths, jsc, sqlContext);
@@ -234,7 +234,7 @@ public class TestBootstrap extends HoodieClientTestBase {
break;
}
List<String> partitions = Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03");
double timestamp = new Double(Instant.now().toEpochMilli()).longValue();
long timestamp = Instant.now().toEpochMilli();
Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, bootstrapBasePath);
HoodieWriteConfig config = getConfigBuilder(schema.toString())
.withAutoCommit(true)
@@ -282,7 +282,7 @@ public class TestBootstrap extends HoodieClientTestBase {
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants);
// Upsert case
double updateTimestamp = new Double(Instant.now().toEpochMilli()).longValue();
long updateTimestamp = Instant.now().toEpochMilli();
String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath);
JavaRDD<HoodieRecord> updateBatch =
@@ -329,13 +329,13 @@ public class TestBootstrap extends HoodieClientTestBase {
}
private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles,
int expNumInstants, double expTimestamp, double expROTimestamp, boolean isDeltaCommit) throws Exception {
int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit) throws Exception {
checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants,
expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant));
}
private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles,
int expNumInstants, int numVersions, double expTimestamp, double expROTimestamp, boolean isDeltaCommit,
int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit,
List<String> instantsWithValidRecords) throws Exception {
metaClient.reloadActiveTimeline();
assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
@@ -378,7 +378,7 @@ public class TestBootstrap extends HoodieClientTestBase {
Set<String> seenKeys = new HashSet<>();
for (GenericRecord r : records) {
assertEquals(r.get("_row_key").toString(), r.get("_hoodie_record_key").toString(), "Record :" + r);
assertEquals(expROTimestamp, ((DoubleWritable)r.get("timestamp")).get(), 0.1, "Record :" + r);
assertEquals(expROTimestamp, ((LongWritable)r.get("timestamp")).get(), 0.1, "Record :" + r);
assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
seenKeys.add(r.get("_hoodie_record_key").toString());
}
@@ -395,7 +395,7 @@ public class TestBootstrap extends HoodieClientTestBase {
assertEquals(totalRecords, records.size());
for (GenericRecord r : records) {
assertEquals(r.get("_row_key").toString(), r.get("_hoodie_record_key").toString(), "Realtime Record :" + r);
assertEquals(expTimestamp, ((DoubleWritable)r.get("timestamp")).get(),0.1, "Realtime Record :" + r);
assertEquals(expTimestamp, ((LongWritable)r.get("timestamp")).get(),0.1, "Realtime Record :" + r);
assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
seenKeys.add(r.get("_hoodie_record_key").toString());
}
@@ -550,7 +550,7 @@ public class TestBootstrap extends HoodieClientTestBase {
return builder;
}
public static Dataset<Row> generateTestRawTripDataset(double timestamp, int from, int to, List<String> partitionPaths,
public static Dataset<Row> generateTestRawTripDataset(long timestamp, int from, int to, List<String> partitionPaths,
JavaSparkContext jsc, SQLContext sqlContext) {
boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
final List<String> records = new ArrayList<>();