diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/Builders.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/Builders.scala index f52f0a609183e4fe5d747b86ff7a83873705f093..7d393aac92275e60cae608610ea836fecd216dca 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/Builders.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/Builders.scala @@ -54,7 +54,8 @@ object Builders { def build() = FileParsingTask( treeTask = treeTask, relativePath = relativePath, - parserName = parserName + parserName = parserName, + extractedPath = extractedPath ) } diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala index 4a9619f464ef75145921e9d3a4420c00b5ab0b32..1e5cd1aff5590345724dac2fe8ec4f915e7919ca 100644 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala +++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/StreamAssertions.scala @@ -45,6 +45,40 @@ trait StreamAssertions[T] { case NotFound => "expected stream failure, but stream completed normally" } + /** + * drain elements from the stream until it completes or times out. This function performs no + * tests on the drained element and is intended to be used when testing side-effects of the + * stream processing. + * @param timeOut maximum wait time for each element + * @return success if the stream completes or times out + */ + def drainStream(timeOut: FiniteDuration = defaultTimeOut): Assertion = { + sink.ensureSubscription().request(Int.MaxValue) + def findMatch(): SearchResult = { + try { + sink.expectEventWithTimeoutPF(timeOut, { + case OnNext(_) => findMatch() + case OnComplete => NotFound + case OnError(e) => StreamError(e) + case OnSubscribe(_) => findMatch() + }) + } catch { + case _: AssertionError => TimedOut(timeOut) + } + } + findMatch() match { + case NotFound | TimedOut(_) => Assertions.succeed + case StreamError(e) => Assertions.fail(s"encountered exception while draining stream: $e") + } + } + + /** + * consumes elements from the stream until an element satisfies the given matcher. Fails if the + * stream completes without match or times out. + * @param test match criteria for the elements produced by the stream + * @param timeOut maximum wait time for each element + * @return success if the stream produced a matching element + */ def findFirstMatchingStreamElement(test: Matcher[T], timeOut: FiniteDuration = defaultTimeOut): Assertion = { sink.ensureSubscription().request(Int.MaxValue) val messages = new FailureRecords @@ -73,6 +107,13 @@ trait StreamAssertions[T] { } } + /** + * Drains elements from the stream until it produces an error which is then checked with the + * given matcher. The test fails if the stream completes normally or times out. + * @param test criteria for the expected exception + * @param timeOut maximum wait time for each element + * @return success if the stream fails with a matching exception + */ def expectStreamFailure(test: Matcher[Throwable], timeOut: FiniteDuration = defaultTimeOut): Assertion = { sink.ensureSubscription().request(Int.MaxValue) def findMatch(): SearchResult = {