From aebe99f35fdcda74955d7b3e17af54ec0e3188af Mon Sep 17 00:00:00 2001 From: Arvid Ihrig <ihrig@fhi-berlin.mpg.de> Date: Thu, 28 Jun 2018 16:36:22 +0200 Subject: [PATCH] Integrated Pipeline: tuned directory file tree tests for ArchiveUnpackingFlow --- .../ArchiveUnpackingFlowSpec.scala | 39 ++++++++++++------- .../StreamAssertions.scala | 10 ++--- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala index d38c5561..acc8d4cf 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala @@ -38,7 +38,7 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB ClosedShape } ) - val (probeInput, probe) = testGraph.run() + val (source, sink) = testGraph.run() val treeTask = aFileTreeScanTask().withTreeType(TreeType.Directory).build() } @@ -47,52 +47,63 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB "forward start file tree signals unchanged" in { val f = new DirectoryTreeFixture() val task = aFileParsingTaskStartTree().withTreeTask(f.treeTask).build() - f.probeInput.sendNext(task) + f.source.sendNext(task) f.findFirstMatchingStreamElement(be(task)) + verifyZeroInteractions(f.archiveHandler) } "forward file parsing tasks unchanged" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) val task = aFileParsingTask().withTreeTask(f.treeTask).build() - f.probeInput.sendNext(task) + f.source.sendNext(task) f.findFirstMatchingStreamElement(be(task)) + verifyZeroInteractions(f.archiveHandler) + } + + "forward end file tree signals unchanged" in { + val f = new DirectoryTreeFixture() + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + val task = aFileParsingTaskEndTree().withTreeTask(f.treeTask).build() + f.source.sendNext(task) + f.findFirstMatchingStreamElement(be(task)) + verifyZeroInteractions(f.archiveHandler) } } "receiving signals with an arbitrary file tree type" should { "fail if file parsing tasks arrive before the start tree signal" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTask().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTask().withTreeTask(f.treeTask).build()) f.expectStreamFailure(be(an[IllegalArgumentException])) } "fail if file tree end signal arrive before the start tree signal" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) f.expectStreamFailure(be(an[IllegalArgumentException])) } "fail if file parsing task arrives after the end tree signal" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) - f.probeInput.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) - f.probeInput.sendNext(aFileParsingTask().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTask().withTreeTask(f.treeTask).build()) f.expectStreamFailure(be(an[IllegalArgumentException])) } "fail if a file tree start signal arrives more than once" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) - f.probeInput.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) f.expectStreamFailure(be(an[IllegalArgumentException])) } "fail if a file tree end signal arrives more than once" in { val f = new DirectoryTreeFixture - f.probeInput.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) - f.probeInput.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) - f.probeInput.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) + f.source.sendNext(aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()) f.expectStreamFailure(be(an[IllegalArgumentException])) } } diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala index 9b9f10cf..4a9619f4 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala @@ -16,7 +16,7 @@ trait StreamAssertions[T] { private case object NotFound extends SearchResult private case class StreamError(e: Throwable) extends SearchResult - val probe: TestSubscriber.Probe[T] + val sink: TestSubscriber.Probe[T] val defaultTimeOut: FiniteDuration = streamTimeOut private class FailureRecords { @@ -46,11 +46,11 @@ trait StreamAssertions[T] { } def findFirstMatchingStreamElement(test: Matcher[T], timeOut: FiniteDuration = defaultTimeOut): Assertion = { - probe.ensureSubscription().request(Int.MaxValue) + sink.ensureSubscription().request(Int.MaxValue) val messages = new FailureRecords def findMatch(): SearchResult = { try { - probe.expectEventWithTimeoutPF(timeOut, { + sink.expectEventWithTimeoutPF(timeOut, { case OnNext(element: T) => val result = test(element) if (result.matches) { @@ -74,10 +74,10 @@ trait StreamAssertions[T] { } def expectStreamFailure(test: Matcher[Throwable], timeOut: FiniteDuration = defaultTimeOut): Assertion = { - probe.ensureSubscription().request(Int.MaxValue) + sink.ensureSubscription().request(Int.MaxValue) def findMatch(): SearchResult = { try { - probe.expectEventWithTimeoutPF(timeOut, { + sink.expectEventWithTimeoutPF(timeOut, { case OnNext(_) => findMatch() case OnComplete => NotFound case OnError(e) => StreamError(e) -- GitLab