Commit eb5e0d4a authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: added class archiveUnpacker to integrate rawdata archive...

Integrated Pipeline: added class archiveUnpacker to integrate rawdata archive handling into stream abstraction layer
parent c51277ec
package eu.nomad_lab.integrated_pipeline
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.messages.{ CandidateFound, TreeScanCompleted, TreeScanSignal }
class ArchiveUnpacker(zipHandler: RawDataArchiveHandler) extends MessageProcessor[TreeScanSignal, TreeScanSignal] {
private var toEmit: Option[TreeScanSignal] = None
override def processSignal(in: TreeScanSignal): Unit = {
if (toEmit.isEmpty) {
toEmit = in.fileTree.treeType match {
case TreeType.Directory => Some(in)
case TreeType.Zip =>
val extractedPath = zipHandler.processTreeScanSignal(in)
in match {
case signal: CandidateFound => Some(signal.copy(extractedPath = extractedPath))
case signal: TreeScanCompleted => Some(signal)
}
}
} else {
throw new IllegalStateException("must fetch all outbound signals before next inbound signal")
}
}
override def hasSignalToEmit(): Boolean = toEmit.nonEmpty
override def getNextSignalToEmit(): TreeScanSignal = {
if (toEmit.nonEmpty) {
val signal = toEmit.get
toEmit = None
signal
} else {
throw new IllegalStateException("no signal ready to be fetched")
}
}
override def requiresMoreMessages: Boolean = false
}
package eu.nomad_lab.integrated_pipeline
import java.nio.file.Path
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingSignal, TreeScanSignal }
trait RawDataArchiveHandler {
/**
* Process the given TreeScanSignal and temporarily extract any files necessary for the parsing
* procedure.
* @param signal the inbound signal
* @return the path to the extracted main file, if applicable
*/
def processTreeScanSignal(signal: TreeScanSignal): Option[Path]
/**
* Process the given parsing signal and invoke clean-up procedures as necessary.
* @param signal the inbound signal
*/
def processFileParsingSignal(signal: FileParsingSignal): Unit
}
package eu.nomad_lab.integrated_pipeline_tests
import java.nio.file.Paths
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.messages.{ CandidateFound, TreeScanSignal }
import eu.nomad_lab.integrated_pipeline.{ ArchiveUnpacker, FileTree, RawDataArchiveHandler }
import eu.nomad_lab.integrated_pipeline_tests.helpers.CustomMatchers
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatest.WordSpec
import org.scalatest.mockito.MockitoSugar
import eu.nomad_lab.integrated_pipeline_tests.matchers.TreeScanSignalMatchers
class ArchiveUnpackerSpec extends WordSpec with MockitoSugar with TestDataBuilders with CustomMatchers
with MessageProcessorBehaviour[TreeScanSignal, TreeScanSignal] {
val directoryFileTree: FileTree = aFileTree().withBasePath("/test").withTreeType(TreeType.Directory).build()
val zipArchiveFileTree: FileTree = aFileTree().withBasePath("/test.zip").withTreeType(TreeType.Zip).build()
val inboundSignals = Seq(
aCandidateFound().withFileTree(directoryFileTree).withRelativePath("someFile").build(),
aCandidateFound().withFileTree(zipArchiveFileTree).withRelativePath("someFile").build(),
aTreeScanCompleted().withFileTree(directoryFileTree).build(),
aTreeScanCompleted().withFileTree(zipArchiveFileTree).build()
)
abstract class Fixture {
val zipHandler: RawDataArchiveHandler = mock[RawDataArchiveHandler]
val unpacker = new ArchiveUnpacker(zipHandler)
when(zipHandler.processTreeScanSignal(any())).thenReturn(Some(Paths.get("/magic/nomad")))
}
class NoMessagesReceived extends Fixture {}
class DirectoryTaskReceived extends Fixture {
val fileTree: FileTree = aFileTree().withTreeType(TreeType.Directory).build()
unpacker.processSignal(aCandidateFound().withFileTree(fileTree))
}
class DirectoryTaskProcessed extends DirectoryTaskReceived {
unpacker.getNextSignalToEmit()
}
class ZipArchiveTaskReceived extends Fixture {
val fileTree: FileTree = aFileTree().withTreeType(TreeType.Zip).build()
unpacker.processSignal(aCandidateFound().withFileTree(fileTree))
}
class ZipArchiveTaskProcessed extends ZipArchiveTaskReceived {
unpacker.getNextSignalToEmit()
}
"An ArchiveUnpackingFlow" when {
"having received no messages" should {
def createFixture = () => new NoMessagesReceived
behave like processorWithNoSignalReady(() => createFixture().unpacker, inboundSignals)
behave like processDirectoryFileTreeMessages(createFixture)
behave like processZipArchiveFileTreeMessages(createFixture)
}
"having received a parsing task message from a directory file tree" should {
def createFixture = () => new DirectoryTaskReceived
behave like processorWithOutboundSignalReady(
() => createFixture().unpacker,
inboundSignals,
be(a[CandidateFound])
)
}
"having processed a parsing task message from a directory file tree" should {
def createFixture = () => new DirectoryTaskProcessed
behave like processorWithNoSignalReady(() => createFixture().unpacker, inboundSignals)
behave like processDirectoryFileTreeMessages(createFixture)
behave like processZipArchiveFileTreeMessages(createFixture)
}
"having received a parsing task message from a zip file tree" should {
def createFixture = () => new ZipArchiveTaskReceived
behave like processorWithOutboundSignalReady(
() => createFixture().unpacker,
inboundSignals,
be(a[CandidateFound])
)
}
"having processed a parsing task message from a zip file tree" should {
def createFixture = () => new ZipArchiveTaskProcessed
behave like processorWithNoSignalReady(() => createFixture().unpacker, inboundSignals)
behave like processDirectoryFileTreeMessages(createFixture)
behave like processZipArchiveFileTreeMessages(createFixture)
}
}
def processDirectoryFileTreeMessages(fixture: () => Fixture): Unit = {
val directoryFileTree = aFileTree().withTreeType(TreeType.Directory).withBasePath("/foo/directory").build()
"forward parsing tasks from directory file trees unchanged" in {
val f = fixture()
val task = aCandidateFound().withFileTree(directoryFileTree).build()
f.unpacker.processSignal(task)
verify(f.zipHandler, never).processTreeScanSignal(task)
f.unpacker.getNextSignalToEmit() should be(task)
}
"forward end tree messages from directory file trees unchanged" in {
val f = fixture()
val task = aTreeScanCompleted().withFileTree(directoryFileTree).build()
f.unpacker.processSignal(task)
verify(f.zipHandler, never).processTreeScanSignal(task)
f.unpacker.getNextSignalToEmit() should be(task)
}
}
def processZipArchiveFileTreeMessages(fixture: () => Fixture): Unit = {
val zipArchiveFileTree = aFileTree().withTreeType(TreeType.Zip).withBasePath("/foo/bar.zip").build()
"add extracted path information and then forward parsing tasks from zip file trees" in {
import TreeScanSignalMatchers._
val f = fixture()
val task = aCandidateFound().withFileTree(zipArchiveFileTree).withRelativePath("nomad").build()
f.unpacker.processSignal(task)
val response = f.unpacker.getNextSignalToEmit()
response should be(a[CandidateFound])
val signal = response.asInstanceOf[CandidateFound]
signal should have(relativePath("nomad"), extractedPath("/magic/nomad"), fileTree(zipArchiveFileTree))
}
"forward end tree messages from zip file trees unchanged" in {
val f = fixture()
val task = aTreeScanCompleted().withFileTree(zipArchiveFileTree).build()
f.unpacker.processSignal(task)
f.unpacker.getNextSignalToEmit() should be(task)
}
"should inform the underlying zip-archive handler about each incoming candidate file signal" in {
val f = fixture()
val task = aCandidateFound().withFileTree(zipArchiveFileTree).withRelativePath("nomad").build()
f.unpacker.processSignal(task)
verify(f.zipHandler, times(1)).processTreeScanSignal(task)
}
"should inform the underlying zip-archive handler about each incoming end tree scan signal" in {
val f = fixture()
val task = aTreeScanCompleted().withFileTree(zipArchiveFileTree).build()
f.unpacker.processSignal(task)
verify(f.zipHandler, times(1)).processTreeScanSignal(task)
}
}
}
......@@ -82,7 +82,7 @@ class ArchiveUnpackingFlowSpec extends WordSpec with MockitoSugar with TestDataB
val task = aCandidateFound().withFileTree(f.fileTree).withRelativePath("foo")
f.source.sendNext(task)
f.findFirstMatchingStreamElement(be(a[CandidateFound]) and
have(relativePath("foo"), extractedPathString(Some("/magic/foo"))))
have(relativePath("foo"), extractedPath("/magic/foo")))
}
"forward end file tree signals unchanged" in {
......
......@@ -36,8 +36,8 @@ object TreeScanSignalMatchers {
test = (x: CandidateFound) => x.extractedPath
)
def extractedPathString(expectedValue: Option[String]): HavePropertyMatcher[CandidateFound, Option[Path]] = {
extractedPath(expectedValue.map(Paths.get(_)))
def extractedPath(expectedValue: String): HavePropertyMatcher[CandidateFound, Option[Path]] = {
extractedPath(Some(Paths.get(expectedValue)))
}
def numParsingTasks(expectedValue: Long): HavePropertyMatcher[TreeScanCompleted, Long] =
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment