StreamAssertions.scala 9.38 KB
Newer Older
1
package eu.nomad_lab.integrated_pipeline_tests
2
3
4
5
6
7

import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.TestSubscriber.{ OnComplete, OnError, OnNext, OnSubscribe }
import org.scalatest.matchers.{ MatchResult, Matcher }
import org.scalatest.{ Assertion, Assertions }

8
import scala.collection.mutable
9
10
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.FiniteDuration
11
import scala.reflect.ClassTag
12
13
14
15
16
17
18
19
20

trait StreamAssertions[T] {

  private trait SearchResult
  private case object Found extends SearchResult
  private case class TimedOut(t: FiniteDuration) extends SearchResult
  private case object NotFound extends SearchResult
  private case class StreamError(e: Throwable) extends SearchResult

21
  val sink: TestSubscriber.Probe[T]
22
23
  val defaultTimeOut: FiniteDuration = streamTimeOut

24
25
26
27
28
  private case class FailureReport(
    element: T,
    index: Long,
    reason: Either[String, MatchResult]
  )
29

30
31
32
33
34
35
  private class FailureRecords[U <: T](private val tests: Seq[Matcher[U]]) {
    private val failedTests = mutable.Map[Matcher[U], ListBuffer[FailureReport]]()
    tests.foreach { x => failedTests(x) = new ListBuffer() }

    def addResult(test: Matcher[U], element: T, index: Long, result: MatchResult): Unit = {
      failedTests.get(test).foreach(_.append(FailureReport(element, index, Right(result))))
36
37
    }

38
39
    def addResult(test: Matcher[U], element: T, index: Long, result: String): Unit = {
      failedTests.get(test).foreach(_.append(FailureReport(element, index, Left(result))))
40
    }
41

42
43
    def succeedTest(test: Matcher[U]): Unit = failedTests.remove(test)

44
    def printFailures(endOfTestReason: SearchResult): String = {
45
46
47
48
49
50
51
52
53
54
55
      val details = tests.filter(failedTests.contains).map { test =>
        val entry = failedTests(test)
        val title = s"unsatisfied match: ${test.toString()}"
        val failures = entry.toList.map {
          case FailureReport(elem, i, Right(result)) => s"  [$i] $elem : ${result.failureMessage}"
          case FailureReport(elem, i, Left(result)) => s"  [$i] $elem : $result"
        }
        failures.size match {
          case 0 => s"$title\n  <no stream elements received>"
          case _ => s"$title\n${failures.mkString("\n")}"
        }
56
      }
57
      val reason = endOfTestReason match {
58
59
60
61
        case TimedOut(t) => s"no new event received before time-out ($t)"
        case StreamError(e) => s"stream failed with an exception $e"
        case NotFound => "stream completed normally, but no match was found"
      }
62
      s"$reason\n${details.mkString("\n")}"
63
64
65
    }
  }

66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
  /**
   * drain elements from the stream until it completes or times out. This function performs no
   * tests on the drained element and is intended to be used when testing side-effects of the
   * stream processing.
   * @param timeOut maximum wait time for each element
   * @return success if the stream completes or times out
   */
  def drainStream(timeOut: FiniteDuration = defaultTimeOut): Assertion = {
    sink.ensureSubscription().request(Int.MaxValue)
    def findMatch(): SearchResult = {
      try {
        sink.expectEventWithTimeoutPF(timeOut, {
          case OnNext(_) => findMatch()
          case OnComplete => NotFound
          case OnError(e) => StreamError(e)
          case OnSubscribe(_) => findMatch()
        })
      } catch {
        case _: AssertionError => TimedOut(timeOut)
      }
    }
    findMatch() match {
      case NotFound | TimedOut(_) => Assertions.succeed
      case StreamError(e) => Assertions.fail(s"encountered exception while draining stream: $e")
    }
  }

  /**
   * consumes elements from the stream until an element satisfies the given matcher. Fails if the
95
96
   * stream completes without match or times out.
   * @param test match criteria for the elements produced by the stream (can target subclasses)
97
98
99
   * @param timeOut maximum wait time for each element
   * @return success if the stream produced a matching element
   */
100
  def findFirstMatchingStreamElement[U <: T: ClassTag](test: Matcher[U], timeOut: FiniteDuration = defaultTimeOut): Assertion = {
101
    sink.ensureSubscription().request(Int.MaxValue)
102
103
    val messages = new FailureRecords(Seq(test))
    def findMatch(index: Long): SearchResult = {
104
      try {
105
        sink.expectEventWithTimeoutPF(timeOut, {
106
107
108
109
110
111
          case OnNext(element) => element match {
            case x: U =>
              val result = test(x)
              if (result.matches) {
                Found
              } else {
112
113
                messages.addResult(test, x, index, result)
                findMatch(index + 1)
114
115
              }
            case x =>
116
117
              messages.addResult(test, x.asInstanceOf[T], index, s"wrong class '${x.getClass}'")
              findMatch(index + 1)
118
          }
119
120
          case OnComplete => NotFound
          case OnError(e) => StreamError(e)
121
          case OnSubscribe(_) => findMatch(index)
122
123
124
125
126
        })
      } catch {
        case _: AssertionError => TimedOut(timeOut)
      }
    }
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
    findMatch(0) match {
      case Found => Assertions.succeed
      case failure => Assertions.fail(messages.printFailures(failure))
    }
  }

  /**
   * Consume elements from the stream until a matching element is found for each given matcher.
   * Each stream element will be used to satisfy at most one matcher. The stream must produce the
   * matching elements in the same order as the provided matcher list to satisfy the assertion.
   * Fails if the stream completes without satisfying all tests or times out.
   * @param tests match criteria for the elements produced by the stream (can target subclasses)
   * @param timeOut maximum wait time for each element
   * @return success if the stream produced a matching element
   */
  def findMatchingStreamElementsOrdered[U <: T: Manifest](tests: Seq[Matcher[U]], timeOut: FiniteDuration = defaultTimeOut): Assertion = {
    sink.ensureSubscription().request(Int.MaxValue)
    val messages = new FailureRecords(tests)
    var pendingTests = tests
    def findMatch(index: Long): SearchResult = {
      try {
        sink.expectEventWithTimeoutPF(timeOut, {
          case OnNext(element) => element match {
            case elem: U =>
              val test = pendingTests.head
              val result = test(elem)
              if (result.matches) {
                pendingTests = pendingTests.tail
                messages.succeedTest(test)
              } else {
                messages.addResult(test, elem, index, result)
              }
              if (pendingTests.isEmpty) {
                Found
              } else {
                findMatch(index + 1)
              }
            case x =>
              messages.addResult(pendingTests.head, x.asInstanceOf[T], index, s"wrong class '${x.getClass}'")
              findMatch(index + 1)
          }
          case OnComplete => NotFound
          case OnError(e) => StreamError(e)
          case OnSubscribe(_) => findMatch(index)
        })
      } catch {
        case _: AssertionError => TimedOut(timeOut)
      }
    }
    findMatch(0) match {
177
178
179
180
181
      case Found => Assertions.succeed
      case failure => Assertions.fail(messages.printFailures(failure))
    }
  }

182
183
184
185
186
187
188
  /**
   * Drains elements from the stream until it produces an error which is then checked with the
   * given matcher. The test fails if the stream completes normally or times out.
   * @param test criteria for the expected exception
   * @param timeOut maximum wait time for each element
   * @return success if the stream fails with a matching exception
   */
189
  def expectStreamFailure(test: Matcher[Throwable], timeOut: FiniteDuration = defaultTimeOut): Assertion = {
190
    sink.ensureSubscription().request(Int.MaxValue)
191
192
    def findMatch(): SearchResult = {
      try {
193
        sink.expectEventWithTimeoutPF(timeOut, {
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
          case OnNext(_) => findMatch()
          case OnComplete => NotFound
          case OnError(e) => StreamError(e)
          case OnSubscribe(_) => findMatch()
        })
      } catch {
        case _: AssertionError => TimedOut(timeOut)
      }
    }
    findMatch() match {
      case StreamError(e) =>
        val result = test(e)
        if (result.matches) {
          Assertions.succeed
        } else {
          Assertions.fail(result.failureMessage)
        }
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
      case noFailure => Assertions.fail(noFailure match {
        case TimedOut(t) => s"no new event received before time-out ($t)"
        case NotFound => "expected stream failure, but stream completed normally"
      })
    }
  }

  /**
   * Drains elements from the stream until it completes normally. If a time-out or stream failure
   * is encountered, the assertion will fail.
   * @param timeOut maximum wait time for each element
   * @return success if the stream completes normally
   */
  def expectStreamCompletion(timeOut: FiniteDuration = defaultTimeOut): Assertion = {
    sink.ensureSubscription().request(Int.MaxValue)

    def findMatch(): SearchResult = {
      try {
        sink.expectEventWithTimeoutPF(timeOut, {
          case OnNext(_) => findMatch()
          case OnComplete => NotFound
          case OnError(e) => StreamError(e)
          case OnSubscribe(_) => findMatch()
        })
      } catch {
        case _: AssertionError => TimedOut(timeOut)
      }
    }

    findMatch() match {
      case NotFound => Assertions.succeed
      case noComplete => Assertions.fail(noComplete match {
        case TimedOut(t) => s"no new event received before time-out ($t)"
        case StreamError(e) => s"expected stream completion, but stream failed with exception $e"
      })
246
247
248
249
    }
  }

}