Commit eef450a3 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: ParsingResultsProcessingManager sends end events to the EventListener

parent 475e2f51
......@@ -44,11 +44,17 @@ trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResult
val path = signal.treeTask.treeBasePath
if (expectedCounts.getOrElse(path, -1) == processedCounts.getOrElse(path, -2)) {
processor.finishProcessingTreeResults(signal.treeTask)
val numCalculations = expectedCounts.remove(path)
val numCalculations = expectedCounts.remove(path).get
processedCounts.remove(path)
eventListener.processEvent(myId, ResultWriterEventEnd(
treeTask = signal.treeTask,
numCalculations = numCalculations,
numParsingFailures = 0, //FIXME: get a proper value here
outputLocation = Paths.get(".") //FIXME: should return some meaningful value
))
nextResult = Some(FileTreeParsingResult(
treeScanTask = signal.treeTask,
numCalculationsFound = numCalculations.get,
numCalculationsFound = numCalculations,
numParsingFailures = 0, //FIXME: get a proper value here
outputLocation = Paths.get("."), //FIXME: should return some meaningful value
outputFormat = processor.outputType
......
......@@ -110,6 +110,15 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
f.manager.hasSignalToEmit() should be(true)
}
"send an end event to the EventListener when the associated end signal arrives" in {
val f = createFixture()
f.manager.processSignal(aFileParsingSignalEndTree().withTaskCount(2))
val args: ArgumentCaptor[ResultWriterEvent] = ArgumentCaptor.forClass(classOf[ResultWriterEvent])
verify(f.eventMock, atLeastOnce()).processEvent(any(), args.capture())
val events = args.getAllValues.asScala.collect { case x: ResultWriterEventEnd => x }
exactly(1, events) should have(basePath(aFileTreeScanTask().build().treeBasePath))
}
}
"having received the end signal and all but one parsing results" should {
......@@ -126,7 +135,7 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
behave like startProcessingANewTree(createFixture)
behave like notRegisteringAgain(createFixture)
"finish processing a file tree when the last associated parsing result arrived" in {
"finish processing a file tree when the last associated parsing result arrives" in {
val f = createFixture()
f.manager.hasSignalToEmit() should be(false)
f.manager.processSignal(anInMemoryResult())
......@@ -134,6 +143,15 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
f.manager.hasSignalToEmit() should be(true)
}
"send an end event to the EventListener when the last associated parsing result arrives" in {
val f = createFixture()
f.manager.processSignal(anInMemoryResult())
val args: ArgumentCaptor[ResultWriterEvent] = ArgumentCaptor.forClass(classOf[ResultWriterEvent])
verify(f.eventMock, atLeastOnce()).processEvent(any(), args.capture())
val events = args.getAllValues.asScala.collect { case x: ResultWriterEventEnd => x }
exactly(1, events) should have(basePath(aFileTreeScanTask().build().treeBasePath))
}
}
"having received the end signal and all parsing results" should {
......@@ -185,6 +203,15 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
exactly(1, events) should have(basePath(aFileTreeScanTask().build().treeBasePath))
}
"have send a single end event to the EventListener" in {
import ResultWriterEventMatchers._
val f = createFixture()
val args: ArgumentCaptor[ResultWriterEvent] = ArgumentCaptor.forClass(classOf[ResultWriterEvent])
verify(f.eventMock, atLeastOnce()).processEvent(any(), args.capture())
val events = args.getAllValues.asScala.collect { case x: ResultWriterEventEnd => x }
exactly(1, events) should have(basePath(aFileTreeScanTask().build().treeBasePath))
}
"be able to process the same file tree again after finishing it once" in {
val f = createFixture()
f.manager.processSignal(anInMemoryResult())
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment