[HUDI-2031] JVM occasionally crashes during compaction when spark speculative execution is enabled (#3093)
* unit tests added
This commit is contained in:
@@ -24,6 +24,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
|||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
|
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||||
|
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
@@ -38,6 +39,8 @@ import scala.Tuple2;
|
|||||||
import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
|
import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@@ -97,4 +100,51 @@ public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInterruptExecutor() {
|
||||||
|
final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
|
||||||
|
|
||||||
|
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
|
||||||
|
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
|
||||||
|
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
|
||||||
|
new BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void finish() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Integer getResult() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
SparkBoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
|
||||||
|
try {
|
||||||
|
executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig, hoodieRecords.iterator(), consumer,
|
||||||
|
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
|
||||||
|
SparkBoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> finalExecutor = executor;
|
||||||
|
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
|
||||||
|
assertThrows(HoodieException.class, () -> finalExecutor.execute());
|
||||||
|
assertTrue(Thread.interrupted());
|
||||||
|
} finally {
|
||||||
|
if (executor != null) {
|
||||||
|
executor.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -139,6 +139,10 @@ public class BoundedInMemoryExecutor<I, O, E> {
|
|||||||
Future<E> future = startConsumer();
|
Future<E> future = startConsumer();
|
||||||
// Wait for consumer to be done
|
// Wait for consumer to be done
|
||||||
return future.get();
|
return future.get();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
shutdownNow();
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new HoodieException(ie);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieException(e);
|
throw new HoodieException(e);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user