Commit fe1b4f3d authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: renamed some case classes to make separation between...

Integrated Pipeline: renamed some case classes to make separation between TreeScanSignals and FileParsingSignals more clear
parent 9db98327
...@@ -31,7 +31,7 @@ class CalculationParsingEngine( ...@@ -31,7 +31,7 @@ class CalculationParsingEngine(
val myId = eventListener.registerReporter(this, name) val myId = eventListener.registerReporter(this, name)
private def getParser(request: FileParsingTask): Option[OptimizedParser] = { private def getParser(request: CandidateFound): Option[OptimizedParser] = {
parsers.parsers.get(request.parserName).map(_.optimizedParser(Seq())) parsers.parsers.get(request.parserName).map(_.optimizedParser(Seq()))
} }
...@@ -63,10 +63,10 @@ class CalculationParsingEngine( ...@@ -63,10 +63,10 @@ class CalculationParsingEngine(
* @param task the original parsing task signal * @param task the original parsing task signal
* @return the generated parsing results signal * @return the generated parsing results signal
*/ */
def processSignal(task: FileParsingTaskSignal): FileParsingResultSignal = { def processSignal(task: TreeScanSignal): FileParsingSignal = {
task match { task match {
case signal: FileParsingSignalEndTree => signal case signal: TreeScanCompleted => signal
case task: FileParsingTask => case task: CandidateFound =>
eventListener.processEvent(myId, CalculationParserEventStart( eventListener.processEvent(myId, CalculationParserEventStart(
task.fileTree, task.relativePath, task.parserName task.fileTree, task.relativePath, task.parserName
)) ))
...@@ -92,11 +92,11 @@ class CalculationParsingEngine( ...@@ -92,11 +92,11 @@ class CalculationParsingEngine(
* @param reason explanation why the parsing has failed * @param reason explanation why the parsing has failed
* @return a FileParsingResult representing the failure * @return a FileParsingResult representing the failure
*/ */
private def failParseRequest(request: FileParsingTask, reason: String): FileParsingResult = { private def failParseRequest(request: CandidateFound, reason: String): FileParsingResult = {
eventListener.processEvent(myId, CalculationParserEventEnd( eventListener.processEvent(myId, CalculationParserEventEnd(
request.fileTree, request.relativePath, request.parserName, ParseResult.ParseFailure, Some(reason) request.fileTree, request.relativePath, request.parserName, ParseResult.ParseFailure, Some(reason)
)) ))
InMemoryResult( ParsingResultInMemory(
task = request, task = request,
result = ParseResult.ParseFailure, result = ParseResult.ParseFailure,
start = None, start = None,
...@@ -106,7 +106,7 @@ class CalculationParsingEngine( ...@@ -106,7 +106,7 @@ class CalculationParsingEngine(
) )
} }
private def parseCalculation(request: FileParsingTask, parser: OptimizedParser, private def parseCalculation(request: CandidateFound, parser: OptimizedParser,
pathToMainFile: Path): FileParsingResult = { pathToMainFile: Path): FileParsingResult = {
val buffer = new BufferForBackend val buffer = new BufferForBackend
val backend = new ParseEventsEmitter(metaInfo, buffer.handleParseEvents, val backend = new ParseEventsEmitter(metaInfo, buffer.handleParseEvents,
...@@ -125,7 +125,7 @@ class CalculationParsingEngine( ...@@ -125,7 +125,7 @@ class CalculationParsingEngine(
eventListener.processEvent(myId, CalculationParserEventEnd( eventListener.processEvent(myId, CalculationParserEventEnd(
request.fileTree, request.relativePath, request.parserName, result, error request.fileTree, request.relativePath, request.parserName, result, error
)) ))
InMemoryResult( ParsingResultInMemory(
task = request, task = request,
result = result, result = result,
start = buffer.startEvent, start = buffer.startEvent,
......
...@@ -27,7 +27,7 @@ import eu.nomad_lab.TreeType.TreeType ...@@ -27,7 +27,7 @@ import eu.nomad_lab.TreeType.TreeType
import eu.nomad_lab.integrated_pipeline.Main.PipelineSettings import eu.nomad_lab.integrated_pipeline.Main.PipelineSettings
import eu.nomad_lab.integrated_pipeline.OutputType.OutputType import eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import eu.nomad_lab.integrated_pipeline.io_integrations._ import eu.nomad_lab.integrated_pipeline.io_integrations._
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResultSignal, FileParsingTaskSignal, FileTreeParsingResult } import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingSignal, TreeScanSignal, FileTreeParsingResult }
import eu.nomad_lab.integrated_pipeline.stream_components._ import eu.nomad_lab.integrated_pipeline.stream_components._
import eu.nomad_lab.meta.{ KnownMetaInfoEnvs, MetaInfoEnv } import eu.nomad_lab.meta.{ KnownMetaInfoEnvs, MetaInfoEnv }
import eu.nomad_lab.parsers.AllParsers import eu.nomad_lab.parsers.AllParsers
...@@ -124,7 +124,7 @@ class Main { ...@@ -124,7 +124,7 @@ class Main {
GraphDSL.create(sinkSummaries) { implicit builder => (sink) => GraphDSL.create(sinkSummaries) { implicit builder => (sink) =>
import GraphDSL.Implicits._ import GraphDSL.Implicits._
val treeParser = Flow.fromGraph(new MessageProcessorFlow[FileTree, FileParsingTaskSignal] { val treeParser = Flow.fromGraph(new MessageProcessorFlow[FileTree, TreeScanSignal] {
override val stageName = "TreeParser" override val stageName = "TreeParser"
override val processor = new TreeScanner { override val processor = new TreeScanner {
override val eventListener = eventProcessor override val eventListener = eventProcessor
...@@ -143,7 +143,7 @@ class Main { ...@@ -143,7 +143,7 @@ class Main {
new CalculationParsingEngine(Main.parsers, metaInfo, eventProcessor, Some(f"Worker-$i%2d"))) new CalculationParsingEngine(Main.parsers, metaInfo, eventProcessor, Some(f"Worker-$i%2d")))
) )
val processor = Flow.fromGraph(new MessageProcessorFlow[FileParsingResultSignal, FileTreeParsingResult] { val processor = Flow.fromGraph(new MessageProcessorFlow[FileParsingSignal, FileTreeParsingResult] {
override val stageName = "ResultWriter" override val stageName = "ResultWriter"
override val processor = new ParsingResultsProcessingManager { override val processor = new ParsingResultsProcessingManager {
override val eventListener = eventProcessor override val eventListener = eventProcessor
......
...@@ -7,7 +7,7 @@ import eu.nomad_lab.parsers.ParseResult ...@@ -7,7 +7,7 @@ import eu.nomad_lab.parsers.ParseResult
import scala.collection.mutable import scala.collection.mutable
trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResultSignal, FileTreeParsingResult] { trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingSignal, FileTreeParsingResult] {
protected[this] val processor: ParsingResultsProcessor protected[this] val processor: ParsingResultsProcessor
protected[this] val eventListener: EventListener protected[this] val eventListener: EventListener
...@@ -22,11 +22,11 @@ trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResult ...@@ -22,11 +22,11 @@ trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResult
override def requiresMoreMessages: Boolean = processedCounts.nonEmpty override def requiresMoreMessages: Boolean = processedCounts.nonEmpty
override def processSignal(signal: FileParsingResultSignal): Unit = { override def processSignal(signal: FileParsingSignal): Unit = {
if (nextResult.nonEmpty) if (nextResult.nonEmpty)
throw new IllegalStateException("previously generated signal not fetched!") throw new IllegalStateException("previously generated signal not fetched!")
signal match { signal match {
case signal: FileParsingSignalEndTree => case signal: TreeScanCompleted =>
if (!processedCounts.contains(signal.fileTree.treeBasePath)) if (!processedCounts.contains(signal.fileTree.treeBasePath))
startFileTree(signal.fileTree) startFileTree(signal.fileTree)
expectedCounts(signal.fileTree.treeBasePath) = signal.numParsingTasks expectedCounts(signal.fileTree.treeBasePath) = signal.numParsingTasks
......
...@@ -4,18 +4,18 @@ import java.io.InputStream ...@@ -4,18 +4,18 @@ import java.io.InputStream
import java.nio.file.Path import java.nio.file.Path
import java.util.NoSuchElementException import java.util.NoSuchElementException
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, TreeParserEventScanError } import eu.nomad_lab.integrated_pipeline.messages.{ CandidateFound, TreeParserEventScanError }
import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection } import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection }
/** /**
* contains methods shared by the different TreeParserLogic Implementations. * contains methods shared by the different TreeParserLogic Implementations.
* These are streams-adapted versions of the code found in the original TreeParser class. * These are streams-adapted versions of the code found in the original TreeParser class.
*/ */
trait ParsingTaskGenerator extends Iterator[Either[TreeParserEventScanError, FileParsingTask]] { trait ParsingTaskGenerator extends Iterator[Either[TreeParserEventScanError, CandidateFound]] {
protected[this] def findNextParsingCandidate(): Option[Either[TreeParserEventScanError, FileParsingTask]] protected[this] def findNextParsingCandidate(): Option[Either[TreeParserEventScanError, CandidateFound]]
private var nextRequest: Option[Either[TreeParserEventScanError, FileParsingTask]] = None private var nextRequest: Option[Either[TreeParserEventScanError, CandidateFound]] = None
private var searchStarted: Boolean = false private var searchStarted: Boolean = false
def hasNext: Boolean = { def hasNext: Boolean = {
...@@ -26,7 +26,7 @@ trait ParsingTaskGenerator extends Iterator[Either[TreeParserEventScanError, Fil ...@@ -26,7 +26,7 @@ trait ParsingTaskGenerator extends Iterator[Either[TreeParserEventScanError, Fil
nextRequest.isDefined nextRequest.isDefined
} }
def next(): Either[TreeParserEventScanError, FileParsingTask] = { def next(): Either[TreeParserEventScanError, CandidateFound] = {
if (!searchStarted) { if (!searchStarted) {
nextRequest = findNextParsingCandidate() nextRequest = findNextParsingCandidate()
searchStarted = true searchStarted = true
...@@ -49,11 +49,11 @@ trait ParsingTaskGenerator extends Iterator[Either[TreeParserEventScanError, Fil ...@@ -49,11 +49,11 @@ trait ParsingTaskGenerator extends Iterator[Either[TreeParserEventScanError, Fil
} }
def generateRequest(fileTreeRequest: FileTree, relativeFilePath: Path, def generateRequest(fileTreeRequest: FileTree, relativeFilePath: Path,
candidateParsers: Seq[CandidateParser]): Option[FileParsingTask] = { candidateParsers: Seq[CandidateParser]): Option[CandidateFound] = {
require(!relativeFilePath.isAbsolute, "internal file paths must be relative to file tree root") require(!relativeFilePath.isAbsolute, "internal file paths must be relative to file tree root")
if (candidateParsers.nonEmpty) { if (candidateParsers.nonEmpty) {
Some(FileParsingTask( Some(CandidateFound(
fileTree = fileTreeRequest, fileTree = fileTreeRequest,
relativePath = relativeFilePath, relativePath = relativeFilePath,
parserName = candidateParsers.head.parserName parserName = candidateParsers.head.parserName
......
...@@ -9,7 +9,7 @@ import scala.annotation.tailrec ...@@ -9,7 +9,7 @@ import scala.annotation.tailrec
* zip-archive) for files that are potential calculations of interest for NOMAD and emits file * zip-archive) for files that are potential calculations of interest for NOMAD and emits file
* parsing requests for these candidate files. * parsing requests for these candidate files.
*/ */
trait TreeScanner extends MessageProcessor[FileTree, FileParsingTaskSignal] { trait TreeScanner extends MessageProcessor[FileTree, TreeScanSignal] {
protected[this] def createProcessor(fileTree: FileTree): ParsingTaskGenerator protected[this] def createProcessor(fileTree: FileTree): ParsingTaskGenerator
protected[this] val eventListener: EventListener protected[this] val eventListener: EventListener
...@@ -43,7 +43,7 @@ trait TreeScanner extends MessageProcessor[FileTree, FileParsingTaskSignal] { ...@@ -43,7 +43,7 @@ trait TreeScanner extends MessageProcessor[FileTree, FileParsingTaskSignal] {
final override def hasSignalToEmit(): Boolean = !readyForInbound final override def hasSignalToEmit(): Boolean = !readyForInbound
@tailrec final override def getNextSignalToEmit(): FileParsingTaskSignal = { @tailrec final override def getNextSignalToEmit(): TreeScanSignal = {
if (generator.exists(_.hasNext)) { if (generator.exists(_.hasNext)) {
generator.get.next() match { generator.get.next() match {
case Left(scanError) => case Left(scanError) =>
...@@ -60,7 +60,7 @@ trait TreeScanner extends MessageProcessor[FileTree, FileParsingTaskSignal] { ...@@ -60,7 +60,7 @@ trait TreeScanner extends MessageProcessor[FileTree, FileParsingTaskSignal] {
} else if (!readyForInbound) { } else if (!readyForInbound) {
readyForInbound = true readyForInbound = true
eventListener.processEvent(myId, TreeParserEventEnd(request.get, taskCount)) eventListener.processEvent(myId, TreeParserEventEnd(request.get, taskCount))
FileParsingSignalEndTree(request.get, taskCount) TreeScanCompleted(request.get, taskCount)
} else { } else {
throw new IllegalStateException("no message ready for fetching") throw new IllegalStateException("no message ready for fetching")
} }
......
...@@ -5,7 +5,7 @@ import java.nio.file.Files ...@@ -5,7 +5,7 @@ import java.nio.file.Files
import eu.nomad_lab.TreeType import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.{ FileTree, ParsingTaskGenerator } import eu.nomad_lab.integrated_pipeline.{ FileTree, ParsingTaskGenerator }
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, TreeParserEventScanError } import eu.nomad_lab.integrated_pipeline.messages.{ CandidateFound, TreeParserEventScanError }
import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection } import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection }
import scala.annotation.tailrec import scala.annotation.tailrec
...@@ -19,7 +19,7 @@ class DirectoryTreeParsingTaskGenerator(request: FileTree, parserCollection: Par ...@@ -19,7 +19,7 @@ class DirectoryTreeParsingTaskGenerator(request: FileTree, parserCollection: Par
private val basePath = request.treeBasePath private val basePath = request.treeBasePath
private val fileIterator = Files.walk(basePath).iterator().asScala.filter(Files.isRegularFile(_)) private val fileIterator = Files.walk(basePath).iterator().asScala.filter(Files.isRegularFile(_))
@tailrec final protected[this] override def findNextParsingCandidate(): Option[Either[TreeParserEventScanError, FileParsingTask]] = { @tailrec final protected[this] override def findNextParsingCandidate(): Option[Either[TreeParserEventScanError, CandidateFound]] = {
if (fileIterator.hasNext) { if (fileIterator.hasNext) {
val file = fileIterator.next() val file = fileIterator.next()
val internalFilePath = basePath.relativize(file) val internalFilePath = basePath.relativize(file)
......
...@@ -4,7 +4,7 @@ import java.nio.file.{ Files, Path, Paths } ...@@ -4,7 +4,7 @@ import java.nio.file.{ Files, Path, Paths }
import eu.nomad_lab.{ H5Lib, TreeType } import eu.nomad_lab.{ H5Lib, TreeType }
import eu.nomad_lab.integrated_pipeline.OutputType.OutputType import eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, InMemoryResult } import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, ParsingResultInMemory }
import eu.nomad_lab.integrated_pipeline.{ FileTree, OutputType, ParsingResultsProcessor } import eu.nomad_lab.integrated_pipeline.{ FileTree, OutputType, ParsingResultsProcessor }
import eu.nomad_lab.meta.MetaInfoEnv import eu.nomad_lab.meta.MetaInfoEnv
import eu.nomad_lab.parsers.H5Backend.H5File import eu.nomad_lab.parsers.H5Backend.H5File
...@@ -47,7 +47,7 @@ class WriteToHDF5MergedResultsProcessor( ...@@ -47,7 +47,7 @@ class WriteToHDF5MergedResultsProcessor(
val backend = new ReindexBackend(h5Backend) val backend = new ReindexBackend(h5Backend)
try { try {
result match { result match {
case x: InMemoryResult => case x: ParsingResultInMemory =>
x.start.foreach(_.emitOnBackend(backend)) x.start.foreach(_.emitOnBackend(backend))
x.events.foreach(_.emitOnBackend(backend)) x.events.foreach(_.emitOnBackend(backend))
x.end.foreach(_.emitOnBackend(backend)) x.end.foreach(_.emitOnBackend(backend))
......
...@@ -3,7 +3,7 @@ package eu.nomad_lab.integrated_pipeline.io_integrations ...@@ -3,7 +3,7 @@ package eu.nomad_lab.integrated_pipeline.io_integrations
import java.nio.file.{ Files, Path, Paths } import java.nio.file.{ Files, Path, Paths }
import eu.nomad_lab.integrated_pipeline.OutputType.OutputType import eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, InMemoryResult } import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, ParsingResultInMemory }
import eu.nomad_lab.integrated_pipeline.{ FileTree, OutputType, ParsingResultsProcessor } import eu.nomad_lab.integrated_pipeline.{ FileTree, OutputType, ParsingResultsProcessor }
import eu.nomad_lab.meta.MetaInfoEnv import eu.nomad_lab.meta.MetaInfoEnv
import eu.nomad_lab.parsers.H5Backend.H5File import eu.nomad_lab.parsers.H5Backend.H5File
...@@ -21,7 +21,7 @@ class WriteToHDF5ResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e ...@@ -21,7 +21,7 @@ class WriteToHDF5ResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e
val backend = new ReindexBackend(h5Backend) val backend = new ReindexBackend(h5Backend)
try { try {
result match { result match {
case x: InMemoryResult => case x: ParsingResultInMemory =>
x.start.foreach(_.emitOnBackend(backend)) x.start.foreach(_.emitOnBackend(backend))
x.events.foreach(_.emitOnBackend(backend)) x.events.foreach(_.emitOnBackend(backend))
x.end.foreach(_.emitOnBackend(backend)) x.end.foreach(_.emitOnBackend(backend))
......
...@@ -4,7 +4,7 @@ import java.io.FileWriter ...@@ -4,7 +4,7 @@ import java.io.FileWriter
import java.nio.file.{ Files, Path, Paths } import java.nio.file.{ Files, Path, Paths }
import eu.nomad_lab.integrated_pipeline.OutputType.OutputType import eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, InMemoryResult } import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, ParsingResultInMemory }
import eu.nomad_lab.integrated_pipeline.{ FileTree, OutputType, ParsingResultsProcessor } import eu.nomad_lab.integrated_pipeline.{ FileTree, OutputType, ParsingResultsProcessor }
import eu.nomad_lab.meta.MetaInfoEnv import eu.nomad_lab.meta.MetaInfoEnv
import eu.nomad_lab.parsers.JsonWriterBackend import eu.nomad_lab.parsers.JsonWriterBackend
...@@ -22,7 +22,7 @@ class WriteToJsonResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e ...@@ -22,7 +22,7 @@ class WriteToJsonResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e
try { try {
val backend = new JsonWriterBackend(metaInfoEnv = metaInfo, outF = fileWriter) val backend = new JsonWriterBackend(metaInfoEnv = metaInfo, outF = fileWriter)
result match { result match {
case x: InMemoryResult => case x: ParsingResultInMemory =>
x.start.foreach(_.emitOnBackend(backend)) x.start.foreach(_.emitOnBackend(backend))
x.events.foreach(_.emitOnBackend(backend)) x.events.foreach(_.emitOnBackend(backend))
x.end.foreach(_.emitOnBackend(backend)) x.end.foreach(_.emitOnBackend(backend))
......
...@@ -4,7 +4,7 @@ import java.nio.file.Paths ...@@ -4,7 +4,7 @@ import java.nio.file.Paths
import eu.nomad_lab.TreeType import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.{ FileTree, ParsingTaskGenerator } import eu.nomad_lab.integrated_pipeline.{ FileTree, ParsingTaskGenerator }
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, TreeParserEventScanError } import eu.nomad_lab.integrated_pipeline.messages.{ CandidateFound, TreeParserEventScanError }
import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection } import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection }
import org.apache.commons.compress.archivers.zip.{ ZipArchiveEntry, ZipFile } import org.apache.commons.compress.archivers.zip.{ ZipArchiveEntry, ZipFile }
...@@ -23,7 +23,7 @@ class ZipTreeParsingTaskGenerator(request: FileTree, parserCollection: ParserCol ...@@ -23,7 +23,7 @@ class ZipTreeParsingTaskGenerator(request: FileTree, parserCollection: ParserCol
zipFile.close() zipFile.close()
} }
@tailrec protected[this] final override def findNextParsingCandidate(): Option[Either[TreeParserEventScanError, FileParsingTask]] = { @tailrec protected[this] final override def findNextParsingCandidate(): Option[Either[TreeParserEventScanError, CandidateFound]] = {
if (zipEntries.hasMoreElements) { if (zipEntries.hasMoreElements) {
val zipEntry: ZipArchiveEntry = zipEntries.nextElement() val zipEntry: ZipArchiveEntry = zipEntries.nextElement()
val internalFilePath = Paths.get(zipEntry.getName) val internalFilePath = Paths.get(zipEntry.getName)
......
...@@ -7,28 +7,28 @@ import eu.nomad_lab.parsers.ParseResult.ParseResult ...@@ -7,28 +7,28 @@ import eu.nomad_lab.parsers.ParseResult.ParseResult
import eu.nomad_lab.parsers.{ FinishedParsingSession, ParseEvent, StartedParsingSession } import eu.nomad_lab.parsers.{ FinishedParsingSession, ParseEvent, StartedParsingSession }
import eu.nomad_lab.{ CompactSha, TreeType } import eu.nomad_lab.{ CompactSha, TreeType }
sealed trait FileParsingTaskSignal { sealed trait TreeScanSignal {
val fileTree: FileTree val fileTree: FileTree
} }
sealed trait FileParsingResultSignal { sealed trait FileParsingSignal {
val fileTree: FileTree val fileTree: FileTree
} }
/** /**
* Request a detailed parsing of the given candidate calculation main file * An candidate calculation which should be analyzed in detail by the given parser.
* @param fileTree the file-tree scan request from which this parsing task was generated * @param fileTree the file-tree scan request from which this parsing task was generated
* @param relativePath path inside the given file tree * @param relativePath path inside the given file tree
* @param parserName name of the parser to use for processing the file * @param parserName name of the parser to use for processing the file
* @param extractedPath the path to the temporarily extracted main file (if originating from an * @param extractedPath the path to the temporarily extracted main file (if originating from an
* archive) * archive)
*/ */
case class FileParsingTask( case class CandidateFound(
fileTree: FileTree, fileTree: FileTree,
relativePath: Path, relativePath: Path,
parserName: String, parserName: String,
extractedPath: Option[Path] = None extractedPath: Option[Path] = None
) extends FileParsingTaskSignal { ) extends TreeScanSignal {
def mainFileUri: String = { def mainFileUri: String = {
fileTree.treeType match { fileTree.treeType match {
...@@ -46,14 +46,14 @@ case class FileParsingTask( ...@@ -46,14 +46,14 @@ case class FileParsingTask(
} }
case class FileParsingSignalEndTree( case class TreeScanCompleted(
fileTree: FileTree, fileTree: FileTree,
numParsingTasks: Long = 0 numParsingTasks: Long = 0
) extends FileParsingTaskSignal with FileParsingResultSignal ) extends TreeScanSignal with FileParsingSignal
sealed trait FileParsingResult extends FileParsingResultSignal { sealed trait FileParsingResult extends FileParsingSignal {
val result: ParseResult val result: ParseResult
val task: FileParsingTask val task: CandidateFound
val start: Option[StartedParsingSession] val start: Option[StartedParsingSession]
val end: Option[FinishedParsingSession] val end: Option[FinishedParsingSession]
val error: Option[String] val error: Option[String]
...@@ -61,8 +61,8 @@ sealed trait FileParsingResult extends FileParsingResultSignal { ...@@ -61,8 +61,8 @@ sealed trait FileParsingResult extends FileParsingResultSignal {
val fileTree: FileTree = task.fileTree val fileTree: FileTree = task.fileTree
} }
case class InMemoryResult( case class ParsingResultInMemory(
task: FileParsingTask, task: CandidateFound,
result: ParseResult, result: ParseResult,
start: Option[StartedParsingSession], start: Option[StartedParsingSession],
events: Seq[ParseEvent], events: Seq[ParseEvent],
......
...@@ -10,10 +10,10 @@ import eu.nomad_lab.integrated_pipeline.messages._ ...@@ -10,10 +10,10 @@ import eu.nomad_lab.integrated_pipeline.messages._
import scala.collection.mutable import scala.collection.mutable
class ArchiveCleanUpFlow(archiveHandler: ArchiveHandler) class ArchiveCleanUpFlow(archiveHandler: ArchiveHandler)
extends GraphStage[FlowShape[FileParsingResultSignal, FileParsingResultSignal]] { extends GraphStage[FlowShape[FileParsingSignal, FileParsingSignal]] {
val in = Inlet[FileParsingResultSignal]("ArchiveCleanUpFlow.in") val in = Inlet[FileParsingSignal]("ArchiveCleanUpFlow.in")
val out = Outlet[FileParsingResultSignal]("ArchiveCleanUpFlow.out") val out = Outlet[FileParsingSignal]("ArchiveCleanUpFlow.out")
override val shape = new FlowShape(in, out) override val shape = new FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
...@@ -41,7 +41,7 @@ class ArchiveCleanUpFlow(archiveHandler: ArchiveHandler) ...@@ -41,7 +41,7 @@ class ArchiveCleanUpFlow(archiveHandler: ArchiveHandler)
input match { input match {
case x: FileParsingResult => case x: FileParsingResult =>
processed(x.fileTree) = processed.getOrElse(x.fileTree, 0l) + 1l processed(x.fileTree) = processed.getOrElse(x.fileTree, 0l) + 1l
case x: FileParsingSignalEndTree => case x: TreeScanCompleted =>
expected(x.fileTree) = x.numParsingTasks expected(x.fileTree) = x.numParsingTasks
} }
if (processed.getOrElse(input.fileTree, -1) == expected.getOrElse(input.fileTree, -2)) { if (processed.getOrElse(input.fileTree, -1) == expected.getOrElse(input.fileTree, -2)) {
......
...@@ -6,7 +6,7 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } ...@@ -6,7 +6,7 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import eu.nomad_lab.TreeType import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler