diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveCleanUpFlow.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveCleanUpFlow.scala deleted file mode 100644 index 84962ac822dcb163d0fe9d97db35e4be5dd851b6..0000000000000000000000000000000000000000 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveCleanUpFlow.scala +++ /dev/null @@ -1,57 +0,0 @@ -package eu.nomad_lab.integrated_pipeline.stream_components - -import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } -import eu.nomad_lab.TreeType -import eu.nomad_lab.integrated_pipeline.FileTree -import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler -import eu.nomad_lab.integrated_pipeline.messages._ - -import scala.collection.mutable - -class ArchiveCleanUpFlow(archiveHandler: ArchiveHandler) - extends GraphStage[FlowShape[FileParsingSignal, FileParsingSignal]] { - - val in = Inlet[FileParsingSignal]("ArchiveCleanUpFlow.in") - val out = Outlet[FileParsingSignal]("ArchiveCleanUpFlow.out") - override val shape = new FlowShape(in, out) - - override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { - - private val processed: mutable.Map[FileTree, Long] = mutable.Map() - private val expected: mutable.Map[FileTree, Long] = mutable.Map() - - private def fail(msg: String): Unit = { - failStage(new IllegalArgumentException(msg)) - } - - setHandler(out, new OutHandler { - override def onPull(): Unit = { - pull(in) - } - }) - - setHandler(in, new InHandler { - override def onPush(): Unit = { - val input = grab(in) - - input.fileTree.treeType match { - case TreeType.Directory => push(out, input) - case TreeType.Zip => - input match { - case x: FileParsingResult => - processed(x.fileTree) = processed.getOrElse(x.fileTree, 0l) + 1l - case x: TreeScanCompleted => - expected(x.fileTree) = x.numParsingTasks - } - if (processed.getOrElse(input.fileTree, -1) == expected.getOrElse(input.fileTree, -2)) { - archiveHandler.cleanUpExtractedArchive(input.fileTree.treeBasePath.toAbsolutePath) - processed.remove(input.fileTree) - expected.remove(input.fileTree) - } - push(out, input) - } - } - }) - } -} diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveUnpackingFlow.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveUnpackingFlow.scala deleted file mode 100644 index 523178927f70ca291ba1bcfb47adc375dafdd596..0000000000000000000000000000000000000000 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveUnpackingFlow.scala +++ /dev/null @@ -1,66 +0,0 @@ -package eu.nomad_lab.integrated_pipeline.stream_components - -import java.nio.file.Path - -import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } -import eu.nomad_lab.TreeType -import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler -import eu.nomad_lab.integrated_pipeline.messages.{ TreeScanCompleted, CandidateFound, TreeScanSignal } - -import scala.collection.mutable - -/** - * Unpacks archives to a temporary location for any file tree that resides inside an archive when - * a file tree start signal is received. - */ -class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler) - extends GraphStage[FlowShape[TreeScanSignal, TreeScanSignal]] { - - val in = Inlet[TreeScanSignal]("ArchiveUnpackingFlow.in") - val out = Outlet[TreeScanSignal]("ArchiveUnpackingFlow.out") - override val shape = new FlowShape(in, out) - - override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { - - private val activeTrees: mutable.Map[Path, Path] = mutable.Map() - private def fail(msg: String): Unit = { - failStage(new IllegalArgumentException(msg)) - } - - setHandler(out, new OutHandler { - override def onPull(): Unit = { - pull(in) - } - }) - - setHandler(in, new InHandler { - override def onPush(): Unit = { - val input = grab(in) - val path = input.fileTree.treeBasePath - - input match { - case _: TreeScanCompleted if !activeTrees.contains(path) => - fail("to be finished file tree was not registered") - case signal: CandidateFound => - if (!activeTrees.contains(path)) { - activeTrees(path) = signal.fileTree.treeType match { - case TreeType.Directory => path - case TreeType.Zip => archiveHandler.extractZipArchive(path) - } - } - signal.fileTree.treeType match { - case TreeType.Directory => push(out, signal) - case TreeType.Zip => - val tempPath = activeTrees(path).resolve(signal.relativePath) - push(out, signal.copy(extractedPath = Some(tempPath))) - } - case signal: TreeScanCompleted => - activeTrees.remove(path) - push(out, signal) - } - } - }) - } - -} diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveCleanUpFlowSpec.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveCleanUpFlowSpec.scala deleted file mode 100644 index 43f29d3fb9e017245b3d84ed765d9d4e1786f7b8..0000000000000000000000000000000000000000 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveCleanUpFlowSpec.scala +++ /dev/null @@ -1,122 +0,0 @@ -package eu.nomad_lab.integrated_pipeline_tests - -import java.nio.file.Paths - -import akka.stream.ClosedShape -import akka.stream.scaladsl.{ GraphDSL, RunnableGraph } -import akka.stream.testkit.scaladsl.{ TestSink, TestSource } -import eu.nomad_lab.TreeType -import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler -import eu.nomad_lab.integrated_pipeline.messages.FileParsingSignal -import eu.nomad_lab.integrated_pipeline.stream_components.ArchiveCleanUpFlow -import eu.nomad_lab.integrated_pipeline_tests.matchers.StreamAssertions -import eu.nomad_lab.parsers.ParseResult -import org.mockito.Mockito._ -import org.scalatest.mockito.MockitoSugar -import org.scalatest.{ Matchers, WordSpec } - -class ArchiveCleanUpFlowSpec extends WordSpec with MockitoSugar with TestDataBuilders with Matchers { - - abstract class Fixture extends StreamAssertions[FileParsingSignal] { - val archiveHandler = mock[ArchiveHandler] - - private val testInput = TestSource.probe[FileParsingSignal] - private val testOutput = TestSink.probe[FileParsingSignal] - private val testGraph = RunnableGraph.fromGraph( - GraphDSL.create(testInput, testOutput)((_, _)) { implicit builder => (source, sink) => - import GraphDSL.Implicits._ - - val worker = builder.add(new ArchiveCleanUpFlow(archiveHandler)) - - source ~> worker ~> sink - ClosedShape - } - ) - val (source, sink) = testGraph.run() - } - - class DirectoryTreeFixture extends Fixture { - val treeTask = aFileTree().withTreeType(TreeType.Directory).build() - } - - class ZipArchiveTreeFixture extends Fixture { - val treeTask = aFileTree().withTreeType(TreeType.Zip) - } - - "An ArchiveCleanUpFlow" when { - "receiving signals from directory file trees" should { - - "forward file parsing results unchanged" in { - val f = new DirectoryTreeFixture() - val task = aParsingResultInMemory().withParseResult(ParseResult.ParseWithWarnings).build() - f.source.sendNext(task) - f.findFirstMatchingStreamElement(be(task)) - verifyZeroInteractions(f.archiveHandler) - } - - "forward end file tree signals unchanged" in { - val f = new DirectoryTreeFixture() - val task = aTreeScanCompleted().withFileTree(f.treeTask).build() - f.source.sendNext(task) - f.findFirstMatchingStreamElement(be(task)) - verifyZeroInteractions(f.archiveHandler) - } - - } - - } - - "receiving signals from zip archive file trees" should { - - "forward file parsing results unchanged" in { - val f = new ZipArchiveTreeFixture() - val task = aParsingResultInMemory().withParseResult(ParseResult.ParseWithWarnings).build() - f.source.sendNext(task) - f.findFirstMatchingStreamElement(be(task)) - verifyZeroInteractions(f.archiveHandler) - } - - "forward end file tree signals unchanged" in { - val f = new ZipArchiveTreeFixture() - val task = aTreeScanCompleted().withFileTree(f.treeTask).withTaskCount(1).build() - f.source.sendNext(task) - f.findFirstMatchingStreamElement(be(task)) - verifyZeroInteractions(f.archiveHandler) - } - - "clean up temporary extracted files once all parsing results for a tree have arrived (end signal arrives last)" in { - val f = new ZipArchiveTreeFixture() - val treeTask = f.treeTask.withBasePath("/tmp/mighty/magic") - (1 to 4).foreach(_ => f.source.sendNext(aParsingResultInMemory().withFileTree(treeTask))) - f.source.sendNext(aTreeScanCompleted().withFileTree(treeTask).withTaskCount(4)) - f.drainStream() - verify(f.archiveHandler).cleanUpExtractedArchive(Paths.get("/tmp/mighty/magic").toAbsolutePath) - verifyNoMoreInteractions(f.archiveHandler) - } - - "clean up temporary extracted files once all parsing results for a tree have arrived (parsing result arrives last)" in { - val f = new ZipArchiveTreeFixture() - val treeTask = f.treeTask.withBasePath("/tmp/mighty/magic") - (1 to 3).foreach(_ => f.source.sendNext(aParsingResultInMemory().withFileTree(treeTask))) - f.source.sendNext(aTreeScanCompleted().withFileTree(treeTask).withTaskCount(4)) - f.source.sendNext(aParsingResultInMemory().withFileTree(treeTask)) - f.drainStream() - verify(f.archiveHandler).cleanUpExtractedArchive(Paths.get("/tmp/mighty/magic").toAbsolutePath) - verifyNoMoreInteractions(f.archiveHandler) - } - - "handle the same extracted path again after cleaning it up the first time" in { - val f = new ZipArchiveTreeFixture() - val treeTask = f.treeTask.withBasePath("/tmp/mighty/magic") - (1 to 4).foreach(_ => f.source.sendNext(aParsingResultInMemory().withFileTree(treeTask))) - f.source.sendNext(aTreeScanCompleted().withFileTree(treeTask).withTaskCount(4)) - (1 to 4).foreach(_ => f.source.sendNext(aParsingResultInMemory().withFileTree(treeTask))) - f.source.sendNext(aTreeScanCompleted().withFileTree(treeTask).withTaskCount(4)) - f.drainStream() - verify(f.archiveHandler, times(2)).cleanUpExtractedArchive(Paths.get("/tmp/mighty/magic").toAbsolutePath) - verifyNoMoreInteractions(f.archiveHandler) - } - - } - -} diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala deleted file mode 100644 index 2f376d81e8a4d79125164af08a8df5919013bc89..0000000000000000000000000000000000000000 --- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala +++ /dev/null @@ -1,121 +0,0 @@ -package eu.nomad_lab.integrated_pipeline_tests - -import java.nio.file.Paths - -import akka.stream.ClosedShape -import akka.stream.scaladsl.{ GraphDSL, RunnableGraph } -import akka.stream.testkit.scaladsl.{ TestSink, TestSource } -import eu.nomad_lab.TreeType -import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler -import eu.nomad_lab.integrated_pipeline.messages.{ CandidateFound, TreeScanSignal } -import eu.nomad_lab.integrated_pipeline.stream_components.ArchiveUnpackingFlow -import eu.nomad_lab.integrated_pipeline_tests.matchers.{ StreamAssertions, TreeScanSignalMatchers } -import org.mockito.ArgumentMatchers._ -import org.mockito.Mockito._ -import org.scalatest.mockito.MockitoSugar -import org.scalatest.{ Matchers, WordSpec } - -class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataBuilders with Matchers { - - abstract class Fixture extends StreamAssertions[TreeScanSignal] { - val archiveHandler = mock[ArchiveHandler] - - private val testInput = TestSource.probe[TreeScanSignal] - private val testOutput = TestSink.probe[TreeScanSignal] - private val testGraph = RunnableGraph.fromGraph( - GraphDSL.create(testInput, testOutput)((_, _)) { implicit builder => (source, sink) => - import GraphDSL.Implicits._ - - val worker = builder.add(new ArchiveUnpackingFlow(archiveHandler)) - - source ~> worker ~> sink - ClosedShape - } - ) - val (source, sink) = testGraph.run() - - } - - class DirectoryTreeFixture extends Fixture { - val fileTree = aFileTree().withTreeType(TreeType.Directory).build() - } - - class ZipArchiveTreeFixture extends Fixture { - val fileTree = aFileTree().withTreeType(TreeType.Zip).build() - when(archiveHandler.extractZipArchive(any())).thenReturn(Paths.get("/magic")) - } - - "An ArchiveUnpackingFlow" when { - "receiving signals from directory file trees" should { - - "forward file parsing tasks unchanged" in { - val f = new DirectoryTreeFixture - val task = aCandidateFound().withFileTree(f.fileTree).build() - f.source.sendNext(task) - f.findFirstMatchingStreamElement(be(task)) - verifyZeroInteractions(f.archiveHandler) - } - - "forward end file tree signals unchanged" in { - val f = new DirectoryTreeFixture() - f.source.sendNext(aCandidateFound().withFileTree(f.fileTree)) - val task = aTreeScanCompleted().withFileTree(f.fileTree).build() - f.source.sendNext(task) - f.findFirstMatchingStreamElement(be(task)) - verifyZeroInteractions(f.archiveHandler) - } - } - - "receiving signals from zip file trees" should { - "temporarily unpack archive when receiving the first task from a file tree" in { - val f = new ZipArchiveTreeFixture() - val treeTask = aFileTree().withTreeType(TreeType.Zip).withBasePath("/foo") - val task = aCandidateFound().withFileTree(treeTask) - f.source.sendNext(task) - f.drainStream() - verify(f.archiveHandler).extractZipArchive(Paths.get("/foo")) - } - - "add the temporary extracted file path to file parsing tasks" in { - import TreeScanSignalMatchers._ - val f = new ZipArchiveTreeFixture() - val task = aCandidateFound().withFileTree(f.fileTree).withRelativePath("foo") - f.source.sendNext(task) - f.findFirstMatchingStreamElement(be(a[CandidateFound]) and - have(relativePath("foo"), extractedPath("/magic/foo"))) - } - - "forward end file tree signals unchanged" in { - val f = new ZipArchiveTreeFixture() - f.source.sendNext(aCandidateFound().withFileTree(f.fileTree)) - val task = aTreeScanCompleted().withFileTree(f.fileTree).build() - f.source.sendNext(task) - f.findFirstMatchingStreamElement(be(task)) - } - } - - "receiving signals with an arbitrary file tree type" should { - - "fail if file tree end signal arrive before the start tree signal" in { - val f = new DirectoryTreeFixture - f.source.sendNext(aTreeScanCompleted().withFileTree(f.fileTree).build()) - f.expectStreamFailure(be(an[IllegalArgumentException])) - } - - "fail if file parsing task arrives after the end tree signal" in { - val f = new DirectoryTreeFixture - f.source.sendNext(aTreeScanCompleted().withFileTree(f.fileTree).build()) - f.source.sendNext(aCandidateFound().withFileTree(f.fileTree).build()) - f.expectStreamFailure(be(an[IllegalArgumentException])) - } - - "fail if a file tree end signal arrives more than once" in { - val f = new DirectoryTreeFixture - f.source.sendNext(aTreeScanCompleted().withFileTree(f.fileTree).build()) - f.source.sendNext(aTreeScanCompleted().withFileTree(f.fileTree).build()) - f.expectStreamFailure(be(an[IllegalArgumentException])) - } - } - } - -}