Commit 319e5f1e authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: FileParsingTaskEndTree emitted by TreeParserFlow now...

Integrated Pipeline: FileParsingTaskEndTree emitted by TreeParserFlow now includes the number of generated parsing tasks

-improved error messages given by findFirstMatchingStreamElement
parent 13b09f03
......@@ -41,4 +41,7 @@ case class FileParsingTask(
case class FileParsingTaskStartTree(treeTask: FileTreeScanTask) extends FileParsingTaskSignal
case class FileParsingTaskEndTree(treeTask: FileTreeScanTask) extends FileParsingTaskSignal
case class FileParsingTaskEndTree(
treeTask: FileTreeScanTask,
numParsingTasks: Long = 0
) extends FileParsingTaskSignal
......@@ -57,7 +57,7 @@ trait TreeParserFlow extends GraphStage[FlowShape[FileTreeScanTask, FileParsingT
generator.next()
else if (!emittedEnd) {
emittedEnd = true
FileParsingTaskEndTree(treeRequest)
FileParsingTaskEndTree(treeRequest, generator.getProcessedRequestCount)
} else
throw new NoSuchElementException("No more elements in this iterator")
}
......
......@@ -62,6 +62,15 @@ trait FileParsingTaskMatchers {
extractedPath(expectedValue.map(Paths.get(_)))
}
def numParsingTasks(expectedValue: Long) =
new HavePropertyMatcher[FileParsingTaskEndTree, Long] {
def apply(test: FileParsingTaskEndTree) = HavePropertyMatchResult(
test.numParsingTasks == expectedValue,
"number of identified candidate calculations",
expectedValue,
test.numParsingTasks
)
}
}
trait FileParsingResultMatchers {
......
......@@ -7,6 +7,7 @@ import org.scalatest.{ Assertion, Assertions }
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
trait StreamAssertions[T] {
......@@ -19,7 +20,7 @@ trait StreamAssertions[T] {
val sink: TestSubscriber.Probe[T]
val defaultTimeOut: FiniteDuration = streamTimeOut
private class FailureRecords {
private class FailureRecords[U <: T: ClassTag] {
private val failedTests = new ListBuffer[(T, Either[String, MatchResult])]()
def addResult(element: T, result: MatchResult): Unit = {
......@@ -31,18 +32,18 @@ trait StreamAssertions[T] {
}
def printFailures(endOfTestReason: SearchResult): String = {
val details = failedTests.toList.map {
case (elem, Right(result)) => s"- $elem : ${result.failureMessage}"
case (elem, Left(result)) => s"- $elem : $result"
val details = failedTests.toList.zipWithIndex.map {
case ((elem, Right(result)), i) => s" [$i] $elem : ${result.failureMessage}"
case ((elem, Left(result)), i) => s" [$i] $elem : $result"
}
val reason = endOfTestReason match {
val reason = (endOfTestReason match {
case TimedOut(t) => s"no new event received before time-out ($t)"
case StreamError(e) => s"stream failed with an exception $e"
case NotFound => "stream completed normally, but no match was found"
}
}) + s" [required class: '${implicitly[ClassTag[U]].runtimeClass}']"
details.size match {
case 0 => s"checked $reason\n<no stream elements received>"
case _ => s"checked $reason\n${details.mkString("\n")}"
case 0 => s"$reason\n<no stream elements received>"
case _ => s"$reason\nrecorded stream elements:\n${details.mkString("\n")}"
}
}
......@@ -104,7 +105,7 @@ trait StreamAssertions[T] {
findMatch()
}
case x =>
messages.addResult(x.asInstanceOf[T], s"wrong class, was ${x.getClass}")
messages.addResult(x.asInstanceOf[T], s"wrong class '${x.getClass}'")
findMatch()
}
case OnComplete => NotFound
......
......@@ -78,20 +78,14 @@ class TreeParserFlowSpec extends WordSpec with MockitoSugar with Matchers with T
"emit parse requests for all potential calculations in the file tree" in new Fixture {
source.sendNext(input1)
sink.request(Int.MaxValue)
val results = sink.receiveWithin(streamTimeOut).collect { case x: FileParsingTask => x }
results should have size 3
exactly(1, results) should have(relativePath("file1"))
exactly(1, results) should have(relativePath("file2"))
exactly(1, results) should have(relativePath("file3"))
findFirstMatchingStreamElement(have(relativePath("file1")))
findFirstMatchingStreamElement(have(relativePath("file2")))
findFirstMatchingStreamElement(have(relativePath("file3")))
}
"emit an end tree signal after sending all file parsing tasks" in new Fixture {
source.sendNext(input1)
sink.request(Int.MaxValue)
val result = sink.receiveWithin(streamTimeOut).last
result shouldBe a[FileParsingTaskEndTree]
result should have(treeTask(input1))
findFirstMatchingStreamElement(have(treeTask(input1), numParsingTasks(3)))
}
"emit all remaining values after the upstream element completed" in new Fixture {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment