Skip to content
Snippets Groups Projects
Commit 99f64d99 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: further refactored tests for ParsingResultsProcessingManager

parent 3c086d40
No related branches found
No related tags found
No related merge requests found
package eu.nomad_lab.integrated_pipeline_tests package eu.nomad_lab.integrated_pipeline_tests
import java.nio.file.Paths
import eu.nomad_lab.integrated_pipeline.messages._ import eu.nomad_lab.integrated_pipeline.messages._
import eu.nomad_lab.integrated_pipeline.{ OutputType, ParsingResultsProcessingManager, ParsingResultsProcessor } import eu.nomad_lab.integrated_pipeline.{ OutputType, ParsingResultsProcessingManager, ParsingResultsProcessor }
import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.any
...@@ -51,48 +49,43 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit ...@@ -51,48 +49,43 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
f f
} }
"a ParsingResultsProcessorFlow" when { def fullyProcessedATree(): Fixture = {
"not having received any events" should {
behave like processorWithNoSignalReady(
() => noResultsNoEndReceived().manager,
Seq(anInMemoryResult(), aFileParsingSignalEndTree())
)
"start processing a file tree when the first signal associated with it arrives (result arrives first)" in {
val f = new Fixture val f = new Fixture
f.manager.processSignal(anInMemoryResult()) f.manager.processSignal(anInMemoryResult())
verify(f.processingMock).startProcessingTreeResults(any())
f.manager.processSignal(anInMemoryResult()) f.manager.processSignal(anInMemoryResult())
verify(f.processingMock).startProcessingTreeResults(any()) f.manager.processSignal(aFileParsingSignalEndTree().withTaskCount(2))
f.manager.getNextSignalToEmit()
f
} }
"start processing a file tree when the first signal associated with it arrives (end signal arrives first)" in { "a ParsingResultsProcessorFlow" when {
val f = new Fixture
f.manager.processSignal(aFileParsingSignalEndTree())
verify(f.processingMock).startProcessingTreeResults(any())
f.manager.processSignal(anInMemoryResult())
verify(f.processingMock).startProcessingTreeResults(any())
}
"process every incoming file parsing result" in { "not having received any events" should {
val f = new Fixture val createFixture = () => noResultsNoEndReceived()
f.manager.processSignal(anInMemoryResult().withRelativePath("foo"))
f.manager.processSignal(anInMemoryResult().withRelativePath("magic"))
val argument: ArgumentCaptor[FileParsingResult] = ArgumentCaptor.forClass(classOf[FileParsingResult])
verify(f.processingMock, times(2)).processFileParsingResult(argument.capture())
val results = argument.getAllValues.asScala.map(x => x.task.relativePath)
results should contain allOf (Paths.get("foo"), Paths.get("magic"))
}
behave like processorWithNoSignalReady(
() => createFixture().manager,
Seq(anInMemoryResult(), aFileParsingSignalEndTree())
)
behave like readyToAcceptParsingResults(createFixture)
behave like readyToAcceptParsingResultsFromAnotherTree(createFixture)
behave like interleaveAnotherFileTree(createFixture)
behave like startProcessingANewTree(createFixture)
} }
"having received all parsing tasks but no end signal" should { "having received all parsing tasks but no end signal" should {
val createFixture = () => twoResultsNoEndReceived()
behave like processorWithNoSignalReady( behave like processorWithNoSignalReady(
() => twoResultsNoEndReceived().manager, () => createFixture().manager,
Seq(anInMemoryResult(), aFileParsingSignalEndTree()) Seq(anInMemoryResult(), aFileParsingSignalEndTree())
) )
behave like readyToAcceptParsingResultsFromAnotherTree(createFixture)
behave like interleaveAnotherFileTree(createFixture)
behave like startProcessingANewTree(createFixture)
"finish processing a file tree when the associated end signal arrives" in { "finish processing a file tree when the associated end signal arrives" in {
val f = twoResultsNoEndReceived() val f = twoResultsNoEndReceived()
f.manager.hasSignalToEmit() should be(false) f.manager.hasSignalToEmit() should be(false)
...@@ -104,11 +97,18 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit ...@@ -104,11 +97,18 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
} }
"having received the end signal and all but one parsing results" should { "having received the end signal and all but one parsing results" should {
val createFixture = () => oneResultEndWithTwoEntriesReceived()
behave like processorWithNoSignalReady( behave like processorWithNoSignalReady(
() => oneResultEndWithTwoEntriesReceived().manager, () => createFixture().manager,
Seq(anInMemoryResult(), aFileParsingSignalEndTree()) Seq(anInMemoryResult(), aFileParsingSignalEndTree())
) )
behave like readyToAcceptParsingResults(createFixture)
behave like readyToAcceptParsingResultsFromAnotherTree(createFixture)
behave like interleaveAnotherFileTree(createFixture)
behave like startProcessingANewTree(createFixture)
"finish processing a file tree when the last associated parsing result arrived" in { "finish processing a file tree when the last associated parsing result arrived" in {
val f = oneResultEndWithTwoEntriesReceived() val f = oneResultEndWithTwoEntriesReceived()
f.manager.hasSignalToEmit() should be(false) f.manager.hasSignalToEmit() should be(false)
...@@ -135,47 +135,85 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit ...@@ -135,47 +135,85 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
f.orderingTest.verify(f.processingMock).finishProcessingTreeResults(aFileTreeScanTask()) f.orderingTest.verify(f.processingMock).finishProcessingTreeResults(aFileTreeScanTask())
} }
}
"having fully processed a file tree and emitted the result" should {
val createFixture = () => fullyProcessedATree()
behave like processorWithNoSignalReady(
() => createFixture().manager,
Seq(anInMemoryResult(), aFileParsingSignalEndTree())
)
behave like readyToAcceptParsingResults(createFixture)
behave like readyToAcceptParsingResultsFromAnotherTree(createFixture)
behave like interleaveAnotherFileTree(createFixture)
behave like startProcessingANewTree(createFixture)
"be able to process the same file tree again after finishing it once" in { "be able to process the same file tree again after finishing it once" in {
val f = twoResultsEndWithTwoEntriesReceived() val f = fullyProcessedATree()
f.manager.getNextSignalToEmit()
f.manager.processSignal(anInMemoryResult()) f.manager.processSignal(anInMemoryResult())
f.manager.processSignal(anInMemoryResult()) f.manager.processSignal(anInMemoryResult())
f.manager.processSignal(aFileParsingSignalEndTree().withTaskCount(2)) f.manager.processSignal(aFileParsingSignalEndTree().withTaskCount(2))
verify(f.processingMock, times(2)).finishProcessingTreeResults(aFileTreeScanTask()) verify(f.processingMock, times(2)).finishProcessingTreeResults(aFileTreeScanTask())
} }
} }
}
"handling multiple, possibly interleaved, tree results at the same time" should { def readyToAcceptParsingResults(createFixture: () => Fixture): Unit = {
"process all file parsing results, regardless of their inbound ordering" in { "process a file parsing result" in {
val f = new Fixture import FileParsingResultMatchers._
val treeTask1 = aFileTreeScanTask().withBasePath("/path1") val f = createFixture()
val treeTask2 = aFileTreeScanTask().withBasePath("/path2") f.manager.processSignal(anInMemoryResult().withRelativePath("foo"))
val inputs1 = (1 to 3).map(x => anInMemoryResult().withTreeTask(treeTask1).withRelativePath(s"foo$x").build()) val argument: ArgumentCaptor[FileParsingResult] = ArgumentCaptor.forClass(classOf[FileParsingResult])
val inputs2 = (1 to 3).map(x => anInMemoryResult().withTreeTask(treeTask2).withRelativePath(s"bar$x").build()) verify(f.processingMock, atLeastOnce()).processFileParsingResult(argument.capture())
val chaos = Seq(inputs1(0), inputs1(1), inputs2(0), inputs2(1), inputs1(2), inputs2(2)) val results = argument.getAllValues.asScala
chaos.foreach(f.manager.processSignal) exactly(1, results) should have(relativePath("foo"))
chaos.foreach(x => verify(f.processingMock, times(1)).processFileParsingResult(x)) }
} }
"send a result for each tree once all associated file parsing results are processed" in { def readyToAcceptParsingResultsFromAnotherTree(createFixture: () => Fixture): Unit = {
"process a file parsing result from another file tree" in {
import FileParsingResultMatchers._
val f = createFixture()
val fileTree = aFileTreeScanTask().withBasePath("/universe/magic")
f.manager.processSignal(anInMemoryResult().withTreeTask(fileTree))
val argument: ArgumentCaptor[FileParsingResult] = ArgumentCaptor.forClass(classOf[FileParsingResult])
verify(f.processingMock, atLeastOnce()).processFileParsingResult(argument.capture())
val results = argument.getAllValues.asScala
exactly(1, results) should have(treeTask(fileTree))
}
}
def interleaveAnotherFileTree(createFixture: () => Fixture): Unit = {
"interleave requests from another file tree" in {
import FileTreeParsingResultMatchers._ import FileTreeParsingResultMatchers._
val f = new Fixture val f = createFixture()
val treeTask1 = aFileTreeScanTask().withBasePath("/path1") val fileTree = aFileTreeScanTask().withBasePath("/universe/magic")
val treeTask2 = aFileTreeScanTask().withBasePath("/path2") f.manager.processSignal(anInMemoryResult().withTreeTask(fileTree))
val inputs1 = (1 to 3).map(x => anInMemoryResult().withTreeTask(treeTask1).withRelativePath(s"foo$x").build()) f.manager.processSignal(aFileParsingSignalEndTree().withTreeTask(fileTree).withTaskCount(1))
val inputs2 = (1 to 2).map(x => anInMemoryResult().withTreeTask(treeTask2).withRelativePath(s"bar$x").build())
val chaos = Seq(inputs1(0), inputs1(1), inputs2(0), inputs1(2), inputs2(1))
chaos.foreach(f.manager.processSignal)
chaos.foreach(x => verify(f.processingMock, times(1)).processFileParsingResult(x))
f.manager.processSignal(aFileParsingSignalEndTree().withTreeTask(treeTask1).withTaskCount(3))
f.manager.hasSignalToEmit() should be(true)
f.manager.getNextSignalToEmit() should have(treeTask(treeTask1), numParsingTasks(3))
f.manager.hasSignalToEmit() should be(false)
f.manager.processSignal(aFileParsingSignalEndTree().withTreeTask(treeTask2).withTaskCount(2))
f.manager.hasSignalToEmit() should be(true) f.manager.hasSignalToEmit() should be(true)
f.manager.getNextSignalToEmit() should have(treeTask(treeTask2), numParsingTasks(2)) f.manager.getNextSignalToEmit() should have(treeTask(fileTree), numParsingTasks(1))
f.manager.hasSignalToEmit() should be(false) }
} }
def startProcessingANewTree(createFixture: () => Fixture): Unit = {
"start processing a file tree when the first signal associated with it arrives (result arrives first)" in {
val f = createFixture()
val fileTree = aFileTreeScanTask().withBasePath("/tmp/foo/random")
f.manager.processSignal(anInMemoryResult().withTreeTask(fileTree))
verify(f.processingMock, times(1)).startProcessingTreeResults(fileTree)
f.manager.processSignal(anInMemoryResult().withTreeTask(fileTree))
verify(f.processingMock, times(1)).startProcessingTreeResults(fileTree)
}
"start processing a file tree when the first signal associated with it arrives (end signal arrives first)" in {
val f = createFixture()
val fileTree = aFileTreeScanTask().withBasePath("/tmp/foo/random")
f.manager.processSignal(aFileParsingSignalEndTree().withTreeTask(fileTree))
verify(f.processingMock, times(1)).startProcessingTreeResults(fileTree)
f.manager.processSignal(anInMemoryResult().withTreeTask(fileTree))
verify(f.processingMock, times(1)).startProcessingTreeResults(fileTree)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment