Commit 8e4f6d70 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: added EventListener registration and start tree event...

Integrated Pipeline: added EventListener registration and start tree event notifications to ParsingResultsProcessingManager
parent e7320b5b
......@@ -42,6 +42,23 @@ trait EventLogger extends EventListener with StrictLogging {
super.processEvent(reporter, message)
}
override def processEvent(reporter: ResultWriterId, message: ResultWriterEvent): Unit = ???
override def processEvent(reporter: ResultWriterId, message: ResultWriterEvent): Unit = {
message match {
case x: ResultWriterEventStart => logger.info(
s"[ResultWriter ${reporter.name}] started processing results for file tree " +
s"'${x.treeTask.treeBasePath}'"
)
case x: ResultWriterEventResult => logger.debug(
s"[ResultWriter ${reporter.name}] processes parsing results for file '${x.relativePath}' " +
s"from file tree '${x.treeTask.treeBasePath}'"
)
case x: ResultWriterEventEnd => logger.info(
s"[ResultWriter ${reporter.name}] finished writing results for file tree " +
s"'${x.treeTask.treeBasePath}', output location: '${x.outputLocation}' " +
s"(${x.numCalculations} calculations, ${x.numParsingFailures} parsing failures)"
)
}
super.processEvent(reporter, message)
}
}
......@@ -143,6 +143,7 @@ class Main {
val processor = Flow.fromGraph(new MessageProcessorFlow[FileParsingResultSignal, FileTreeParsingResult] {
override val stageName = "ResultWriter"
override val processor = new ParsingResultsProcessingManager {
override val eventListener = eventProcessor
override val processor: ParsingResultsProcessor = params.mode match {
case OutputType.Json => new WriteToJsonResultsProcessor(params.targetDirectory, metaInfo)
case OutputType.HDF5 => new WriteToHDF5ResultsProcessor(params.targetDirectory, metaInfo)
......
......@@ -8,7 +8,11 @@ import scala.collection.mutable
trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResultSignal, FileTreeParsingResult] {
val processor: ParsingResultsProcessor
protected[this] val processor: ParsingResultsProcessor
protected[this] val eventListener: EventListener
protected[this] val eventReporterName: Option[String] = None
private lazy val myId = eventListener.registerReporter(this, eventReporterName)
private val expectedCounts: mutable.Map[Path, Long] = mutable.Map()
private val processedCounts: mutable.Map[Path, Long] = mutable.Map()
......@@ -60,6 +64,7 @@ trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResult
private def startFileTree(task: FileTreeScanTask): Unit = {
processedCounts(task.treeBasePath) = 0
eventListener.processEvent(myId, ResultWriterEventStart(task))
processor.startProcessingTreeResults(task)
}
}
\ No newline at end of file
package eu.nomad_lab.integrated_pipeline_tests
import eu.nomad_lab.integrated_pipeline.messages._
import eu.nomad_lab.integrated_pipeline.{ OutputType, ParsingResultsProcessingManager, ParsingResultsProcessor }
import eu.nomad_lab.integrated_pipeline.{ EventListener, OutputType, ParsingResultsProcessingManager, ParsingResultsProcessor }
import eu.nomad_lab.integrated_pipeline_tests.ResultWriterEventMatchers.basePath
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.{ ArgumentCaptor, Mockito }
......@@ -16,10 +17,12 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
class Fixture {
val processingMock = mock[ParsingResultsProcessor]
val eventMock = mock[EventListener]
when(processingMock.outputType).thenReturn(OutputType.Json)
val orderingTest = Mockito.inOrder(processingMock)
val manager = new ParsingResultsProcessingManager {
override val processor: ParsingResultsProcessor = processingMock
override protected[this] val processor: ParsingResultsProcessor = processingMock
override protected[this] val eventListener: EventListener = eventMock
}
}
......@@ -72,6 +75,18 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
behave like readyToAcceptParsingResultsFromAnotherTree(createFixture)
behave like interleaveAnotherFileTree(createFixture)
behave like startProcessingANewTree(createFixture)
"register itself with the EventListener upon processing the first message (parsing result)" in {
val f = createFixture()
f.manager.processSignal(anInMemoryResult())
verify(f.eventMock).registerReporter(f.manager, None)
}
"register itself with the EventListener upon processing the first message (tree end signal)" in {
val f = createFixture()
f.manager.processSignal(anInMemoryResult())
verify(f.eventMock).registerReporter(f.manager, None)
}
}
"having received all parsing tasks but no end signal" should {
......@@ -85,9 +100,10 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
behave like readyToAcceptParsingResultsFromAnotherTree(createFixture)
behave like interleaveAnotherFileTree(createFixture)
behave like startProcessingANewTree(createFixture)
behave like notRegisteringAgain(createFixture)
"finish processing a file tree when the associated end signal arrives" in {
val f = twoResultsNoEndReceived()
val f = createFixture()
f.manager.hasSignalToEmit() should be(false)
f.manager.processSignal(aFileParsingSignalEndTree().withTaskCount(2))
verify(f.processingMock).finishProcessingTreeResults(any())
......@@ -108,9 +124,10 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
behave like readyToAcceptParsingResultsFromAnotherTree(createFixture)
behave like interleaveAnotherFileTree(createFixture)
behave like startProcessingANewTree(createFixture)
behave like notRegisteringAgain(createFixture)
"finish processing a file tree when the last associated parsing result arrived" in {
val f = oneResultEndWithTwoEntriesReceived()
val f = createFixture()
f.manager.hasSignalToEmit() should be(false)
f.manager.processSignal(anInMemoryResult())
verify(f.processingMock).finishProcessingTreeResults(any())
......@@ -120,8 +137,10 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
}
"having received the end signal and all parsing results" should {
val createFixture = () => twoResultsEndWithTwoEntriesReceived()
behave like processorWithOutboundSignalReady(
() => twoResultsEndWithTwoEntriesReceived().manager,
() => createFixture().manager,
Seq(anInMemoryResult(), aFileParsingSignalEndTree()),
have(
FileTreeParsingResultMatchers.numParsingTasks(2),
......@@ -130,7 +149,7 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
)
"have called the file tree pre- and postprocessing operations in the correct order" in {
val f = twoResultsEndWithTwoEntriesReceived()
val f = createFixture()
f.orderingTest.verify(f.processingMock).startProcessingTreeResults(aFileTreeScanTask())
f.orderingTest.verify(f.processingMock).finishProcessingTreeResults(aFileTreeScanTask())
}
......@@ -149,9 +168,25 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
behave like readyToAcceptParsingResultsFromAnotherTree(createFixture)
behave like interleaveAnotherFileTree(createFixture)
behave like startProcessingANewTree(createFixture)
behave like notRegisteringAgain(createFixture)
"have called the file tree pre- and postprocessing operations exactly once" in {
val f = createFixture()
verify(f.processingMock, times(1)).startProcessingTreeResults(aFileTreeScanTask())
verify(f.processingMock, times(1)).finishProcessingTreeResults(aFileTreeScanTask())
}
"have send a single start event to the EventListener" in {
import ResultWriterEventMatchers._
val f = createFixture()
val args: ArgumentCaptor[ResultWriterEvent] = ArgumentCaptor.forClass(classOf[ResultWriterEvent])
verify(f.eventMock, atLeastOnce()).processEvent(any(), args.capture())
val events = args.getAllValues.asScala.collect { case x: ResultWriterEventStart => x }
exactly(1, events) should have(basePath(aFileTreeScanTask().build().treeBasePath))
}
"be able to process the same file tree again after finishing it once" in {
val f = fullyProcessedATree()
val f = createFixture()
f.manager.processSignal(anInMemoryResult())
f.manager.processSignal(anInMemoryResult())
f.manager.processSignal(aFileParsingSignalEndTree().withTaskCount(2))
......@@ -203,8 +238,6 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
val fileTree = aFileTreeScanTask().withBasePath("/tmp/foo/random")
f.manager.processSignal(anInMemoryResult().withTreeTask(fileTree))
verify(f.processingMock, times(1)).startProcessingTreeResults(fileTree)
f.manager.processSignal(anInMemoryResult().withTreeTask(fileTree))
verify(f.processingMock, times(1)).startProcessingTreeResults(fileTree)
}
"start processing a file tree when the first signal associated with it arrives (end signal arrives first)" in {
......@@ -212,8 +245,34 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
val fileTree = aFileTreeScanTask().withBasePath("/tmp/foo/random")
f.manager.processSignal(aFileParsingSignalEndTree().withTreeTask(fileTree))
verify(f.processingMock, times(1)).startProcessingTreeResults(fileTree)
}
"send a start event to the EventListener when receiving a message from a new tree (result arrives first)" in {
import ResultWriterEventMatchers._
val f = createFixture()
val fileTree = aFileTreeScanTask().withBasePath("/tmp/foo/random")
f.manager.processSignal(anInMemoryResult().withTreeTask(fileTree))
verify(f.processingMock, times(1)).startProcessingTreeResults(fileTree)
val args: ArgumentCaptor[ResultWriterEvent] = ArgumentCaptor.forClass(classOf[ResultWriterEvent])
verify(f.eventMock, atLeastOnce()).processEvent(any(), args.capture())
val events = args.getAllValues.asScala.collect { case x: ResultWriterEventStart => x }
exactly(1, events) should have(basePath("/tmp/foo/random"))
}
"send a start event to the EventListener when receiving a message from a new tree (end signal arrives first)" in {
val f = createFixture()
val fileTree = aFileTreeScanTask().withBasePath("/tmp/foo/random")
f.manager.processSignal(aFileParsingSignalEndTree().withTreeTask(fileTree))
val args: ArgumentCaptor[ResultWriterEvent] = ArgumentCaptor.forClass(classOf[ResultWriterEvent])
verify(f.eventMock, atLeastOnce()).processEvent(any(), args.capture())
val events = args.getAllValues.asScala.collect { case x: ResultWriterEventStart => x }
exactly(1, events) should have(basePath("/tmp/foo/random"))
}
}
def notRegisteringAgain(createFixture: () => Fixture): Unit = {
"not register itself again with the EventListener after the first processed message" in {
val f = createFixture()
verify(f.eventMock, times(1)).registerReporter(f.manager, None)
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment