Commit 68e960a5 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: merged parsing task/result messages into a single file to...

Integrated Pipeline: merged parsing task/result messages into a single file to merge the start/end signals

-parsing and result signals now use the same start/end signal case classes to get rid of duplicate code
 and unnecessary transformations
parent 6d989f1d
package eu.nomad_lab.integrated_pipeline.messages
import eu.nomad_lab.parsers.ParseResult.ParseResult
import eu.nomad_lab.parsers.{ FinishedParsingSession, ParseEvent, StartedParsingSession }
sealed trait FileParsingResultSignal {
val treeTask: FileTreeScanTask
}
sealed trait FileParsingResult extends FileParsingResultSignal {
val result: ParseResult
val task: FileParsingTask
val treeTask: FileTreeScanTask = task.treeTask
}
case class InMemoryResult(
task: FileParsingTask,
result: ParseResult,
start: Option[StartedParsingSession],
events: Seq[ParseEvent],
end: Option[FinishedParsingSession]
) extends FileParsingResult
case class FileParsingResultStartTree(treeTask: FileTreeScanTask) extends FileParsingResultSignal
case class FileParsingResultEndTree(treeTask: FileTreeScanTask) extends FileParsingResultSignal
......@@ -2,12 +2,18 @@ package eu.nomad_lab.integrated_pipeline.messages
import java.nio.file.Path
import eu.nomad_lab.parsers.{ FinishedParsingSession, ParseEvent, StartedParsingSession }
import eu.nomad_lab.parsers.ParseResult.ParseResult
import eu.nomad_lab.{ CompactSha, TreeType }
sealed trait FileParsingTaskSignal {
val treeTask: FileTreeScanTask
}
sealed trait FileParsingResultSignal {
val treeTask: FileTreeScanTask
}
/**
* Request a detailed parsing of the given candidate calculation main file
* @param treeTask the file-tree scan request from which this parsing task was generated
......@@ -39,9 +45,24 @@ case class FileParsingTask(
}
case class FileParsingTaskStartTree(treeTask: FileTreeScanTask) extends FileParsingTaskSignal
case class FileParsingSignalStartTree(treeTask: FileTreeScanTask) extends FileParsingTaskSignal with FileParsingResultSignal
case class FileParsingTaskEndTree(
case class FileParsingSignalEndTree(
treeTask: FileTreeScanTask,
numParsingTasks: Long = 0
) extends FileParsingTaskSignal
) extends FileParsingTaskSignal with FileParsingResultSignal
sealed trait FileParsingResult extends FileParsingResultSignal {
val result: ParseResult
val task: FileParsingTask
val treeTask: FileTreeScanTask = task.treeTask
}
case class InMemoryResult(
task: FileParsingTask,
result: ParseResult,
start: Option[StartedParsingSession],
events: Seq[ParseEvent],
end: Option[FinishedParsingSession]
) extends FileParsingResult
......@@ -5,7 +5,7 @@ import java.nio.file.Path
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileParsingTaskEndTree, FileParsingTaskSignal, FileParsingTaskStartTree }
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileParsingSignalEndTree, FileParsingTaskSignal, FileParsingSignalStartTree }
import scala.collection.mutable
......@@ -39,13 +39,13 @@ class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler)
val path = input.treeTask.treeBasePath
input match {
case _: FileParsingTaskStartTree if activeTrees.contains(path) =>
case _: FileParsingSignalStartTree if activeTrees.contains(path) =>
fail("to be started file tree already registered")
case _: FileParsingTask if !activeTrees.contains(path) =>
fail("parsing task arrived before start tree signal")
case _: FileParsingTaskEndTree if !activeTrees.contains(path) =>
case _: FileParsingSignalEndTree if !activeTrees.contains(path) =>
fail("to be finished file tree was not registered")
case signal: FileParsingTaskStartTree =>
case signal: FileParsingSignalStartTree =>
activeTrees(path) = signal.treeTask.treeType match {
case TreeType.Directory => path
case TreeType.Zip => archiveHandler.extractZipArchive(path)
......@@ -58,7 +58,7 @@ class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler)
val tempPath = activeTrees(path).resolve(signal.relativePath)
push(out, signal.copy(extractedPath = Some(tempPath)))
}
case signal: FileParsingTaskEndTree =>
case signal: FileParsingSignalEndTree =>
activeTrees.remove(path)
push(out, signal)
}
......
......@@ -33,18 +33,18 @@ class CalculationParsingFlow(engines: Seq[CalculationParsingEngine], archiveHand
case task: FileParsingTask =>
require(activePaths.contains(task.treeTask), "file tree not yet started")
push(out, parseFile(task))
case signal: FileParsingTaskStartTree =>
case signal: FileParsingSignalStartTree =>
require(!activePaths.contains(signal.treeTask), "file tree already started")
activePaths += signal.treeTask
push(out, FileParsingResultStartTree(signal.treeTask))
case signal: FileParsingTaskEndTree =>
push(out, FileParsingSignalStartTree(signal.treeTask))
case signal: FileParsingSignalEndTree =>
require(activePaths.remove(signal.treeTask), "file tree not yet started")
signal.treeTask.treeType match {
case TreeType.Zip => archiveHandler.cleanUpExtractedArchive(signal.treeTask.treeBasePath)
case TreeType.Directory => ()
case _ => ()
}
push(out, FileParsingResultEndTree(signal.treeTask))
push(out, signal)
}
}
......
......@@ -37,14 +37,14 @@ trait ParsingResultsProcessorFlow extends GraphStage[FlowShape[FileParsingResult
require(activeTrees.contains(result.treeTask), "tree not registered!")
activeTrees(result.treeTask) += 1
processor.processFileParsingResult(result)
case startSignal: FileParsingResultStartTree =>
case startSignal: FileParsingSignalStartTree =>
require(
!activeTrees.contains(startSignal.treeTask),
"tree result processing was already started!"
)
activeTrees.put(startSignal.treeTask, 0)
processor.startProcessingTreeResults(startSignal.treeTask)
case endSignal: FileParsingResultEndTree =>
case endSignal: FileParsingSignalEndTree =>
require(
activeTrees.contains(endSignal.treeTask),
"tree result processing was not started before calling termination!"
......
......@@ -52,12 +52,12 @@ trait TreeParserFlow extends GraphStage[FlowShape[FileTreeScanTask, FileParsingT
override def next(): FileParsingTaskSignal = {
if (!emittedStart) {
emittedStart = true
FileParsingTaskStartTree(treeRequest)
FileParsingSignalStartTree(treeRequest)
} else if (generator.hasNext)
generator.next()
else if (!emittedEnd) {
emittedEnd = true
FileParsingTaskEndTree(treeRequest, generator.getProcessedRequestCount)
FileParsingSignalEndTree(treeRequest, generator.getProcessedRequestCount)
} else
throw new NoSuchElementException("No more elements in this iterator")
}
......
......@@ -7,7 +7,7 @@ import akka.stream.scaladsl.{ GraphDSL, RunnableGraph }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import eu.nomad_lab.TreeType
import eu.nomad_lab.TreeType.TreeType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileParsingTaskSignal, FileParsingTaskStartTree, FileTreeScanTask }
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileParsingTaskSignal, FileParsingSignalStartTree, FileTreeScanTask }
import eu.nomad_lab.integrated_pipeline.stream_components.{ ArchiveHandler, ArchiveUnpackingFlow }
import org.scalatest.{ Matchers, WordSpec }
import org.mockito.Mockito._
......
......@@ -67,7 +67,7 @@ object Builders {
def withTreeTask(task: FileTreeScanTask) = copy(treeTask = task)
def build() = FileParsingTaskStartTree(treeTask = treeTask)
def build() = FileParsingSignalStartTree(treeTask = treeTask)
}
case class BuilderFileParsingTaskEndTree(
......@@ -76,7 +76,7 @@ object Builders {
def withTreeTask(task: FileTreeScanTask) = copy(treeTask = task)
def build() = FileParsingTaskEndTree(treeTask = treeTask)
def build() = FileParsingSignalEndTree(treeTask = treeTask)
}
case class BuilderInMemoryResult(
......@@ -108,7 +108,7 @@ object Builders {
def withTreeTask(task: FileTreeScanTask) = copy(treeTask = task)
def build() = FileParsingResultStartTree(treeTask = treeTask)
def build() = FileParsingSignalStartTree(treeTask = treeTask)
}
case class BuilderFileParsingResultEndTree(
......@@ -117,7 +117,7 @@ object Builders {
def withTreeTask(task: FileTreeScanTask) = copy(treeTask = task)
def build() = FileParsingResultEndTree(treeTask = treeTask)
def build() = FileParsingSignalEndTree(treeTask = treeTask)
}
}
......@@ -76,13 +76,13 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat
"managing a single worker" should {
"forward a received file tree started signal" in new Fixture(1) {
source.sendNext(FileParsingTaskStartTree(sampleTreeScan))
source.sendNext(FileParsingSignalStartTree(sampleTreeScan))
sink.ensureSubscription().request(1)
sink.expectNext(FileParsingResultStartTree(sampleTreeScan))
sink.expectNext(FileParsingSignalStartTree(sampleTreeScan))
}
"emit parsing results for every incoming parsing request in order of input" in new Fixture(1) {
source.sendNext(FileParsingTaskStartTree(sampleTreeScan))
source.sendNext(FileParsingSignalStartTree(sampleTreeScan))
source.sendNext(input1).sendNext(input2).sendNext(input3)
sink.ensureSubscription().request(4)
sink.expectNext()
......@@ -98,23 +98,23 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat
}
"forward a file tree finished signal after processing it" in new Fixture(1) {
source.sendNext(FileParsingTaskStartTree(sampleTreeScan))
source.sendNext(FileParsingTaskEndTree(sampleTreeScan))
source.sendNext(FileParsingSignalStartTree(sampleTreeScan))
source.sendNext(FileParsingSignalEndTree(sampleTreeScan))
sink.ensureSubscription().request(2)
sink.expectNext()
sink.expectNext(FileParsingResultEndTree(sampleTreeScan))
sink.expectNext(FileParsingSignalEndTree(sampleTreeScan))
}
"reject parsing requests after receiving the tree finished signal" in new Fixture(1) {
source.sendNext(FileParsingTaskStartTree(sampleTreeScan))
source.sendNext(FileParsingTaskEndTree(sampleTreeScan)).sendNext(input1)
source.sendNext(FileParsingSignalStartTree(sampleTreeScan))
source.sendNext(FileParsingSignalEndTree(sampleTreeScan)).sendNext(input1)
sink.ensureSubscription().request(3)
sink.expectNextN(2)
sink.expectError()
}
"complete the stage if upstream signals completion before all elements are processed" in new Fixture(1) {
source.sendNext(FileParsingTaskStartTree(sampleTreeScan))
source.sendNext(FileParsingSignalStartTree(sampleTreeScan))
source.sendNext(input1).sendNext(input2).sendNext(input3).sendComplete()
sink.ensureSubscription().request(4)
sink.expectNextN(4)
......@@ -122,7 +122,7 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat
}
"complete the stage if upstream signals completion after all elements are processed" in new Fixture(1) {
source.sendNext(FileParsingTaskStartTree(sampleTreeScan))
source.sendNext(FileParsingSignalStartTree(sampleTreeScan))
source.sendNext(input1).sendNext(input2).sendNext(input3)
sink.ensureSubscription().request(4)
sink.expectNextN(4)
......@@ -131,23 +131,23 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat
}
"delete the extracted files from a zip archive when the end tree signal arrives" in new Fixture(1) {
source.sendNext(FileParsingTaskStartTree(sampleTreeScan.copy(treeType = TreeType.Zip)))
source.sendNext(FileParsingTaskEndTree(sampleTreeScan.copy(treeType = TreeType.Zip)))
source.sendNext(FileParsingSignalStartTree(sampleTreeScan.copy(treeType = TreeType.Zip)))
source.sendNext(FileParsingSignalEndTree(sampleTreeScan.copy(treeType = TreeType.Zip)))
sink.ensureSubscription().request(2)
sink.expectNextN(2)
verify(dummyArchiveHandler).cleanUpExtractedArchive(sampleTreeScan.treeBasePath)
}
"not interact with the ArchiveHandler when processing directory file trees" in new Fixture(1) {
source.sendNext(FileParsingTaskStartTree(sampleTreeScan))
source.sendNext(FileParsingTaskEndTree(sampleTreeScan))
source.sendNext(FileParsingSignalStartTree(sampleTreeScan))
source.sendNext(FileParsingSignalEndTree(sampleTreeScan))
sink.ensureSubscription().request(2)
sink.expectNextN(2)
verifyZeroInteractions(dummyArchiveHandler)
}
"specify the location of the main file to the parsing engine when handling directories" in new Fixture(1) {
source.sendNext(FileParsingTaskStartTree(sampleTreeScan))
source.sendNext(FileParsingSignalStartTree(sampleTreeScan))
val task = aFileParsingTask().withTreeTask(sampleTreeScan).withRelativePath("magic").build()
source.sendNext(task)
findFirstMatchingStreamElement(have(relativePath("magic"), status(ParseResult.ParseSuccess)))
......@@ -160,7 +160,7 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat
val extractedPath = Paths.get("/tmp/extracted/magic")
val task = aFileParsingTask().withTreeTask(zipTreeTask).withRelativePath("magic").
withExtractedPath(Some(extractedPath)).build()
source.sendNext(FileParsingTaskStartTree(zipTreeTask))
source.sendNext(FileParsingSignalStartTree(zipTreeTask))
source.sendNext(task)
findFirstMatchingStreamElement(have(relativePath("magic"), status(ParseResult.ParseSuccess)))
verify(dummyWorkers.head).processRequest(task, extractedPath)
......@@ -171,7 +171,7 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat
Seq(TreeType.Unknown, TreeType.File, TreeType.Tar).foreach { treeType =>
val anotherTreeScan = sampleTreeScan.copy(treeType = treeType)
val anotherRequest = input1.copy(treeTask = anotherTreeScan)
source.sendNext(FileParsingTaskStartTree(anotherTreeScan))
source.sendNext(FileParsingSignalStartTree(anotherTreeScan))
source.sendNext(anotherRequest)
sink.expectNext()
val result = sink.expectNext()
......
......@@ -63,8 +63,8 @@ trait FileParsingTaskMatchers {
}
def numParsingTasks(expectedValue: Long) =
new HavePropertyMatcher[FileParsingTaskEndTree, Long] {
def apply(test: FileParsingTaskEndTree) = HavePropertyMatchResult(
new HavePropertyMatcher[FileParsingSignalEndTree, Long] {
def apply(test: FileParsingSignalEndTree) = HavePropertyMatchResult(
test.numParsingTasks == expectedValue,
"number of identified candidate calculations",
expectedValue,
......
......@@ -69,9 +69,9 @@ class ParsingResultsProcessorFlowSpec extends WordSpec with MockitoSugar {
"process every incoming file parsing result" in new GraphWithDummy {
val inputs = (1 to 3).map(x => sampleInput(sampleTreeTask, Paths.get(s"file$x")))
probeOutput.ensureSubscription().request(1)
probeInFiles.sendNext(FileParsingResultStartTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalStartTree(sampleTreeTask))
inputs.foreach(probeInFiles.sendNext)
probeInFiles.sendNext(FileParsingResultEndTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalEndTree(sampleTreeTask))
probeOutput.expectNext()
inputs.foreach(x => verify(processingMock, times(1)).processFileParsingResult(x))
}
......@@ -86,10 +86,10 @@ class ParsingResultsProcessorFlowSpec extends WordSpec with MockitoSugar {
"send a result once the end tree signal is received" in new GraphWithDummy {
val inputs = (1 to 3).map(x => sampleInput(sampleTreeTask, Paths.get(s"file$x")))
probeOutput.ensureSubscription().request(1)
probeInFiles.sendNext(FileParsingResultStartTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalStartTree(sampleTreeTask))
inputs.foreach(probeInFiles.sendNext)
probeOutput.expectNoMsg(1.seconds)
probeInFiles.sendNext(FileParsingResultEndTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalEndTree(sampleTreeTask))
probeOutput.expectNext(FileTreeParsingResult(sampleTreeTask, 3, 0, Paths.get("."),
OutputType.Json))
}
......@@ -97,9 +97,9 @@ class ParsingResultsProcessorFlowSpec extends WordSpec with MockitoSugar {
"not accept any parsing results after the end tree signal was received" in new GraphWithDummy {
val inputs = (1 to 3).map(x => sampleInput(sampleTreeTask, Paths.get(s"file$x")))
probeOutput.ensureSubscription().request(2)
probeInFiles.sendNext(FileParsingResultStartTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalStartTree(sampleTreeTask))
inputs.tail.foreach(probeInFiles.sendNext)
probeInFiles.sendNext(FileParsingResultEndTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalEndTree(sampleTreeTask))
probeOutput.expectNext()
probeInFiles.sendNext(inputs.head)
probeOutput.expectError()
......@@ -107,9 +107,9 @@ class ParsingResultsProcessorFlowSpec extends WordSpec with MockitoSugar {
"call the file tree pre- and postprocessing operations in the correct order" in new GraphWithDummy {
val inputs = (1 to 3).map(x => sampleInput(sampleTreeTask, Paths.get(s"file$x")))
probeInFiles.sendNext(FileParsingResultStartTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalStartTree(sampleTreeTask))
inputs.foreach(probeInFiles.sendNext)
probeInFiles.sendNext(FileParsingResultEndTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalEndTree(sampleTreeTask))
probeOutput.ensureSubscription().request(1)
probeOutput.expectNext()
orderingTest.verify(processingMock, times(1)).startProcessingTreeResults(sampleTreeTask)
......@@ -122,9 +122,9 @@ class ParsingResultsProcessorFlowSpec extends WordSpec with MockitoSugar {
"complete the stage once the input closed and all remaining elements are processed" in new GraphWithDummy {
val inputs = (1 to 3).map(x => sampleInput(sampleTreeTask, Paths.get(s"file$x")))
probeOutput.ensureSubscription().request(1)
probeInFiles.sendNext(FileParsingResultStartTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalStartTree(sampleTreeTask))
inputs.foreach(probeInFiles.sendNext)
probeInFiles.sendNext(FileParsingResultEndTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalEndTree(sampleTreeTask))
probeInFiles.sendComplete()
probeOutput.expectNext()
probeOutput.expectComplete()
......@@ -135,12 +135,12 @@ class ParsingResultsProcessorFlowSpec extends WordSpec with MockitoSugar {
"process all file parsing results, regardless of their inbound ordering" in new GraphWithDummy {
val inputs1 = (1 to 3).map(x => sampleInput(sampleTreeTask, Paths.get(s"file$x")))
val inputs2 = (1 to 3).map(x => sampleInput(sampleTreeTask2, Paths.get(s"file$x")))
probeInFiles.sendNext(FileParsingResultStartTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingResultStartTree(sampleTreeTask2))
probeInFiles.sendNext(FileParsingSignalStartTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalStartTree(sampleTreeTask2))
val chaos = Seq(inputs1(0), inputs1(1), inputs2(0), inputs2(1), inputs1(2), inputs2(2))
chaos.foreach(probeInFiles.sendNext)
probeInFiles.sendNext(FileParsingResultEndTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingResultEndTree(sampleTreeTask2))
probeInFiles.sendNext(FileParsingSignalEndTree(sampleTreeTask))
probeInFiles.sendNext(FileParsingSignalEndTree(sampleTreeTask2))
probeOutput.ensureSubscription().request(2)
probeOutput.expectNextN(2)
chaos.foreach(x => verify(processingMock, times(1)).processFileParsingResult(x))
......@@ -150,14 +150,14 @@ class ParsingResultsProcessorFlowSpec extends WordSpec with MockitoSugar {
val inputs1 = (1 to 3).map(x => sampleInput(sampleTreeTask, Paths.get(s"file$x")))
val inputs2 = (1 to 3).map(x => sampleInput(sampleTreeTask2, Paths.get(s"file$x")))
val chaos = Seq(
FileParsingResultStartTree(sampleTreeTask),
FileParsingSignalStartTree(sampleTreeTask),
inputs1(0),
inputs1(1),
FileParsingResultStartTree(sampleTreeTask2),
FileParsingSignalStartTree(sampleTreeTask2),
inputs2(0), inputs2(1), inputs1(2),
FileParsingResultEndTree(sampleTreeTask),
FileParsingSignalEndTree(sampleTreeTask),
inputs2(2),
FileParsingResultEndTree(sampleTreeTask2)
FileParsingSignalEndTree(sampleTreeTask2)
)
chaos.foreach(probeInFiles.sendNext)
probeOutput.ensureSubscription().request(2)
......
......@@ -72,7 +72,7 @@ class TreeParserFlowSpec extends WordSpec with MockitoSugar with Matchers with T
"emit a start tree signal before sending any file parsing tasks" in new Fixture {
source.sendNext(input1)
val result = sink.ensureSubscription().requestNext()
result shouldBe a[FileParsingTaskStartTree]
result shouldBe a[FileParsingSignalStartTree]
result should have(treeTask(input1))
}
......@@ -93,7 +93,7 @@ class TreeParserFlowSpec extends WordSpec with MockitoSugar with Matchers with T
sink.requestNext()
source.sendComplete()
val result = sink.request(Int.MaxValue).receiveWithin(streamTimeOut).last
result shouldBe a[FileParsingTaskEndTree]
result shouldBe a[FileParsingSignalEndTree]
}
"completes once all elements are emitted if the upstream completes before the tree is finished" in new Fixture {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment