diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/CalculationParsingFlowSpec.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/CalculationParsingFlowSpec.scala index f5a9799e82eecd8f1788a3553e5e8ad27520f6b9..cad339dfbd675fdedf7dcb589372c401121c001e 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/CalculationParsingFlowSpec.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/CalculationParsingFlowSpec.scala @@ -77,18 +77,15 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat f.findFirstMatchingStreamElement(be(signal)) } - "emit parsing results for every incoming parsing request in order of input" in new Fixture(1) { - source.sendNext(input1).sendNext(input2).sendNext(input3) - sink.ensureSubscription().request(4) - val results = sink.expectNextN(3) - results.zipWithIndex.map { entry => - assert(entry._1.isInstanceOf[InMemoryResult], "result should be in memory") - val inMemory = entry._1.asInstanceOf[InMemoryResult] - assert( - inMemory.task.relativePath == Paths.get(s"file${entry._2 + 1}"), - "files are out of order" - ) - } + "emit parsing results for every incoming parsing request in order of input" in { + val f = new Fixture(1) + f.source.sendNext(f.input1).sendNext(f.input2).sendNext(f.input3) + f.findMatchingStreamElementsOrdered(Seq( + have(relativePath("file1")), + have(relativePath("file2")), + have(relativePath("file3")) + )) + } "forward file tree finished signals unchanged" in { diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala index b485fee14518c80b64dd9a044c9384189109786e..fce7320274cbf8fe8a9e3bbeeda09b4b7ed7d5c4 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala @@ -97,4 +97,11 @@ trait FileParsingResultMatchers { def relativePath(expectedValue: String): HavePropertyMatcher[FileParsingResult, Path] = relativePath(Paths.get(expectedValue)) + + def numParsingTasks(expectedValue: Long): HavePropertyMatcher[FileParsingSignalEndTree, Long] = + Helpers.propertyMatcher( + propertyName = "number of identified candidate calculations", + expected = expectedValue, + test = (x: FileParsingSignalEndTree) => x.numParsingTasks + ) } \ No newline at end of file diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala index 0a99f563342ed68206e53d1eba9df5305cc8885b..63d29efb7dfbff111ea83420e747b96f4540d47d 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala @@ -5,6 +5,7 @@ import akka.stream.testkit.TestSubscriber.{ OnComplete, OnError, OnNext, OnSubsc import org.scalatest.matchers.{ MatchResult, Matcher } import org.scalatest.{ Assertion, Assertions } +import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag @@ -20,32 +21,45 @@ trait StreamAssertions[T] { val sink: TestSubscriber.Probe[T] val defaultTimeOut: FiniteDuration = streamTimeOut - private class FailureRecords[U <: T: ClassTag] { - private val failedTests = new ListBuffer[(T, Either[String, MatchResult])]() + private case class FailureReport( + element: T, + index: Long, + reason: Either[String, MatchResult] + ) - def addResult(element: T, result: MatchResult): Unit = { - failedTests.append((element, Right(result))) + private class FailureRecords[U <: T](private val tests: Seq[Matcher[U]]) { + private val failedTests = mutable.Map[Matcher[U], ListBuffer[FailureReport]]() + tests.foreach { x => failedTests(x) = new ListBuffer() } + + def addResult(test: Matcher[U], element: T, index: Long, result: MatchResult): Unit = { + failedTests.get(test).foreach(_.append(FailureReport(element, index, Right(result)))) } - def addResult(element: T, result: String): Unit = { - failedTests.append((element, Left(result))) + def addResult(test: Matcher[U], element: T, index: Long, result: String): Unit = { + failedTests.get(test).foreach(_.append(FailureReport(element, index, Left(result)))) } + def succeedTest(test: Matcher[U]): Unit = failedTests.remove(test) + def printFailures(endOfTestReason: SearchResult): String = { - val details = failedTests.toList.zipWithIndex.map { - case ((elem, Right(result)), i) => s" [$i] $elem : ${result.failureMessage}" - case ((elem, Left(result)), i) => s" [$i] $elem : $result" + val details = tests.filter(failedTests.contains).map { test => + val entry = failedTests(test) + val title = s"unsatisfied match: ${test.toString()}" + val failures = entry.toList.map { + case FailureReport(elem, i, Right(result)) => s" [$i] $elem : ${result.failureMessage}" + case FailureReport(elem, i, Left(result)) => s" [$i] $elem : $result" + } + failures.size match { + case 0 => s"$title\n <no stream elements received>" + case _ => s"$title\n${failures.mkString("\n")}" + } } - val reason = (endOfTestReason match { + val reason = endOfTestReason match { case TimedOut(t) => s"no new event received before time-out ($t)" case StreamError(e) => s"stream failed with an exception $e" case NotFound => "stream completed normally, but no match was found" - }) + s" [required class: '${implicitly[ClassTag[U]].runtimeClass}']" - details.size match { - case 0 => s"$reason\n<no stream elements received>" - case _ => s"$reason\nrecorded stream elements:\n${details.mkString("\n")}" } - + s"$reason\n${details.mkString("\n")}" } } @@ -78,15 +92,15 @@ trait StreamAssertions[T] { /** * consumes elements from the stream until an element satisfies the given matcher. Fails if the - * stream completes without match or times out. If the matcher targets a subset of - * @param test match criteria for the elements produced by the stream + * stream completes without match or times out. + * @param test match criteria for the elements produced by the stream (can target subclasses) * @param timeOut maximum wait time for each element * @return success if the stream produced a matching element */ - def findFirstMatchingStreamElement[U <: T: Manifest](test: Matcher[U], timeOut: FiniteDuration = defaultTimeOut): Assertion = { + def findFirstMatchingStreamElement[U <: T: ClassTag](test: Matcher[U], timeOut: FiniteDuration = defaultTimeOut): Assertion = { sink.ensureSubscription().request(Int.MaxValue) - val messages = new FailureRecords - def findMatch(): SearchResult = { + val messages = new FailureRecords(Seq(test)) + def findMatch(index: Long): SearchResult = { try { sink.expectEventWithTimeoutPF(timeOut, { case OnNext(element) => element match { @@ -95,22 +109,71 @@ trait StreamAssertions[T] { if (result.matches) { Found } else { - messages.addResult(x, result) - findMatch() + messages.addResult(test, x, index, result) + findMatch(index + 1) } case x => - messages.addResult(x.asInstanceOf[T], s"wrong class '${x.getClass}'") - findMatch() + messages.addResult(test, x.asInstanceOf[T], index, s"wrong class '${x.getClass}'") + findMatch(index + 1) } case OnComplete => NotFound case OnError(e) => StreamError(e) - case OnSubscribe(_) => findMatch() + case OnSubscribe(_) => findMatch(index) }) } catch { case _: AssertionError => TimedOut(timeOut) } } - findMatch() match { + findMatch(0) match { + case Found => Assertions.succeed + case failure => Assertions.fail(messages.printFailures(failure)) + } + } + + /** + * Consume elements from the stream until a matching element is found for each given matcher. + * Each stream element will be used to satisfy at most one matcher. The stream must produce the + * matching elements in the same order as the provided matcher list to satisfy the assertion. + * Fails if the stream completes without satisfying all tests or times out. + * @param tests match criteria for the elements produced by the stream (can target subclasses) + * @param timeOut maximum wait time for each element + * @return success if the stream produced a matching element + */ + def findMatchingStreamElementsOrdered[U <: T: Manifest](tests: Seq[Matcher[U]], timeOut: FiniteDuration = defaultTimeOut): Assertion = { + sink.ensureSubscription().request(Int.MaxValue) + val messages = new FailureRecords(tests) + var pendingTests = tests + def findMatch(index: Long): SearchResult = { + try { + sink.expectEventWithTimeoutPF(timeOut, { + case OnNext(element) => element match { + case elem: U => + val test = pendingTests.head + val result = test(elem) + if (result.matches) { + pendingTests = pendingTests.tail + messages.succeedTest(test) + } else { + messages.addResult(test, elem, index, result) + } + if (pendingTests.isEmpty) { + Found + } else { + findMatch(index + 1) + } + case x => + messages.addResult(pendingTests.head, x.asInstanceOf[T], index, s"wrong class '${x.getClass}'") + findMatch(index + 1) + } + case OnComplete => NotFound + case OnError(e) => StreamError(e) + case OnSubscribe(_) => findMatch(index) + }) + } catch { + case _: AssertionError => TimedOut(timeOut) + } + } + findMatch(0) match { case Found => Assertions.succeed case failure => Assertions.fail(messages.printFailures(failure)) }