Commit 8f3c4614 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: added class ArchiveCleanUp to integrate rawdata archive...

Integrated Pipeline: added class ArchiveCleanUp to integrate rawdata archive handling into stream abstraction layer
parent eb5e0d4a
package eu.nomad_lab.integrated_pipeline
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingSignal, TreeScanSignal }
class ArchiveCleanUp(zipHandler: RawDataArchiveHandler) extends MessageProcessor[FileParsingSignal, FileParsingSignal] {
private var toEmit: Option[FileParsingSignal] = None
override def processSignal(in: FileParsingSignal): Unit = {
if (toEmit.isEmpty) {
in.fileTree.treeType match {
case TreeType.Directory => ()
case TreeType.Zip => zipHandler.processFileParsingSignal(in)
}
toEmit = Some(in)
} else {
throw new IllegalStateException("must fetch all outbound signals before next inbound signal")
}
}
override def hasSignalToEmit(): Boolean = toEmit.nonEmpty
override def getNextSignalToEmit(): FileParsingSignal = {
if (toEmit.nonEmpty) {
val signal = toEmit.get
toEmit = None
signal
} else {
throw new IllegalStateException("no signal ready to be fetched")
}
}
override def requiresMoreMessages: Boolean = false
}
......@@ -30,8 +30,7 @@ trait MessageProcessor[-In, +Out] {
def getNextSignalToEmit(): Out
/**
* Check whether this processor has unfinished internal messages in its internal state which
* require the input of additional signals.
* Check whether this processor expects more inbound messages before it can terminate cleanly.
* @return true if this processor expects more messages at present
*/
def requiresMoreMessages: Boolean
......
package eu.nomad_lab.integrated_pipeline_tests
import java.nio.file.Paths
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, FileParsingSignal }
import eu.nomad_lab.integrated_pipeline.{ ArchiveCleanUp, FileTree, RawDataArchiveHandler }
import eu.nomad_lab.integrated_pipeline_tests.helpers.CustomMatchers
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatest.WordSpec
import org.scalatest.mockito.MockitoSugar
class ArchiveCleanUpSpec extends WordSpec with MockitoSugar with TestDataBuilders with CustomMatchers
with MessageProcessorBehaviour[FileParsingSignal, FileParsingSignal] {
val sampleDirectoryFileTree: FileTree = aFileTree().withBasePath("/test").withTreeType(TreeType.Directory).build()
val sampleZipArchiveFileTree: FileTree = aFileTree().withBasePath("/test.zip").withTreeType(TreeType.Zip).build()
val inboundSignals = Seq(
aParsingResultInMemory().withFileTree(sampleDirectoryFileTree).withRelativePath("someFile").build(),
aParsingResultInMemory().withFileTree(sampleZipArchiveFileTree).withRelativePath("someFile").build(),
aTreeScanCompleted().withFileTree(sampleDirectoryFileTree).build(),
aTreeScanCompleted().withFileTree(sampleZipArchiveFileTree).build()
)
abstract class Fixture {
val zipHandler: RawDataArchiveHandler = mock[RawDataArchiveHandler]
val unpacker = new ArchiveCleanUp(zipHandler)
when(zipHandler.processTreeScanSignal(any())).thenReturn(Some(Paths.get("/magic/nomad")))
}
class NoMessagesReceived extends Fixture {}
class DirectoryParsingResultReceived extends Fixture {
val fileTree: FileTree = aFileTree().withTreeType(TreeType.Directory).build()
unpacker.processSignal(aParsingResultInMemory().withFileTree(fileTree))
}
class DirectoryParsingResultProcessed extends DirectoryParsingResultReceived {
unpacker.getNextSignalToEmit()
}
class ZipArchiveParsingResultReceived extends Fixture {
val fileTree: FileTree = aFileTree().withTreeType(TreeType.Zip).build()
unpacker.processSignal(aParsingResultInMemory().withFileTree(fileTree))
}
class ZipArchiveParsingResultProcessed extends ZipArchiveParsingResultReceived {
unpacker.getNextSignalToEmit()
}
"An ArchiveCleanUp" when {
"having received no messages" should {
def createFixture = () => new NoMessagesReceived
behave like processorWithNoSignalReady(() => createFixture().unpacker, inboundSignals)
behave like processDirectoryFileTreeMessages(createFixture)
behave like processZipArchiveFileTreeMessages(createFixture)
}
"having received a parsing task message from a directory file tree" should {
def createFixture = () => new DirectoryParsingResultReceived
behave like processorWithOutboundSignalReady(
() => createFixture().unpacker,
inboundSignals,
be(a[FileParsingResult])
)
}
"having processed a parsing task message from a directory file tree" should {
def createFixture = () => new DirectoryParsingResultProcessed
behave like processorWithNoSignalReady(() => createFixture().unpacker, inboundSignals)
behave like processDirectoryFileTreeMessages(createFixture)
behave like processZipArchiveFileTreeMessages(createFixture)
}
"having received a parsing task message from a zip file tree" should {
def createFixture = () => new ZipArchiveParsingResultReceived
behave like processorWithOutboundSignalReady(
() => createFixture().unpacker,
inboundSignals,
be(a[FileParsingResult])
)
}
"having processed a parsing task message from a zip file tree" should {
def createFixture = () => new ZipArchiveParsingResultProcessed
behave like processorWithNoSignalReady(() => createFixture().unpacker, inboundSignals)
behave like processDirectoryFileTreeMessages(createFixture)
behave like processZipArchiveFileTreeMessages(createFixture)
}
}
def processDirectoryFileTreeMessages(fixture: () => Fixture): Unit = {
val directoryFileTree = aFileTree().withTreeType(TreeType.Directory).withBasePath("/foo/directory").build()
"forward parsing tasks from directory file trees unchanged" in {
val f = fixture()
val task = aParsingResultInMemory().withFileTree(directoryFileTree).build()
f.unpacker.processSignal(task)
verify(f.zipHandler, never).processFileParsingSignal(task)
f.unpacker.getNextSignalToEmit() should be(task)
}
"forward end tree messages from directory file trees unchanged" in {
val f = fixture()
val task = aTreeScanCompleted().withFileTree(directoryFileTree).build()
f.unpacker.processSignal(task)
verify(f.zipHandler, never).processFileParsingSignal(task)
f.unpacker.getNextSignalToEmit() should be(task)
}
}
def processZipArchiveFileTreeMessages(fixture: () => Fixture): Unit = {
val zipArchiveFileTree = aFileTree().withTreeType(TreeType.Zip).withBasePath("/foo/bar.zip").build()
"forward parsing tasks from zip file trees unchanged" in {
val f = fixture()
val task = aParsingResultInMemory().withFileTree(zipArchiveFileTree).build()
f.unpacker.processSignal(task)
f.unpacker.getNextSignalToEmit() should be(task)
}
"forward end tree messages from zip file trees unchanged" in {
val f = fixture()
val task = aTreeScanCompleted().withFileTree(zipArchiveFileTree).build()
f.unpacker.processSignal(task)
f.unpacker.getNextSignalToEmit() should be(task)
}
"should inform the underlying zip-archive handler about each incoming parsing result signal" in {
val f = fixture()
val task = aParsingResultInMemory().withFileTree(zipArchiveFileTree).withRelativePath("nomad").build()
f.unpacker.processSignal(task)
verify(f.zipHandler, times(1)).processFileParsingSignal(task)
}
"should inform the underlying zip-archive handler about each incoming end tree scan signal" in {
val f = fixture()
val task = aTreeScanCompleted().withFileTree(zipArchiveFileTree).build()
f.unpacker.processSignal(task)
verify(f.zipHandler, times(1)).processFileParsingSignal(task)
}
}
}
......@@ -51,7 +51,7 @@ class ArchiveUnpackerSpec extends WordSpec with MockitoSugar with TestDataBuilde
unpacker.getNextSignalToEmit()
}
"An ArchiveUnpackingFlow" when {
"An ArchiveUnpacker" when {
"having received no messages" should {
def createFixture = () => new NoMessagesReceived
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment