From dc7b5163cd3ea5225529b018bf6c1d6c6f1df36f Mon Sep 17 00:00:00 2001 From: Arvid Ihrig <ihrig@fhi-berlin.mpg.de> Date: Tue, 12 Jun 2018 09:23:15 +0200 Subject: [PATCH] Integrated Pipeline: renamed message case classes --- .../nomad_lab/integrated_pipeline/Main.scala | 10 ++--- ...ingRequest.scala => FileParsingTask.scala} | 2 +- ...Summary.scala => FileTreeScanResult.scala} | 4 +- ...anRequest.scala => FileTreeScanTask.scala} | 2 +- .../CalculationParsingEngine.scala | 10 ++--- ...DirectoryTreeParsingRequestGenerator.scala | 10 ++--- .../ParsingRequestGenerator.scala | 10 ++--- .../stream_components/TreeParserGraph.scala | 20 ++++----- .../ZipTreeParsingRequestGenerator.scala | 10 ++--- .../CalculationParsingEngineSpec.scala | 4 +- .../TreeParserGraphSpec.scala | 44 +++++++++---------- .../TreeParsingRequestGeneratorSpec.scala | 4 +- .../integrated_pipeline_tests/package.scala | 10 ++--- 13 files changed, 70 insertions(+), 70 deletions(-) rename integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/{FileParsingRequest.scala => FileParsingTask.scala} (97%) rename integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/{FileTreeScanSummary.scala => FileTreeScanResult.scala} (84%) rename integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/{FileTreeScanRequest.scala => FileTreeScanTask.scala} (92%) diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala index 66974e70..27290615 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala @@ -26,7 +26,7 @@ import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.scalalogging.StrictLogging import eu.nomad_lab.QueueMessage.CalculationParserRequest import eu.nomad_lab.TreeType.TreeType -import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingRequest, FileTreeScanSummary, FileTreeScanRequest } +import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanResult, FileTreeScanTask } import eu.nomad_lab.integrated_pipeline.stream_components.{ DirectoryTreeParserGraph, ZipTreeParserGraph } import eu.nomad_lab.meta.KnownMetaInfoEnvs import eu.nomad_lab.parsers.AllParsers @@ -107,9 +107,9 @@ class Main { def parseTreesGivenByCommandLine(params: Array[String], treeType: TreeType): Unit = { val source = createCommandLineSource(params.last, treeType) - val sinkRequests: Sink[FileParsingRequest, Future[Done]] = Sink.foreach(x => + val sinkRequests: Sink[FileParsingTask, Future[Done]] = Sink.foreach(x => println(s"received parsing request $x")) - val sinkSummaries: Sink[FileTreeScanSummary, Future[Done]] = Sink.ignore + val sinkSummaries: Sink[FileTreeScanResult, Future[Done]] = Sink.ignore val graph = RunnableGraph.fromGraph(GraphDSL.create(sinkRequests, sinkSummaries)((_, _)) { implicit builder => (sinkRequests, sinkSummaries) => import GraphDSL.Implicits._ @@ -133,7 +133,7 @@ class Main { private def createCommandLineSource( sourceName: String, treeType: TreeType - ): Source[FileTreeScanRequest, NotUsed] = { - Source.single(FileTreeScanRequest(Paths.get(sourceName).toAbsolutePath, treeType)) + ): Source[FileTreeScanTask, NotUsed] = { + Source.single(FileTreeScanTask(Paths.get(sourceName).toAbsolutePath, treeType)) } } diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingRequest.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingTask.scala similarity index 97% rename from integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingRequest.scala rename to integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingTask.scala index 9f64a68c..38b75b01 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingRequest.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingTask.scala @@ -11,7 +11,7 @@ import eu.nomad_lab.TreeType * @param relativePath path inside the given file tree * @param parserName name of the parser to use for processing the file */ -case class FileParsingRequest( +case class FileParsingTask( fileTreeBasePath: Path, fileTreeType: TreeType.TreeType, relativePath: Path, diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanSummary.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanResult.scala similarity index 84% rename from integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanSummary.scala rename to integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanResult.scala index c6555e28..5084362c 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanSummary.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanResult.scala @@ -7,8 +7,8 @@ package eu.nomad_lab.integrated_pipeline.messages * @param request the original scan request * @param numCandidates number of identified candidate calculations in the file tree */ -case class FileTreeScanSummary( - request: FileTreeScanRequest, +case class FileTreeScanResult( + request: FileTreeScanTask, numCandidates: Long ) diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanRequest.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanTask.scala similarity index 92% rename from integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanRequest.scala rename to integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanTask.scala index ea4fb775..d72ab830 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanRequest.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScanTask.scala @@ -10,7 +10,7 @@ import eu.nomad_lab.TreeType * @param treeType type of the file tree, e.g. zip or directory */ -case class FileTreeScanRequest( +case class FileTreeScanTask( treeBasePath: Path, treeType: TreeType.TreeType ) diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/CalculationParsingEngine.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/CalculationParsingEngine.scala index 17cd2bae..b0c6d97f 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/CalculationParsingEngine.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/CalculationParsingEngine.scala @@ -4,7 +4,7 @@ import java.nio.file.Paths import com.typesafe.scalalogging.StrictLogging import eu.nomad_lab.TreeType -import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingRequest, FileParsingResult, InMemoryResult } +import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileParsingResult, InMemoryResult } import eu.nomad_lab.meta.MetaInfoEnv import eu.nomad_lab.parsers._ import org.json4s.JsonAST.{ JArray, JObject, JString } @@ -22,7 +22,7 @@ import scala.collection.mutable.ListBuffer */ class CalculationParsingEngine(parsers: ParserCollection)(implicit metaInfo: MetaInfoEnv) extends StrictLogging { - private def getParser(request: FileParsingRequest): Option[OptimizedParser] = { + private def getParser(request: FileParsingTask): Option[OptimizedParser] = { parsers.parsers.get(request.parserName).map(_.optimizedParser(Seq())) } @@ -52,7 +52,7 @@ class CalculationParsingEngine(parsers: ParserCollection)(implicit metaInfo: Met * @param request the original parsing request * @return the ParseEvents emitted during the parsing */ - def processRequest(request: FileParsingRequest): FileParsingResult = { + def processRequest(request: FileParsingTask): FileParsingResult = { val parser = getParser(request) parser match { case Some(parser) => @@ -64,7 +64,7 @@ class CalculationParsingEngine(parsers: ParserCollection)(implicit metaInfo: Met } } - private def failParseRequest(request: FileParsingRequest, reason: String): FileParsingResult = { + private def failParseRequest(request: FileParsingTask, reason: String): FileParsingResult = { val end = FinishedParsingSession( Some(ParseResult.ParseFailure), JArray(List(JString(reason))), @@ -82,7 +82,7 @@ class CalculationParsingEngine(parsers: ParserCollection)(implicit metaInfo: Met ) } - private def parseCalculationInDirectory(request: FileParsingRequest, parser: OptimizedParser): FileParsingResult = { + private def parseCalculationInDirectory(request: FileParsingTask, parser: OptimizedParser): FileParsingResult = { val pathToMainFile = Paths.get(".") val buffer = new BufferForBackend diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/DirectoryTreeParsingRequestGenerator.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/DirectoryTreeParsingRequestGenerator.scala index e5fd1fbb..147a29e5 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/DirectoryTreeParsingRequestGenerator.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/DirectoryTreeParsingRequestGenerator.scala @@ -6,13 +6,13 @@ import java.util.NoSuchElementException import eu.nomad_lab.LocalEnv.Settings import eu.nomad_lab.TreeType -import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingRequest, FileTreeScanRequest } +import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask } import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection } import scala.annotation.tailrec import scala.collection.JavaConverters._ -class DirectoryTreeParsingRequestGenerator(request: FileTreeScanRequest)(implicit +class DirectoryTreeParsingRequestGenerator(request: FileTreeScanTask)(implicit config: Settings, parserCollection: ParserCollection) extends ParsingRequestGenerator { @@ -20,13 +20,13 @@ class DirectoryTreeParsingRequestGenerator(request: FileTreeScanRequest)(implici private val basePath = request.treeBasePath private val fileIterator = Files.walk(basePath).iterator().asScala.filter(Files.isRegularFile(_)) private var numEntries = 0l - private var nextRequest: Option[FileParsingRequest] = findNextParsingCandidate() + private var nextRequest: Option[FileParsingTask] = findNextParsingCandidate() override def getProcessedRequestCount = numEntries override def hasNext = nextRequest.isDefined - override def next(): FileParsingRequest = { + override def next(): FileParsingTask = { val toReturn = nextRequest nextRequest = findNextParsingCandidate() if (toReturn.isDefined) { @@ -37,7 +37,7 @@ class DirectoryTreeParsingRequestGenerator(request: FileTreeScanRequest)(implici } } - @tailrec private def findNextParsingCandidate(): Option[FileParsingRequest] = { + @tailrec private def findNextParsingCandidate(): Option[FileParsingTask] = { if (fileIterator.hasNext) { val file = fileIterator.next() val internalFilePath = basePath.relativize(file) diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ParsingRequestGenerator.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ParsingRequestGenerator.scala index 07ec5de2..0541de0f 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ParsingRequestGenerator.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ParsingRequestGenerator.scala @@ -2,19 +2,19 @@ package eu.nomad_lab.integrated_pipeline.stream_components import java.nio.file.Path -import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingRequest, FileTreeScanRequest } +import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask } import eu.nomad_lab.parsers.CandidateParser -trait ParsingRequestGenerator extends Iterator[FileParsingRequest] { +trait ParsingRequestGenerator extends Iterator[FileParsingTask] { def getProcessedRequestCount: Long - def generateRequest(fileTreeRequest: FileTreeScanRequest, relativeFilePath: Path, - candidateParsers: Seq[CandidateParser]): Option[FileParsingRequest] = { + def generateRequest(fileTreeRequest: FileTreeScanTask, relativeFilePath: Path, + candidateParsers: Seq[CandidateParser]): Option[FileParsingTask] = { require(!relativeFilePath.isAbsolute, "internal file paths must be relative to file tree root") if (candidateParsers.nonEmpty) { - Some(FileParsingRequest( + Some(FileParsingTask( fileTreeBasePath = fileTreeRequest.treeBasePath, fileTreeType = fileTreeRequest.treeType, relativePath = relativeFilePath, diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/TreeParserGraph.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/TreeParserGraph.scala index 3e2e0620..ea7b66af 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/TreeParserGraph.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/TreeParserGraph.scala @@ -5,7 +5,7 @@ import java.io.InputStream import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import eu.nomad_lab.LocalEnv.Settings -import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingRequest, FileTreeScanRequest, FileTreeScanSummary } +import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask, FileTreeScanResult } import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection } /** @@ -30,19 +30,19 @@ object TreeParserGraph { * calculation parsing requests generated for the file-tree). This information can be used to * determine when all calculation parsing requests belonging to one file-tree have been processed. */ -trait TreeParserGraph extends GraphStage[FanOutShape2[FileTreeScanRequest, FileParsingRequest, FileTreeScanSummary]] { +trait TreeParserGraph extends GraphStage[FanOutShape2[FileTreeScanTask, FileParsingTask, FileTreeScanResult]] { - val in = Inlet[FileTreeScanRequest]("TreeParserGraph.in") - val outRequests = Outlet[FileParsingRequest]("TreeParserGraph.outRequests") - val outSummaries = Outlet[FileTreeScanSummary]("TreeParserGraph.outSummaries") + val in = Inlet[FileTreeScanTask]("TreeParserGraph.in") + val outRequests = Outlet[FileParsingTask]("TreeParserGraph.outRequests") + val outSummaries = Outlet[FileTreeScanResult]("TreeParserGraph.outSummaries") override val shape = new FanOutShape2(in, outRequests, outSummaries) - def createRequestGenerator(request: FileTreeScanRequest): ParsingRequestGenerator + def createRequestGenerator(request: FileTreeScanTask): ParsingRequestGenerator override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { private var generator: Option[ParsingRequestGenerator] = None - private var currentRequest: Option[FileTreeScanRequest] = None + private var currentRequest: Option[FileTreeScanTask] = None setHandler(outRequests, new OutHandler { override def onPull(): Unit = { @@ -89,7 +89,7 @@ trait TreeParserGraph extends GraphStage[FanOutShape2[FileTreeScanRequest, FileP def tryToPushSummary(): Unit = { if (generator.nonEmpty && isAvailable(outSummaries)) { if (!generator.get.hasNext) { - push(outSummaries, FileTreeScanSummary(currentRequest.get, generator.get.getProcessedRequestCount)) + push(outSummaries, FileTreeScanResult(currentRequest.get, generator.get.getProcessedRequestCount)) currentRequest = None generator = None if (!isClosed(in)) @@ -105,9 +105,9 @@ trait TreeParserGraph extends GraphStage[FanOutShape2[FileTreeScanRequest, FileP } class ZipTreeParserGraph(implicit config: Settings, parserCollection: ParserCollection) extends TreeParserGraph { - override def createRequestGenerator(request: FileTreeScanRequest) = new ZipTreeParsingRequestGenerator(request) + override def createRequestGenerator(request: FileTreeScanTask) = new ZipTreeParsingRequestGenerator(request) } class DirectoryTreeParserGraph(implicit config: Settings, parserCollection: ParserCollection) extends TreeParserGraph { - override def createRequestGenerator(request: FileTreeScanRequest) = new DirectoryTreeParsingRequestGenerator(request) + override def createRequestGenerator(request: FileTreeScanTask) = new DirectoryTreeParsingRequestGenerator(request) } \ No newline at end of file diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ZipTreeParsingRequestGenerator.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ZipTreeParsingRequestGenerator.scala index 85cd910b..1543dc33 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ZipTreeParsingRequestGenerator.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ZipTreeParsingRequestGenerator.scala @@ -5,13 +5,13 @@ import java.util.NoSuchElementException import eu.nomad_lab.LocalEnv.Settings import eu.nomad_lab.TreeType -import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingRequest, FileTreeScanRequest } +import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask } import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection } import org.apache.commons.compress.archivers.zip.{ ZipArchiveEntry, ZipFile } import scala.annotation.tailrec -class ZipTreeParsingRequestGenerator(request: FileTreeScanRequest)(implicit +class ZipTreeParsingRequestGenerator(request: FileTreeScanTask)(implicit config: Settings, parserCollection: ParserCollection) extends ParsingRequestGenerator { @@ -20,7 +20,7 @@ class ZipTreeParsingRequestGenerator(request: FileTreeScanRequest)(implicit private val zipFile = new ZipFile(request.treeBasePath.toFile) private val zipEntries: java.util.Enumeration[ZipArchiveEntry] = zipFile.getEntries private var numEntries = 0l - private var nextRequest: Option[FileParsingRequest] = findNextParsingCandidate() + private var nextRequest: Option[FileParsingTask] = findNextParsingCandidate() override def getProcessedRequestCount: Long = numEntries @@ -30,7 +30,7 @@ class ZipTreeParsingRequestGenerator(request: FileTreeScanRequest)(implicit zipFile.close() } - override def next(): FileParsingRequest = { + override def next(): FileParsingTask = { val toReturn = nextRequest nextRequest = findNextParsingCandidate() if (toReturn.isDefined) { @@ -41,7 +41,7 @@ class ZipTreeParsingRequestGenerator(request: FileTreeScanRequest)(implicit } } - @tailrec private def findNextParsingCandidate(): Option[FileParsingRequest] = { + @tailrec private def findNextParsingCandidate(): Option[FileParsingTask] = { if (zipEntries.hasMoreElements) { val zipEntry: ZipArchiveEntry = zipEntries.nextElement() val internalFilePath = Paths.get(zipEntry.getName) diff --git a/integrated-pipeline/src/test/scala/integrated_pipeline_tests/CalculationParsingEngineSpec.scala b/integrated-pipeline/src/test/scala/integrated_pipeline_tests/CalculationParsingEngineSpec.scala index 538ba2e6..fad6ffea 100644 --- a/integrated-pipeline/src/test/scala/integrated_pipeline_tests/CalculationParsingEngineSpec.scala +++ b/integrated-pipeline/src/test/scala/integrated_pipeline_tests/CalculationParsingEngineSpec.scala @@ -3,7 +3,7 @@ package integrated_pipeline_tests import java.nio.file.Paths import eu.nomad_lab.TreeType -import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingRequest, InMemoryResult } +import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, InMemoryResult } import eu.nomad_lab.integrated_pipeline.stream_components.CalculationParsingEngine import eu.nomad_lab.meta.KnownMetaInfoEnvs import eu.nomad_lab.parsers.ParseResult.ParseResult @@ -32,7 +32,7 @@ class CalculationParsingEngineSpec extends WordSpec with MockitoSugar { implicit val metaInfo = KnownMetaInfoEnvs.all - val sampleParseRequest = FileParsingRequest( + val sampleParseRequest = FileParsingTask( fileTreeBasePath = Paths.get("s/foo"), fileTreeType = TreeType.Directory, relativePath = Paths.get("bar/gus.out"), diff --git a/integrated-pipeline/src/test/scala/integrated_pipeline_tests/TreeParserGraphSpec.scala b/integrated-pipeline/src/test/scala/integrated_pipeline_tests/TreeParserGraphSpec.scala index 6ea41a58..0ddb7d59 100644 --- a/integrated-pipeline/src/test/scala/integrated_pipeline_tests/TreeParserGraphSpec.scala +++ b/integrated-pipeline/src/test/scala/integrated_pipeline_tests/TreeParserGraphSpec.scala @@ -6,7 +6,7 @@ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.testkit.scaladsl._ import eu.nomad_lab.TreeType -import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingRequest, FileTreeScanRequest, FileTreeScanSummary } +import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask, FileTreeScanResult } import eu.nomad_lab.integrated_pipeline.stream_components.{ ParsingRequestGenerator, TreeParserGraph } import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -23,13 +23,13 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar { */ private trait GraphWithDummy { - def getGeneratorDummy(request: FileTreeScanRequest): ParsingRequestGenerator = { + def getGeneratorDummy(request: FileTreeScanTask): ParsingRequestGenerator = { val generator = mock[ParsingRequestGenerator] var extracted = 0 val dummyIterator = Iterator.apply( - FileParsingRequest(request.treeBasePath, request.treeType, Paths.get("file1"), "fooParser"), - FileParsingRequest(request.treeBasePath, request.treeType, Paths.get("file2"), "fooParser"), - FileParsingRequest(request.treeBasePath, request.treeType, Paths.get("file3"), "fooParser") + FileParsingTask(request.treeBasePath, request.treeType, Paths.get("file1"), "fooParser"), + FileParsingTask(request.treeBasePath, request.treeType, Paths.get("file2"), "fooParser"), + FileParsingTask(request.treeBasePath, request.treeType, Paths.get("file3"), "fooParser") ) when(generator.hasNext).thenAnswer(new Answer[Boolean] { def answer(x: InvocationOnMock) = dummyIterator.hasNext @@ -37,21 +37,21 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar { when(generator.getProcessedRequestCount).thenAnswer(new Answer[Long] { def answer(x: InvocationOnMock) = extracted }) - when(generator.next()).thenAnswer(new Answer[FileParsingRequest] { + when(generator.next()).thenAnswer(new Answer[FileParsingTask] { def answer(x: InvocationOnMock) = { extracted += 1; dummyIterator.next() } }) generator } - private val testInput = TestSource.probe[FileTreeScanRequest] - private val testRequests = TestSink.probe[FileParsingRequest] - private val testSummaries = TestSink.probe[FileTreeScanSummary] + private val testInput = TestSource.probe[FileTreeScanTask] + private val testRequests = TestSink.probe[FileParsingTask] + private val testSummaries = TestSink.probe[FileTreeScanResult] val testGraph = RunnableGraph.fromGraph( GraphDSL.create(testInput, testRequests, testSummaries)((_, _, _)) { implicit builder => (sourceInput, sinkRequests, sinkSummaries) => import GraphDSL.Implicits._ val treeParser = builder.add(new TreeParserGraph { - override def createRequestGenerator(request: FileTreeScanRequest): ParsingRequestGenerator = { + override def createRequestGenerator(request: FileTreeScanTask): ParsingRequestGenerator = { getGeneratorDummy(request) } }) @@ -63,7 +63,7 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar { } ) - def sampleInput(baseName: String) = FileTreeScanRequest( + def sampleInput(baseName: String) = FileTreeScanTask( treeBasePath = Paths.get(s"/foo/$baseName"), treeType = TreeType.Zip ) @@ -78,9 +78,9 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar { probeRequests.ensureSubscription().request(3) probeInput.sendNext(input) val expectedRequests = immutable.Seq( - FileParsingRequest(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file1"), "fooParser"), - FileParsingRequest(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file2"), "fooParser"), - FileParsingRequest(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file3"), "fooParser") + FileParsingTask(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file1"), "fooParser"), + FileParsingTask(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file2"), "fooParser"), + FileParsingTask(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file3"), "fooParser") ) probeRequests.expectNextUnorderedN(expectedRequests) } @@ -91,7 +91,7 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar { probeRequests.ensureSubscription().request(3) probeSummaries.ensureSubscription().request(1) probeInput.sendNext(input) - probeSummaries.expectNext(FileTreeScanSummary(input, 3l)) + probeSummaries.expectNext(FileTreeScanResult(input, 3l)) } "emit all remaining values after the upstream element completed" in new GraphWithDummy { @@ -129,12 +129,12 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar { probeSummaries.ensureSubscription().request(2) probeInput.sendNext(input1).sendNext(input2) probeRequests.expectNextN(immutable.Seq( - FileParsingRequest(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file1"), "fooParser"), - FileParsingRequest(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file2"), "fooParser"), - FileParsingRequest(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file3"), "fooParser"), - FileParsingRequest(Paths.get("/foo/archive2"), TreeType.Zip, Paths.get("file1"), "fooParser"), - FileParsingRequest(Paths.get("/foo/archive2"), TreeType.Zip, Paths.get("file2"), "fooParser"), - FileParsingRequest(Paths.get("/foo/archive2"), TreeType.Zip, Paths.get("file3"), "fooParser") + FileParsingTask(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file1"), "fooParser"), + FileParsingTask(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file2"), "fooParser"), + FileParsingTask(Paths.get("/foo/archive1"), TreeType.Zip, Paths.get("file3"), "fooParser"), + FileParsingTask(Paths.get("/foo/archive2"), TreeType.Zip, Paths.get("file1"), "fooParser"), + FileParsingTask(Paths.get("/foo/archive2"), TreeType.Zip, Paths.get("file2"), "fooParser"), + FileParsingTask(Paths.get("/foo/archive2"), TreeType.Zip, Paths.get("file3"), "fooParser") )) } @@ -145,7 +145,7 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar { probeRequests.ensureSubscription().request(6) probeSummaries.ensureSubscription().request(2) probeInput.sendNext(input1).sendNext(input2) - probeSummaries.expectNext(FileTreeScanSummary(input1, 3l), FileTreeScanSummary(input2, 3l)) + probeSummaries.expectNext(FileTreeScanResult(input1, 3l), FileTreeScanResult(input2, 3l)) } "emit all remaining values after the upstream element completed" in new GraphWithDummy { diff --git a/integrated-pipeline/src/test/scala/integrated_pipeline_tests/TreeParsingRequestGeneratorSpec.scala b/integrated-pipeline/src/test/scala/integrated_pipeline_tests/TreeParsingRequestGeneratorSpec.scala index 55dac404..3b33c53d 100644 --- a/integrated-pipeline/src/test/scala/integrated_pipeline_tests/TreeParsingRequestGeneratorSpec.scala +++ b/integrated-pipeline/src/test/scala/integrated_pipeline_tests/TreeParsingRequestGeneratorSpec.scala @@ -2,7 +2,7 @@ package integrated_pipeline_tests import eu.nomad_lab.QueueMessage.TreeParserRequest import eu.nomad_lab.TreeType.TreeType -import eu.nomad_lab.integrated_pipeline.messages.FileTreeScanRequest +import eu.nomad_lab.integrated_pipeline.messages.FileTreeScanTask import eu.nomad_lab.integrated_pipeline.stream_components.{ DirectoryTreeParsingRequestGenerator, ParsingRequestGenerator, ZipTreeParsingRequestGenerator } import eu.nomad_lab.parsers.AllParsers import eu.nomad_lab.{ LocalEnv, TreeType } @@ -14,7 +14,7 @@ class TreeParsingRequestGeneratorSpec extends WordSpec { private implicit val parsers = AllParsers.defaultParserCollection case class TestGenerator( - generator: (FileTreeScanRequest) => ParsingRequestGenerator, treeType: TreeType + generator: (FileTreeScanTask) => ParsingRequestGenerator, treeType: TreeType ) "a ZipTreeParsingRequestGenerator" when { diff --git a/integrated-pipeline/src/test/scala/integrated_pipeline_tests/package.scala b/integrated-pipeline/src/test/scala/integrated_pipeline_tests/package.scala index c8f96dfe..e0060f5d 100644 --- a/integrated-pipeline/src/test/scala/integrated_pipeline_tests/package.scala +++ b/integrated-pipeline/src/test/scala/integrated_pipeline_tests/package.scala @@ -4,7 +4,7 @@ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import eu.nomad_lab.TreeType import eu.nomad_lab.TreeType.TreeType -import eu.nomad_lab.integrated_pipeline.messages.FileTreeScanRequest +import eu.nomad_lab.integrated_pipeline.messages.FileTreeScanTask import eu.nomad_lab.parsers.{ Cp2kParser, FhiAimsParser, VaspRunParser } package object integrated_pipeline_tests { @@ -46,19 +46,19 @@ package object integrated_pipeline_tests { "RQX1tgMCwKvq1nPT9wZeVwxi_oLSV/data/second_example/598004ae7a4aed90672fefaa/vasprun.xml" -> VaspRunParser.name )) - def createFileTreeScanRequest(archive: TestTreeData, mode: TreeType): FileTreeScanRequest = { + def createFileTreeScanRequest(archive: TestTreeData, mode: TreeType): FileTreeScanTask = { val baseDir = sys.props.get("user.dir").get val prefix = archive.baseName.substring(0, 3) mode match { - case TreeType.Zip => FileTreeScanRequest( + case TreeType.Zip => FileTreeScanTask( treeBasePath = Paths.get(s"$baseDir/src/test/resources/$prefix/${archive.baseName}.zip"), treeType = mode ) - case TreeType.Directory => FileTreeScanRequest( + case TreeType.Directory => FileTreeScanTask( treeBasePath = Paths.get(s"$baseDir/src/test/resources/$prefix/${archive.baseName}"), treeType = mode ) - case _ => FileTreeScanRequest( + case _ => FileTreeScanTask( treeBasePath = Paths.get(s"$baseDir/src/test/resources/$prefix/${archive.baseName}"), treeType = mode ) -- GitLab