Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
nomad-lab
nomad-lab-base
Commits
dc7b5163
Commit
dc7b5163
authored
Jun 12, 2018
by
Ihrig, Arvid Conrad (ari)
Browse files
Integrated Pipeline: renamed message case classes
parent
7e183445
Changes
13
Hide whitespace changes
Inline
Side-by-side
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala
View file @
dc7b5163
...
...
@@ -26,7 +26,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
import
com.typesafe.scalalogging.StrictLogging
import
eu.nomad_lab.QueueMessage.CalculationParserRequest
import
eu.nomad_lab.TreeType.TreeType
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Request
,
FileTreeScan
Summary
,
FileTreeScan
Request
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Task
,
FileTreeScan
Result
,
FileTreeScan
Task
}
import
eu.nomad_lab.integrated_pipeline.stream_components.
{
DirectoryTreeParserGraph
,
ZipTreeParserGraph
}
import
eu.nomad_lab.meta.KnownMetaInfoEnvs
import
eu.nomad_lab.parsers.AllParsers
...
...
@@ -107,9 +107,9 @@ class Main {
def
parseTreesGivenByCommandLine
(
params
:
Array
[
String
],
treeType
:
TreeType
)
:
Unit
=
{
val
source
=
createCommandLineSource
(
params
.
last
,
treeType
)
val
sinkRequests
:
Sink
[
FileParsing
Request
,
Future
[
Done
]]
=
Sink
.
foreach
(
x
=>
val
sinkRequests
:
Sink
[
FileParsing
Task
,
Future
[
Done
]]
=
Sink
.
foreach
(
x
=>
println
(
s
"received parsing request $x"
))
val
sinkSummaries
:
Sink
[
FileTreeScan
Summary
,
Future
[
Done
]]
=
Sink
.
ignore
val
sinkSummaries
:
Sink
[
FileTreeScan
Result
,
Future
[
Done
]]
=
Sink
.
ignore
val
graph
=
RunnableGraph
.
fromGraph
(
GraphDSL
.
create
(
sinkRequests
,
sinkSummaries
)((
_
,
_
))
{
implicit
builder
=>
(
sinkRequests
,
sinkSummaries
)
=>
import
GraphDSL.Implicits._
...
...
@@ -133,7 +133,7 @@ class Main {
private
def
createCommandLineSource
(
sourceName
:
String
,
treeType
:
TreeType
)
:
Source
[
FileTreeScan
Request
,
NotUsed
]
=
{
Source
.
single
(
FileTreeScan
Request
(
Paths
.
get
(
sourceName
).
toAbsolutePath
,
treeType
))
)
:
Source
[
FileTreeScan
Task
,
NotUsed
]
=
{
Source
.
single
(
FileTreeScan
Task
(
Paths
.
get
(
sourceName
).
toAbsolutePath
,
treeType
))
}
}
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsing
Request
.scala
→
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileParsing
Task
.scala
View file @
dc7b5163
...
...
@@ -11,7 +11,7 @@ import eu.nomad_lab.TreeType
* @param relativePath path inside the given file tree
* @param parserName name of the parser to use for processing the file
*/
case
class
FileParsing
Request
(
case
class
FileParsing
Task
(
fileTreeBasePath
:
Path
,
fileTreeType
:
TreeType.TreeType
,
relativePath
:
Path
,
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScan
Summary
.scala
→
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScan
Result
.scala
View file @
dc7b5163
...
...
@@ -7,8 +7,8 @@ package eu.nomad_lab.integrated_pipeline.messages
* @param request the original scan request
* @param numCandidates number of identified candidate calculations in the file tree
*/
case
class
FileTreeScan
Summary
(
request
:
FileTreeScan
Request
,
case
class
FileTreeScan
Result
(
request
:
FileTreeScan
Task
,
numCandidates
:
Long
)
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScan
Request
.scala
→
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/messages/FileTreeScan
Task
.scala
View file @
dc7b5163
...
...
@@ -10,7 +10,7 @@ import eu.nomad_lab.TreeType
* @param treeType type of the file tree, e.g. zip or directory
*/
case
class
FileTreeScan
Request
(
case
class
FileTreeScan
Task
(
treeBasePath
:
Path
,
treeType
:
TreeType.TreeType
)
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/CalculationParsingEngine.scala
View file @
dc7b5163
...
...
@@ -4,7 +4,7 @@ import java.nio.file.Paths
import
com.typesafe.scalalogging.StrictLogging
import
eu.nomad_lab.TreeType
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Request
,
FileParsingResult
,
InMemoryResult
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Task
,
FileParsingResult
,
InMemoryResult
}
import
eu.nomad_lab.meta.MetaInfoEnv
import
eu.nomad_lab.parsers._
import
org.json4s.JsonAST.
{
JArray
,
JObject
,
JString
}
...
...
@@ -22,7 +22,7 @@ import scala.collection.mutable.ListBuffer
*/
class
CalculationParsingEngine
(
parsers
:
ParserCollection
)(
implicit
metaInfo
:
MetaInfoEnv
)
extends
StrictLogging
{
private
def
getParser
(
request
:
FileParsing
Request
)
:
Option
[
OptimizedParser
]
=
{
private
def
getParser
(
request
:
FileParsing
Task
)
:
Option
[
OptimizedParser
]
=
{
parsers
.
parsers
.
get
(
request
.
parserName
).
map
(
_
.
optimizedParser
(
Seq
()))
}
...
...
@@ -52,7 +52,7 @@ class CalculationParsingEngine(parsers: ParserCollection)(implicit metaInfo: Met
* @param request the original parsing request
* @return the ParseEvents emitted during the parsing
*/
def
processRequest
(
request
:
FileParsing
Request
)
:
FileParsingResult
=
{
def
processRequest
(
request
:
FileParsing
Task
)
:
FileParsingResult
=
{
val
parser
=
getParser
(
request
)
parser
match
{
case
Some
(
parser
)
=>
...
...
@@ -64,7 +64,7 @@ class CalculationParsingEngine(parsers: ParserCollection)(implicit metaInfo: Met
}
}
private
def
failParseRequest
(
request
:
FileParsing
Request
,
reason
:
String
)
:
FileParsingResult
=
{
private
def
failParseRequest
(
request
:
FileParsing
Task
,
reason
:
String
)
:
FileParsingResult
=
{
val
end
=
FinishedParsingSession
(
Some
(
ParseResult
.
ParseFailure
),
JArray
(
List
(
JString
(
reason
))),
...
...
@@ -82,7 +82,7 @@ class CalculationParsingEngine(parsers: ParserCollection)(implicit metaInfo: Met
)
}
private
def
parseCalculationInDirectory
(
request
:
FileParsing
Request
,
parser
:
OptimizedParser
)
:
FileParsingResult
=
{
private
def
parseCalculationInDirectory
(
request
:
FileParsing
Task
,
parser
:
OptimizedParser
)
:
FileParsingResult
=
{
val
pathToMainFile
=
Paths
.
get
(
"."
)
val
buffer
=
new
BufferForBackend
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/DirectoryTreeParsingRequestGenerator.scala
View file @
dc7b5163
...
...
@@ -6,13 +6,13 @@ import java.util.NoSuchElementException
import
eu.nomad_lab.LocalEnv.Settings
import
eu.nomad_lab.TreeType
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Request
,
FileTreeScan
Request
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Task
,
FileTreeScan
Task
}
import
eu.nomad_lab.parsers.
{
CandidateParser
,
ParserCollection
}
import
scala.annotation.tailrec
import
scala.collection.JavaConverters._
class
DirectoryTreeParsingRequestGenerator
(
request
:
FileTreeScan
Request
)(
implicit
class
DirectoryTreeParsingRequestGenerator
(
request
:
FileTreeScan
Task
)(
implicit
config
:
Settings
,
parserCollection
:
ParserCollection
)
extends
ParsingRequestGenerator
{
...
...
@@ -20,13 +20,13 @@ class DirectoryTreeParsingRequestGenerator(request: FileTreeScanRequest)(implici
private
val
basePath
=
request
.
treeBasePath
private
val
fileIterator
=
Files
.
walk
(
basePath
).
iterator
().
asScala
.
filter
(
Files
.
isRegularFile
(
_
))
private
var
numEntries
=
0
l
private
var
nextRequest
:
Option
[
FileParsing
Request
]
=
findNextParsingCandidate
()
private
var
nextRequest
:
Option
[
FileParsing
Task
]
=
findNextParsingCandidate
()
override
def
getProcessedRequestCount
=
numEntries
override
def
hasNext
=
nextRequest
.
isDefined
override
def
next
()
:
FileParsing
Request
=
{
override
def
next
()
:
FileParsing
Task
=
{
val
toReturn
=
nextRequest
nextRequest
=
findNextParsingCandidate
()
if
(
toReturn
.
isDefined
)
{
...
...
@@ -37,7 +37,7 @@ class DirectoryTreeParsingRequestGenerator(request: FileTreeScanRequest)(implici
}
}
@tailrec
private
def
findNextParsingCandidate
()
:
Option
[
FileParsing
Request
]
=
{
@tailrec
private
def
findNextParsingCandidate
()
:
Option
[
FileParsing
Task
]
=
{
if
(
fileIterator
.
hasNext
)
{
val
file
=
fileIterator
.
next
()
val
internalFilePath
=
basePath
.
relativize
(
file
)
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ParsingRequestGenerator.scala
View file @
dc7b5163
...
...
@@ -2,19 +2,19 @@ package eu.nomad_lab.integrated_pipeline.stream_components
import
java.nio.file.Path
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Request
,
FileTreeScan
Request
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Task
,
FileTreeScan
Task
}
import
eu.nomad_lab.parsers.CandidateParser
trait
ParsingRequestGenerator
extends
Iterator
[
FileParsing
Request
]
{
trait
ParsingRequestGenerator
extends
Iterator
[
FileParsing
Task
]
{
def
getProcessedRequestCount
:
Long
def
generateRequest
(
fileTreeRequest
:
FileTreeScan
Request
,
relativeFilePath
:
Path
,
candidateParsers
:
Seq
[
CandidateParser
])
:
Option
[
FileParsing
Request
]
=
{
def
generateRequest
(
fileTreeRequest
:
FileTreeScan
Task
,
relativeFilePath
:
Path
,
candidateParsers
:
Seq
[
CandidateParser
])
:
Option
[
FileParsing
Task
]
=
{
require
(!
relativeFilePath
.
isAbsolute
,
"internal file paths must be relative to file tree root"
)
if
(
candidateParsers
.
nonEmpty
)
{
Some
(
FileParsing
Request
(
Some
(
FileParsing
Task
(
fileTreeBasePath
=
fileTreeRequest
.
treeBasePath
,
fileTreeType
=
fileTreeRequest
.
treeType
,
relativePath
=
relativeFilePath
,
...
...
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/TreeParserGraph.scala
View file @
dc7b5163
...
...
@@ -5,7 +5,7 @@ import java.io.InputStream
import
akka.stream._
import
akka.stream.stage.
{
GraphStage
,
GraphStageLogic
,
InHandler
,
OutHandler
}
import
eu.nomad_lab.LocalEnv.Settings
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Request
,
FileTreeScan
Request
,
FileTreeScan
Summary
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Task
,
FileTreeScan
Task
,
FileTreeScan
Result
}
import
eu.nomad_lab.parsers.
{
CandidateParser
,
ParserCollection
}
/**
...
...
@@ -30,19 +30,19 @@ object TreeParserGraph {
* calculation parsing requests generated for the file-tree). This information can be used to
* determine when all calculation parsing requests belonging to one file-tree have been processed.
*/
trait
TreeParserGraph
extends
GraphStage
[
FanOutShape2
[
FileTreeScan
Request
,
FileParsing
Request
,
FileTreeScan
Summary
]]
{
trait
TreeParserGraph
extends
GraphStage
[
FanOutShape2
[
FileTreeScan
Task
,
FileParsing
Task
,
FileTreeScan
Result
]]
{
val
in
=
Inlet
[
FileTreeScan
Request
](
"TreeParserGraph.in"
)
val
outRequests
=
Outlet
[
FileParsing
Request
](
"TreeParserGraph.outRequests"
)
val
outSummaries
=
Outlet
[
FileTreeScan
Summary
](
"TreeParserGraph.outSummaries"
)
val
in
=
Inlet
[
FileTreeScan
Task
](
"TreeParserGraph.in"
)
val
outRequests
=
Outlet
[
FileParsing
Task
](
"TreeParserGraph.outRequests"
)
val
outSummaries
=
Outlet
[
FileTreeScan
Result
](
"TreeParserGraph.outSummaries"
)
override
val
shape
=
new
FanOutShape2
(
in
,
outRequests
,
outSummaries
)
def
createRequestGenerator
(
request
:
FileTreeScan
Request
)
:
ParsingRequestGenerator
def
createRequestGenerator
(
request
:
FileTreeScan
Task
)
:
ParsingRequestGenerator
override
def
createLogic
(
inheritedAttributes
:
Attributes
)
=
new
GraphStageLogic
(
shape
)
{
private
var
generator
:
Option
[
ParsingRequestGenerator
]
=
None
private
var
currentRequest
:
Option
[
FileTreeScan
Request
]
=
None
private
var
currentRequest
:
Option
[
FileTreeScan
Task
]
=
None
setHandler
(
outRequests
,
new
OutHandler
{
override
def
onPull
()
:
Unit
=
{
...
...
@@ -89,7 +89,7 @@ trait TreeParserGraph extends GraphStage[FanOutShape2[FileTreeScanRequest, FileP
def
tryToPushSummary
()
:
Unit
=
{
if
(
generator
.
nonEmpty
&&
isAvailable
(
outSummaries
))
{
if
(!
generator
.
get
.
hasNext
)
{
push
(
outSummaries
,
FileTreeScan
Summary
(
currentRequest
.
get
,
generator
.
get
.
getProcessedRequestCount
))
push
(
outSummaries
,
FileTreeScan
Result
(
currentRequest
.
get
,
generator
.
get
.
getProcessedRequestCount
))
currentRequest
=
None
generator
=
None
if
(!
isClosed
(
in
))
...
...
@@ -105,9 +105,9 @@ trait TreeParserGraph extends GraphStage[FanOutShape2[FileTreeScanRequest, FileP
}
class
ZipTreeParserGraph
(
implicit
config
:
Settings
,
parserCollection
:
ParserCollection
)
extends
TreeParserGraph
{
override
def
createRequestGenerator
(
request
:
FileTreeScan
Request
)
=
new
ZipTreeParsingRequestGenerator
(
request
)
override
def
createRequestGenerator
(
request
:
FileTreeScan
Task
)
=
new
ZipTreeParsingRequestGenerator
(
request
)
}
class
DirectoryTreeParserGraph
(
implicit
config
:
Settings
,
parserCollection
:
ParserCollection
)
extends
TreeParserGraph
{
override
def
createRequestGenerator
(
request
:
FileTreeScan
Request
)
=
new
DirectoryTreeParsingRequestGenerator
(
request
)
override
def
createRequestGenerator
(
request
:
FileTreeScan
Task
)
=
new
DirectoryTreeParsingRequestGenerator
(
request
)
}
\ No newline at end of file
integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/ZipTreeParsingRequestGenerator.scala
View file @
dc7b5163
...
...
@@ -5,13 +5,13 @@ import java.util.NoSuchElementException
import
eu.nomad_lab.LocalEnv.Settings
import
eu.nomad_lab.TreeType
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Request
,
FileTreeScan
Request
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Task
,
FileTreeScan
Task
}
import
eu.nomad_lab.parsers.
{
CandidateParser
,
ParserCollection
}
import
org.apache.commons.compress.archivers.zip.
{
ZipArchiveEntry
,
ZipFile
}
import
scala.annotation.tailrec
class
ZipTreeParsingRequestGenerator
(
request
:
FileTreeScan
Request
)(
implicit
class
ZipTreeParsingRequestGenerator
(
request
:
FileTreeScan
Task
)(
implicit
config
:
Settings
,
parserCollection
:
ParserCollection
)
extends
ParsingRequestGenerator
{
...
...
@@ -20,7 +20,7 @@ class ZipTreeParsingRequestGenerator(request: FileTreeScanRequest)(implicit
private
val
zipFile
=
new
ZipFile
(
request
.
treeBasePath
.
toFile
)
private
val
zipEntries
:
java.util.Enumeration
[
ZipArchiveEntry
]
=
zipFile
.
getEntries
private
var
numEntries
=
0
l
private
var
nextRequest
:
Option
[
FileParsing
Request
]
=
findNextParsingCandidate
()
private
var
nextRequest
:
Option
[
FileParsing
Task
]
=
findNextParsingCandidate
()
override
def
getProcessedRequestCount
:
Long
=
numEntries
...
...
@@ -30,7 +30,7 @@ class ZipTreeParsingRequestGenerator(request: FileTreeScanRequest)(implicit
zipFile
.
close
()
}
override
def
next
()
:
FileParsing
Request
=
{
override
def
next
()
:
FileParsing
Task
=
{
val
toReturn
=
nextRequest
nextRequest
=
findNextParsingCandidate
()
if
(
toReturn
.
isDefined
)
{
...
...
@@ -41,7 +41,7 @@ class ZipTreeParsingRequestGenerator(request: FileTreeScanRequest)(implicit
}
}
@tailrec
private
def
findNextParsingCandidate
()
:
Option
[
FileParsing
Request
]
=
{
@tailrec
private
def
findNextParsingCandidate
()
:
Option
[
FileParsing
Task
]
=
{
if
(
zipEntries
.
hasMoreElements
)
{
val
zipEntry
:
ZipArchiveEntry
=
zipEntries
.
nextElement
()
val
internalFilePath
=
Paths
.
get
(
zipEntry
.
getName
)
...
...
integrated-pipeline/src/test/scala/integrated_pipeline_tests/CalculationParsingEngineSpec.scala
View file @
dc7b5163
...
...
@@ -3,7 +3,7 @@ package integrated_pipeline_tests
import
java.nio.file.Paths
import
eu.nomad_lab.TreeType
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Request
,
InMemoryResult
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Task
,
InMemoryResult
}
import
eu.nomad_lab.integrated_pipeline.stream_components.CalculationParsingEngine
import
eu.nomad_lab.meta.KnownMetaInfoEnvs
import
eu.nomad_lab.parsers.ParseResult.ParseResult
...
...
@@ -32,7 +32,7 @@ class CalculationParsingEngineSpec extends WordSpec with MockitoSugar {
implicit
val
metaInfo
=
KnownMetaInfoEnvs
.
all
val
sampleParseRequest
=
FileParsing
Request
(
val
sampleParseRequest
=
FileParsing
Task
(
fileTreeBasePath
=
Paths
.
get
(
"s/foo"
),
fileTreeType
=
TreeType
.
Directory
,
relativePath
=
Paths
.
get
(
"bar/gus.out"
),
...
...
integrated-pipeline/src/test/scala/integrated_pipeline_tests/TreeParserGraphSpec.scala
View file @
dc7b5163
...
...
@@ -6,7 +6,7 @@ import akka.stream._
import
akka.stream.scaladsl._
import
akka.stream.testkit.scaladsl._
import
eu.nomad_lab.TreeType
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Request
,
FileTreeScan
Request
,
FileTreeScan
Summary
}
import
eu.nomad_lab.integrated_pipeline.messages.
{
FileParsing
Task
,
FileTreeScan
Task
,
FileTreeScan
Result
}
import
eu.nomad_lab.integrated_pipeline.stream_components.
{
ParsingRequestGenerator
,
TreeParserGraph
}
import
org.mockito.Mockito._
import
org.mockito.invocation.InvocationOnMock
...
...
@@ -23,13 +23,13 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar {
*/
private
trait
GraphWithDummy
{
def
getGeneratorDummy
(
request
:
FileTreeScan
Request
)
:
ParsingRequestGenerator
=
{
def
getGeneratorDummy
(
request
:
FileTreeScan
Task
)
:
ParsingRequestGenerator
=
{
val
generator
=
mock
[
ParsingRequestGenerator
]
var
extracted
=
0
val
dummyIterator
=
Iterator
.
apply
(
FileParsing
Request
(
request
.
treeBasePath
,
request
.
treeType
,
Paths
.
get
(
"file1"
),
"fooParser"
),
FileParsing
Request
(
request
.
treeBasePath
,
request
.
treeType
,
Paths
.
get
(
"file2"
),
"fooParser"
),
FileParsing
Request
(
request
.
treeBasePath
,
request
.
treeType
,
Paths
.
get
(
"file3"
),
"fooParser"
)
FileParsing
Task
(
request
.
treeBasePath
,
request
.
treeType
,
Paths
.
get
(
"file1"
),
"fooParser"
),
FileParsing
Task
(
request
.
treeBasePath
,
request
.
treeType
,
Paths
.
get
(
"file2"
),
"fooParser"
),
FileParsing
Task
(
request
.
treeBasePath
,
request
.
treeType
,
Paths
.
get
(
"file3"
),
"fooParser"
)
)
when
(
generator
.
hasNext
).
thenAnswer
(
new
Answer
[
Boolean
]
{
def
answer
(
x
:
InvocationOnMock
)
=
dummyIterator
.
hasNext
...
...
@@ -37,21 +37,21 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar {
when
(
generator
.
getProcessedRequestCount
).
thenAnswer
(
new
Answer
[
Long
]
{
def
answer
(
x
:
InvocationOnMock
)
=
extracted
})
when
(
generator
.
next
()).
thenAnswer
(
new
Answer
[
FileParsing
Request
]
{
when
(
generator
.
next
()).
thenAnswer
(
new
Answer
[
FileParsing
Task
]
{
def
answer
(
x
:
InvocationOnMock
)
=
{
extracted
+=
1
;
dummyIterator
.
next
()
}
})
generator
}
private
val
testInput
=
TestSource
.
probe
[
FileTreeScan
Request
]
private
val
testRequests
=
TestSink
.
probe
[
FileParsing
Request
]
private
val
testSummaries
=
TestSink
.
probe
[
FileTreeScan
Summary
]
private
val
testInput
=
TestSource
.
probe
[
FileTreeScan
Task
]
private
val
testRequests
=
TestSink
.
probe
[
FileParsing
Task
]
private
val
testSummaries
=
TestSink
.
probe
[
FileTreeScan
Result
]
val
testGraph
=
RunnableGraph
.
fromGraph
(
GraphDSL
.
create
(
testInput
,
testRequests
,
testSummaries
)((
_
,
_
,
_
))
{
implicit
builder
=>
(
sourceInput
,
sinkRequests
,
sinkSummaries
)
=>
import
GraphDSL.Implicits._
val
treeParser
=
builder
.
add
(
new
TreeParserGraph
{
override
def
createRequestGenerator
(
request
:
FileTreeScan
Request
)
:
ParsingRequestGenerator
=
{
override
def
createRequestGenerator
(
request
:
FileTreeScan
Task
)
:
ParsingRequestGenerator
=
{
getGeneratorDummy
(
request
)
}
})
...
...
@@ -63,7 +63,7 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar {
}
)
def
sampleInput
(
baseName
:
String
)
=
FileTreeScan
Request
(
def
sampleInput
(
baseName
:
String
)
=
FileTreeScan
Task
(
treeBasePath
=
Paths
.
get
(
s
"/foo/$baseName"
),
treeType
=
TreeType
.
Zip
)
...
...
@@ -78,9 +78,9 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar {
probeRequests
.
ensureSubscription
().
request
(
3
)
probeInput
.
sendNext
(
input
)
val
expectedRequests
=
immutable
.
Seq
(
FileParsing
Request
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file1"
),
"fooParser"
),
FileParsing
Request
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file2"
),
"fooParser"
),
FileParsing
Request
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file3"
),
"fooParser"
)
FileParsing
Task
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file1"
),
"fooParser"
),
FileParsing
Task
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file2"
),
"fooParser"
),
FileParsing
Task
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file3"
),
"fooParser"
)
)
probeRequests
.
expectNextUnorderedN
(
expectedRequests
)
}
...
...
@@ -91,7 +91,7 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar {
probeRequests
.
ensureSubscription
().
request
(
3
)
probeSummaries
.
ensureSubscription
().
request
(
1
)
probeInput
.
sendNext
(
input
)
probeSummaries
.
expectNext
(
FileTreeScan
Summary
(
input
,
3
l
))
probeSummaries
.
expectNext
(
FileTreeScan
Result
(
input
,
3
l
))
}
"emit all remaining values after the upstream element completed"
in
new
GraphWithDummy
{
...
...
@@ -129,12 +129,12 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar {
probeSummaries
.
ensureSubscription
().
request
(
2
)
probeInput
.
sendNext
(
input1
).
sendNext
(
input2
)
probeRequests
.
expectNextN
(
immutable
.
Seq
(
FileParsing
Request
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file1"
),
"fooParser"
),
FileParsing
Request
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file2"
),
"fooParser"
),
FileParsing
Request
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file3"
),
"fooParser"
),
FileParsing
Request
(
Paths
.
get
(
"/foo/archive2"
),
TreeType
.
Zip
,
Paths
.
get
(
"file1"
),
"fooParser"
),
FileParsing
Request
(
Paths
.
get
(
"/foo/archive2"
),
TreeType
.
Zip
,
Paths
.
get
(
"file2"
),
"fooParser"
),
FileParsing
Request
(
Paths
.
get
(
"/foo/archive2"
),
TreeType
.
Zip
,
Paths
.
get
(
"file3"
),
"fooParser"
)
FileParsing
Task
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file1"
),
"fooParser"
),
FileParsing
Task
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file2"
),
"fooParser"
),
FileParsing
Task
(
Paths
.
get
(
"/foo/archive1"
),
TreeType
.
Zip
,
Paths
.
get
(
"file3"
),
"fooParser"
),
FileParsing
Task
(
Paths
.
get
(
"/foo/archive2"
),
TreeType
.
Zip
,
Paths
.
get
(
"file1"
),
"fooParser"
),
FileParsing
Task
(
Paths
.
get
(
"/foo/archive2"
),
TreeType
.
Zip
,
Paths
.
get
(
"file2"
),
"fooParser"
),
FileParsing
Task
(
Paths
.
get
(
"/foo/archive2"
),
TreeType
.
Zip
,
Paths
.
get
(
"file3"
),
"fooParser"
)
))
}
...
...
@@ -145,7 +145,7 @@ class TreeParserGraphSpec extends WordSpec with MockitoSugar {
probeRequests
.
ensureSubscription
().
request
(
6
)
probeSummaries
.
ensureSubscription
().
request
(
2
)
probeInput
.
sendNext
(
input1
).
sendNext
(
input2
)
probeSummaries
.
expectNext
(
FileTreeScan
Summary
(
input1
,
3
l
),
FileTreeScan
Summary
(
input2
,
3
l
))
probeSummaries
.
expectNext
(
FileTreeScan
Result
(
input1
,
3
l
),
FileTreeScan
Result
(
input2
,
3
l
))
}
"emit all remaining values after the upstream element completed"
in
new
GraphWithDummy
{
...
...
integrated-pipeline/src/test/scala/integrated_pipeline_tests/TreeParsingRequestGeneratorSpec.scala
View file @
dc7b5163
...
...
@@ -2,7 +2,7 @@ package integrated_pipeline_tests
import
eu.nomad_lab.QueueMessage.TreeParserRequest
import
eu.nomad_lab.TreeType.TreeType
import
eu.nomad_lab.integrated_pipeline.messages.FileTreeScan
Request
import
eu.nomad_lab.integrated_pipeline.messages.FileTreeScan
Task
import
eu.nomad_lab.integrated_pipeline.stream_components.
{
DirectoryTreeParsingRequestGenerator
,
ParsingRequestGenerator
,
ZipTreeParsingRequestGenerator
}
import
eu.nomad_lab.parsers.AllParsers
import
eu.nomad_lab.
{
LocalEnv
,
TreeType
}
...
...
@@ -14,7 +14,7 @@ class TreeParsingRequestGeneratorSpec extends WordSpec {
private
implicit
val
parsers
=
AllParsers
.
defaultParserCollection
case
class
TestGenerator
(
generator
:
(
FileTreeScan
Request
)
=>
ParsingRequestGenerator
,
treeType
:
TreeType
generator
:
(
FileTreeScan
Task
)
=>
ParsingRequestGenerator
,
treeType
:
TreeType
)
"a ZipTreeParsingRequestGenerator"
when
{
...
...
integrated-pipeline/src/test/scala/integrated_pipeline_tests/package.scala
View file @
dc7b5163
...
...
@@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import
akka.stream.ActorMaterializer
import
eu.nomad_lab.TreeType
import
eu.nomad_lab.TreeType.TreeType
import
eu.nomad_lab.integrated_pipeline.messages.FileTreeScan
Request
import
eu.nomad_lab.integrated_pipeline.messages.FileTreeScan
Task
import
eu.nomad_lab.parsers.
{
Cp2kParser
,
FhiAimsParser
,
VaspRunParser
}
package
object
integrated_pipeline_tests
{
...
...
@@ -46,19 +46,19 @@ package object integrated_pipeline_tests {
"RQX1tgMCwKvq1nPT9wZeVwxi_oLSV/data/second_example/598004ae7a4aed90672fefaa/vasprun.xml"
->
VaspRunParser
.
name
))
def
createFileTreeScanRequest
(
archive
:
TestTreeData
,
mode
:
TreeType
)
:
FileTreeScan
Request
=
{
def
createFileTreeScanRequest
(
archive
:
TestTreeData
,
mode
:
TreeType
)
:
FileTreeScan
Task
=
{
val
baseDir
=
sys
.
props
.
get
(
"user.dir"
).
get
val
prefix
=
archive
.
baseName
.
substring
(
0
,
3
)
mode
match
{
case
TreeType
.
Zip
=>
FileTreeScan
Request
(
case
TreeType
.
Zip
=>
FileTreeScan
Task
(
treeBasePath
=
Paths
.
get
(
s
"$baseDir/src/test/resources/$prefix/${archive.baseName}.zip"
),
treeType
=
mode
)
case
TreeType
.
Directory
=>
FileTreeScan
Request
(
case
TreeType
.
Directory
=>
FileTreeScan
Task
(
treeBasePath
=
Paths
.
get
(
s
"$baseDir/src/test/resources/$prefix/${archive.baseName}"
),
treeType
=
mode
)
case
_
=>
FileTreeScan
Request
(
case
_
=>
FileTreeScan
Task
(
treeBasePath
=
Paths
.
get
(
s
"$baseDir/src/test/resources/$prefix/${archive.baseName}"
),
treeType
=
mode
)
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment