Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
nomad-lab
nomad-lab-base
Commits
e3f6e579
Commit
e3f6e579
authored
Jul 25, 2018
by
Ihrig, Arvid Conrad (ari)
Browse files
Integrated Pipeline: renamed FileTreeScanTask to FileTree to better reflect the actual usage
parent
270b03cc
Changes
24
Hide whitespace changes
Inline
Side-by-side
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala
View file @
e3f6e579
...
...
@@ -27,7 +27,7 @@ import eu.nomad_lab.TreeType.TreeType
import
eu.nomad_lab.integrated_pipeline.Main.PipelineSettings
import
eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import
eu.nomad_lab.integrated_pipeline.io_integrations._
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingResultSignal
,
FileParsingTaskSignal
,
FileTreeParsingResult
,
FileTree
ScanTask
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingResultSignal
,
FileParsingTaskSignal
,
FileTreeParsingResult
,
FileTree
}
import
eu.nomad_lab.integrated_pipeline.stream_components._
import
eu.nomad_lab.meta.
{
KnownMetaInfoEnvs
,
MetaInfoEnv
}
import
eu.nomad_lab.parsers.AllParsers
...
...
@@ -58,7 +58,7 @@ object Main extends StrictLogging {
}
case
class
PipelineSettings
(
task
:
Option
[
FileTree
ScanTask
],
task
:
Option
[
FileTree
],
mode
:
OutputType
,
numWorkers
:
Int
,
treeType
:
TreeType
,
...
...
@@ -94,7 +94,7 @@ object Main extends StrictLogging {
case
numWorkersRe
(
count
)
=>
settings
.
copy
(
numWorkers
=
count
.
toInt
)
case
"--noConfigDump"
=>
settings
//already consumed by the calling function
case
x
if
args
.
isEmpty
&&
operation
==
OperationMode
.
Console
=>
settings
.
copy
(
task
=
Some
(
FileTree
ScanTask
(
Paths
.
get
(
x
).
toAbsolutePath
,
treeType
)))
settings
.
copy
(
task
=
Some
(
FileTree
(
Paths
.
get
(
x
).
toAbsolutePath
,
treeType
)))
case
_
=>
???
//TODO: show usage instructions
}
}
...
...
@@ -124,11 +124,11 @@ class Main {
GraphDSL
.
create
(
sinkSummaries
)
{
implicit
builder
=>
(
sink
)
=>
import
GraphDSL.Implicits._
val
treeParser
=
Flow
.
fromGraph
(
new
MessageProcessorFlow
[
FileTree
ScanTask
,
FileParsingTaskSignal
]
{
val
treeParser
=
Flow
.
fromGraph
(
new
MessageProcessorFlow
[
FileTree
,
FileParsingTaskSignal
]
{
override
val
stageName
=
"TreeParser"
override
val
processor
=
new
TreeScanner
{
override
val
eventListener
=
eventProcessor
override
def
createProcessor
(
task
:
FileTree
ScanTask
)
=
params
.
treeType
match
{
override
def
createProcessor
(
task
:
FileTree
)
=
params
.
treeType
match
{
case
TreeType
.
Zip
=>
new
ZipTreeParsingTaskGenerator
(
task
,
Main
.
parsers
)
case
TreeType
.
Directory
=>
new
DirectoryTreeParsingTaskGenerator
(
task
,
Main
.
parsers
)
}
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/ParsingResultsProcessingManager.scala
View file @
e3f6e579
...
...
@@ -80,7 +80,7 @@ trait ParsingResultsProcessingManager extends MessageProcessor[FileParsingResult
}
}
private
def
startFileTree
(
task
:
FileTree
ScanTask
)
:
Unit
=
{
private
def
startFileTree
(
task
:
FileTree
)
:
Unit
=
{
processedCounts
(
task
.
treeBasePath
)
=
0
failedCounts
(
task
.
treeBasePath
)
=
0
eventListener
.
processEvent
(
myId
,
ResultWriterEventStart
(
task
))
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/ParsingResultsProcessor.scala
View file @
e3f6e579
...
...
@@ -3,7 +3,7 @@ package eu.nomad_lab.integrated_pipeline
import
java.nio.file.Path
import
eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingResult
,
FileTree
ScanTask
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingResult
,
FileTree
}
trait
ParsingResultsProcessor
{
...
...
@@ -16,10 +16,10 @@ trait ParsingResultsProcessor {
* @param fileTree the processed file tree
* @return the folder where the generated results are located
*/
def
outputLocation
(
fileTree
:
FileTree
ScanTask
)
:
Path
def
outputLocation
(
fileTree
:
FileTree
)
:
Path
def
startProcessingTreeResults
(
treeTask
:
FileTree
ScanTask
)
:
Unit
def
startProcessingTreeResults
(
treeTask
:
FileTree
)
:
Unit
def
finishProcessingTreeResults
(
treeTask
:
FileTree
ScanTask
)
:
Unit
def
finishProcessingTreeResults
(
treeTask
:
FileTree
)
:
Unit
}
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/ParsingTaskGenerator.scala
View file @
e3f6e579
...
...
@@ -4,7 +4,7 @@ import java.io.InputStream
import
java.nio.file.Path
import
java.util.NoSuchElementException
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingTask
,
FileTree
ScanTask
,
TreeParserEventScanError
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingTask
,
FileTree
,
TreeParserEventScanError
}
import
eu.nomad_lab.parsers.
{
CandidateParser
,
ParserCollection
}
/**
...
...
@@ -48,7 +48,7 @@ trait ParsingTaskGenerator extends Iterator[Either[TreeParserEventScanError, Fil
parsers
.
scanFile
(
internalFilePath
,
minBuf
).
sorted
}
def
generateRequest
(
fileTreeRequest
:
FileTree
ScanTask
,
relativeFilePath
:
Path
,
def
generateRequest
(
fileTreeRequest
:
FileTree
,
relativeFilePath
:
Path
,
candidateParsers
:
Seq
[
CandidateParser
])
:
Option
[
FileParsingTask
]
=
{
require
(!
relativeFilePath
.
isAbsolute
,
"internal file paths must be relative to file tree root"
)
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/TreeScanner.scala
View file @
e3f6e579
...
...
@@ -9,9 +9,9 @@ import scala.annotation.tailrec
* zip-archive) for files that are potential calculations of interest for NOMAD and emits file
* parsing requests for these candidate files.
*/
trait
TreeScanner
extends
MessageProcessor
[
FileTree
ScanTask
,
FileParsingTaskSignal
]
{
trait
TreeScanner
extends
MessageProcessor
[
FileTree
,
FileParsingTaskSignal
]
{
protected
[
this
]
def
createProcessor
(
task
:
FileTree
ScanTask
)
:
ParsingTaskGenerator
protected
[
this
]
def
createProcessor
(
task
:
FileTree
)
:
ParsingTaskGenerator
protected
[
this
]
val
eventListener
:
EventListener
protected
[
this
]
val
eventReporterName
:
Option
[
String
]
=
None
...
...
@@ -19,10 +19,10 @@ trait TreeScanner extends MessageProcessor[FileTreeScanTask, FileParsingTaskSign
private
var
generator
:
Option
[
ParsingTaskGenerator
]
=
None
private
var
readyForInbound
=
true
private
var
request
:
Option
[
FileTree
ScanTask
]
=
None
private
var
request
:
Option
[
FileTree
]
=
None
private
var
taskCount
=
0
l
final
override
def
processSignal
(
in
:
FileTree
ScanTask
)
:
Unit
=
{
final
override
def
processSignal
(
in
:
FileTree
)
:
Unit
=
{
if
(
readyForInbound
)
{
eventListener
.
processEvent
(
myId
,
TreeParserEventStart
(
in
))
if
(
in
.
treeBasePath
.
toFile
.
exists
())
{
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/DirectoryTreeParsingTaskGenerator.scala
View file @
e3f6e579
...
...
@@ -5,14 +5,14 @@ import java.nio.file.Files
import
eu.nomad_lab.TreeType
import
eu.nomad_lab.integrated_pipeline.ParsingTaskGenerator
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingTask
,
FileTree
ScanTask
,
TreeParserEventScanError
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingTask
,
FileTree
,
TreeParserEventScanError
}
import
eu.nomad_lab.parsers.
{
CandidateParser
,
ParserCollection
}
import
scala.annotation.tailrec
import
scala.collection.JavaConverters._
import
scala.util.control.NonFatal
class
DirectoryTreeParsingTaskGenerator
(
request
:
FileTree
ScanTask
,
parserCollection
:
ParserCollection
)
class
DirectoryTreeParsingTaskGenerator
(
request
:
FileTree
,
parserCollection
:
ParserCollection
)
extends
ParsingTaskGenerator
{
require
(
request
.
treeType
==
TreeType
.
Directory
,
"file tree to process must be a directory"
)
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/WriteToHDF5MergedResultsProcessor.scala
View file @
e3f6e579
...
...
@@ -4,7 +4,7 @@ import java.nio.file.{ Files, Path, Paths }
import
eu.nomad_lab.
{
H5Lib
,
TreeType
}
import
eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingResult
,
FileTree
ScanTask
,
InMemoryResult
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingResult
,
FileTree
,
InMemoryResult
}
import
eu.nomad_lab.integrated_pipeline.
{
OutputType
,
ParsingResultsProcessor
}
import
eu.nomad_lab.meta.MetaInfoEnv
import
eu.nomad_lab.parsers.H5Backend.H5File
...
...
@@ -22,9 +22,9 @@ class WriteToHDF5MergedResultsProcessor(
metaInfo
:
MetaInfoEnv
)
extends
ParsingResultsProcessor
{
private
val
fileMap
:
mutable.Map
[
FileTree
ScanTask
,
H5File
]
=
mutable
.
Map
()
private
val
fileMap
:
mutable.Map
[
FileTree
,
H5File
]
=
mutable
.
Map
()
override
def
outputLocation
(
fileTree
:
FileTree
ScanTask
)
:
Path
=
{
override
def
outputLocation
(
fileTree
:
FileTree
)
:
Path
=
{
outputLocation
.
resolve
(
fileTree
.
prefixFolder
)
}
...
...
@@ -60,14 +60,14 @@ class WriteToHDF5MergedResultsProcessor(
override
val
outputType
:
OutputType
=
OutputType
.
HDF5
override
def
startProcessingTreeResults
(
treeTask
:
FileTree
ScanTask
)
:
Unit
=
{
override
def
startProcessingTreeResults
(
treeTask
:
FileTree
)
:
Unit
=
{
Files
.
createDirectories
(
outputLocation
(
treeTask
))
val
targetPath
=
outputLocation
(
treeTask
).
resolve
(
treeTask
.
fileName
)
val
h5file
=
H5File
.
create
(
targetPath
,
Paths
.
get
(
"/"
,
treeTask
.
archiveId
))
fileMap
(
treeTask
)
=
h5file
}
override
def
finishProcessingTreeResults
(
treeTask
:
FileTree
ScanTask
)
:
Unit
=
{
override
def
finishProcessingTreeResults
(
treeTask
:
FileTree
)
:
Unit
=
{
val
h5file
=
fileMap
.
remove
(
treeTask
)
//TODO: error handling
if
(
h5file
.
nonEmpty
)
{
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/WriteToHDF5ResultsProcessor.scala
View file @
e3f6e579
...
...
@@ -3,7 +3,7 @@ package eu.nomad_lab.integrated_pipeline.io_integrations
import
java.nio.file.
{
Files
,
Path
,
Paths
}
import
eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingResult
,
FileTree
ScanTask
,
InMemoryResult
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingResult
,
FileTree
,
InMemoryResult
}
import
eu.nomad_lab.integrated_pipeline.
{
OutputType
,
ParsingResultsProcessor
}
import
eu.nomad_lab.meta.MetaInfoEnv
import
eu.nomad_lab.parsers.H5Backend.H5File
...
...
@@ -34,11 +34,11 @@ class WriteToHDF5ResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e
override
val
outputType
:
OutputType
=
OutputType
.
HDF5
override
def
startProcessingTreeResults
(
treeTask
:
FileTree
ScanTask
)
:
Unit
=
()
override
def
startProcessingTreeResults
(
treeTask
:
FileTree
)
:
Unit
=
()
override
def
finishProcessingTreeResults
(
treeTask
:
FileTree
ScanTask
)
:
Unit
=
()
override
def
finishProcessingTreeResults
(
treeTask
:
FileTree
)
:
Unit
=
()
override
def
outputLocation
(
fileTree
:
FileTree
ScanTask
)
:
Path
=
{
override
def
outputLocation
(
fileTree
:
FileTree
)
:
Path
=
{
outputLocation
.
resolve
(
fileTree
.
prefixFolder
).
resolve
(
fileTree
.
archiveId
)
}
}
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/WriteToJsonResultsProcessor.scala
View file @
e3f6e579
...
...
@@ -4,7 +4,7 @@ import java.io.FileWriter
import
java.nio.file.
{
Files
,
Path
,
Paths
}
import
eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingResult
,
FileTree
ScanTask
,
InMemoryResult
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingResult
,
FileTree
,
InMemoryResult
}
import
eu.nomad_lab.integrated_pipeline.
{
OutputType
,
ParsingResultsProcessor
}
import
eu.nomad_lab.meta.MetaInfoEnv
import
eu.nomad_lab.parsers.JsonWriterBackend
...
...
@@ -35,11 +35,11 @@ class WriteToJsonResultsProcessor(outputLocation: Path, metaInfo: MetaInfoEnv) e
override
val
outputType
:
OutputType
=
OutputType
.
Json
override
def
startProcessingTreeResults
(
treeTask
:
FileTree
ScanTask
)
:
Unit
=
()
override
def
startProcessingTreeResults
(
treeTask
:
FileTree
)
:
Unit
=
()
override
def
finishProcessingTreeResults
(
treeTask
:
FileTree
ScanTask
)
:
Unit
=
()
override
def
finishProcessingTreeResults
(
treeTask
:
FileTree
)
:
Unit
=
()
override
def
outputLocation
(
fileTree
:
FileTree
ScanTask
)
:
Path
=
{
override
def
outputLocation
(
fileTree
:
FileTree
)
:
Path
=
{
outputLocation
.
resolve
(
fileTree
.
prefixFolder
).
resolve
(
fileTree
.
archiveId
)
}
}
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/io_integrations/ZipTreeParsingTaskGenerator.scala
View file @
e3f6e579
...
...
@@ -4,14 +4,14 @@ import java.nio.file.Paths
import
eu.nomad_lab.TreeType
import
eu.nomad_lab.integrated_pipeline.ParsingTaskGenerator
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingTask
,
FileTree
ScanTask
,
TreeParserEventScanError
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsingTask
,
FileTree
,
TreeParserEventScanError
}
import
eu.nomad_lab.parsers.
{
CandidateParser
,
ParserCollection
}
import
org.apache.commons.compress.archivers.zip.
{
ZipArchiveEntry
,
ZipFile
}
import
scala.annotation.tailrec
import
scala.util.control.NonFatal
class
ZipTreeParsingTaskGenerator
(
request
:
FileTree
ScanTask
,
parserCollection
:
ParserCollection
)
class
ZipTreeParsingTaskGenerator
(
request
:
FileTree
,
parserCollection
:
ParserCollection
)
extends
ParsingTaskGenerator
{
require
(
request
.
treeType
==
TreeType
.
Zip
,
"file tree to process must be a Zip archive"
)
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsingSignals.scala
View file @
e3f6e579
...
...
@@ -7,11 +7,11 @@ import eu.nomad_lab.parsers.{ FinishedParsingSession, ParseEvent, StartedParsing
import
eu.nomad_lab.
{
CompactSha
,
TreeType
}
sealed
trait
FileParsingTaskSignal
{
val
treeTask
:
FileTree
ScanTask
val
treeTask
:
FileTree
}
sealed
trait
FileParsingResultSignal
{
val
treeTask
:
FileTree
ScanTask
val
treeTask
:
FileTree
}
/**
...
...
@@ -23,7 +23,7 @@ sealed trait FileParsingResultSignal {
* archive)
*/
case
class
FileParsingTask
(
treeTask
:
FileTree
ScanTask
,
treeTask
:
FileTree
,
relativePath
:
Path
,
parserName
:
String
,
extractedPath
:
Option
[
Path
]
=
None
...
...
@@ -46,7 +46,7 @@ case class FileParsingTask(
}
case
class
FileParsingSignalEndTree
(
treeTask
:
FileTree
ScanTask
,
treeTask
:
FileTree
,
numParsingTasks
:
Long
=
0
)
extends
FileParsingTaskSignal
with
FileParsingResultSignal
...
...
@@ -57,7 +57,7 @@ sealed trait FileParsingResult extends FileParsingResultSignal {
val
end
:
Option
[
FinishedParsingSession
]
val
error
:
Option
[
String
]
val
treeTask
:
FileTree
ScanTask
=
task
.
treeTask
val
treeTask
:
FileTree
=
task
.
treeTask
}
case
class
InMemoryResult
(
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTree
ScanTask
.scala
→
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTree.scala
View file @
e3f6e579
...
...
@@ -6,17 +6,16 @@ import eu.nomad_lab.TreeType
import
scala.util.matching.Regex
object
FileTree
ScanTask
{
object
FileTree
{
val
rawDataArchive
:
Regex
=
"(R[A-z0-9_-]{28}).zip"
.
r
}
/**
*
Request a scan to identify all potential calculation files found under the given
file tree
root
* @param treeBasePath the root of the file-tree
*
abstract representation of a
file tree
* @param treeBasePath
path to
the root of the file-tree
* @param treeType type of the file tree, e.g. zip or directory
*/
case
class
FileTreeScanTask
(
case
class
FileTree
(
treeBasePath
:
Path
,
treeType
:
TreeType.TreeType
)
{
...
...
@@ -29,7 +28,7 @@ case class FileTreeScanTask(
def
archiveId
:
String
=
treeType
match
{
case
TreeType
.
Directory
=>
treeBasePath
.
getFileName
.
toString
case
TreeType
.
Zip
=>
treeBasePath
.
getFileName
.
toString
match
{
case
FileTree
ScanTask
.
rawDataArchive
(
id
)
=>
id
case
FileTree
.
rawDataArchive
(
id
)
=>
id
case
path
=>
throw
new
UnsupportedOperationException
(
s
"$path is not a valid rawdata archive name"
)
}
}
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeParsingResult.scala
View file @
e3f6e579
...
...
@@ -5,7 +5,7 @@ import java.nio.file.Path
import
eu.nomad_lab.integrated_pipeline.OutputType
case
class
FileTreeParsingResult
(
treeScanTask
:
FileTree
ScanTask
,
treeScanTask
:
FileTree
,
numCalculationsFound
:
Long
,
numParsingFailures
:
Long
,
outputLocation
:
Path
,
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/ProcessEvents.scala
View file @
e3f6e579
...
...
@@ -10,49 +10,49 @@ import eu.nomad_lab.parsers.ParseResult.ParseResult
*/
sealed
trait
TreeParserEvent
{
def
treeTask
:
FileTree
ScanTask
def
treeTask
:
FileTree
}
case
class
TreeParserEventStart
(
treeTask
:
FileTree
ScanTask
treeTask
:
FileTree
)
extends
TreeParserEvent
case
class
TreeParserEventCandidate
(
treeTask
:
FileTree
ScanTask
,
treeTask
:
FileTree
,
relativePath
:
Path
,
parser
:
String
)
extends
TreeParserEvent
case
class
TreeParserEventScanError
(
treeTask
:
FileTree
ScanTask
,
treeTask
:
FileTree
,
relativePath
:
Path
,
error
:
Throwable
)
extends
TreeParserEvent
case
class
TreeParserEventTreeFailure
(
treeTask
:
FileTree
ScanTask
,
treeTask
:
FileTree
,
error
:
Throwable
)
extends
TreeParserEvent
case
class
TreeParserEventEnd
(
treeTask
:
FileTree
ScanTask
,
treeTask
:
FileTree
,
numCandidates
:
Long
)
extends
TreeParserEvent
sealed
trait
CalculationParserEvent
{
def
treeTask
:
FileTree
ScanTask
def
treeTask
:
FileTree
def
relativePath
:
Path
def
parser
:
String
}
case
class
CalculationParserEventStart
(
treeTask
:
FileTree
ScanTask
,
treeTask
:
FileTree
,
relativePath
:
Path
,
parser
:
String
)
extends
CalculationParserEvent
case
class
CalculationParserEventEnd
(
treeTask
:
FileTree
ScanTask
,
treeTask
:
FileTree
,
relativePath
:
Path
,
parser
:
String
,
result
:
ParseResult
,
...
...
@@ -60,15 +60,15 @@ case class CalculationParserEventEnd(
)
extends
CalculationParserEvent
sealed
trait
ResultWriterEvent
{
def
treeTask
:
FileTree
ScanTask
def
treeTask
:
FileTree
}
case
class
ResultWriterEventStart
(
treeTask
:
FileTree
ScanTask
treeTask
:
FileTree
)
extends
ResultWriterEvent
case
class
ResultWriterEventResult
(
treeTask
:
FileTree
ScanTask
,
treeTask
:
FileTree
,
relativePath
:
Path
,
parser
:
String
,
result
:
ParseResult
,
...
...
@@ -76,7 +76,7 @@ case class ResultWriterEventResult(
)
extends
ResultWriterEvent
case
class
ResultWriterEventEnd
(
treeTask
:
FileTree
ScanTask
,
treeTask
:
FileTree
,
numCalculations
:
Long
,
numParsingFailures
:
Long
,
outputLocation
:
Path
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ArchiveCleanUpFlow.scala
View file @
e3f6e579
...
...
@@ -17,8 +17,8 @@ class ArchiveCleanUpFlow(archiveHandler: ArchiveHandler)
override
def
createLogic
(
inheritedAttributes
:
Attributes
)
=
new
GraphStageLogic
(
shape
)
{
private
val
processed
:
mutable.Map
[
FileTree
ScanTask
,
Long
]
=
mutable
.
Map
()
private
val
expected
:
mutable.Map
[
FileTree
ScanTask
,
Long
]
=
mutable
.
Map
()
private
val
processed
:
mutable.Map
[
FileTree
,
Long
]
=
mutable
.
Map
()
private
val
expected
:
mutable.Map
[
FileTree
,
Long
]
=
mutable
.
Map
()
private
def
fail
(
msg
:
String
)
:
Unit
=
{
failStage
(
new
IllegalArgumentException
(
msg
))
...
...
integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_end_to_end_tests/package.scala
View file @
e3f6e579
...
...
@@ -7,7 +7,7 @@ import eu.nomad_lab.TreeType.TreeType
import
eu.nomad_lab.h5.CalculationH5
import
eu.nomad_lab.integrated_pipeline.OutputType
import
eu.nomad_lab.integrated_pipeline.OutputType.OutputType
import
eu.nomad_lab.integrated_pipeline.messages.FileTree
ScanTask
import
eu.nomad_lab.integrated_pipeline.messages.FileTree
import
eu.nomad_lab.integrated_pipeline_tests._
import
eu.nomad_lab.meta.MetaInfoEnv
import
org.scalatest.Assertions.succeed
...
...
@@ -23,7 +23,7 @@ package object integrated_pipeline_end_to_end_tests extends TestDataBuilders {
val
outputType
:
OutputType
val
treeType
:
TreeType
val
sample
:
TestTreeData
lazy
val
treeTask
:
FileTree
ScanTask
=
createFileTreeScanRequest
(
sample
,
treeType
)
lazy
val
treeTask
:
FileTree
=
createFileTreeScanRequest
(
sample
,
treeType
)
lazy
val
dataRoot
:
String
=
treeTask
.
treeType
match
{
case
TreeType
.
Directory
=>
s
"src/test/resources/${sample.baseName.substring(0, 3)}/${sample.baseName}"
case
TreeType
.
Zip
=>
s
"src/test/resources/${sample.baseName.substring(0, 3)}/${sample.baseName}.zip"
...
...
@@ -73,7 +73,7 @@ package object integrated_pipeline_end_to_end_tests extends TestDataBuilders {
override
val
sample
:
TestTreeData
=
TestTreeData
(
"purely-imaginary"
,
Map
())
}
def
assertValidityOfGeneratedJsonFiles
(
sample
:
TestTreeData
,
treeTask
:
FileTree
ScanTask
,
def
assertValidityOfGeneratedJsonFiles
(
sample
:
TestTreeData
,
treeTask
:
FileTree
,
tmpResultsFolder
:
Path
,
metaInfo
:
MetaInfoEnv
)
:
Assertion
=
{
sample
.
candidateCalculationsWithParsers
.
foreach
{
entry
=>
val
task
=
aFileParsingTask
().
withTreeTask
(
treeTask
).
withRelativePath
(
entry
.
_1
).
build
()
...
...
@@ -94,7 +94,7 @@ package object integrated_pipeline_end_to_end_tests extends TestDataBuilders {
succeed
}
def
assertValidityOfGeneratedHDF5Files
(
sample
:
TestTreeData
,
treeTask
:
FileTree
ScanTask
,
def
assertValidityOfGeneratedHDF5Files
(
sample
:
TestTreeData
,
treeTask
:
FileTree
,
tmpResultsFolder
:
Path
,
metaInfo
:
MetaInfoEnv
)
:
Assertion
=
{
sample
.
candidateCalculationsWithParsers
.
foreach
{
entry
=>
val
task
=
aFileParsingTask
().
withTreeTask
(
treeTask
).
withRelativePath
(
entry
.
_1
).
build
()
...
...
@@ -113,7 +113,7 @@ package object integrated_pipeline_end_to_end_tests extends TestDataBuilders {
succeed
}
def
assertValidityOfGeneratedMergedHDF5File
(
sample
:
TestTreeData
,
treeTask
:
FileTree
ScanTask
,
def
assertValidityOfGeneratedMergedHDF5File
(
sample
:
TestTreeData
,
treeTask
:
FileTree
,
tmpResultsFolder
:
Path
,
metaInfo
:
MetaInfoEnv
)
:
Assertion
=
{
val
id
=
treeTask
.
archiveId
val
fileName
=
treeTask
.
fileName
...
...
integrated-pipeline/src/test/scala/eu/nomad_lab/integrated_pipeline_tests/Builders.scala
View file @
e3f6e579
...
...
@@ -33,7 +33,7 @@ trait TestDataBuilders {
def
aTreeParserEventScanError
()
=
BuilderTreeParserEventScanError
()
implicit
def
build
(
x
:
BuilderFileTreeScanTask
)
:
FileTree
ScanTask
=
x
.
build
()
implicit
def
build
(
x
:
BuilderFileTreeScanTask
)
:
FileTree
=
x
.
build
()
implicit
def
build
(
x
:
BuilderFileParsingTask
)
:
FileParsingTask
=
x
.
build
()
implicit
def
build
(
x
:
BuilderFileParsingSignalEndTree
)
:
FileParsingSignalEndTree
=
x
.
build
()
...
...
@@ -74,20 +74,20 @@ object MessageBuilders {
def
withBasePath
(
path
:
String
)
=
copy
(
treeBasePath
=
Paths
.
get
(
path
))
def
withTreeType
(
newType
:
TreeType
)
=
copy
(
treeType
=
newType
)
def
build
()
=
FileTree
ScanTask
(
def
build
()
=
FileTree
(
treeBasePath
=
treeBasePath
,
treeType
=
treeType
)
}
case
class
BuilderFileParsingTask
(
private
val
treeTask
:
FileTree
ScanTask
=
BuilderFileTreeScanTask
().
build
(),
private
val
treeTask
:
FileTree
=
BuilderFileTreeScanTask
().
build
(),
private
val
relativePath
:
Path
=
defaultPath
,
private
val
parserName
:
String
=
defaultParser
,
private
val
extractedPath
:
Option
[
Path
]
=
None
)
{
def
withTreeTask
(
task
:
FileTree
ScanTask
)
=
copy
(
treeTask
=
task
)
def
withTreeTask
(
task
:
FileTree
)
=
copy
(
treeTask
=
task
)
def
withRelativePath
(
path
:
Path
)
=
copy
(
relativePath
=
path
)
def
withRelativePath
(
path
:
String
)
=
copy
(
relativePath
=
Paths
.
get
(
path
))
def
withParserName
(
name
:
String
)
=
copy
(
parserName
=
name
)
...
...
@@ -102,11 +102,11 @@ object MessageBuilders {
}
case
class
BuilderFileParsingSignalEndTree
(
private
val
treeTask
:
FileTree
ScanTask
=
BuilderFileTreeScanTask
().
build
(),
private
val
treeTask
:
FileTree
=
BuilderFileTreeScanTask
().
build
(),
private
val
numTasks
:
Long
=
-
1
)
{
def
withTreeTask
(
task
:
FileTree
ScanTask
)
=
copy
(
treeTask
=
task
)
def
withTreeTask
(
task
:
FileTree
)
=
copy
(
treeTask
=
task
)
def
withBasePath
(
path
:
Path
)
=
copy
(
treeTask
=
treeTask
.
copy
(
treeBasePath
=
path
))
def
withBasePath
(
path
:
String
)
:
BuilderFileParsingSignalEndTree
=
withBasePath
(
Paths
.
get
(
path
))
def
withTreeType
(
newType
:
TreeType
)
=
copy
(
treeTask
=
treeTask
.
copy
(
treeType
=
newType
))
...
...
@@ -132,7 +132,7 @@ object MessageBuilders {
def
withStartEvent
(
event
:
Option
[
StartedParsingSession
])
=
copy
(
start
=
event
)
def
withEvents
(
newEvents
:
Seq
[
ParseEvent
])
=
copy
(
events
=
newEvents
)
def
withFinishEvent
(
event
:
Option
[
FinishedParsingSession
])
=
copy
(
end
=
event
)
def
withTreeTask
(
newTask
:
FileTree
ScanTask
)
=
copy
(
task
=
task
.
copy
(
treeTask
=
newTask
))
def
withTreeTask
(
newTask
:
FileTree
)
=
copy
(
task
=
task
.
copy
(
treeTask
=
newTask
))
def
withRelativePath
(
newPath
:
Path
)
=
copy
(
task
=
task
.
copy
(
relativePath
=
newPath
))
def
withRelativePath
(
newPath
:
String
)
=
copy
(
task
=
task
.
copy
(
relativePath
=
Paths
.
get
(
newPath
)))
def
withErrorMessage
(
message
:
Option
[
String
])
=
copy
(
error
=
message
)
...
...
@@ -153,10 +153,10 @@ object EventBuilders {
import
Defaults._
case
class
BuilderTreeParserEventStart
(
private
val
treeTask
:
FileTree
ScanTask
=
BuilderFileTreeScanTask
().
build
()
private
val
treeTask
:
FileTree
=
BuilderFileTreeScanTask
().
build
()
)
{
def
withTreeTask
(
task
:
FileTree
ScanTask
)
=
copy
(
treeTask
=
task
)
def
withTreeTask
(
task
:
FileTree
)
=
copy
(
treeTask
=
task
)
def
withBasePath
(
path
:
Path
)
=
copy
(
treeTask
=
treeTask
.
copy
(
treeBasePath
=
path
))
def
withBasePath
(
path
:
String
)
:
BuilderTreeParserEventStart
=
withBasePath
(
Paths
.
get
(
path
))
def
withTreeType
(
newType
:
TreeType
)
=
copy
(
treeTask
=
treeTask
.
copy
(
treeType
=
newType
))
...
...
@@ -167,12 +167,12 @@ object EventBuilders {
}
case
class
BuilderTreeParserEventCandidate
(
private
val
treeTask
:
FileTree
ScanTask
=
BuilderFileTreeScanTask
().
build
(),
private
val
treeTask
:
FileTree
=
BuilderFileTreeScanTask
().
build
(),
private
val
relativePath
:
Path
=
defaultPath
,
private
val
parser
:
String
=
defaultParser
)
{
def
withTreeTask
(
task
:
FileTree
ScanTask
)
=
copy
(
treeTask
=
task
)
def
withTreeTask
(
task
:
FileTree
)
=
copy
(
treeTask
=
task
)
def
withBasePath
(
path
:
Path
)
=
copy
(
treeTask
=
treeTask
.
copy
(
treeBasePath
=
path
))
def
withBasePath
(
path
:
String
)
:
BuilderTreeParserEventCandidate
=
withBasePath
(
Paths
.
get
(
path
))
def
withTreeType
(
newType
:
TreeType
)
=
copy
(
treeTask
=
treeTask
.
copy
(
treeType
=
newType
))
...
...
@@ -188,12 +188,12 @@ object EventBuilders {
}
case
class
BuilderTreeParserEventScanError
(
private
val
treeTask
:
FileTree
ScanTask
=
BuilderFileTreeScanTask
().
build
(),
private
val
treeTask
:
FileTree
=
BuilderFileTreeScanTask
().
build
(),
private
val
relativePath
:
Path
=
defaultPath
,
private
val
error
:
Throwable
=
defaultError
)
{