Commit bf16dade authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: ParsingResultsProcessingManager has correct output path...

Integrated Pipeline: ParsingResultsProcessingManager has correct output path and failure count in outgoing messages/events
parent 6d2809c3
......@@ -3,6 +3,7 @@ package eu.nomad_lab.integrated_pipeline
import java.nio.file.{ Path, Paths }
import eu.nomad_lab.integrated_pipeline.messages._
import eu.nomad_lab.parsers.ParseResult
import scala.collection.mutable
......@@ -16,6 +17,7 @@ trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResult
private val expectedCounts: mutable.Map[Path, Long] = mutable.Map()
private val processedCounts: mutable.Map[Path, Long] = mutable.Map()
private val failedCounts: mutable.Map[Path, Long] = mutable.Map()
private var nextResult: Option[FileTreeParsingResult] = None
override def requiresMoreMessages: Boolean = processedCounts.nonEmpty
......@@ -40,23 +42,26 @@ trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResult
))
processor.processFileParsingResult(signal)
processedCounts(signal.treeTask.treeBasePath) += 1
if (signal.result == ParseResult.ParseFailure)
failedCounts(signal.treeTask.treeBasePath) += 1
}
val path = signal.treeTask.treeBasePath
if (expectedCounts.getOrElse(path, -1) == processedCounts.getOrElse(path, -2)) {
processor.finishProcessingTreeResults(signal.treeTask)
val numCalculations = expectedCounts.remove(path).get
processedCounts.remove(path)
val numFailures = failedCounts.remove(path).get
eventListener.processEvent(myId, ResultWriterEventEnd(
treeTask = signal.treeTask,
numCalculations = numCalculations,
numParsingFailures = 0, //FIXME: get a proper value here
outputLocation = Paths.get(".") //FIXME: should return some meaningful value
numParsingFailures = numFailures,
outputLocation = processor.outputLocation(signal.treeTask)
))
nextResult = Some(FileTreeParsingResult(
treeScanTask = signal.treeTask,
numCalculationsFound = numCalculations,
numParsingFailures = 0, //FIXME: get a proper value here
outputLocation = Paths.get("."), //FIXME: should return some meaningful value
numParsingFailures = numFailures,
outputLocation = processor.outputLocation(signal.treeTask),
outputFormat = processor.outputType
))
}
......@@ -77,6 +82,7 @@ trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResult
private def startFileTree(task: FileTreeScanTask): Unit = {
processedCounts(task.treeBasePath) = 0
failedCounts(task.treeBasePath) = 0
eventListener.processEvent(myId, ResultWriterEventStart(task))
processor.startProcessingTreeResults(task)
}
......
......@@ -171,7 +171,7 @@ object FileTreeParsingResultMatchers {
)
def outputPath(expectedValue: String): HavePropertyMatcher[FileTreeParsingResult, Path] =
basePath(Paths.get(expectedValue))
outputPath(Paths.get(expectedValue))
def outputType(expectedValue: OutputType): HavePropertyMatcher[FileTreeParsingResult, OutputType] =
MatcherHelpers.propertyMatcher(
......
package eu.nomad_lab.integrated_pipeline_tests
import java.nio.file.Paths
import eu.nomad_lab.integrated_pipeline.messages._
import eu.nomad_lab.integrated_pipeline.{ EventListener, OutputType, ParsingResultsProcessingManager, ParsingResultsProcessor }
import eu.nomad_lab.integrated_pipeline_tests.ResultWriterEventMatchers.basePath
import eu.nomad_lab.parsers.ParseResult
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.{ ArgumentCaptor, Mockito }
......@@ -19,6 +21,7 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
val processingMock = mock[ParsingResultsProcessor]
val eventMock = mock[EventListener]
when(processingMock.outputType).thenReturn(OutputType.Json)
when(processingMock.outputLocation(any())).thenReturn(Paths.get(s"/imaginary/foo/Rxx/R${"x" * 28}"))
val orderingTest = Mockito.inOrder(processingMock)
val manager = new ParsingResultsProcessingManager {
override protected[this] val processor: ParsingResultsProcessor = processingMock
......@@ -32,21 +35,21 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
def twoResultsNoEndReceived(): Fixture = {
val f = new Fixture
f.manager.processSignal(anInMemoryResult())
f.manager.processSignal(anInMemoryResult().withParseResult(ParseResult.ParseFailure))
f.manager.processSignal(anInMemoryResult())
f
}
def oneResultEndWithTwoEntriesReceived(): Fixture = {
val f = new Fixture
f.manager.processSignal(anInMemoryResult())
f.manager.processSignal(anInMemoryResult().withParseResult(ParseResult.ParseFailure))
f.manager.processSignal(aFileParsingSignalEndTree().withTaskCount(2))
f
}
def twoResultsEndWithTwoEntriesReceived(): Fixture = {
val f = new Fixture
f.manager.processSignal(anInMemoryResult())
f.manager.processSignal(anInMemoryResult().withParseResult(ParseResult.ParseFailure))
f.manager.processSignal(anInMemoryResult())
f.manager.processSignal(aFileParsingSignalEndTree().withTaskCount(2))
f
......@@ -54,7 +57,7 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
def fullyProcessedATree(): Fixture = {
val f = new Fixture
f.manager.processSignal(anInMemoryResult())
f.manager.processSignal(anInMemoryResult().withParseResult(ParseResult.ParseFailure))
f.manager.processSignal(anInMemoryResult())
f.manager.processSignal(aFileParsingSignalEndTree().withTaskCount(2))
f.manager.getNextSignalToEmit()
......@@ -110,15 +113,6 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
f.manager.hasSignalToEmit() should be(true)
}
"send an end event to the EventListener when the associated end signal arrives" in {
val f = createFixture()
f.manager.processSignal(aFileParsingSignalEndTree().withTaskCount(2))
val args: ArgumentCaptor[ResultWriterEvent] = ArgumentCaptor.forClass(classOf[ResultWriterEvent])
verify(f.eventMock, atLeastOnce()).processEvent(any(), args.capture())
val events = args.getAllValues.asScala.collect { case x: ResultWriterEventEnd => x }
exactly(1, events) should have(basePath(aFileTreeScanTask().build().treeBasePath))
}
}
"having received the end signal and all but one parsing results" should {
......@@ -143,15 +137,6 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
f.manager.hasSignalToEmit() should be(true)
}
"send an end event to the EventListener when the last associated parsing result arrives" in {
val f = createFixture()
f.manager.processSignal(anInMemoryResult())
val args: ArgumentCaptor[ResultWriterEvent] = ArgumentCaptor.forClass(classOf[ResultWriterEvent])
verify(f.eventMock, atLeastOnce()).processEvent(any(), args.capture())
val events = args.getAllValues.asScala.collect { case x: ResultWriterEventEnd => x }
exactly(1, events) should have(basePath(aFileTreeScanTask().build().treeBasePath))
}
}
"having received the end signal and all parsing results" should {
......@@ -172,6 +157,41 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
f.orderingTest.verify(f.processingMock).finishProcessingTreeResults(aFileTreeScanTask())
}
"emit an end signal with the correct output location and type" in {
import FileTreeParsingResultMatchers._
val f = createFixture()
val result = f.manager.getNextSignalToEmit()
result should have(outputPath(s"/imaginary/foo/Rxx/R${"x" * 28}"), outputType(OutputType.Json))
}
"emit an end signal with the correct number of total parsed and failed calculations" in {
import FileTreeParsingResultMatchers._
val f = createFixture()
val result = f.manager.getNextSignalToEmit()
result should have(numParsingTasks(2), numFailedTasks(1))
}
"have send an end event to the EventListener with the correct output location" in {
import ResultWriterEventMatchers._
val f = createFixture()
val args: ArgumentCaptor[ResultWriterEvent] = ArgumentCaptor.forClass(classOf[ResultWriterEvent])
verify(f.eventMock, atLeastOnce()).processEvent(any(), args.capture())
val events = args.getAllValues.asScala.collect { case x: ResultWriterEventEnd => x }
exactly(1, events) should have(
basePath(aFileTreeScanTask().build().treeBasePath),
outputPath(s"/imaginary/foo/Rxx/R${"x" * 28}")
)
}
"have send an end event to the EventListener with the correct parsing count stats" in {
import ResultWriterEventMatchers._
val f = createFixture()
val args: ArgumentCaptor[ResultWriterEvent] = ArgumentCaptor.forClass(classOf[ResultWriterEvent])
verify(f.eventMock, atLeastOnce()).processEvent(any(), args.capture())
val events = args.getAllValues.asScala.collect { case x: ResultWriterEventEnd => x }
exactly(1, events) should have(numCalculations(2), numFailures(1))
}
}
"having fully processed a file tree and emitted the result" should {
......@@ -296,6 +316,7 @@ class ParsingResultsProcessingManagerSpec extends WordSpec with MockitoSugar wit
}
"send a start event to the EventListener when receiving a message from a new tree (end signal arrives first)" in {
import ResultWriterEventMatchers._
val f = createFixture()
val fileTree = aFileTreeScanTask().withBasePath("/tmp/foo/random")
f.manager.processSignal(aFileParsingSignalEndTree().withTreeTask(fileTree))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment