Commit 056be441 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: renamed fileTree fields in processing signal classes

parent 8787d6c1
......@@ -68,10 +68,10 @@ class CalculationParsingEngine(
case signal: FileParsingSignalEndTree => signal
case task: FileParsingTask =>
eventListener.processEvent(myId, CalculationParserEventStart(
task.treeTask, task.relativePath, task.parserName
task.fileTree, task.relativePath, task.parserName
))
val pathToMainFile = task.treeTask.treeType match {
case TreeType.Directory => Right(task.treeTask.treeBasePath.resolve(task.relativePath))
val pathToMainFile = task.fileTree.treeType match {
case TreeType.Directory => Right(task.fileTree.treeBasePath.resolve(task.relativePath))
case TreeType.Zip => task.extractedPath match {
case Some(path) => Right(path)
case None => Left("extracted file path is missing in the request")
......@@ -94,7 +94,7 @@ class CalculationParsingEngine(
*/
private def failParseRequest(request: FileParsingTask, reason: String): FileParsingResult = {
eventListener.processEvent(myId, CalculationParserEventEnd(
request.treeTask, request.relativePath, request.parserName, ParseResult.ParseFailure, Some(reason)
request.fileTree, request.relativePath, request.parserName, ParseResult.ParseFailure, Some(reason)
))
InMemoryResult(
task = request,
......@@ -123,7 +123,7 @@ class CalculationParsingEngine(
case _ => None
}
eventListener.processEvent(myId, CalculationParserEventEnd(
request.treeTask, request.relativePath, request.parserName, result, error
request.fileTree, request.relativePath, request.parserName, result, error
))
InMemoryResult(
task = request,
......
......@@ -27,41 +27,41 @@ trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResult
throw new IllegalStateException("previously generated signal not fetched!")
signal match {
case signal: FileParsingSignalEndTree =>
if (!processedCounts.contains(signal.treeTask.treeBasePath))
startFileTree(signal.treeTask)
expectedCounts(signal.treeTask.treeBasePath) = signal.numParsingTasks
if (!processedCounts.contains(signal.fileTree.treeBasePath))
startFileTree(signal.fileTree)
expectedCounts(signal.fileTree.treeBasePath) = signal.numParsingTasks
case signal: FileParsingResult =>
if (!processedCounts.contains(signal.treeTask.treeBasePath))
startFileTree(signal.treeTask)
if (!processedCounts.contains(signal.fileTree.treeBasePath))
startFileTree(signal.fileTree)
eventListener.processEvent(myId, ResultWriterEventResult(
fileTree = signal.treeTask,
fileTree = signal.fileTree,
relativePath = signal.task.relativePath,
parser = signal.task.parserName,
result = signal.result,
error = signal.error
))
processor.processFileParsingResult(signal)
processedCounts(signal.treeTask.treeBasePath) += 1
processedCounts(signal.fileTree.treeBasePath) += 1
if (signal.result == ParseResult.ParseFailure)
failedCounts(signal.treeTask.treeBasePath) += 1
failedCounts(signal.fileTree.treeBasePath) += 1
}
val path = signal.treeTask.treeBasePath
val path = signal.fileTree.treeBasePath
if (expectedCounts.getOrElse(path, -1) == processedCounts.getOrElse(path, -2)) {
processor.finishProcessingTreeResults(signal.treeTask)
processor.finishProcessingTreeResults(signal.fileTree)
val numCalculations = expectedCounts.remove(path).get
processedCounts.remove(path)
val numFailures = failedCounts.remove(path).get
eventListener.processEvent(myId, ResultWriterEventEnd(
fileTree = signal.treeTask,
fileTree = signal.fileTree,
numCalculations = numCalculations,
numParsingFailures = numFailures,
outputLocation = processor.outputLocation(signal.treeTask)
outputLocation = processor.outputLocation(signal.fileTree)
))
nextResult = Some(FileTreeParsingResult(
treeScanTask = signal.treeTask,
treeScanTask = signal.fileTree,
numCalculationsFound = numCalculations,
numParsingFailures = numFailures,
outputLocation = processor.outputLocation(signal.treeTask),
outputLocation = processor.outputLocation(signal.fileTree),
outputFormat = processor.outputType
))
}
......
......@@ -54,7 +54,7 @@ trait ParsingTaskGenerator extends Iterator[Either[TreeParserEventScanError, Fil
if (candidateParsers.nonEmpty) {
Some(FileParsingTask(
treeTask = fileTreeRequest,
fileTree = fileTreeRequest,
relativePath = relativeFilePath,
parserName = candidateParsers.head.parserName
))
......
......@@ -30,7 +30,7 @@ class WriteToHDF5MergedResultsProcessor(
override def processFileParsingResult(result: FileParsingResult): Unit = {
val id = result.task.calculationGid
val archiveFile = fileMap(result.task.treeTask)
val archiveFile = fileMap(result.task.fileTree)
val archiveHandle = archiveFile.groupHandle
val calcHandle = H5Lib.groupGet(archiveHandle, id, create = true)
val storageLocation = new H5File(
......
......@@ -12,10 +12,10 @@ import eu.nomad_lab.parsers.{ H5Backend, ReindexBackend }
class WriteToHDF5ResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) extends ParsingResultsProcessor {
override def processFileParsingResult(result: FileParsingResult): Unit = {
val id = result.task.calculationGid
val archiveGid = result.task.treeTask.archiveId
val archiveGid = result.task.fileTree.archiveId
val fileName = Paths.get(id + ".h5")
val targetPath = outputLocation(result.treeTask).resolve(fileName)
Files.createDirectories(outputLocation(result.treeTask))
val targetPath = outputLocation(result.fileTree).resolve(fileName)
Files.createDirectories(outputLocation(result.fileTree))
val h5file = H5File.create(targetPath, Paths.get("/", archiveGid, id))
val h5Backend = H5Backend(metaEnv = metaInfo, h5File = h5file, closeFileOnFinishedParsing = false)
val backend = new ReindexBackend(h5Backend)
......
......@@ -16,8 +16,8 @@ class WriteToJsonResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e
override def processFileParsingResult(result: FileParsingResult): Unit = {
val id = result.task.calculationGid
val fileName = Paths.get(id + ".json")
val targetPath = outputLocation(result.treeTask).resolve(fileName)
Files.createDirectories(outputLocation(result.treeTask))
val targetPath = outputLocation(result.fileTree).resolve(fileName)
Files.createDirectories(outputLocation(result.fileTree))
val fileWriter = new FileWriter(targetPath.toFile)
try {
val backend = new JsonWriterBackend(metaInfoEnv = metaInfo, outF = fileWriter)
......
......@@ -8,32 +8,32 @@ import eu.nomad_lab.parsers.{ FinishedParsingSession, ParseEvent, StartedParsing
import eu.nomad_lab.{ CompactSha, TreeType }
sealed trait FileParsingTaskSignal {
val treeTask: FileTree
val fileTree: FileTree
}
sealed trait FileParsingResultSignal {
val treeTask: FileTree
val fileTree: FileTree
}
/**
* Request a detailed parsing of the given candidate calculation main file
* @param treeTask the file-tree scan request from which this parsing task was generated
* @param fileTree the file-tree scan request from which this parsing task was generated
* @param relativePath path inside the given file tree
* @param parserName name of the parser to use for processing the file
* @param extractedPath the path to the temporarily extracted main file (if originating from an
* archive)
*/
case class FileParsingTask(
treeTask: FileTree,
fileTree: FileTree,
relativePath: Path,
parserName: String,
extractedPath: Option[Path] = None
) extends FileParsingTaskSignal {
def mainFileUri: String = {
treeTask.treeType match {
fileTree.treeType match {
case TreeType.Zip => "nmd://" + relativePath.toString
case TreeType.Directory => treeTask.treeBasePath.resolve(relativePath).toAbsolutePath.toUri.toString
case TreeType.Directory => fileTree.treeBasePath.resolve(relativePath).toAbsolutePath.toUri.toString
case _ => "unknown://${treeTask.treeBasePath.resolve(relativePath).toAbsolutePath.toString}"
}
}
......@@ -47,7 +47,7 @@ case class FileParsingTask(
}
case class FileParsingSignalEndTree(
treeTask: FileTree,
fileTree: FileTree,
numParsingTasks: Long = 0
) extends FileParsingTaskSignal with FileParsingResultSignal
......@@ -58,7 +58,7 @@ sealed trait FileParsingResult extends FileParsingResultSignal {
val end: Option[FinishedParsingSession]
val error: Option[String]
val treeTask: FileTree = task.treeTask
val fileTree: FileTree = task.fileTree
}
case class InMemoryResult(
......
......@@ -35,19 +35,19 @@ class ArchiveCleanUpFlow(archiveHandler: ArchiveHandler)
override def onPush(): Unit = {
val input = grab(in)
input.treeTask.treeType match {
input.fileTree.treeType match {
case TreeType.Directory => push(out, input)
case TreeType.Zip =>
input match {
case x: FileParsingResult =>
processed(x.treeTask) = processed.getOrElse(x.treeTask, 0l) + 1l
processed(x.fileTree) = processed.getOrElse(x.fileTree, 0l) + 1l
case x: FileParsingSignalEndTree =>
expected(x.treeTask) = x.numParsingTasks
expected(x.fileTree) = x.numParsingTasks
}
if (processed.getOrElse(input.treeTask, -1) == expected.getOrElse(input.treeTask, -2)) {
archiveHandler.cleanUpExtractedArchive(input.treeTask.treeBasePath.toAbsolutePath)
processed.remove(input.treeTask)
expected.remove(input.treeTask)
if (processed.getOrElse(input.fileTree, -1) == expected.getOrElse(input.fileTree, -2)) {
archiveHandler.cleanUpExtractedArchive(input.fileTree.treeBasePath.toAbsolutePath)
processed.remove(input.fileTree)
expected.remove(input.fileTree)
}
push(out, input)
}
......
......@@ -37,19 +37,19 @@ class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler)
setHandler(in, new InHandler {
override def onPush(): Unit = {
val input = grab(in)
val path = input.treeTask.treeBasePath
val path = input.fileTree.treeBasePath
input match {
case _: FileParsingSignalEndTree if !activeTrees.contains(path) =>
fail("to be finished file tree was not registered")
case signal: FileParsingTask =>
if (!activeTrees.contains(path)) {
activeTrees(path) = signal.treeTask.treeType match {
activeTrees(path) = signal.fileTree.treeType match {
case TreeType.Directory => path
case TreeType.Zip => archiveHandler.extractZipArchive(path)
}
}
signal.treeTask.treeType match {
signal.fileTree.treeType match {
case TreeType.Directory => push(out, signal)
case TreeType.Zip =>
val tempPath = activeTrees(path).resolve(signal.relativePath)
......
......@@ -95,7 +95,7 @@ object MessageBuilders {
def withExtractedPath(newPath: Option[Path]) = copy(extractedPath = newPath)
def build() = FileParsingTask(
treeTask = fileTree,
fileTree = fileTree,
relativePath = relativePath,
parserName = parserName,
extractedPath = extractedPath
......@@ -114,7 +114,7 @@ object MessageBuilders {
def withTaskCount(count: Long) = copy(numTasks = count)
def build() = FileParsingSignalEndTree(
treeTask = fileTree,
fileTree = fileTree,
numParsingTasks = numTasks
)
}
......@@ -133,7 +133,7 @@ object MessageBuilders {
def withStartEvent(event: Option[StartedParsingSession]) = copy(start = event)
def withEvents(newEvents: Seq[ParseEvent]) = copy(events = newEvents)
def withFinishEvent(event: Option[FinishedParsingSession]) = copy(end = event)
def withFileTree(tree: FileTree) = copy(task = task.copy(treeTask = tree))
def withFileTree(tree: FileTree) = copy(task = task.copy(fileTree = tree))
def withRelativePath(newPath: Path) = copy(task = task.copy(relativePath = newPath))
def withRelativePath(newPath: String) = copy(task = task.copy(relativePath = Paths.get(newPath)))
def withErrorMessage(message: Option[String]) = copy(error = message)
......
......@@ -27,7 +27,7 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar with TestDat
)
def sampleInput(fileName: String) = FileParsingTask(
treeTask = sampleTreeScan,
fileTree = sampleTreeScan,
relativePath = Paths.get(fileName),
parserName = "dummyParser"
)
......
......@@ -49,7 +49,7 @@ class WriteToHDF5MergedResultsProcessorSpec extends WordSpec with Matchers with
val fileName = location.resolve(sampleTree.fileName)
assert(fileName.toFile.exists(), s"parsing results HDF5 file '$fileName' does not exist")
val expectedMainFileUris = inputs.map(x =>
x.treeTask.treeBasePath.resolve(x.relativePath).toUri.toString).to[mutable.Set]
x.fileTree.treeBasePath.resolve(x.relativePath).toUri.toString).to[mutable.Set]
validateHDF5(location, id, metaData, validateHDFContent(expectedMainFileUris))
}
......
......@@ -35,7 +35,7 @@ class WriteToHDF5ResultsProcessorSpec extends WordSpec with TestDataBuilders wit
val calcName = entry.calculationGid
val filePath = targetFolder.resolve(s"$calcName.h5")
assert(filePath.toFile.exists(), s"calculation output HDF5 '$filePath' does not exist")
val mainFileUri = entry.treeTask.treeBasePath.resolve(entry.relativePath).toUri.toString
val mainFileUri = entry.fileTree.treeBasePath.resolve(entry.relativePath).toUri.toString
validateHDF5(targetFolder, archiveId, metaData, validateHDFContent(mainFileUri),
Some(s"$calcName.h5"))
}
......
......@@ -24,7 +24,7 @@ object FileParsingTaskMatchers {
MatcherHelpers.propertyMatcher(
propertyName = "parent tree task",
expected = expectedValue,
test = (x: FileParsingTaskSignal) => x.treeTask
test = (x: FileParsingTaskSignal) => x.fileTree
)
def relativePath(expectedValue: Path): HavePropertyMatcher[FileParsingTask, Path] =
......@@ -64,7 +64,7 @@ object FileParsingResultMatchers {
MatcherHelpers.propertyMatcher(
propertyName = "parent tree task",
expected = expectedValue,
test = (x: FileParsingResultSignal) => x.treeTask
test = (x: FileParsingResultSignal) => x.fileTree
)
def status(expectedValue: ParseResult): HavePropertyMatcher[FileParsingResult, ParseResult] =
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment