Skip to content
Snippets Groups Projects
Commit b4d553f9 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: ArchiveUnpackingFlow now unpacks archives and adds...

Integrated Pipeline: ArchiveUnpackingFlow now unpacks archives and adds extracted paths to parsing requests

-generalized some matchers
-FileParsingTask  now takes an optional argument for the extracted file path
parent aebe99f3
No related branches found
No related tags found
No related merge requests found
...@@ -13,11 +13,14 @@ sealed trait FileParsingTaskSignal { ...@@ -13,11 +13,14 @@ sealed trait FileParsingTaskSignal {
* @param treeTask the file-tree scan request from which this parsing task was generated * @param treeTask the file-tree scan request from which this parsing task was generated
* @param relativePath path inside the given file tree * @param relativePath path inside the given file tree
* @param parserName name of the parser to use for processing the file * @param parserName name of the parser to use for processing the file
* @param extractedPath the path to the temporarily extracted main file (if originating from an
* archive)
*/ */
case class FileParsingTask( case class FileParsingTask(
treeTask: FileTreeScanTask, treeTask: FileTreeScanTask,
relativePath: Path, relativePath: Path,
parserName: String parserName: String,
extractedPath: Option[Path] = None
) extends FileParsingTaskSignal { ) extends FileParsingTaskSignal {
def mainFileUri: String = { def mainFileUri: String = {
......
...@@ -4,6 +4,7 @@ import java.nio.file.Path ...@@ -4,6 +4,7 @@ import java.nio.file.Path
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileParsingTaskEndTree, FileParsingTaskSignal, FileParsingTaskStartTree } import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileParsingTaskEndTree, FileParsingTaskSignal, FileParsingTaskStartTree }
import scala.collection.mutable import scala.collection.mutable
...@@ -21,7 +22,7 @@ class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler) ...@@ -21,7 +22,7 @@ class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
val treesInProgress: mutable.Map[Path, Path] = mutable.Map() private val activeTrees: mutable.Map[Path, Path] = mutable.Map()
private def fail(msg: String): Unit = { private def fail(msg: String): Unit = {
failStage(new IllegalArgumentException(msg)) failStage(new IllegalArgumentException(msg))
} }
...@@ -33,25 +34,35 @@ class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler) ...@@ -33,25 +34,35 @@ class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler)
}) })
setHandler(in, new InHandler { setHandler(in, new InHandler {
override def onPush(): Unit = override def onPush(): Unit = {
val input = grab(in)
val path = input.treeTask.treeBasePath
grab(in) match { input match {
case x: FileParsingTaskStartTree => case _: FileParsingTaskStartTree if activeTrees.contains(path) =>
treesInProgress.put(x.treeTask.treeBasePath, x.treeTask.treeBasePath) match { fail("to be started file tree already registered")
case Some(_) => fail("to be started file tree already registered") case _: FileParsingTask if !activeTrees.contains(path) =>
case None => push(out, x) fail("parsing task arrived before start tree signal")
case _: FileParsingTaskEndTree if !activeTrees.contains(path) =>
fail("to be finished file tree was not registered")
case signal: FileParsingTaskStartTree =>
activeTrees(path) = signal.treeTask.treeType match {
case TreeType.Directory => path
case TreeType.Zip => archiveHandler.extractZipArchive(path)
} }
case x: FileParsingTaskEndTree => push(out, signal)
treesInProgress.remove(x.treeTask.treeBasePath) match { case signal: FileParsingTask =>
case Some(_) => push(out, x) signal.treeTask.treeType match {
case None => fail("to be finished file tree was not registered") case TreeType.Directory => push(out, signal)
} case TreeType.Zip =>
case x: FileParsingTask => val tempPath = activeTrees(path).resolve(signal.relativePath)
treesInProgress.get(x.treeTask.treeBasePath) match { push(out, signal.copy(extractedPath = Some(tempPath)))
case Some(_) => push(out, x)
case None => fail("parsing task arrived before start tree signal")
} }
case signal: FileParsingTaskEndTree =>
activeTrees.remove(path)
push(out, signal)
} }
}
}) })
} }
......
...@@ -7,7 +7,7 @@ import akka.stream.scaladsl.{ GraphDSL, RunnableGraph } ...@@ -7,7 +7,7 @@ import akka.stream.scaladsl.{ GraphDSL, RunnableGraph }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import eu.nomad_lab.TreeType import eu.nomad_lab.TreeType
import eu.nomad_lab.TreeType.TreeType import eu.nomad_lab.TreeType.TreeType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTaskSignal, FileParsingTaskStartTree, FileTreeScanTask } import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileParsingTaskSignal, FileParsingTaskStartTree, FileTreeScanTask }
import eu.nomad_lab.integrated_pipeline.stream_components.{ ArchiveHandler, ArchiveUnpackingFlow } import eu.nomad_lab.integrated_pipeline.stream_components.{ ArchiveHandler, ArchiveUnpackingFlow }
import org.scalatest.{ Matchers, WordSpec } import org.scalatest.{ Matchers, WordSpec }
import org.mockito.Mockito._ import org.mockito.Mockito._
...@@ -23,12 +23,12 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB ...@@ -23,12 +23,12 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB
treeType = treeType treeType = treeType
) )
class DirectoryTreeFixture extends StreamAssertions[FileParsingTaskSignal] { abstract class Fixture extends StreamAssertions[FileParsingTaskSignal] {
val archiveHandler = mock[ArchiveHandler] val archiveHandler = mock[ArchiveHandler]
private val testInput = TestSource.probe[FileParsingTaskSignal] private val testInput = TestSource.probe[FileParsingTaskSignal]
private val testOutput = TestSink.probe[FileParsingTaskSignal] private val testOutput = TestSink.probe[FileParsingTaskSignal]
val testGraph = RunnableGraph.fromGraph( private val testGraph = RunnableGraph.fromGraph(
GraphDSL.create(testInput, testOutput)((_, _)) { implicit builder => (source, sink) => GraphDSL.create(testInput, testOutput)((_, _)) { implicit builder => (source, sink) =>
import GraphDSL.Implicits._ import GraphDSL.Implicits._
...@@ -39,9 +39,17 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB ...@@ -39,9 +39,17 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB
} }
) )
val (source, sink) = testGraph.run() val (source, sink) = testGraph.run()
}
class DirectoryTreeFixture extends Fixture {
val treeTask = aFileTreeScanTask().withTreeType(TreeType.Directory).build() val treeTask = aFileTreeScanTask().withTreeType(TreeType.Directory).build()
} }
class ZipArchiveTreeFixture extends Fixture {
val treeTask = aFileTreeScanTask().withTreeType(TreeType.Zip).build()
}
"An ArchiveUnpackingFlow" when { "An ArchiveUnpackingFlow" when {
"receiving signals from directory file trees" should { "receiving signals from directory file trees" should {
"forward start file tree signals unchanged" in { "forward start file tree signals unchanged" in {
...@@ -71,6 +79,35 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB ...@@ -71,6 +79,35 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB
} }
} }
"receiving signals from zip file trees" should {
"temporarily unpack archive when receiving a start tree signal" in {
val f = new ZipArchiveTreeFixture()
val treeTask = aFileTreeScanTask().withTreeType(TreeType.Zip).withBasePath("/foo").build()
val task = aFileParsingTaskStartTree().withTreeTask(treeTask).build()
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(task))
verify(f.archiveHandler).extractZipArchive(Paths.get("/foo"))
}
"add the temporary extracted file path to file parsing tasks" in new FileParsingTaskMatchers {
val f = new ZipArchiveTreeFixture()
when(f.archiveHandler.extractZipArchive(any())).thenReturn(Paths.get("/magic"))
f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build())
val task = aFileParsingTask().withTreeTask(f.treeTask).withRelativePath("foo").build()
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(a[FileParsingTask]) and
have(relativePath("foo"), extractedPathString(Some("/magic/foo"))))
}
"forward end file tree signals unchanged" in {
val f = new ZipArchiveTreeFixture()
f.source.sendNext(aFileParsingTaskStartTree().withTreeTask(f.treeTask).build())
val task = aFileParsingTaskEndTree().withTreeTask(f.treeTask).build()
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(task))
}
}
"receiving signals with an arbitrary file tree type" should { "receiving signals with an arbitrary file tree type" should {
"fail if file parsing tasks arrive before the start tree signal" in { "fail if file parsing tasks arrive before the start tree signal" in {
val f = new DirectoryTreeFixture val f = new DirectoryTreeFixture
......
...@@ -38,26 +38,48 @@ trait FileParsingTaskMatchers { ...@@ -38,26 +38,48 @@ trait FileParsingTaskMatchers {
} }
def relativePath(expectedValue: Path) = def relativePath(expectedValue: Path) =
new HavePropertyMatcher[FileParsingTask, Path] { new HavePropertyMatcher[FileParsingTaskSignal, Path] {
def apply(test: FileParsingTask) = def apply(test: FileParsingTaskSignal) = test match {
HavePropertyMatchResult( case x: FileParsingTask => HavePropertyMatchResult(
test.relativePath == expectedValue, x.relativePath == expectedValue,
"relative file path", "relative file path",
expectedValue, expectedValue,
test.relativePath x.relativePath
)
case wrongClass => HavePropertyMatchResult(
matches = false,
s"relative file path (wrong class ${wrongClass.getClass})",
expectedValue,
null
) )
}
} }
def relativePath(expectedValue: String) = def relativePath(expectedValue: String): HavePropertyMatcher[FileParsingTaskSignal, Path] = {
new HavePropertyMatcher[FileParsingTask, Path] { relativePath(Paths.get(expectedValue))
def apply(test: FileParsingTask) = }
HavePropertyMatchResult(
test.relativePath == Paths.get(expectedValue), def extractedPath(expectedValue: Option[Path]) =
"relative file path", new HavePropertyMatcher[FileParsingTaskSignal, Option[Path]] {
Paths.get(expectedValue), def apply(test: FileParsingTaskSignal) = test match {
test.relativePath case x: FileParsingTask => HavePropertyMatchResult(
x.extractedPath == expectedValue,
"temporary extracted file path",
expectedValue,
x.extractedPath
) )
case wrongClass => HavePropertyMatchResult(
matches = false,
s"relative file path (wrong class ${wrongClass.getClass})",
expectedValue,
null
)
}
} }
def extractedPathString(expectedValue: Option[String]): HavePropertyMatcher[FileParsingTaskSignal, Option[Path]] = {
extractedPath(expectedValue.map(Paths.get(_)))
}
} }
trait FileParsingResultMatchers { trait FileParsingResultMatchers {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment