Skip to content
Snippets Groups Projects
Commit 15d04b5e authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: transformed ParsingTaskGenerator into a trait, changed...

Integrated Pipeline: transformed ParsingTaskGenerator into a trait, changed TreeParser trait implementing classes

-removed the two mini-classes implementing Directory/Zip sources and replaced them with an anonymous class in Main
parent 3825ccd2
No related branches found
No related tags found
No related merge requests found
......@@ -26,7 +26,7 @@ import com.typesafe.scalalogging.StrictLogging
import eu.nomad_lab.TreeType.TreeType
import eu.nomad_lab.integrated_pipeline.Main.PipelineSettings
import eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler
import eu.nomad_lab.integrated_pipeline.io_integrations.{ ArchiveHandler, DirectoryTreeParsingTaskGenerator, ZipTreeParsingTaskGenerator }
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResultSignal, FileParsingTaskSignal, FileTreeParsingResult, FileTreeScanTask }
import eu.nomad_lab.integrated_pipeline.stream_components._
import eu.nomad_lab.meta.{ KnownMetaInfoEnvs, MetaInfoEnv }
......@@ -121,9 +121,11 @@ class Main {
val treeParser = Flow.fromGraph(new MessageProcessorFlow[FileTreeScanTask, FileParsingTaskSignal] {
override val stageName = "TreeParser"
override val processor: TreeParser = params.treeType match {
case TreeType.Zip => new ZipTreeParser(Main.parsers)
case TreeType.Directory => new DirectoryTreeParser(Main.parsers)
override val processor = new TreeParser {
override def createProcessor(task: FileTreeScanTask) = params.treeType match {
case TreeType.Zip => new ZipTreeParsingTaskGenerator(task, Main.parsers)
case TreeType.Directory => new DirectoryTreeParsingTaskGenerator(task, Main.parsers)
}
}
})
......
......@@ -10,7 +10,7 @@ import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection }
* contains methods shared by the different TreeParserLogic Implementations.
* These are streams-adapted versions of the code found in the original TreeParser class.
*/
object ParsingTaskGenerator {
trait ParsingTaskGenerator extends Iterator[FileParsingTask] {
def scanInputStream(parsers: ParserCollection, input: InputStream,
internalFilePath: String): Seq[CandidateParser] = {
......
......@@ -39,12 +39,4 @@ trait TreeParser extends MessageProcessor[FileTreeScanTask, FileParsingTaskSigna
}
final override def requiresMoreMessages: Boolean = false
}
class ZipTreeParser(parserCollection: ParserCollection) extends TreeParser {
override def createProcessor(request: FileTreeScanTask) = new ZipTreeParsingTaskGenerator(request, parserCollection)
}
class DirectoryTreeParser(parserCollection: ParserCollection) extends TreeParser {
override def createProcessor(request: FileTreeScanTask) = new DirectoryTreeParsingTaskGenerator(request, parserCollection)
}
\ No newline at end of file
......@@ -13,7 +13,7 @@ import scala.annotation.tailrec
import scala.collection.JavaConverters._
class DirectoryTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollection: ParserCollection)
extends Iterator[FileParsingTask] {
extends ParsingTaskGenerator {
require(request.treeType == TreeType.Directory, "file tree to process must be a directory")
private val basePath = request.treeBasePath
......@@ -40,7 +40,7 @@ class DirectoryTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollect
scala.io.Source.fromFile(file.toFile)
val in = new FileInputStream(file.toFile)
try {
ParsingTaskGenerator.scanInputStream(parserCollection, in, internalFilePath.toString)
scanInputStream(parserCollection, in, internalFilePath.toString)
//TODO: handle errors
} finally {
in.close()
......@@ -48,7 +48,7 @@ class DirectoryTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollect
} else {
Seq[CandidateParser]()
}
ParsingTaskGenerator.generateRequest(request, internalFilePath, candidateParsers) match {
generateRequest(request, internalFilePath, candidateParsers) match {
case Some(x) => Some(x)
case None => findNextParsingCandidate()
}
......
......@@ -12,7 +12,7 @@ import org.apache.commons.compress.archivers.zip.{ ZipArchiveEntry, ZipFile }
import scala.annotation.tailrec
class ZipTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollection: ParserCollection)
extends Iterator[FileParsingTask] {
extends ParsingTaskGenerator {
require(request.treeType == TreeType.Zip, "file tree to process must be a Zip archive")
......@@ -43,7 +43,7 @@ class ZipTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollection: P
val candidateParsers = if (!zipEntry.isDirectory && !zipEntry.isUnixSymlink) {
val zIn = zipFile.getInputStream(zipEntry)
try {
ParsingTaskGenerator.scanInputStream(parserCollection, zIn, internalFilePath.toString)
scanInputStream(parserCollection, zIn, internalFilePath.toString)
//TODO: handle errors
} finally {
zIn.close()
......@@ -51,7 +51,7 @@ class ZipTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollection: P
} else {
Seq[CandidateParser]()
}
ParsingTaskGenerator.generateRequest(request, internalFilePath, candidateParsers) match {
generateRequest(request, internalFilePath, candidateParsers) match {
case Some(x) => Some(x)
case None => findNextParsingCandidate()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment