diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala
index 170f57425f358da960de48e4d4a19c06be9ed420..3aa9d6d5afde0627528168bf7f3f3123f01b8a65 100644
--- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala
+++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala
@@ -26,60 +26,42 @@ trait FileTreeTaskMatchers {
 }
 
 trait FileParsingTaskMatchers {
-  def treeTask(expectedValue: FileTreeScanTask) =
-    new HavePropertyMatcher[FileParsingTaskSignal, FileTreeScanTask] {
-      def apply(test: FileParsingTaskSignal) =
-        HavePropertyMatchResult(
-          test.treeTask == expectedValue,
-          "parent tree task",
-          expectedValue,
-          test.treeTask
-        )
-    }
+  def treeTask(expectedValue: FileTreeScanTask) = new HavePropertyMatcher[FileParsingTaskSignal, FileTreeScanTask] {
+    def apply(test: FileParsingTaskSignal) =
+      HavePropertyMatchResult(
+        test.treeTask == expectedValue,
+        "parent tree task",
+        expectedValue,
+        test.treeTask
+      )
+  }
 
-  def relativePath(expectedValue: Path) =
-    new HavePropertyMatcher[FileParsingTaskSignal, Path] {
-      def apply(test: FileParsingTaskSignal) = test match {
-        case x: FileParsingTask => HavePropertyMatchResult(
-          x.relativePath == expectedValue,
-          "relative file path",
-          expectedValue,
-          x.relativePath
-        )
-        case wrongClass => HavePropertyMatchResult(
-          matches = false,
-          s"relative file path (wrong class ${wrongClass.getClass})",
-          expectedValue,
-          null
-        )
-      }
-    }
+  def relativePath(expectedValue: Path) = new HavePropertyMatcher[FileParsingTask, Path] {
+    def apply(test: FileParsingTask) = HavePropertyMatchResult(
+      test.relativePath == expectedValue,
+      "relative file path",
+      expectedValue,
+      test.relativePath
+    )
+  }
 
-  def relativePath(expectedValue: String): HavePropertyMatcher[FileParsingTaskSignal, Path] = {
+  def relativePath(expectedValue: String): HavePropertyMatcher[FileParsingTask, Path] = {
     relativePath(Paths.get(expectedValue))
   }
 
-  def extractedPath(expectedValue: Option[Path]) =
-    new HavePropertyMatcher[FileParsingTaskSignal, Option[Path]] {
-      def apply(test: FileParsingTaskSignal) = test match {
-        case x: FileParsingTask => HavePropertyMatchResult(
-          x.extractedPath == expectedValue,
-          "temporary extracted file path",
-          expectedValue,
-          x.extractedPath
-        )
-        case wrongClass => HavePropertyMatchResult(
-          matches = false,
-          s"relative file path (wrong class ${wrongClass.getClass})",
-          expectedValue,
-          null
-        )
-      }
-    }
+  def extractedPath(expectedValue: Option[Path]) = new HavePropertyMatcher[FileParsingTask, Option[Path]] {
+    def apply(test: FileParsingTask) = HavePropertyMatchResult(
+      test.extractedPath == expectedValue,
+      "temporary extracted file path",
+      expectedValue,
+      test.extractedPath
+    )
+  }
 
-  def extractedPathString(expectedValue: Option[String]): HavePropertyMatcher[FileParsingTaskSignal, Option[Path]] = {
+  def extractedPathString(expectedValue: Option[String]): HavePropertyMatcher[FileParsingTask, Option[Path]] = {
     extractedPath(expectedValue.map(Paths.get(_)))
   }
+
 }
 
 trait FileParsingResultMatchers {
diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala
index 1e5cd1aff5590345724dac2fe8ec4f915e7919ca..869e936ad39a39e2db0569b329043a12a5c2761b 100644
--- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala
+++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala
@@ -20,12 +20,21 @@ trait StreamAssertions[T] {
   val defaultTimeOut: FiniteDuration = streamTimeOut
 
   private class FailureRecords {
-    private val failedTests = new ListBuffer[(T, MatchResult)]()
+    private val failedTests = new ListBuffer[(T, Either[String, MatchResult])]()
 
-    def addResult(element: T, result: MatchResult): Unit = failedTests.append((element, result))
+    def addResult(element: T, result: MatchResult): Unit = {
+      failedTests.append((element, Right(result)))
+    }
+
+    def addResult(element: T, result: String): Unit = {
+      failedTests.append((element, Left(result)))
+    }
 
     def printFailures(endOfTestReason: SearchResult): String = {
-      val details = failedTests.toList.map(x => s"- ${x._1} : ${x._2}")
+      val details = failedTests.toList.map {
+        case (elem, Right(result)) => s"- $elem : ${result.failureMessage}"
+        case (elem, Left(result)) => s"- $elem : $result"
+      }
       val reason = endOfTestReason match {
         case TimedOut(t) => s"no new event received before time-out ($t)"
         case StreamError(e) => s"stream failed with an exception $e"
@@ -74,25 +83,30 @@ trait StreamAssertions[T] {
 
   /**
    * consumes elements from the stream until an element satisfies the given matcher. Fails if the
-   * stream completes without match or times out.
+   * stream completes without match or times out. If the matcher targets a subset of
    * @param test match criteria for the elements produced by the stream
    * @param timeOut maximum wait time for each element
    * @return success if the stream produced a matching element
    */
-  def findFirstMatchingStreamElement(test: Matcher[T], timeOut: FiniteDuration = defaultTimeOut): Assertion = {
+  def findFirstMatchingStreamElement[U <: T: Manifest](test: Matcher[U], timeOut: FiniteDuration = defaultTimeOut): Assertion = {
     sink.ensureSubscription().request(Int.MaxValue)
     val messages = new FailureRecords
     def findMatch(): SearchResult = {
       try {
         sink.expectEventWithTimeoutPF(timeOut, {
-          case OnNext(element: T) =>
-            val result = test(element)
-            if (result.matches) {
-              Found
-            } else {
-              messages.addResult(element, result)
+          case OnNext(element) => element match {
+            case x: U =>
+              val result = test(x)
+              if (result.matches) {
+                Found
+              } else {
+                messages.addResult(x, result)
+                findMatch()
+              }
+            case x =>
+              messages.addResult(x.asInstanceOf[T], s"wrong class, was ${x.getClass}")
               findMatch()
-            }
+          }
           case OnComplete => NotFound
           case OnError(e) => StreamError(e)
           case OnSubscribe(_) => findMatch()
diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/TreeParserFlowSpec.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/TreeParserFlowSpec.scala
index 5a0b48f2f2a796385a53aec69c17c0baeaf5a5dd..c7c86aebb7e62c434c6c69150b33f1214693d2da 100644
--- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/TreeParserFlowSpec.scala
+++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/TreeParserFlowSpec.scala
@@ -65,11 +65,6 @@ class TreeParserFlowSpec extends WordSpec with MockitoSugar with Matchers with T
     val input1 = aFileTreeScanTask().withBasePath(path1).build()
     val input2 = aFileTreeScanTask().withBasePath(path2).build()
 
-    def sampleInput(baseName: String) = FileTreeScanTask(
-      treeBasePath = Paths.get(s"/foo/$baseName"),
-      treeType = TreeType.Zip
-    )
-
     val (probeInput, probeOutput) = testGraph.run()
   }