CalculationParsingFlowSpec.scala 4 KB
Newer Older
1
package eu.nomad_lab.integrated_pipeline_tests
2

3
import java.nio.file.Paths
4
5
6
7
8

import akka.stream.ClosedShape
import akka.stream.scaladsl.{ GraphDSL, RunnableGraph }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import eu.nomad_lab.TreeType
9
import eu.nomad_lab.integrated_pipeline.{ CalculationParsingEngine, FileTree }
10
import eu.nomad_lab.integrated_pipeline.messages._
11
import eu.nomad_lab.integrated_pipeline.stream_components._
12
import eu.nomad_lab.integrated_pipeline_tests.matchers.{ FileParsingResultMatchers, StreamAssertions }
13
import eu.nomad_lab.parsers.ParseResult
14
import org.mockito.ArgumentMatchers._
15
import org.mockito.Mockito
16
17
18
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
19
import org.scalatest.mockito.MockitoSugar
20
import org.scalatest.{ Matchers, WordSpec }
21

22
class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDataBuilders with Matchers {
23

24
  val sampleTreeScan = FileTree(
25
26
27
    treeBasePath = Paths.get("/foo/bar"),
    treeType = TreeType.Directory
  )
28

29
  def sampleInput(fileName: String) = FileParsingTask(
30
    treeTask = sampleTreeScan,
31
32
33
34
    relativePath = Paths.get(fileName),
    parserName = "dummyParser"
  )

35
  private class Fixture(numWorkers: Int) extends StreamAssertions[FileParsingResultSignal] {
36
37
38
    require(numWorkers >= 1, "need at least one dummy worker")
    val dummyWorkers = (1 to numWorkers).map { index =>
      val myMock = mock[CalculationParsingEngine]
39
      when(myMock.processSignal(any())).thenAnswer(new Answer[FileParsingResult] {
40
        override def answer(invocation: InvocationOnMock): FileParsingResult = {
41
          Thread.sleep(3) //pretend that each dummy task consumes a little time
42
43
44
45
          anInMemoryResult().
            withTask(invocation.getArgument[FileParsingTask](0)).
            withParseResult(ParseResult.ParseSuccess).
            build()
46
47
48
49
50
        }
      })
      myMock
    }

51
    private val testInput = TestSource.probe[FileParsingTaskSignal]
52
    private val testRequests = TestSink.probe[FileParsingResultSignal]
53
    private val testGraph = RunnableGraph.fromGraph(
54
55
56
      GraphDSL.create(testInput, testRequests)((_, _)) { implicit builder => (source, sink) =>
        import GraphDSL.Implicits._

57
        val calculationParser = CalculationParsingFlow.createParsingFlow(dummyWorkers)
58
59
60
61
62

        source ~> calculationParser ~> sink
        ClosedShape
      }
    )
63
    val (source, sink) = testGraph.run()
64
    val (input1, input2, input3) = (sampleInput("file1"), sampleInput("file2"), sampleInput("file3"))
65
66
67
68
  }

  "a CalculationParsingFlow" when {
    "managing a single worker" should {
69

70
      "emit parsing results for every incoming parsing request in order of input" in {
71
        import FileParsingResultMatchers._
72
73
74
75
76
77
78
79
        val f = new Fixture(1)
        f.source.sendNext(f.input1).sendNext(f.input2).sendNext(f.input3)
        f.findMatchingStreamElementsOrdered(Seq(
          have(relativePath("file1")),
          have(relativePath("file2")),
          have(relativePath("file3"))
        ))

80
      }
81

82
83
84
85
86
      "complete the stage if upstream signals completion before all elements are processed" in {
        val f = new Fixture(1)
        (1 to 3).foreach(_ => f.source.sendNext(aFileParsingTask()))
        f.source.sendComplete()
        f.expectStreamCompletion()
87
88
      }

89
90
91
92
93
94
      "complete the stage if upstream signals completion after all elements are processed" in {
        val f = new Fixture(1)
        (1 to 3).foreach(_ => f.source.sendNext(aFileParsingTask()))
        f.drainStream()
        f.source.sendComplete()
        f.expectStreamCompletion()
95
      }
96

97
98
    }

99
100
101
102
103
104
105
106
107
108
109
110
111
    "managing multiple workers" should {
      "distribute workload among all workers in a balanced way" in {
        val f = new Fixture(4)
        val dummy = aFileParsingTask().build()
        f.sink.request(Int.MaxValue)
        (1 to 1000).foreach(_ => f.source.sendNext(dummy))
        f.drainStream()
        f.dummyWorkers.foreach { mock => verify(mock, Mockito.atLeast(200)).processSignal(any())
        }
      }
    }

  }
112
}