Commit cbbc7eb1 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: reimplementation of Zip-unpacking logic to work with...

Integrated Pipeline: reimplementation of Zip-unpacking logic to work with MessageProcessor infrastructure
parent 8f3c4614
......@@ -4,6 +4,15 @@ import java.nio.file.Path
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingSignal, TreeScanSignal }
/**
* Base trait for archive-handling IO modules. How exactly the archive is extracted, e.g. once the
* whole archive or partial extractions for each task is the choice of the implementing class.
*
* Note that classes implementing this trait will be used in two different MessageProcessor
* instances at the same time (unpacking and cleanup) to allow for sanity-checks of file deletion
* tasks. Any internal state that is accessed by both functions must therefore be properly
* synchronized.
*/
trait RawDataArchiveHandler {
/**
......
package eu.nomad_lab.integrated_pipeline.io_integrations
import java.nio.file.Path
import eu.nomad_lab.integrated_pipeline.RawDataArchiveHandler
import eu.nomad_lab.integrated_pipeline.messages._
import scala.collection.{ mutable, parallel }
import scala.collection.parallel.mutable.ParMap
/**
* This implementation unpacks the entire zip-archive once to a temporary location when the first
* candidate file signal is received and then points all follow-up candidate signals to the already
* extracted content.
* Cleanup is performed once all the end tree signal has been received as well as the appropriate
* number of file-parsing results.
*/
class WholeZipArchiveHandler(handler: ArchiveHandler) extends RawDataArchiveHandler {
private val pathMap: parallel.mutable.ParMap[Path, Path] = ParMap()
private val processed: mutable.Map[Path, Long] = mutable.Map()
private val expected: mutable.Map[Path, Long] = mutable.Map()
override def processTreeScanSignal(signal: TreeScanSignal): Option[Path] = {
signal match {
case _: TreeScanCompleted => None
case x: CandidateFound =>
val path = pathMap.get(x.fileTree.treeBasePath)
if (path.isEmpty) {
val location = handler.extractZipArchive(x.fileTree.treeBasePath)
pathMap += x.fileTree.treeBasePath -> location
Some(location.resolve(x.relativePath))
} else {
path.map(_.resolve(x.relativePath))
}
}
}
override def processFileParsingSignal(signal: FileParsingSignal): Unit = {
val key = signal.fileTree.treeBasePath
signal match {
case x: TreeScanCompleted => expected += key -> x.numParsingTasks
case _: FileParsingResult => processed += key -> (processed.getOrElse(key, 0l) + 1)
}
if (expected.getOrElse(key, -1) == processed.getOrElse(key, -2)) {
pathMap -= key
expected.remove(key)
processed.remove(key)
handler.cleanUpExtractedArchive(key)
}
}
}
package eu.nomad_lab
import java.nio.file.Paths
import eu.nomad_lab.integrated_pipeline.io_integrations.{ ArchiveHandler, WholeZipArchiveHandler }
import eu.nomad_lab.integrated_pipeline_tests._
import eu.nomad_lab.integrated_pipeline_tests.helpers.CustomMatchers
import org.scalatest.WordSpec
import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar
import org.mockito.ArgumentMatchers._
class WholeZipArchiveHandlerSpec extends WordSpec with TestDataBuilders with CustomMatchers with MockitoSugar {
abstract class Fixture {
val underlying = mock[ArchiveHandler]
val handler = new WholeZipArchiveHandler(underlying)
val sample = aFileTree().withBasePath("/some/path.zip").withTreeType(TreeType.Zip).build()
when(underlying.extractZipArchive(any())).thenReturn(Paths.get("/tmp/extracted/directory"))
}
class NoSignalsReceived extends Fixture {}
class OneCandidateSignalReceived extends Fixture {
handler.processTreeScanSignal(aCandidateFound().withFileTree(sample))
}
class EndTreeScanSignalReceived extends Fixture {
handler.processTreeScanSignal(aCandidateFound().withFileTree(sample))
handler.processFileParsingSignal(aTreeScanCompleted().withFileTree(sample).withTaskCount(1))
}
class FileParsingResultReceived extends Fixture {
handler.processTreeScanSignal(aCandidateFound().withFileTree(sample))
handler.processFileParsingSignal(aParsingResultInMemory().withFileTree(sample))
}
class FullyProcessedATree extends Fixture {
handler.processTreeScanSignal(aCandidateFound().withFileTree(sample))
handler.processFileParsingSignal(aParsingResultInMemory().withFileTree(sample))
handler.processFileParsingSignal(aTreeScanCompleted().withFileTree(sample).withTaskCount(1))
}
"A WholeZipArchiveHandler" when {
"when not having processed any signals" should {
def createFixture = () => new NoSignalsReceived
behave like startANewZipArchive(createFixture)
}
"when having processed a candidate file signal" should {
def createFixture = () => new OneCandidateSignalReceived
behave like startANewZipArchive(createFixture)
}
"when having received a parsing result" should {
def createFixture = () => new FileParsingResultReceived
behave like startANewZipArchive(createFixture)
"should clean up temporary extracted files when the end parsing signal arrives with the appropriate task count" in {
val f = createFixture()
f.handler.processFileParsingSignal(aTreeScanCompleted().withFileTree(f.sample).withTaskCount(1))
verify(f.underlying).cleanUpExtractedArchive(f.sample.treeBasePath)
}
"not extract the same archive again when the next candidate signal arrives" in {
val f = createFixture()
verify(f.underlying, times(1)).extractZipArchive(f.sample.treeBasePath)
f.handler.processTreeScanSignal(aCandidateFound().withFileTree(f.sample))
verify(f.underlying, times(1)).extractZipArchive(f.sample.treeBasePath)
}
}
"when having received a end parsing signal" should {
def createFixture = () => new EndTreeScanSignalReceived
behave like startANewZipArchive(createFixture)
"should clean up temporary extracted files when the appropriate number of parsing results arrived" in {
val f = createFixture()
f.handler.processFileParsingSignal(aParsingResultInMemory().withFileTree(f.sample))
verify(f.underlying).cleanUpExtractedArchive(f.sample.treeBasePath)
}
}
"when having finished processing a tree" should {
def createFixture = () => new FullyProcessedATree
behave like startANewZipArchive(createFixture)
"should restart an already fully processed archive if a new candidate task arrives" in {
val f = createFixture()
f.handler.processTreeScanSignal(aCandidateFound().withFileTree(f.sample))
verify(f.underlying, times(2)).extractZipArchive(f.sample.treeBasePath)
}
}
}
def startANewZipArchive(fixture: () => Fixture): Unit = {
val zipArchiveFileTree = aFileTree().withBasePath("/foo/bar.zip").withTreeType(TreeType.Zip)
"should unpack the entire archive when the first candidate signal is processed" in {
val f = fixture()
f.handler.processTreeScanSignal(aCandidateFound().withFileTree(zipArchiveFileTree))
verify(f.underlying).extractZipArchive(Paths.get("/foo/bar.zip"))
}
"should assign the correct extracted file path to processed candidate found signals" in {
val f = fixture()
val signal = aCandidateFound().withFileTree(zipArchiveFileTree).withRelativePath("foo/relative.out")
val extractedPath = f.handler.processTreeScanSignal(signal)
extractedPath should be(Some(Paths.get("/tmp/extracted/directory/foo/relative.out")))
}
"do nothing when a end tree file scan signal is processed" in {
val f = fixture()
f.handler.processTreeScanSignal(aTreeScanCompleted().withFileTree(zipArchiveFileTree))
verify(f.underlying, never()).extractZipArchive(Paths.get("/foo/bar.zip"))
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment