Skip to content
Snippets Groups Projects
Commit 2239775a authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: added stream assertion to test for elements in order

parent 16d06138
Branches
Tags
No related merge requests found
......@@ -77,18 +77,15 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat
f.findFirstMatchingStreamElement(be(signal))
}
"emit parsing results for every incoming parsing request in order of input" in new Fixture(1) {
source.sendNext(input1).sendNext(input2).sendNext(input3)
sink.ensureSubscription().request(4)
val results = sink.expectNextN(3)
results.zipWithIndex.map { entry =>
assert(entry._1.isInstanceOf[InMemoryResult], "result should be in memory")
val inMemory = entry._1.asInstanceOf[InMemoryResult]
assert(
inMemory.task.relativePath == Paths.get(s"file${entry._2 + 1}"),
"files are out of order"
)
}
"emit parsing results for every incoming parsing request in order of input" in {
val f = new Fixture(1)
f.source.sendNext(f.input1).sendNext(f.input2).sendNext(f.input3)
f.findMatchingStreamElementsOrdered(Seq(
have(relativePath("file1")),
have(relativePath("file2")),
have(relativePath("file3"))
))
}
"forward file tree finished signals unchanged" in {
......
......@@ -97,4 +97,11 @@ trait FileParsingResultMatchers {
def relativePath(expectedValue: String): HavePropertyMatcher[FileParsingResult, Path] =
relativePath(Paths.get(expectedValue))
def numParsingTasks(expectedValue: Long): HavePropertyMatcher[FileParsingSignalEndTree, Long] =
Helpers.propertyMatcher(
propertyName = "number of identified candidate calculations",
expected = expectedValue,
test = (x: FileParsingSignalEndTree) => x.numParsingTasks
)
}
\ No newline at end of file
......@@ -5,6 +5,7 @@ import akka.stream.testkit.TestSubscriber.{ OnComplete, OnError, OnNext, OnSubsc
import org.scalatest.matchers.{ MatchResult, Matcher }
import org.scalatest.{ Assertion, Assertions }
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
......@@ -20,32 +21,45 @@ trait StreamAssertions[T] {
val sink: TestSubscriber.Probe[T]
val defaultTimeOut: FiniteDuration = streamTimeOut
private class FailureRecords[U <: T: ClassTag] {
private val failedTests = new ListBuffer[(T, Either[String, MatchResult])]()
private case class FailureReport(
element: T,
index: Long,
reason: Either[String, MatchResult]
)
def addResult(element: T, result: MatchResult): Unit = {
failedTests.append((element, Right(result)))
private class FailureRecords[U <: T](private val tests: Seq[Matcher[U]]) {
private val failedTests = mutable.Map[Matcher[U], ListBuffer[FailureReport]]()
tests.foreach { x => failedTests(x) = new ListBuffer() }
def addResult(test: Matcher[U], element: T, index: Long, result: MatchResult): Unit = {
failedTests.get(test).foreach(_.append(FailureReport(element, index, Right(result))))
}
def addResult(element: T, result: String): Unit = {
failedTests.append((element, Left(result)))
def addResult(test: Matcher[U], element: T, index: Long, result: String): Unit = {
failedTests.get(test).foreach(_.append(FailureReport(element, index, Left(result))))
}
def succeedTest(test: Matcher[U]): Unit = failedTests.remove(test)
def printFailures(endOfTestReason: SearchResult): String = {
val details = failedTests.toList.zipWithIndex.map {
case ((elem, Right(result)), i) => s" [$i] $elem : ${result.failureMessage}"
case ((elem, Left(result)), i) => s" [$i] $elem : $result"
val details = tests.filter(failedTests.contains).map { test =>
val entry = failedTests(test)
val title = s"unsatisfied match: ${test.toString()}"
val failures = entry.toList.map {
case FailureReport(elem, i, Right(result)) => s" [$i] $elem : ${result.failureMessage}"
case FailureReport(elem, i, Left(result)) => s" [$i] $elem : $result"
}
failures.size match {
case 0 => s"$title\n <no stream elements received>"
case _ => s"$title\n${failures.mkString("\n")}"
}
}
val reason = (endOfTestReason match {
val reason = endOfTestReason match {
case TimedOut(t) => s"no new event received before time-out ($t)"
case StreamError(e) => s"stream failed with an exception $e"
case NotFound => "stream completed normally, but no match was found"
}) + s" [required class: '${implicitly[ClassTag[U]].runtimeClass}']"
details.size match {
case 0 => s"$reason\n<no stream elements received>"
case _ => s"$reason\nrecorded stream elements:\n${details.mkString("\n")}"
}
s"$reason\n${details.mkString("\n")}"
}
}
......@@ -78,15 +92,15 @@ trait StreamAssertions[T] {
/**
* consumes elements from the stream until an element satisfies the given matcher. Fails if the
* stream completes without match or times out. If the matcher targets a subset of
* @param test match criteria for the elements produced by the stream
* stream completes without match or times out.
* @param test match criteria for the elements produced by the stream (can target subclasses)
* @param timeOut maximum wait time for each element
* @return success if the stream produced a matching element
*/
def findFirstMatchingStreamElement[U <: T: Manifest](test: Matcher[U], timeOut: FiniteDuration = defaultTimeOut): Assertion = {
def findFirstMatchingStreamElement[U <: T: ClassTag](test: Matcher[U], timeOut: FiniteDuration = defaultTimeOut): Assertion = {
sink.ensureSubscription().request(Int.MaxValue)
val messages = new FailureRecords
def findMatch(): SearchResult = {
val messages = new FailureRecords(Seq(test))
def findMatch(index: Long): SearchResult = {
try {
sink.expectEventWithTimeoutPF(timeOut, {
case OnNext(element) => element match {
......@@ -95,22 +109,71 @@ trait StreamAssertions[T] {
if (result.matches) {
Found
} else {
messages.addResult(x, result)
findMatch()
messages.addResult(test, x, index, result)
findMatch(index + 1)
}
case x =>
messages.addResult(x.asInstanceOf[T], s"wrong class '${x.getClass}'")
findMatch()
messages.addResult(test, x.asInstanceOf[T], index, s"wrong class '${x.getClass}'")
findMatch(index + 1)
}
case OnComplete => NotFound
case OnError(e) => StreamError(e)
case OnSubscribe(_) => findMatch()
case OnSubscribe(_) => findMatch(index)
})
} catch {
case _: AssertionError => TimedOut(timeOut)
}
}
findMatch() match {
findMatch(0) match {
case Found => Assertions.succeed
case failure => Assertions.fail(messages.printFailures(failure))
}
}
/**
* Consume elements from the stream until a matching element is found for each given matcher.
* Each stream element will be used to satisfy at most one matcher. The stream must produce the
* matching elements in the same order as the provided matcher list to satisfy the assertion.
* Fails if the stream completes without satisfying all tests or times out.
* @param tests match criteria for the elements produced by the stream (can target subclasses)
* @param timeOut maximum wait time for each element
* @return success if the stream produced a matching element
*/
def findMatchingStreamElementsOrdered[U <: T: Manifest](tests: Seq[Matcher[U]], timeOut: FiniteDuration = defaultTimeOut): Assertion = {
sink.ensureSubscription().request(Int.MaxValue)
val messages = new FailureRecords(tests)
var pendingTests = tests
def findMatch(index: Long): SearchResult = {
try {
sink.expectEventWithTimeoutPF(timeOut, {
case OnNext(element) => element match {
case elem: U =>
val test = pendingTests.head
val result = test(elem)
if (result.matches) {
pendingTests = pendingTests.tail
messages.succeedTest(test)
} else {
messages.addResult(test, elem, index, result)
}
if (pendingTests.isEmpty) {
Found
} else {
findMatch(index + 1)
}
case x =>
messages.addResult(pendingTests.head, x.asInstanceOf[T], index, s"wrong class '${x.getClass}'")
findMatch(index + 1)
}
case OnComplete => NotFound
case OnError(e) => StreamError(e)
case OnSubscribe(_) => findMatch(index)
})
} catch {
case _: AssertionError => TimedOut(timeOut)
}
}
findMatch(0) match {
case Found => Assertions.succeed
case failure => Assertions.fail(messages.printFailures(failure))
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment