Skip to content
Snippets Groups Projects
Commit f1206b2a authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: removed no longer used old Akka streams integration code

parent bfbc3fa6
Branches
Tags
No related merge requests found
package eu.nomad_lab.integrated_pipeline.stream_components
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.FileTree
import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler
import eu.nomad_lab.integrated_pipeline.messages._
import scala.collection.mutable
class ArchiveCleanUpFlow(archiveHandler: ArchiveHandler)
extends GraphStage[FlowShape[FileParsingSignal, FileParsingSignal]] {
val in = Inlet[FileParsingSignal]("ArchiveCleanUpFlow.in")
val out = Outlet[FileParsingSignal]("ArchiveCleanUpFlow.out")
override val shape = new FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
private val processed: mutable.Map[FileTree, Long] = mutable.Map()
private val expected: mutable.Map[FileTree, Long] = mutable.Map()
private def fail(msg: String): Unit = {
failStage(new IllegalArgumentException(msg))
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val input = grab(in)
input.fileTree.treeType match {
case TreeType.Directory => push(out, input)
case TreeType.Zip =>
input match {
case x: FileParsingResult =>
processed(x.fileTree) = processed.getOrElse(x.fileTree, 0l) + 1l
case x: TreeScanCompleted =>
expected(x.fileTree) = x.numParsingTasks
}
if (processed.getOrElse(input.fileTree, -1) == expected.getOrElse(input.fileTree, -2)) {
archiveHandler.cleanUpExtractedArchive(input.fileTree.treeBasePath.toAbsolutePath)
processed.remove(input.fileTree)
expected.remove(input.fileTree)
}
push(out, input)
}
}
})
}
}
package eu.nomad_lab.integrated_pipeline.stream_components
import java.nio.file.Path
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler
import eu.nomad_lab.integrated_pipeline.messages.{ TreeScanCompleted, CandidateFound, TreeScanSignal }
import scala.collection.mutable
/**
* Unpacks archives to a temporary location for any file tree that resides inside an archive when
* a file tree start signal is received.
*/
class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler)
extends GraphStage[FlowShape[TreeScanSignal, TreeScanSignal]] {
val in = Inlet[TreeScanSignal]("ArchiveUnpackingFlow.in")
val out = Outlet[TreeScanSignal]("ArchiveUnpackingFlow.out")
override val shape = new FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
private val activeTrees: mutable.Map[Path, Path] = mutable.Map()
private def fail(msg: String): Unit = {
failStage(new IllegalArgumentException(msg))
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val input = grab(in)
val path = input.fileTree.treeBasePath
input match {
case _: TreeScanCompleted if !activeTrees.contains(path) =>
fail("to be finished file tree was not registered")
case signal: CandidateFound =>
if (!activeTrees.contains(path)) {
activeTrees(path) = signal.fileTree.treeType match {
case TreeType.Directory => path
case TreeType.Zip => archiveHandler.extractZipArchive(path)
}
}
signal.fileTree.treeType match {
case TreeType.Directory => push(out, signal)
case TreeType.Zip =>
val tempPath = activeTrees(path).resolve(signal.relativePath)
push(out, signal.copy(extractedPath = Some(tempPath)))
}
case signal: TreeScanCompleted =>
activeTrees.remove(path)
push(out, signal)
}
}
})
}
}
package eu.nomad_lab.integrated_pipeline_tests
import java.nio.file.Paths
import akka.stream.ClosedShape
import akka.stream.scaladsl.{ GraphDSL, RunnableGraph }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler
import eu.nomad_lab.integrated_pipeline.messages.FileParsingSignal
import eu.nomad_lab.integrated_pipeline.stream_components.ArchiveCleanUpFlow
import eu.nomad_lab.integrated_pipeline_tests.matchers.StreamAssertions
import eu.nomad_lab.parsers.ParseResult
import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{ Matchers, WordSpec }
class ArchiveCleanUpFlowSpec extends WordSpec with MockitoSugar with TestDataBuilders with Matchers {
abstract class Fixture extends StreamAssertions[FileParsingSignal] {
val archiveHandler = mock[ArchiveHandler]
private val testInput = TestSource.probe[FileParsingSignal]
private val testOutput = TestSink.probe[FileParsingSignal]
private val testGraph = RunnableGraph.fromGraph(
GraphDSL.create(testInput, testOutput)((_, _)) { implicit builder => (source, sink) =>
import GraphDSL.Implicits._
val worker = builder.add(new ArchiveCleanUpFlow(archiveHandler))
source ~> worker ~> sink
ClosedShape
}
)
val (source, sink) = testGraph.run()
}
class DirectoryTreeFixture extends Fixture {
val treeTask = aFileTree().withTreeType(TreeType.Directory).build()
}
class ZipArchiveTreeFixture extends Fixture {
val treeTask = aFileTree().withTreeType(TreeType.Zip)
}
"An ArchiveCleanUpFlow" when {
"receiving signals from directory file trees" should {
"forward file parsing results unchanged" in {
val f = new DirectoryTreeFixture()
val task = aParsingResultInMemory().withParseResult(ParseResult.ParseWithWarnings).build()
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(task))
verifyZeroInteractions(f.archiveHandler)
}
"forward end file tree signals unchanged" in {
val f = new DirectoryTreeFixture()
val task = aTreeScanCompleted().withFileTree(f.treeTask).build()
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(task))
verifyZeroInteractions(f.archiveHandler)
}
}
}
"receiving signals from zip archive file trees" should {
"forward file parsing results unchanged" in {
val f = new ZipArchiveTreeFixture()
val task = aParsingResultInMemory().withParseResult(ParseResult.ParseWithWarnings).build()
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(task))
verifyZeroInteractions(f.archiveHandler)
}
"forward end file tree signals unchanged" in {
val f = new ZipArchiveTreeFixture()
val task = aTreeScanCompleted().withFileTree(f.treeTask).withTaskCount(1).build()
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(task))
verifyZeroInteractions(f.archiveHandler)
}
"clean up temporary extracted files once all parsing results for a tree have arrived (end signal arrives last)" in {
val f = new ZipArchiveTreeFixture()
val treeTask = f.treeTask.withBasePath("/tmp/mighty/magic")
(1 to 4).foreach(_ => f.source.sendNext(aParsingResultInMemory().withFileTree(treeTask)))
f.source.sendNext(aTreeScanCompleted().withFileTree(treeTask).withTaskCount(4))
f.drainStream()
verify(f.archiveHandler).cleanUpExtractedArchive(Paths.get("/tmp/mighty/magic").toAbsolutePath)
verifyNoMoreInteractions(f.archiveHandler)
}
"clean up temporary extracted files once all parsing results for a tree have arrived (parsing result arrives last)" in {
val f = new ZipArchiveTreeFixture()
val treeTask = f.treeTask.withBasePath("/tmp/mighty/magic")
(1 to 3).foreach(_ => f.source.sendNext(aParsingResultInMemory().withFileTree(treeTask)))
f.source.sendNext(aTreeScanCompleted().withFileTree(treeTask).withTaskCount(4))
f.source.sendNext(aParsingResultInMemory().withFileTree(treeTask))
f.drainStream()
verify(f.archiveHandler).cleanUpExtractedArchive(Paths.get("/tmp/mighty/magic").toAbsolutePath)
verifyNoMoreInteractions(f.archiveHandler)
}
"handle the same extracted path again after cleaning it up the first time" in {
val f = new ZipArchiveTreeFixture()
val treeTask = f.treeTask.withBasePath("/tmp/mighty/magic")
(1 to 4).foreach(_ => f.source.sendNext(aParsingResultInMemory().withFileTree(treeTask)))
f.source.sendNext(aTreeScanCompleted().withFileTree(treeTask).withTaskCount(4))
(1 to 4).foreach(_ => f.source.sendNext(aParsingResultInMemory().withFileTree(treeTask)))
f.source.sendNext(aTreeScanCompleted().withFileTree(treeTask).withTaskCount(4))
f.drainStream()
verify(f.archiveHandler, times(2)).cleanUpExtractedArchive(Paths.get("/tmp/mighty/magic").toAbsolutePath)
verifyNoMoreInteractions(f.archiveHandler)
}
}
}
package eu.nomad_lab.integrated_pipeline_tests
import java.nio.file.Paths
import akka.stream.ClosedShape
import akka.stream.scaladsl.{ GraphDSL, RunnableGraph }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler
import eu.nomad_lab.integrated_pipeline.messages.{ CandidateFound, TreeScanSignal }
import eu.nomad_lab.integrated_pipeline.stream_components.ArchiveUnpackingFlow
import eu.nomad_lab.integrated_pipeline_tests.matchers.{ StreamAssertions, TreeScanSignalMatchers }
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{ Matchers, WordSpec }
class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataBuilders with Matchers {
abstract class Fixture extends StreamAssertions[TreeScanSignal] {
val archiveHandler = mock[ArchiveHandler]
private val testInput = TestSource.probe[TreeScanSignal]
private val testOutput = TestSink.probe[TreeScanSignal]
private val testGraph = RunnableGraph.fromGraph(
GraphDSL.create(testInput, testOutput)((_, _)) { implicit builder => (source, sink) =>
import GraphDSL.Implicits._
val worker = builder.add(new ArchiveUnpackingFlow(archiveHandler))
source ~> worker ~> sink
ClosedShape
}
)
val (source, sink) = testGraph.run()
}
class DirectoryTreeFixture extends Fixture {
val fileTree = aFileTree().withTreeType(TreeType.Directory).build()
}
class ZipArchiveTreeFixture extends Fixture {
val fileTree = aFileTree().withTreeType(TreeType.Zip).build()
when(archiveHandler.extractZipArchive(any())).thenReturn(Paths.get("/magic"))
}
"An ArchiveUnpackingFlow" when {
"receiving signals from directory file trees" should {
"forward file parsing tasks unchanged" in {
val f = new DirectoryTreeFixture
val task = aCandidateFound().withFileTree(f.fileTree).build()
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(task))
verifyZeroInteractions(f.archiveHandler)
}
"forward end file tree signals unchanged" in {
val f = new DirectoryTreeFixture()
f.source.sendNext(aCandidateFound().withFileTree(f.fileTree))
val task = aTreeScanCompleted().withFileTree(f.fileTree).build()
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(task))
verifyZeroInteractions(f.archiveHandler)
}
}
"receiving signals from zip file trees" should {
"temporarily unpack archive when receiving the first task from a file tree" in {
val f = new ZipArchiveTreeFixture()
val treeTask = aFileTree().withTreeType(TreeType.Zip).withBasePath("/foo")
val task = aCandidateFound().withFileTree(treeTask)
f.source.sendNext(task)
f.drainStream()
verify(f.archiveHandler).extractZipArchive(Paths.get("/foo"))
}
"add the temporary extracted file path to file parsing tasks" in {
import TreeScanSignalMatchers._
val f = new ZipArchiveTreeFixture()
val task = aCandidateFound().withFileTree(f.fileTree).withRelativePath("foo")
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(a[CandidateFound]) and
have(relativePath("foo"), extractedPath("/magic/foo")))
}
"forward end file tree signals unchanged" in {
val f = new ZipArchiveTreeFixture()
f.source.sendNext(aCandidateFound().withFileTree(f.fileTree))
val task = aTreeScanCompleted().withFileTree(f.fileTree).build()
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(task))
}
}
"receiving signals with an arbitrary file tree type" should {
"fail if file tree end signal arrive before the start tree signal" in {
val f = new DirectoryTreeFixture
f.source.sendNext(aTreeScanCompleted().withFileTree(f.fileTree).build())
f.expectStreamFailure(be(an[IllegalArgumentException]))
}
"fail if file parsing task arrives after the end tree signal" in {
val f = new DirectoryTreeFixture
f.source.sendNext(aTreeScanCompleted().withFileTree(f.fileTree).build())
f.source.sendNext(aCandidateFound().withFileTree(f.fileTree).build())
f.expectStreamFailure(be(an[IllegalArgumentException]))
}
"fail if a file tree end signal arrives more than once" in {
val f = new DirectoryTreeFixture
f.source.sendNext(aTreeScanCompleted().withFileTree(f.fileTree).build())
f.source.sendNext(aTreeScanCompleted().withFileTree(f.fileTree).build())
f.expectStreamFailure(be(an[IllegalArgumentException]))
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment