[HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing. (#2197)
1. Added the --clean-input and --clean-output parameters to clean the input and output directories before starting the job 2. Added the --delete-old-input parameter to deleted older batches for data already ingested. This helps keep number of redundant files low. 3. Added the --input-parallelism parameter to restrict the parallelism when generating input data. This helps keeping the number of generated input files low. 4. Added an option start_offset to Dag Nodes. Without ability to specify start offsets, data is generated into existing partitions. With start offset, DAG can control on which partition, the data is to be written. 5. Fixed generation of records for correct number of partitions - In the existing implementation, the partition is chosen as a random long. This does not guarantee exact number of requested partitions to be created. 6. Changed variable blacklistedFields to be a Set as that is faster than List for membership checks. 7. Fixed integer division for Math.ceil. If two integers are divided, the result is not double unless one of the integer is casted to double.
This commit is contained in:
@@ -125,7 +125,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends UtilitiesTestBase {
|
||||
public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws IOException {
|
||||
DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, DeltaInputType.AVRO,
|
||||
new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath, dfsBasePath,
|
||||
schemaProvider.getSourceSchema().toString(), 10240L);
|
||||
schemaProvider.getSourceSchema().toString(), 10240L, jsc.defaultParallelism(), false);
|
||||
DeltaWriterAdapter<GenericRecord> dfsDeltaWriterAdapter = DeltaWriterFactory
|
||||
.getDeltaWriterAdapter(dfsSinkConfig, 1);
|
||||
FlexibleSchemaRecordGenerationIterator itr = new FlexibleSchemaRecordGenerationIterator(1000,
|
||||
|
||||
@@ -24,7 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
@@ -92,7 +94,8 @@ public class TestGenericRecordPayloadGenerator {
|
||||
insertRowKeys.add(record.get("_row_key").toString());
|
||||
insertTimeStamps.add((Long) record.get("timestamp"));
|
||||
});
|
||||
List<String> blacklistFields = Arrays.asList("_row_key");
|
||||
Set<String> blacklistFields = new HashSet<>();
|
||||
blacklistFields.add("_row_key");
|
||||
records.stream().forEach(a -> {
|
||||
// Generate 10 updated records
|
||||
GenericRecord record = payloadGenerator.getUpdatePayload(a, blacklistFields);
|
||||
|
||||
Reference in New Issue
Block a user