Commit 51286ce9 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: CalculationParsingEngine now sends events to an EventListener

parent 4b19ef29
......@@ -19,8 +19,17 @@ import scala.collection.mutable.ListBuffer
* usage remains bounded
* @param parsers the to be used parsers
* @param metaInfo the NOMAD meta information the parser can extract
* @param eventListener processor for event notifications (logging etc)
* @param name name of this object when registering to the eventListener
*/
class CalculationParsingEngine(parsers: ParserCollection, metaInfo: MetaInfoEnv) extends StrictLogging {
class CalculationParsingEngine(
private val parsers: ParserCollection,
private val metaInfo: MetaInfoEnv,
private val eventListener: EventListener,
private val name: Option[String]
) extends StrictLogging {
val myId = eventListener.registerReporter(this, name)
private def getParser(request: FileParsingTask): Option[OptimizedParser] = {
parsers.parsers.get(request.parserName).map(_.optimizedParser(Seq()))
......@@ -58,6 +67,9 @@ class CalculationParsingEngine(parsers: ParserCollection, metaInfo: MetaInfoEnv)
task match {
case signal: FileParsingSignalEndTree => signal
case task: FileParsingTask =>
eventListener.processEvent(myId, CalculationParserEventStart(
task.treeTask, task.relativePath, task.parserName
))
val pathToMainFile = task.treeTask.treeType match {
case TreeType.Directory => Right(task.treeTask.treeBasePath.resolve(task.relativePath))
case TreeType.Zip => task.extractedPath match {
......@@ -81,6 +93,9 @@ class CalculationParsingEngine(parsers: ParserCollection, metaInfo: MetaInfoEnv)
* @return a FileParsingResult representing the failure
*/
private def failParseRequest(request: FileParsingTask, reason: String): FileParsingResult = {
eventListener.processEvent(myId, CalculationParserEventEnd(
request.treeTask, request.relativePath, request.parserName, ParseResult.ParseFailure, Some(reason)
))
InMemoryResult(
task = request,
result = ParseResult.ParseFailure,
......@@ -107,6 +122,9 @@ class CalculationParsingEngine(parsers: ParserCollection, metaInfo: MetaInfoEnv)
}
case _ => None
}
eventListener.processEvent(myId, CalculationParserEventEnd(
request.treeTask, request.relativePath, request.parserName, result, error
))
InMemoryResult(
task = request,
result = result,
......
......@@ -2,10 +2,11 @@ package eu.nomad_lab.integrated_pipeline
import com.typesafe.scalalogging.StrictLogging
import eu.nomad_lab.integrated_pipeline.messages._
/** Write incoming events to a logfile with ScalaLogging and the underlying logging framework.
* This trait stacks and can thus be combined with other EventListener-traits to create a custom
* event logging class with multiple features.
*/
/**
* Write incoming events to a logfile with ScalaLogging and the underlying logging framework.
* This trait stacks and can thus be combined with other EventListener-traits to create a custom
* event logging class with multiple features.
*/
trait EventLogger extends EventListener with StrictLogging {
override def processEvent(reporter: TreeParserId, message: TreeParserEvent): Unit = {
......
......@@ -136,7 +136,8 @@ class Main {
val unpacker = Flow.fromGraph(new ArchiveUnpackingFlow(archiveHandler))
val cleanUp = Flow.fromGraph(new ArchiveCleanUpFlow(archiveHandler))
val parsing = CalculationParsingFlow.createParsingFlow(
(1 to params.numWorkers).map(_ => new CalculationParsingEngine(Main.parsers, metaInfo))
(1 to params.numWorkers).map(i =>
new CalculationParsingEngine(Main.parsers, metaInfo, eventProcessor, Some(f"Worker-$i%2d")))
)
val processor = Flow.fromGraph(new MessageProcessorFlow[FileParsingResultSignal, FileTreeParsingResult] {
......
......@@ -4,6 +4,11 @@ import java.nio.file.Path
import eu.nomad_lab.parsers.ParseResult.ParseResult
/*These events are distinct from the messages passed between the different processing stage because
they model both input and output of each stage. Also, they do not contain any heavy payload (like
the in-memory results of a parsing task) and thus can be stored by EventListener implementations.
*/
sealed trait TreeParserEvent {
def treeTask: FileTreeScanTask
}
......
......@@ -3,12 +3,13 @@ package eu.nomad_lab.integrated_pipeline_tests
import java.nio.file.Paths
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.CalculationParsingEngine
import eu.nomad_lab.integrated_pipeline.messages.FileParsingResult
import eu.nomad_lab.integrated_pipeline.{ CalculationParsingEngine, EventListener }
import eu.nomad_lab.integrated_pipeline.messages._
import eu.nomad_lab.meta.KnownMetaInfoEnvs
import eu.nomad_lab.parsers.ParseResult.ParseResult
import eu.nomad_lab.parsers._
import org.json4s.JsonAST.{ JInt, JObject, JString }
import org.mockito.{ ArgumentCaptor, Mockito }
import org.mockito.ArgumentMatchers.{ eq => raw, _ }
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
......@@ -16,12 +17,16 @@ import org.mockito.stubbing.Answer
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{ Matchers, WordSpec }
import scala.collection.JavaConverters._
class CalculationParsingEngineSpec extends WordSpec with MockitoSugar with TestDataBuilders with Matchers {
val parserInfo: JObject = JObject(
"name" -> JString("dummyParser")
)
private def argsCaptor(): ArgumentCaptor[CalculationParserEvent] = ArgumentCaptor.forClass(classOf[CalculationParserEvent])
def sampleEvents = Seq(
OpenSectionWithGIndex("section_foo", -1),
AddRealValue("bar", 0.42, -1),
......@@ -37,13 +42,14 @@ class CalculationParsingEngineSpec extends WordSpec with MockitoSugar with TestD
val dummyParserCollection = mock[ParserCollection]
val dummyParserGenerator = mock[ParserGenerator]
val dummyParser = mock[OptimizedParser]
val dummyEvents = mock[EventListener]
when(dummyParserCollection.parsers).thenReturn(Map("dummyParser" -> dummyParserGenerator))
when(dummyParserGenerator.optimizedParser(any())).thenReturn(dummyParser)
when(dummyParser.parserGenerator).thenReturn(dummyParserGenerator)
when(dummyParserGenerator.parserInfo).thenReturn(parserInfo)
when(dummyParserGenerator.name).thenReturn("dummyParser")
val worker = new CalculationParsingEngine(dummyParserCollection, metaInfo)
val worker = new CalculationParsingEngine(dummyParserCollection, metaInfo, dummyEvents, None)
def prepareParserStandardInvocation(): Unit = {
when(dummyParser.parseExternal(any(), any(), any(), any())).thenAnswer(new Answer[ParseResult] {
......@@ -75,6 +81,14 @@ class CalculationParsingEngineSpec extends WordSpec with MockitoSugar with TestD
}
"a CalculationParsingEngine" when {
"being created" should {
"register itself with the provided event listener" in {
val f = new Fixture
verify(f.dummyEvents).registerReporter(raw(f.worker), any())
}
}
"handling parsing tasks" should {
"record starting and finishing parse events" in {
val f = new Fixture
......@@ -84,7 +98,42 @@ class CalculationParsingEngineSpec extends WordSpec with MockitoSugar with TestD
assert(result.end.nonEmpty, "expected a finish event to be emitted")
}
"record all emitted events in the correct order" in {
"send a start parsing event to the event listener" in {
import CalculationParserEventMatchers._
val f = new Fixture
f.prepareParserStandardInvocation()
f.worker.processSignal(sampleParseRequest.withRelativePath("blabla"))
val captor = argsCaptor()
verify(f.dummyEvents, Mockito.atLeast(1)).processEvent(any(), captor.capture())
val signals = captor.getAllValues.asScala.collect { case x: CalculationParserEventStart => x }
exactly(1, signals) should have(relativePath("blabla"), parser("dummyParser"))
}
"send a finished parsing event to the event listener (parsing terminated normally)" in {
import CalculationParserEventMatchers._
val f = new Fixture
f.prepareParserStandardInvocation()
f.worker.processSignal(sampleParseRequest.withRelativePath("blabla"))
val captor = argsCaptor()
verify(f.dummyEvents, Mockito.atLeast(1)).processEvent(any(), captor.capture())
val signals = captor.getAllValues.asScala.collect { case x: CalculationParserEventEnd => x }
exactly(1, signals) should have(relativePath("blabla"), parser("dummyParser"),
status(ParseResult.ParseSkipped))
}
"send a finished parsing event to the event listener (parsing fails)" in {
import CalculationParserEventMatchers._
val f = new Fixture
f.prepareParserFailureInvocation()
f.worker.processSignal(sampleParseRequest.withRelativePath("blabla"))
val captor = argsCaptor()
verify(f.dummyEvents, Mockito.atLeast(1)).processEvent(any(), captor.capture())
val signals = captor.getAllValues.asScala.collect { case x: CalculationParserEventEnd => x }
exactly(1, signals) should have(relativePath("blabla"), parser("dummyParser"),
status(ParseResult.ParseFailure), errorMessage("just", "crashed"))
}
"record all events emitted by the parser in the correct order" in {
import FileParsingResultMatchers._
val f = new Fixture
f.prepareParserStandardInvocation()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment