Commit 68765c54 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: merged HDF5 writer can be queried for the result output...

Integrated Pipeline: merged HDF5 writer can be queried for the result output location for a file tree
parent 27e1dc8e
package eu.nomad_lab.integrated_pipeline package eu.nomad_lab.integrated_pipeline
import java.nio.file.Path
import eu.nomad_lab.integrated_pipeline.OutputType.OutputType import eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, FileTreeScanTask } import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, FileTreeScanTask }
...@@ -9,6 +11,13 @@ trait ParsingResultsProcessor { ...@@ -9,6 +11,13 @@ trait ParsingResultsProcessor {
val outputType: OutputType val outputType: OutputType
/**
* obtain the folder where the result file(s) for the given file tree will be written
* @param fileTree the processed file tree
* @return the folder where the generated results are located
*/
def outputLocation(fileTree: FileTreeScanTask): Path
def startProcessingTreeResults(treeTask: FileTreeScanTask): Unit def startProcessingTreeResults(treeTask: FileTreeScanTask): Unit
def finishProcessingTreeResults(treeTask: FileTreeScanTask): Unit def finishProcessingTreeResults(treeTask: FileTreeScanTask): Unit
......
package eu.nomad_lab.integrated_pipeline.io_integrations package eu.nomad_lab.integrated_pipeline.io_integrations
import java.nio.file.{ Path, Paths } import java.nio.file.{ Files, Path, Paths }
import eu.nomad_lab.H5Lib import eu.nomad_lab.{ H5Lib, TreeType }
import eu.nomad_lab.integrated_pipeline.OutputType.OutputType import eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, FileTreeScanTask, InMemoryResult } import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingResult, FileTreeScanTask, InMemoryResult }
import eu.nomad_lab.integrated_pipeline.{ OutputType, ParsingResultsProcessor } import eu.nomad_lab.integrated_pipeline.{ OutputType, ParsingResultsProcessor }
...@@ -12,10 +12,22 @@ import eu.nomad_lab.parsers.{ H5Backend, ReindexBackend } ...@@ -12,10 +12,22 @@ import eu.nomad_lab.parsers.{ H5Backend, ReindexBackend }
import scala.collection.mutable import scala.collection.mutable
class WriteToHDF5MergedResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) extends ParsingResultsProcessor { /**
* Write parsing results to one HDF5 file per file tree.
* @param outputLocation target folder for generated HDF5s
* @param metaInfo meta-info to use when writing HDF5
*/
class WriteToHDF5MergedResultsProcessor(
outputLocation: Path,
metaInfo: MetaInfoEnv
) extends ParsingResultsProcessor {
private val fileMap: mutable.Map[FileTreeScanTask, H5File] = mutable.Map() private val fileMap: mutable.Map[FileTreeScanTask, H5File] = mutable.Map()
override def outputLocation(fileTree: FileTreeScanTask): Path = {
outputLocation.resolve(fileTree.prefixFolder)
}
override def processFileParsingResult(result: FileParsingResult): Unit = { override def processFileParsingResult(result: FileParsingResult): Unit = {
val id = result.task.calculationGid val id = result.task.calculationGid
val archiveFile = fileMap(result.task.treeTask) val archiveFile = fileMap(result.task.treeTask)
...@@ -49,10 +61,9 @@ class WriteToHDF5MergedResultsProcessor(outputLocation: Path, metaInfo: MetaInfo ...@@ -49,10 +61,9 @@ class WriteToHDF5MergedResultsProcessor(outputLocation: Path, metaInfo: MetaInfo
override val outputType: OutputType = OutputType.HDF5 override val outputType: OutputType = OutputType.HDF5
override def startProcessingTreeResults(treeTask: FileTreeScanTask): Unit = { override def startProcessingTreeResults(treeTask: FileTreeScanTask): Unit = {
val h5file = H5File.create( Files.createDirectories(outputLocation(treeTask))
outputLocation.resolve(treeTask.fileName), val targetPath = outputLocation(treeTask).resolve(treeTask.fileName)
Paths.get("/", treeTask.archiveId) val h5file = H5File.create(targetPath, Paths.get("/", treeTask.archiveId))
)
fileMap(treeTask) = h5file fileMap(treeTask) = h5file
} }
......
...@@ -36,4 +36,6 @@ class WriteToHDF5ResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e ...@@ -36,4 +36,6 @@ class WriteToHDF5ResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e
override def startProcessingTreeResults(treeTask: FileTreeScanTask): Unit = () override def startProcessingTreeResults(treeTask: FileTreeScanTask): Unit = ()
override def finishProcessingTreeResults(treeTask: FileTreeScanTask): Unit = () override def finishProcessingTreeResults(treeTask: FileTreeScanTask): Unit = ()
override def outputLocation(fileTree: FileTreeScanTask): Path = ??? // outputLocation FIXME
} }
...@@ -37,4 +37,6 @@ class WriteToJsonResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e ...@@ -37,4 +37,6 @@ class WriteToJsonResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e
override def startProcessingTreeResults(treeTask: FileTreeScanTask): Unit = () override def startProcessingTreeResults(treeTask: FileTreeScanTask): Unit = ()
override def finishProcessingTreeResults(treeTask: FileTreeScanTask): Unit = () override def finishProcessingTreeResults(treeTask: FileTreeScanTask): Unit = ()
override def outputLocation(fileTree: FileTreeScanTask): Path = ??? //FIXME
} }
...@@ -106,19 +106,13 @@ package object integrated_pipeline_end_to_end_tests extends TestDataBuilders { ...@@ -106,19 +106,13 @@ package object integrated_pipeline_end_to_end_tests extends TestDataBuilders {
def assertValidityOfGeneratedMergedHDF5File(sample: TestTreeData, treeTask: FileTreeScanTask, def assertValidityOfGeneratedMergedHDF5File(sample: TestTreeData, treeTask: FileTreeScanTask,
tmpResultsFolder: Path, metaInfo: MetaInfoEnv): Assertion = { tmpResultsFolder: Path, metaInfo: MetaInfoEnv): Assertion = {
val id = Paths.get(sample.baseName).getFileName.toString val id = treeTask.archiveId
val fileName = treeTask.treeType match { val fileName = treeTask.fileName
case TreeType.Directory => s"$id.h5" val targetFolder = tmpResultsFolder.resolve(treeTask.prefixFolder)
case TreeType.Zip => s"S${id.substring(1)}.h5" val location = targetFolder.resolve(fileName)
}
val archiveName = treeTask.treeType match {
case TreeType.Directory => None
case TreeType.Zip => Some(fileName)
}
val location = tmpResultsFolder.resolve(fileName)
assert(location.toFile.exists(), s"parsing results HDF5 file '$location' does not exist") assert(location.toFile.exists(), s"parsing results HDF5 file '$location' does not exist")
validateHDF5(tmpResultsFolder, id, metaInfo, validateHDF5(targetFolder, id, metaInfo,
checkMergedCalculationsHDFContent(sample, mutable.Set()), archiveName) checkMergedCalculationsHDFContent(sample, mutable.Set()), Some(fileName.toString))
succeed succeed
} }
......
...@@ -8,15 +8,13 @@ import eu.nomad_lab.integrated_pipeline.io_integrations.WriteToHDF5MergedResults ...@@ -8,15 +8,13 @@ import eu.nomad_lab.integrated_pipeline.io_integrations.WriteToHDF5MergedResults
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask } import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask }
import eu.nomad_lab.meta.KnownMetaInfoEnvs import eu.nomad_lab.meta.KnownMetaInfoEnvs
import eu.nomad_lab.{ H5Lib, JsonUtils, TreeType } import eu.nomad_lab.{ H5Lib, JsonUtils, TreeType }
import org.scalatest.WordSpec import org.scalatest.{ Matchers, WordSpec }
import scala.collection.mutable import scala.collection.mutable
class WriteToHDF5MergedResultsProcessorSpec extends WordSpec { class WriteToHDF5MergedResultsProcessorSpec extends WordSpec with Matchers with TestDataBuilders {
val sampleTree = FileTreeScanTask(Paths.get("/foo/bar"), TreeType.Directory)
val unusedPath = Paths.get("/tmp")
val sampleTree = aFileTreeScanTask().withBasePath(s"/foo/R${"x" * 28}.zip").withTreeType(TreeType.Directory).build()
val metaData = KnownMetaInfoEnvs.all val metaData = KnownMetaInfoEnvs.all
def validateHDFContent(expectedMainFileUris: mutable.Set[String])(calc: CalculationH5, mainFileUri: String): Unit = { def validateHDFContent(expectedMainFileUris: mutable.Set[String])(calc: CalculationH5, mainFileUri: String): Unit = {
...@@ -46,17 +44,31 @@ class WriteToHDF5MergedResultsProcessorSpec extends WordSpec { ...@@ -46,17 +44,31 @@ class WriteToHDF5MergedResultsProcessorSpec extends WordSpec {
inputs.foreach(x => writer.processFileParsingResult(createSuccessfulFileParsingResult(x))) inputs.foreach(x => writer.processFileParsingResult(createSuccessfulFileParsingResult(x)))
writer.finishProcessingTreeResults(sampleTree) writer.finishProcessingTreeResults(sampleTree)
val id = sampleTree.treeBasePath.getFileName.toString val id = sampleTree.archiveId
val fileName = s"$id.h5" val location = writer.outputLocation(sampleTree)
val location = tempDir.resolve(fileName) val fileName = location.resolve(sampleTree.fileName)
assert(location.toFile.exists(), s"parsing results HDF5 file '$location' does not exist") assert(fileName.toFile.exists(), s"parsing results HDF5 file '$fileName' does not exist")
val expectedMainFileUris = inputs.map(x => val expectedMainFileUris = inputs.map(x =>
x.treeTask.treeBasePath.resolve(x.relativePath).toUri.toString).to[mutable.Set] x.treeTask.treeBasePath.resolve(x.relativePath).toUri.toString).to[mutable.Set]
validateHDF5(tempDir, id, metaData, validateHDFContent(expectedMainFileUris)) validateHDF5(location, id, metaData, validateHDFContent(expectedMainFileUris))
} }
} }
}
"determining the output location" should {
"return the appropriate output location for a directory file tree" in {
val f = new WriteToHDF5MergedResultsProcessor(Paths.get("/non/existing/location"), metaData)
val fileTree = aFileTreeScanTask().withTreeType(TreeType.Directory).withBasePath("foo/bargus")
f.outputLocation(fileTree) should be(Paths.get("/non/existing/location/bar"))
}
"return the appropriate output location for a zip archive file tree" in {
val f = new WriteToHDF5MergedResultsProcessor(Paths.get("/non/existing/location"), metaData)
val fileTree = aFileTreeScanTask().withTreeType(TreeType.Zip).withBasePath(s"foo/R${"x" * 28}.zip")
f.outputLocation(fileTree) should be(Paths.get(s"/non/existing/location/Rxx"))
}
}
}
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment