Skip to content
Snippets Groups Projects
Commit b314247c authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: CalculationParsingFlow extracts to be processed rawdata...

Integrated Pipeline: CalculationParsingFlow extracts to be processed rawdata archives to a temporary directory

-intended to be used with with a RAMdisk filesystem in production, thus the entire archive (at most 30 GB)
 is extracted once and shared by all workers instead of extracting worker-private copies of the required
 file-trees
parent 87b24508
No related branches found
No related tags found
No related merge requests found
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
*/ */
package eu.nomad_lab.integrated_pipeline package eu.nomad_lab.integrated_pipeline
import java.nio.file.Paths import java.nio.file.{ Files, Paths }
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import akka.actor._ import akka.actor._
...@@ -111,6 +111,8 @@ class Main { ...@@ -111,6 +111,8 @@ class Main {
val mode = OutputType.withName(params.flatMap(Main.outputModeRe.findFirstMatchIn(_)).head.group(1)) val mode = OutputType.withName(params.flatMap(Main.outputModeRe.findFirstMatchIn(_)).head.group(1))
val outDir = Paths.get(params.flatMap(Main.outputDirRe.findFirstMatchIn(_)).head.group(1)) val outDir = Paths.get(params.flatMap(Main.outputDirRe.findFirstMatchIn(_)).head.group(1))
val sinkSummaries: Sink[FileTreeParsingResult, Future[Done]] = Sink.ignore val sinkSummaries: Sink[FileTreeParsingResult, Future[Done]] = Sink.ignore
val tempExtracted = Files.createTempDirectory("parsing-extracted-files")
tempExtracted.toFile.deleteOnExit()
val graph = RunnableGraph.fromGraph( val graph = RunnableGraph.fromGraph(
GraphDSL.create(sinkSummaries) { implicit builder => (sink) => GraphDSL.create(sinkSummaries) { implicit builder => (sink) =>
...@@ -121,7 +123,8 @@ class Main { ...@@ -121,7 +123,8 @@ class Main {
case TreeType.Directory => new DirectoryTreeParserFlow case TreeType.Directory => new DirectoryTreeParserFlow
}) })
val parsing = builder.add(new CalculationParsingFlow( val parsing = builder.add(new CalculationParsingFlow(
(1 to 1).map(_ => new CalculationParsingEngine(parsers)) (1 to 1).map(_ => new CalculationParsingEngine(parsers)),
new ArchiveHandler(tempExtracted)
)) ))
val processor = builder.add(mode match { val processor = builder.add(mode match {
case OutputType.Json => new WriteToJsonResultsProcessorFlow(outDir) case OutputType.Json => new WriteToJsonResultsProcessorFlow(outDir)
......
...@@ -9,7 +9,7 @@ import eu.nomad_lab.integrated_pipeline.messages._ ...@@ -9,7 +9,7 @@ import eu.nomad_lab.integrated_pipeline.messages._
import scala.collection.mutable import scala.collection.mutable
class CalculationParsingFlow(engines: Seq[CalculationParsingEngine]) class CalculationParsingFlow(engines: Seq[CalculationParsingEngine], archiveHandler: ArchiveHandler)
extends GraphStage[FlowShape[FileParsingTaskSignal, FileParsingResultSignal]] { extends GraphStage[FlowShape[FileParsingTaskSignal, FileParsingResultSignal]] {
require(engines.nonEmpty, "must have at least one parsing engine") require(engines.nonEmpty, "must have at least one parsing engine")
...@@ -38,6 +38,7 @@ class CalculationParsingFlow(engines: Seq[CalculationParsingEngine]) ...@@ -38,6 +38,7 @@ class CalculationParsingFlow(engines: Seq[CalculationParsingEngine])
case signal: FileParsingTaskStartTree => case signal: FileParsingTaskStartTree =>
require(!pathMap.contains(signal.treeTask), "file tree already started") require(!pathMap.contains(signal.treeTask), "file tree already started")
pathMap(signal.treeTask) = signal.treeTask.treeType match { pathMap(signal.treeTask) = signal.treeTask.treeType match {
case TreeType.Zip => archiveHandler.extractZipArchive(signal.treeTask.treeBasePath)
case TreeType.Directory => signal.treeTask.treeBasePath case TreeType.Directory => signal.treeTask.treeBasePath
} }
push(out, FileParsingResultStartTree(signal.treeTask)) push(out, FileParsingResultStartTree(signal.treeTask))
......
...@@ -47,6 +47,7 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar { ...@@ -47,6 +47,7 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar {
}) })
myMock myMock
} }
val dummyArchiveHandler = mock[ArchiveHandler]
private val testInput = TestSource.probe[FileParsingTaskSignal] private val testInput = TestSource.probe[FileParsingTaskSignal]
private val testRequests = TestSink.probe[FileParsingResultSignal] private val testRequests = TestSink.probe[FileParsingResultSignal]
...@@ -54,7 +55,9 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar { ...@@ -54,7 +55,9 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar {
GraphDSL.create(testInput, testRequests)((_, _)) { implicit builder => (source, sink) => GraphDSL.create(testInput, testRequests)((_, _)) { implicit builder => (source, sink) =>
import GraphDSL.Implicits._ import GraphDSL.Implicits._
val calculationParser = builder.add(new CalculationParsingFlow(dummyWorkers)) val calculationParser = builder.add(
new CalculationParsingFlow(dummyWorkers, dummyArchiveHandler)
)
source ~> calculationParser ~> sink source ~> calculationParser ~> sink
ClosedShape ClosedShape
...@@ -128,6 +131,13 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar { ...@@ -128,6 +131,13 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar {
probeRequests.expectComplete() probeRequests.expectComplete()
} }
"unpack a zip archive when the start tree signal arrives" in new GraphWithDummyWorkers(1) {
probeInput.sendNext(FileParsingTaskStartTree(sampleTreeScan.copy(treeType = TreeType.Zip)))
probeRequests.ensureSubscription().request(1)
probeRequests.expectNext()
verify(dummyArchiveHandler).extractZipArchive(sampleTreeScan.treeBasePath)
}
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment