diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala index d38c556132e2113c4cd05b7c64bd91e17147bd10..acc8d4cf90bd63f1aa5d683f557b71ce875b1560 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala @@ -38,7 +38,7 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB ClosedShape } ) - val (probeInput, probe) = testGraph.run() + val (source, sink) = testGraph.run() val treeTask = aFileTreeScanTask().withTreeType(TreeType.Directory).build() } @@ -47,52 +47,63 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB "forward start file tree signals unchanged" in { val f = new DirectoryTreeFixture() val task = aFileParsingTaskStartTree().withTreeTask(f.treeTask).build() - f.probeInput.sendNext(task) + f.source.sendNext(task) f.findFirstMatchingStreamElement(be(task)) + verifyZeroInteractions(f.archiveHandler) } "forward file parsing tasks unchanged" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) val task = aFileParsingTask().withTreeTask(f.treeTask).build() - f.probeInput.sendNext(task) + f.source.sendNext(task) f.findFirstMatchingStreamElement(be(task)) + verifyZeroInteractions(f.archiveHandler) + } + + "forward end file tree signals unchanged" in { + val f = new DirectoryTreeFixture() + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + val task = aFileParsingTaskEndTree().withTreeTask(f.treeTask).build() + f.source.sendNext(task) + f.findFirstMatchingStreamElement(be(task)) + verifyZeroInteractions(f.archiveHandler) } } "receiving signals with an arbitrary file tree type" should { "fail if file parsing tasks arrive before the start tree signal" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTask().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTask().withTreeTask(f.treeTask).build()) f.expectStreamFailure(be(an[IllegalArgumentException])) } "fail if file tree end signal arrive before the start tree signal" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) f.expectStreamFailure(be(an[IllegalArgumentException])) } "fail if file parsing task arrives after the end tree signal" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) - f.probeInput.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) - f.probeInput.sendNext(aFileParsingTask().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTask().withTreeTask(f.treeTask).build()) f.expectStreamFailure(be(an[IllegalArgumentException])) } "fail if a file tree start signal arrives more than once" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) - f.probeInput.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) f.expectStreamFailure(be(an[IllegalArgumentException])) } "fail if a file tree end signal arrives more than once" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) - f.probeInput.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) - f.probeInput.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) f.expectStreamFailure(be(an[IllegalArgumentException])) } } diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala index 9b9f10cfe48e543ee1c15b2b4328925979581504..4a9619f464ef75145921e9d3a4420c00b5ab0b32 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala @@ -16,7 +16,7 @@ trait StreamAssertions[T] { private case object NotFound extends SearchResult private case class StreamError(e: Throwable) extends SearchResult - val probe: TestSubscriber.Probe[T] + val sink: TestSubscriber.Probe[T] val defaultTimeOut: FiniteDuration = streamTimeOut private class FailureRecords { @@ -46,11 +46,11 @@ trait StreamAssertions[T] { } def findFirstMatchingStreamElement(test: Matcher[T], timeOut: FiniteDuration = defaultTimeOut): Assertion = { - probe.ensureSubscription().request(Int.MaxValue) + sink.ensureSubscription().request(Int.MaxValue) val messages = new FailureRecords def findMatch(): SearchResult = { try { - probe.expectEventWithTimeoutPF(timeOut, { + sink.expectEventWithTimeoutPF(timeOut, { case OnNext(element: T) => val result = test(element) if (result.matches) { @@ -74,10 +74,10 @@ trait StreamAssertions[T] { } def expectStreamFailure(test: Matcher[Throwable], timeOut: FiniteDuration = defaultTimeOut): Assertion = { - probe.ensureSubscription().request(Int.MaxValue) + sink.ensureSubscription().request(Int.MaxValue) def findMatch(): SearchResult = { try { - probe.expectEventWithTimeoutPF(timeOut, { + sink.expectEventWithTimeoutPF(timeOut, { case OnNext(_) => findMatch() case OnComplete => NotFound case OnError(e) => StreamError(e)