Skip to content
Snippets Groups Projects
Commit b5c05df9 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: refactored CalculationParsingFlow tests, added stream...

Integrated Pipeline: refactored CalculationParsingFlow tests, added stream assertion for normal completion
parent 10632b95
No related branches found
No related tags found
No related merge requests found
......@@ -9,7 +9,6 @@ import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.messages._
import eu.nomad_lab.integrated_pipeline.stream_components._
import eu.nomad_lab.parsers.ParseResult
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
......@@ -71,10 +70,11 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat
"a CalculationParsingFlow" when {
"managing a single worker" should {
"forward a received file tree started signal" in new Fixture(1) {
source.sendNext(FileParsingSignalStartTree(sampleTreeScan))
sink.ensureSubscription().request(1)
sink.expectNext(FileParsingSignalStartTree(sampleTreeScan))
"forward file tree started signals unchanged" in {
val f = new Fixture(1)
val signal = aFileParsingSignalStartTree().build()
f.source.sendNext(signal)
f.findFirstMatchingStreamElement(be(signal))
}
"emit parsing results for every incoming parsing request in order of input" in new Fixture(1) {
......@@ -91,55 +91,55 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat
}
}
"forward file tree finished signals unchanged" in new Fixture(1) {
source.sendNext(FileParsingSignalEndTree(sampleTreeScan))
sink.ensureSubscription().request(1)
sink.expectNext(FileParsingSignalEndTree(sampleTreeScan))
"forward file tree finished signals unchanged" in {
val f = new Fixture(1)
val signal = aFileParsingSignalEndTree().build()
f.source.sendNext(signal)
f.findFirstMatchingStreamElement(be(signal))
}
"complete the stage if upstream signals completion before all elements are processed" in new Fixture(1) {
source.sendNext(input1).sendNext(input2).sendNext(input3).sendComplete()
sink.ensureSubscription().request(4)
sink.expectNextN(3)
sink.expectComplete()
"complete the stage if upstream signals completion before all elements are processed" in {
val f = new Fixture(1)
(1 to 3).foreach(_ => f.source.sendNext(aFileParsingTask()))
f.source.sendComplete()
f.expectStreamCompletion()
}
"complete the stage if upstream signals completion after all elements are processed" in new Fixture(1) {
source.sendNext(input1).sendNext(input2).sendNext(input3)
sink.ensureSubscription().request(4)
sink.expectNextN(3)
source.sendComplete()
sink.expectComplete()
"complete the stage if upstream signals completion after all elements are processed" in {
val f = new Fixture(1)
(1 to 3).foreach(_ => f.source.sendNext(aFileParsingTask()))
f.drainStream()
f.source.sendComplete()
f.expectStreamCompletion()
}
"specify the location of the main file to the parsing engine when handling directories" in new Fixture(1) {
val task = aFileParsingTask().withTreeTask(sampleTreeScan).withRelativePath("magic").build()
source.sendNext(task)
findFirstMatchingStreamElement(have(relativePath("magic"), status(ParseResult.ParseSuccess)))
val expectedPath = sampleTreeScan.treeBasePath.resolve("magic")
verify(dummyWorkers.head).processRequest(any(), ArgumentMatchers.eq(expectedPath))
"specify the location of the main file to the parsing engine when handling directories" in {
val f = new Fixture(1)
val treeTask = aFileTreeScanTask().withTreeType(TreeType.Directory).withBasePath("/dir/bla")
val task = aFileParsingTask().withTreeTask(treeTask).withRelativePath("magic").build()
f.source.sendNext(task)
f.drainStream()
val expectedPath = Paths.get("/dir/bla/magic")
verify(f.dummyWorkers.head).processRequest(task, expectedPath)
}
"specify the temporarily extracted main file to the parsing engine when handling zip archives" in new Fixture(1) {
val zipTreeTask = aFileTreeScanTask().withTreeType(TreeType.Zip).build()
"specify the temporarily extracted main file to the parsing engine when handling zip archives" in {
val f = new Fixture(1)
val treeTask = aFileTreeScanTask().withTreeType(TreeType.Zip)
val extractedPath = Paths.get("/tmp/extracted/magic")
val task = aFileParsingTask().withTreeTask(zipTreeTask).withRelativePath("magic").
withExtractedPath(Some(extractedPath)).build()
source.sendNext(task)
findFirstMatchingStreamElement(have(relativePath("magic"), status(ParseResult.ParseSuccess)))
verify(dummyWorkers.head).processRequest(task, extractedPath)
val task = aFileParsingTask().withTreeTask(treeTask).withExtractedPath(Some(extractedPath)).build()
f.source.sendNext(task)
f.drainStream()
verify(f.dummyWorkers.head).processRequest(task, extractedPath)
}
"gracefully fail parsing requests with unknown or not supported file tree types" in new Fixture(1) {
sink.ensureSubscription().request(6)
"gracefully fail parsing requests with unknown or not supported file tree types" in {
val f = new Fixture(1)
Seq(TreeType.Unknown, TreeType.File, TreeType.Tar).foreach { treeType =>
val anotherTreeScan = sampleTreeScan.copy(treeType = treeType)
val anotherRequest = input1.copy(treeTask = anotherTreeScan)
source.sendNext(anotherRequest)
val result = sink.expectNext()
assert(result.isInstanceOf[InMemoryResult], "results should have been kept in memory")
val inMemory = result.asInstanceOf[InMemoryResult]
assert(inMemory.result == ParseResult.ParseFailure, "should have a failed status")
val treeScan = aFileTreeScanTask().withTreeType(treeType)
val request = aFileParsingTask().withTreeTask(treeScan)
f.source.sendNext(request)
f.findFirstMatchingStreamElement(have(status(ParseResult.ParseFailure)))
}
}
......
......@@ -49,12 +49,6 @@ trait StreamAssertions[T] {
}
}
private def errorSearchTerminationReason(end: SearchResult): String = end match {
case TimedOut(t) => s"no new event received before time-out ($t)"
case Found => s"expected stream failure, but stream completed normally"
case NotFound => "expected stream failure, but stream completed normally"
}
/**
* drain elements from the stream until it completes or times out. This function performs no
* tests on the drained element and is intended to be used when testing side-effects of the
......@@ -151,7 +145,41 @@ trait StreamAssertions[T] {
} else {
Assertions.fail(result.failureMessage)
}
case noFailure => Assertions.fail(errorSearchTerminationReason(noFailure))
case noFailure => Assertions.fail(noFailure match {
case TimedOut(t) => s"no new event received before time-out ($t)"
case NotFound => "expected stream failure, but stream completed normally"
})
}
}
/**
* Drains elements from the stream until it completes normally. If a time-out or stream failure
* is encountered, the assertion will fail.
* @param timeOut maximum wait time for each element
* @return success if the stream completes normally
*/
def expectStreamCompletion(timeOut: FiniteDuration = defaultTimeOut): Assertion = {
sink.ensureSubscription().request(Int.MaxValue)
def findMatch(): SearchResult = {
try {
sink.expectEventWithTimeoutPF(timeOut, {
case OnNext(_) => findMatch()
case OnComplete => NotFound
case OnError(e) => StreamError(e)
case OnSubscribe(_) => findMatch()
})
} catch {
case _: AssertionError => TimedOut(timeOut)
}
}
findMatch() match {
case NotFound => Assertions.succeed
case noComplete => Assertions.fail(noComplete match {
case TimedOut(t) => s"no new event received before time-out ($t)"
case StreamError(e) => s"expected stream completion, but stream failed with exception $e"
})
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment