From b4d553f9fb7ddbfd76a4e6254024e44b37e66b37 Mon Sep 17 00:00:00 2001
From: Arvid Ihrig <ihrig@fhi-berlin.mpg.de>
Date: Thu, 28 Jun 2018 17:36:46 +0200
Subject: [PATCH] Integrated Pipeline: ArchiveUnpackingFlow now unpacks
 archives and adds extracted paths to parsing requests

-generalized some matchers
-FileParsingTask  now takes an optional argument for the extracted file path
---
 .../messages/FileParsingTask.scala            |  5 +-
 .../ArchiveUnpackingFlow.scala                | 43 ++++++++++-------
 .../ArchiveUnpackingFlowSpec.scala            | 43 +++++++++++++++--
 .../MessageMatchers.scala                     | 48 ++++++++++++++-----
 4 files changed, 106 insertions(+), 33 deletions(-)

diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingTask.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingTask.scala
index 39305cf0..d3f883a5 100644
--- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingTask.scala
+++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingTask.scala
@@ -13,11 +13,14 @@ sealed trait FileParsingTaskSignal {
  * @param treeTask the file-tree scan request from which this parsing task was generated
  * @param relativePath path inside the given file tree
  * @param parserName name of the parser to use for processing the file
+ * @param extractedPath the path to the temporarily extracted main file (if originating from an
+ *                      archive)
  */
 case class FileParsingTask(
     treeTask: FileTreeScanTask,
     relativePath: Path,
-    parserName: String
+    parserName: String,
+    extractedPath: Option[Path] = None
 ) extends FileParsingTaskSignal {
 
   def mainFileUri: String = {
diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveUnpackingFlow.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveUnpackingFlow.scala
index e329e5b3..a17186f5 100644
--- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveUnpackingFlow.scala
+++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveUnpackingFlow.scala
@@ -4,6 +4,7 @@ import java.nio.file.Path
 
 import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
 import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import eu.nomad_lab.TreeType
 import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileParsingTaskEndTree, FileParsingTaskSignal, FileParsingTaskStartTree }
 
 import scala.collection.mutable
@@ -21,7 +22,7 @@ class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler)
 
   override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
 
-    val treesInProgress: mutable.Map[Path, Path] = mutable.Map()
+    private val activeTrees: mutable.Map[Path, Path] = mutable.Map()
     private def fail(msg: String): Unit = {
       failStage(new IllegalArgumentException(msg))
     }
@@ -33,25 +34,35 @@ class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler)
     })
 
     setHandler(in, new InHandler {
-      override def onPush(): Unit =
+      override def onPush(): Unit = {
+        val input = grab(in)
+        val path = input.treeTask.treeBasePath
 
-        grab(in) match {
-          case x: FileParsingTaskStartTree =>
-            treesInProgress.put(x.treeTask.treeBasePath, x.treeTask.treeBasePath) match {
-              case Some(_) => fail("to be started file tree already registered")
-              case None => push(out, x)
+        input match {
+          case _: FileParsingTaskStartTree if activeTrees.contains(path) =>
+            fail("to be started file tree already registered")
+          case _: FileParsingTask if !activeTrees.contains(path) =>
+            fail("parsing task arrived before start tree signal")
+          case _: FileParsingTaskEndTree if !activeTrees.contains(path) =>
+            fail("to be finished file tree was not registered")
+          case signal: FileParsingTaskStartTree =>
+            activeTrees(path) = signal.treeTask.treeType match {
+              case TreeType.Directory => path
+              case TreeType.Zip => archiveHandler.extractZipArchive(path)
             }
-          case x: FileParsingTaskEndTree =>
-            treesInProgress.remove(x.treeTask.treeBasePath) match {
-              case Some(_) => push(out, x)
-              case None => fail("to be finished file tree was not registered")
-            }
-          case x: FileParsingTask =>
-            treesInProgress.get(x.treeTask.treeBasePath) match {
-              case Some(_) => push(out, x)
-              case None => fail("parsing task arrived before start tree signal")
+            push(out, signal)
+          case signal: FileParsingTask =>
+            signal.treeTask.treeType match {
+              case TreeType.Directory => push(out, signal)
+              case TreeType.Zip =>
+                val tempPath = activeTrees(path).resolve(signal.relativePath)
+                push(out, signal.copy(extractedPath = Some(tempPath)))
             }
+          case signal: FileParsingTaskEndTree =>
+            activeTrees.remove(path)
+            push(out, signal)
         }
+      }
     })
   }
 
diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala
index acc8d4cf..d2fd3bae 100644
--- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala
+++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/ArchiveUnpackingFlowSpec.scala
@@ -7,7 +7,7 @@ import akka.stream.scaladsl.{ GraphDSL, RunnableGraph }
 import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
 import eu.nomad_lab.TreeType
 import eu.nomad_lab.TreeType.TreeType
-import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTaskSignal, FileParsingTaskStartTree, FileTreeScanTask }
+import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileParsingTaskSignal, FileParsingTaskStartTree, FileTreeScanTask }
 import eu.nomad_lab.integrated_pipeline.stream_components.{ ArchiveHandler, ArchiveUnpackingFlow }
 import org.scalatest.{ Matchers, WordSpec }
 import org.mockito.Mockito._
@@ -23,12 +23,12 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB
     treeType = treeType
   )
 
-  class DirectoryTreeFixture extends StreamAssertions[FileParsingTaskSignal] {
+  abstract class Fixture extends StreamAssertions[FileParsingTaskSignal] {
     val archiveHandler = mock[ArchiveHandler]
 
     private val testInput = TestSource.probe[FileParsingTaskSignal]
     private val testOutput = TestSink.probe[FileParsingTaskSignal]
-    val testGraph = RunnableGraph.fromGraph(
+    private val testGraph = RunnableGraph.fromGraph(
       GraphDSL.create(testInput, testOutput)((_, _)) { implicit builder => (source, sink) =>
         import GraphDSL.Implicits._
 
@@ -39,9 +39,17 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB
       }
     )
     val (source, sink) = testGraph.run()
+
+  }
+
+  class DirectoryTreeFixture extends Fixture {
     val treeTask = aFileTreeScanTask().withTreeType(TreeType.Directory).build()
   }
 
+  class ZipArchiveTreeFixture extends Fixture {
+    val treeTask = aFileTreeScanTask().withTreeType(TreeType.Zip).build()
+  }
+
   "An ArchiveUnpackingFlow" when {
     "receiving signals from directory file trees" should {
       "forward start file tree signals unchanged" in {
@@ -71,6 +79,35 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB
       }
     }
 
+    "receiving signals from zip file trees" should {
+      "temporarily unpack archive when receiving a start tree signal" in {
+        val f = new ZipArchiveTreeFixture()
+        val treeTask = aFileTreeScanTask().withTreeType(TreeType.Zip).withBasePath("/foo").build()
+        val task = aFileParsingTaskStartTree().withTreeTask(treeTask).build()
+        f.source.sendNext(task)
+        f.findFirstMatchingStreamElement(be(task))
+        verify(f.archiveHandler).extractZipArchive(Paths.get("/foo"))
+      }
+
+      "add the temporary extracted file path to file parsing tasks" in new FileParsingTaskMatchers {
+        val f = new ZipArchiveTreeFixture()
+        when(f.archiveHandler.extractZipArchive(any())).thenReturn(Paths.get("/magic"))
+        f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build())
+        val task = aFileParsingTask().withTreeTask(f.treeTask).withRelativePath("foo").build()
+        f.source.sendNext(task)
+        f.findFirstMatchingStreamElement(be(a[FileParsingTask]) and
+          have(relativePath("foo"), extractedPathString(Some("/magic/foo"))))
+      }
+
+      "forward end file tree signals unchanged" in {
+        val f = new ZipArchiveTreeFixture()
+        f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build())
+        val task = aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()
+        f.source.sendNext(task)
+        f.findFirstMatchingStreamElement(be(task))
+      }
+    }
+
     "receiving signals with an arbitrary file tree type" should {
       "fail if file parsing tasks arrive before the start tree signal" in {
         val f = new DirectoryTreeFixture
diff --git a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala
index 7517c47c..170f5742 100644
--- a/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala
+++ b/integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/MessageMatchers.scala
@@ -38,26 +38,48 @@ trait FileParsingTaskMatchers {
     }
 
   def relativePath(expectedValue: Path) =
-    new HavePropertyMatcher[FileParsingTask, Path] {
-      def apply(test: FileParsingTask) =
-        HavePropertyMatchResult(
-          test.relativePath == expectedValue,
+    new HavePropertyMatcher[FileParsingTaskSignal, Path] {
+      def apply(test: FileParsingTaskSignal) = test match {
+        case x: FileParsingTask => HavePropertyMatchResult(
+          x.relativePath == expectedValue,
           "relative file path",
           expectedValue,
-          test.relativePath
+          x.relativePath
+        )
+        case wrongClass => HavePropertyMatchResult(
+          matches = false,
+          s"relative file path (wrong class ${wrongClass.getClass})",
+          expectedValue,
+          null
         )
+      }
     }
 
-  def relativePath(expectedValue: String) =
-    new HavePropertyMatcher[FileParsingTask, Path] {
-      def apply(test: FileParsingTask) =
-        HavePropertyMatchResult(
-          test.relativePath == Paths.get(expectedValue),
-          "relative file path",
-          Paths.get(expectedValue),
-          test.relativePath
+  def relativePath(expectedValue: String): HavePropertyMatcher[FileParsingTaskSignal, Path] = {
+    relativePath(Paths.get(expectedValue))
+  }
+
+  def extractedPath(expectedValue: Option[Path]) =
+    new HavePropertyMatcher[FileParsingTaskSignal, Option[Path]] {
+      def apply(test: FileParsingTaskSignal) = test match {
+        case x: FileParsingTask => HavePropertyMatchResult(
+          x.extractedPath == expectedValue,
+          "temporary extracted file path",
+          expectedValue,
+          x.extractedPath
         )
+        case wrongClass => HavePropertyMatchResult(
+          matches = false,
+          s"relative file path (wrong class ${wrongClass.getClass})",
+          expectedValue,
+          null
+        )
+      }
     }
+
+  def extractedPathString(expectedValue: Option[String]): HavePropertyMatcher[FileParsingTaskSignal, Option[Path]] = {
+    extractedPath(expectedValue.map(Paths.get(_)))
+  }
 }
 
 trait FileParsingResultMatchers {
-- 
GitLab