diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/CalculationParsingFlowSpec.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/CalculationParsingFlowSpec.scala index 8a3e926c635be467bbaaa20f808ebb7b4eed17c5..f5a9799e82eecd8f1788a3553e5e8ad27520f6b9 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/CalculationParsingFlowSpec.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/CalculationParsingFlowSpec.scala @@ -9,7 +9,6 @@ import eu.nomad_lab.TreeType import eu.nomad_lab.integrated_pipeline.messages._ import eu.nomad_lab.integrated_pipeline.stream_components._ import eu.nomad_lab.parsers.ParseResult -import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -71,10 +70,11 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat "a CalculationParsingFlow" when { "managing a single worker" should { - "forward a received file tree started signal" in new Fixture(1) { - source.sendNext(FileParsingSignalStartTree(sampleTreeScan)) - sink.ensureSubscription().request(1) - sink.expectNext(FileParsingSignalStartTree(sampleTreeScan)) + "forward file tree started signals unchanged" in { + val f = new Fixture(1) + val signal = aFileParsingSignalStartTree().build() + f.source.sendNext(signal) + f.findFirstMatchingStreamElement(be(signal)) } "emit parsing results for every incoming parsing request in order of input" in new Fixture(1) { @@ -91,55 +91,55 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat } } - "forward file tree finished signals unchanged" in new Fixture(1) { - source.sendNext(FileParsingSignalEndTree(sampleTreeScan)) - sink.ensureSubscription().request(1) - sink.expectNext(FileParsingSignalEndTree(sampleTreeScan)) + "forward file tree finished signals unchanged" in { + val f = new Fixture(1) + val signal = aFileParsingSignalEndTree().build() + f.source.sendNext(signal) + f.findFirstMatchingStreamElement(be(signal)) } - "complete the stage if upstream signals completion before all elements are processed" in new Fixture(1) { - source.sendNext(input1).sendNext(input2).sendNext(input3).sendComplete() - sink.ensureSubscription().request(4) - sink.expectNextN(3) - sink.expectComplete() + "complete the stage if upstream signals completion before all elements are processed" in { + val f = new Fixture(1) + (1 to 3).foreach(_ => f.source.sendNext(aFileParsingTask())) + f.source.sendComplete() + f.expectStreamCompletion() } - "complete the stage if upstream signals completion after all elements are processed" in new Fixture(1) { - source.sendNext(input1).sendNext(input2).sendNext(input3) - sink.ensureSubscription().request(4) - sink.expectNextN(3) - source.sendComplete() - sink.expectComplete() + "complete the stage if upstream signals completion after all elements are processed" in { + val f = new Fixture(1) + (1 to 3).foreach(_ => f.source.sendNext(aFileParsingTask())) + f.drainStream() + f.source.sendComplete() + f.expectStreamCompletion() } - "specify the location of the main file to the parsing engine when handling directories" in new Fixture(1) { - val task = aFileParsingTask().withTreeTask(sampleTreeScan).withRelativePath("magic").build() - source.sendNext(task) - findFirstMatchingStreamElement(have(relativePath("magic"), status(ParseResult.ParseSuccess))) - val expectedPath = sampleTreeScan.treeBasePath.resolve("magic") - verify(dummyWorkers.head).processRequest(any(), ArgumentMatchers.eq(expectedPath)) + "specify the location of the main file to the parsing engine when handling directories" in { + val f = new Fixture(1) + val treeTask = aFileTreeScanTask().withTreeType(TreeType.Directory).withBasePath("/dir/bla") + val task = aFileParsingTask().withTreeTask(treeTask).withRelativePath("magic").build() + f.source.sendNext(task) + f.drainStream() + val expectedPath = Paths.get("/dir/bla/magic") + verify(f.dummyWorkers.head).processRequest(task, expectedPath) } - "specify the temporarily extracted main file to the parsing engine when handling zip archives" in new Fixture(1) { - val zipTreeTask = aFileTreeScanTask().withTreeType(TreeType.Zip).build() + "specify the temporarily extracted main file to the parsing engine when handling zip archives" in { + val f = new Fixture(1) + val treeTask = aFileTreeScanTask().withTreeType(TreeType.Zip) val extractedPath = Paths.get("/tmp/extracted/magic") - val task = aFileParsingTask().withTreeTask(zipTreeTask).withRelativePath("magic"). - withExtractedPath(Some(extractedPath)).build() - source.sendNext(task) - findFirstMatchingStreamElement(have(relativePath("magic"), status(ParseResult.ParseSuccess))) - verify(dummyWorkers.head).processRequest(task, extractedPath) + val task = aFileParsingTask().withTreeTask(treeTask).withExtractedPath(Some(extractedPath)).build() + f.source.sendNext(task) + f.drainStream() + verify(f.dummyWorkers.head).processRequest(task, extractedPath) } - "gracefully fail parsing requests with unknown or not supported file tree types" in new Fixture(1) { - sink.ensureSubscription().request(6) + "gracefully fail parsing requests with unknown or not supported file tree types" in { + val f = new Fixture(1) Seq(TreeType.Unknown, TreeType.File, TreeType.Tar).foreach { treeType => - val anotherTreeScan = sampleTreeScan.copy(treeType = treeType) - val anotherRequest = input1.copy(treeTask = anotherTreeScan) - source.sendNext(anotherRequest) - val result = sink.expectNext() - assert(result.isInstanceOf[InMemoryResult], "results should have been kept in memory") - val inMemory = result.asInstanceOf[InMemoryResult] - assert(inMemory.result == ParseResult.ParseFailure, "should have a failed status") + val treeScan = aFileTreeScanTask().withTreeType(treeType) + val request = aFileParsingTask().withTreeTask(treeScan) + f.source.sendNext(request) + f.findFirstMatchingStreamElement(have(status(ParseResult.ParseFailure))) } } diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala index 1bba943732d4a57b0ee362a6161f1b98fcc39e30..0a99f563342ed68206e53d1eba9df5305cc8885b 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala @@ -49,12 +49,6 @@ trait StreamAssertions[T] { } } - private def errorSearchTerminationReason(end: SearchResult): String = end match { - case TimedOut(t) => s"no new event received before time-out ($t)" - case Found => s"expected stream failure, but stream completed normally" - case NotFound => "expected stream failure, but stream completed normally" - } - /** * drain elements from the stream until it completes or times out. This function performs no * tests on the drained element and is intended to be used when testing side-effects of the @@ -151,7 +145,41 @@ trait StreamAssertions[T] { } else { Assertions.fail(result.failureMessage) } - case noFailure => Assertions.fail(errorSearchTerminationReason(noFailure)) + case noFailure => Assertions.fail(noFailure match { + case TimedOut(t) => s"no new event received before time-out ($t)" + case NotFound => "expected stream failure, but stream completed normally" + }) + } + } + + /** + * Drains elements from the stream until it completes normally. If a time-out or stream failure + * is encountered, the assertion will fail. + * @param timeOut maximum wait time for each element + * @return success if the stream completes normally + */ + def expectStreamCompletion(timeOut: FiniteDuration = defaultTimeOut): Assertion = { + sink.ensureSubscription().request(Int.MaxValue) + + def findMatch(): SearchResult = { + try { + sink.expectEventWithTimeoutPF(timeOut, { + case OnNext(_) => findMatch() + case OnComplete => NotFound + case OnError(e) => StreamError(e) + case OnSubscribe(_) => findMatch() + }) + } catch { + case _: AssertionError => TimedOut(timeOut) + } + } + + findMatch() match { + case NotFound => Assertions.succeed + case noComplete => Assertions.fail(noComplete match { + case TimedOut(t) => s"no new event received before time-out ($t)" + case StreamError(e) => s"expected stream completion, but stream failed with exception $e" + }) } }