diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala index 6e528397d824ce4e882bfa3c8c1049f00ad70974..1738ccdaf2da461e56ad7dbbf27ca7457f0861bf 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala @@ -26,7 +26,7 @@ import com.typesafe.scalalogging.StrictLogging import eu.nomad_lab.TreeType.TreeType import eu.nomad_lab.integrated_pipeline.Main.PipelineSettings import eu.nomad_lab.integrated_pipeline.OutputType.OutputType -import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler +import eu.nomad_lab.integrated_pipeline.io_integrations.{ ArchiveHandler, DirectoryTreeParsingTaskGenerator, ZipTreeParsingTaskGenerator } import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResultSignal, FileParsingTaskSignal, FileTreeParsingResult, FileTreeScanTask } import eu.nomad_lab.integrated_pipeline.stream_components._ import eu.nomad_lab.meta.{ KnownMetaInfoEnvs, MetaInfoEnv } @@ -121,9 +121,11 @@ class Main { val treeParser = Flow.fromGraph(new MessageProcessorFlow[FileTreeScanTask, FileParsingTaskSignal] { override val stageName = "TreeParser" - override val processor: TreeParser = params.treeType match { - case TreeType.Zip => new ZipTreeParser(Main.parsers) - case TreeType.Directory => new DirectoryTreeParser(Main.parsers) + override val processor = new TreeParser { + override def createProcessor(task: FileTreeScanTask) = params.treeType match { + case TreeType.Zip => new ZipTreeParsingTaskGenerator(task, Main.parsers) + case TreeType.Directory => new DirectoryTreeParsingTaskGenerator(task, Main.parsers) + } } }) diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/ParsingTaskGenerator.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/ParsingTaskGenerator.scala index 6980ea514a99bdd919592e38b66e99667e55047b..f8fa18f2d48711a8ba91ccc92e8bf00a03c9ec8e 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/ParsingTaskGenerator.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/ParsingTaskGenerator.scala @@ -10,7 +10,7 @@ import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection } * contains methods shared by the different TreeParserLogic Implementations. * These are streams-adapted versions of the code found in the original TreeParser class. */ -object ParsingTaskGenerator { +trait ParsingTaskGenerator extends Iterator[FileParsingTask] { def scanInputStream(parsers: ParserCollection, input: InputStream, internalFilePath: String): Seq[CandidateParser] = { diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/TreeParser.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/TreeParser.scala index dd7f28b46defaf3d0222327f0e3178c926e39953..8f3b60610b0a90768e3d71ffecb9918fc76c2e2c 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/TreeParser.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/TreeParser.scala @@ -39,12 +39,4 @@ trait TreeParser extends MessageProcessor[FileTreeScanTask, FileParsingTaskSigna } final override def requiresMoreMessages: Boolean = false -} - -class ZipTreeParser(parserCollection: ParserCollection) extends TreeParser { - override def createProcessor(request: FileTreeScanTask) = new ZipTreeParsingTaskGenerator(request, parserCollection) -} - -class DirectoryTreeParser(parserCollection: ParserCollection) extends TreeParser { - override def createProcessor(request: FileTreeScanTask) = new DirectoryTreeParsingTaskGenerator(request, parserCollection) } \ No newline at end of file diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/DirectoryTreeParsingTaskGenerator.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/DirectoryTreeParsingTaskGenerator.scala index f16a4260c7ff4d2412d1410423b6e84e711a4536..1bba70c4f0073034445bedb0d6e5a2d4bdd8f813 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/DirectoryTreeParsingTaskGenerator.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/DirectoryTreeParsingTaskGenerator.scala @@ -13,7 +13,7 @@ import scala.annotation.tailrec import scala.collection.JavaConverters._ class DirectoryTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollection: ParserCollection) - extends Iterator[FileParsingTask] { + extends ParsingTaskGenerator { require(request.treeType == TreeType.Directory, "file tree to process must be a directory") private val basePath = request.treeBasePath @@ -40,7 +40,7 @@ class DirectoryTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollect scala.io.Source.fromFile(file.toFile) val in = new FileInputStream(file.toFile) try { - ParsingTaskGenerator.scanInputStream(parserCollection, in, internalFilePath.toString) + scanInputStream(parserCollection, in, internalFilePath.toString) //TODO: handle errors } finally { in.close() @@ -48,7 +48,7 @@ class DirectoryTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollect } else { Seq[CandidateParser]() } - ParsingTaskGenerator.generateRequest(request, internalFilePath, candidateParsers) match { + generateRequest(request, internalFilePath, candidateParsers) match { case Some(x) => Some(x) case None => findNextParsingCandidate() } diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/ZipTreeParsingTaskGenerator.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/ZipTreeParsingTaskGenerator.scala index c3faaaabdf99daa56533a020885a1ab554df79e2..931d6c08ab5ac52955dab7a0d07c4eebd2d2b5f7 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/ZipTreeParsingTaskGenerator.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/ZipTreeParsingTaskGenerator.scala @@ -12,7 +12,7 @@ import org.apache.commons.compress.archivers.zip.{ ZipArchiveEntry, ZipFile } import scala.annotation.tailrec class ZipTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollection: ParserCollection) - extends Iterator[FileParsingTask] { + extends ParsingTaskGenerator { require(request.treeType == TreeType.Zip, "file tree to process must be a Zip archive") @@ -43,7 +43,7 @@ class ZipTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollection: P val candidateParsers = if (!zipEntry.isDirectory && !zipEntry.isUnixSymlink) { val zIn = zipFile.getInputStream(zipEntry) try { - ParsingTaskGenerator.scanInputStream(parserCollection, zIn, internalFilePath.toString) + scanInputStream(parserCollection, zIn, internalFilePath.toString) //TODO: handle errors } finally { zIn.close() @@ -51,7 +51,7 @@ class ZipTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollection: P } else { Seq[CandidateParser]() } - ParsingTaskGenerator.generateRequest(request, internalFilePath, candidateParsers) match { + generateRequest(request, internalFilePath, candidateParsers) match { case Some(x) => Some(x) case None => findNextParsingCandidate() }