Commit cd982cf8 authored by Mohamed, Fawzi Roberto (fawzi)'s avatar Mohamed, Fawzi Roberto (fawzi)
Browse files

Arvid's work in the integrated pipeline

parents 1bfe4616 e387aede
Pipeline #35379 failed with stages
in 50 minutes and 37 seconds
......@@ -35,7 +35,8 @@ val loggingLibs = {
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.10.0",
"org.apache.logging.log4j" % "log4j-api" % "2.10.0",
"org.apache.logging.log4j" % "log4j-core" % "2.10.0",
"org.apache.logging.log4j" % "log4j-1.2-api" % "2.10.0")
"org.apache.logging.log4j" % "log4j-1.2-api" % "2.10.0",
"org.apache.logging.log4j" % "log4j-api-scala_2.11" % "11.0")
scalalogLib +: log4j2Libs
}
val commonsLoggingBridgeLib = "org.apache.logging.log4j" % "log4j-jcl" % "2.5"
......@@ -44,8 +45,9 @@ val commonsLoggingBridgeLib = "org.apache.logging.log4j" % "log4j-jcl" % "2.
val specs2Lib = "org.specs2" %% "specs2-core" % "2.3.11"
val scalacheckLib = "org.scalacheck" %% "scalacheck" % "1.12.4"
val scalatestLib = "org.scalatest" % "scalatest_2.11" % "3.0.1"
val mockitoLib = "org.mockito" % "mockito-core" % "2.18.3"
val testLibs = {
Seq(specs2Lib % "test", scalacheckLib % "test", scalatestLib % "test")
Seq(specs2Lib % "test", scalacheckLib % "test", scalatestLib % "test", mockitoLib % Test)
}
val fullTestLibs = {
......@@ -116,20 +118,32 @@ lazy val sprayLibs = {
"com.typesafe.akka" %% "akka-testkit" % akkaV % "test")
}
lazy val akkaV = "2.5.3"
lazy val akkaStreamLibs = {
Seq(
"com.typesafe.akka" %% "akka-stream" % akkaV
)
}
lazy val akkaStreamTestLibs = {
Seq(
"com.typesafe.akka" %% "akka-stream-testkit" % akkaV % Test
)
}
lazy val akkaHttpLibs = {
val akkaV = "2.5.3"
val akkaHttpV = "10.0.9"
val akkaSwaggerV = "0.13.0"
Seq(
"com.typesafe.akka" %% "akka-http" % akkaHttpV,
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpV,
"com.typesafe.akka" %% "akka-http-xml" % akkaHttpV,
"com.typesafe.akka" %% "akka-stream" % akkaV,
"com.github.swagger-akka-http" %% "swagger-akka-http" % akkaSwaggerV//,
//"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpV % Test,
//"org.scalatest" %% "scalatest" % "3.0.1" % Test
)
) ++ akkaStreamLibs
}
val akkaRabbitmqResolver = "The New Motion Public Repo" at "http://nexus.thenewmotion.com/content/groups/public/"
......@@ -1504,6 +1518,11 @@ lazy val integratedpipeline = (project in file("integrated-pipeline")).
libraryDependencies += akkaRabbitmq,
libraryDependencies += rabbitmqLib,
libraryDependencies ++= sprayLibs,
libraryDependencies ++= akkaStreamLibs,
libraryDependencies ++= akkaStreamTestLibs,
libraryDependencies --= testLibs,
libraryDependencies ++= Seq(scalatestLib % Test, mockitoLib % Test),
testOptions in Test += Tests.Argument("-oD"),
docker := { (docker dependsOn assembly).value },
imageNames in docker := Seq(
// Sets a name with a tag that contains the project version; namespace/repository:tag
......
......@@ -161,6 +161,10 @@ nomad_lab {
pidEndpoint = "http://localhost:8111/repo/utility/pids?gid=\"${archiveGid}\""
}
integrated_pipeline {
//TODO: give meaningful defaults here
targetDirectory = "."
numWorkers = 1
inFile = ""
treeparserMinutesOfRuntime = -1
}
......
......@@ -240,6 +240,19 @@ object LocalEnv extends StrictLogging {
}
}
def defaultConfig(verbose: Boolean): Config = {
LocalEnv.synchronized {
privateConfig match {
case None =>
val newConf = loadConfiguration(verbose = verbose)
privateConfig = Some(newConf)
newConf
case Some(conf) =>
conf
}
}
}
def defaultConfig_=(newValue: Config): Unit = {
LocalEnv.synchronized {
privateConfig match {
......
package eu.nomad_lab.integrated_pipeline
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingSignal, TreeScanSignal }
class ArchiveCleanUp(zipHandler: RawDataArchiveHandler) extends MessageProcessor[FileParsingSignal, FileParsingSignal] {
private var toEmit: Option[FileParsingSignal] = None
override def processSignal(in: FileParsingSignal): Unit = {
if (toEmit.isEmpty) {
in.fileTree.treeType match {
case TreeType.Directory => ()
case TreeType.Zip => zipHandler.processFileParsingSignal(in)
}
toEmit = Some(in)
} else {
throw new IllegalStateException("must fetch all outbound signals before next inbound signal")
}
}
override def hasSignalToEmit(): Boolean = toEmit.nonEmpty
override def getNextSignalToEmit(): FileParsingSignal = {
if (toEmit.nonEmpty) {
val signal = toEmit.get
toEmit = None
signal
} else {
throw new IllegalStateException("no signal ready to be fetched")
}
}
override def requiresMoreMessages: Boolean = false
}
package eu.nomad_lab.integrated_pipeline
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.messages.{ CandidateFound, TreeScanCompleted, TreeScanSignal }
class ArchiveUnpacker(zipHandler: RawDataArchiveHandler) extends MessageProcessor[TreeScanSignal, TreeScanSignal] {
private var toEmit: Option[TreeScanSignal] = None
override def processSignal(in: TreeScanSignal): Unit = {
if (toEmit.isEmpty) {
toEmit = in.fileTree.treeType match {
case TreeType.Directory => Some(in)
case TreeType.Zip =>
val extractedPath = zipHandler.processTreeScanSignal(in)
in match {
case signal: CandidateFound => Some(signal.copy(extractedPath = extractedPath))
case signal: TreeScanCompleted => Some(signal)
}
}
} else {
throw new IllegalStateException("must fetch all outbound signals before next inbound signal")
}
}
override def hasSignalToEmit(): Boolean = toEmit.nonEmpty
override def getNextSignalToEmit(): TreeScanSignal = {
if (toEmit.nonEmpty) {
val signal = toEmit.get
toEmit = None
signal
} else {
throw new IllegalStateException("no signal ready to be fetched")
}
}
override def requiresMoreMessages: Boolean = false
}
/*
Copyright 2016-2017 The NOMAD Developers Group
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eu.nomad_lab.integrated_pipeline
import akka.actor._
import com.typesafe.scalalogging.StrictLogging
import eu.nomad_lab.QueueMessage.CalculationParserRequest
import eu.nomad_lab.parsers
import eu.nomad_lab.JsonSupport
import eu.nomad_lab.parsing_queue.CalculationParser
import eu.nomad_lab.LocalEnv
import scala.collection.mutable
import scala.concurrent.duration._
import java.lang.Thread
/**
* actor that keeps all the parser threads
*/
class CalculationParserMaster() extends Actor with StrictLogging {
val maxWorker: Int = LocalEnv.defaultConfig.getInt("nomad_lab.parser_worker_rabbitmq.numberOfWorkers")
val statsCollector: ActorRef = context.parent
var lastRequestFinishedTime: Option[java.util.Date] = None
var lastRequest: Option[CalculationParserRequest] = None
val workersStatus: mutable.Map[String, TaskStarted] = mutable.Map()
var nrFilesParsed = 0
var activeWorkers = 0
def updateParsingStats(stats: CalculationParser.StatsValues): Unit = {
statsCollector ! stats
}
import context._
val workers = for (i <- 1 to maxWorker) yield {
val worker = new ParserWorker(i, PipelineManager.blockingQueue, updateParsingStats: (CalculationParser.StatsValues) => Unit, context.self)
val thread = new Thread(worker, "calculationParser-" + i.toString)
thread.start()
(worker -> thread)
}
activeWorkers = maxWorker
logger.info(s"Created ${workers.size} workers.")
def receive = {
case Die =>
//statsCollector ! TreeParserGroupProcessed
logger.info("Manager will die...")
for ((worker, thread) <- workers)
if (thread.isAlive())
logger.warn(s"Stopping manager while worker thread ${thread.getName} is still alive")
context.stop(self)
case TaskStarted(request, lastRequestReceivedTime) =>
workersStatus += sender().path.name -> TaskStarted(request, lastRequestReceivedTime)
lastRequest = Some(request)
logger.info(s"Received message from ParserActor: Parsing Started. ${JsonSupport.writePrettyStr(request)}, $lastRequestReceivedTime")
case TaskFinished(result) =>
workersStatus -= sender().path.name
lastRequestFinishedTime = Some(new java.util.Date)
nrFilesParsed += 1
statsCollector ! TaskFinished(result)
logger.info(s"Received message from ParserActor: Parsing Done. $result, $lastRequestFinishedTime")
// write normalization queue
case MasterStatusCheck =>
sender() ! MasterStatusReport(workersStatus.size, nrFilesParsed, lastRequestFinishedTime.getOrElse(new java.util.Date))
case WorkerStopped(w) =>
activeWorkers -= 1
logger.info(s"Manager detected worker stop, active workers: $activeWorkers")
if (activeWorkers == 0) {
logger.info(s"Manager detected that all workers stopped, perparing to kill")
context.system.scheduler.scheduleOnce(30.seconds, statsCollector, Die)
context.system.scheduler.scheduleOnce(20.seconds, self, Die)
}
}
}
package eu.nomad_lab.integrated_pipeline
import java.nio.file.Path
import com.typesafe.scalalogging.StrictLogging
import eu.nomad_lab.JsonSupport.formats
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.messages._
import eu.nomad_lab.meta.MetaInfoEnv
import eu.nomad_lab.parsers._
import scala.collection.mutable.ListBuffer
/**
* Parsing engine that runs the actual parsing process for a single request. The information
* extracted by the parser is stored as a list of parsing events. These parsing events can then be
* fed into other backends, like an HDF5-writer.
* TODO: event streams exceeding a size limit should be stored on disk to ensure that the memory
* usage remains bounded
* @param parsers the to be used parsers
* @param metaInfo the NOMAD meta information the parser can extract
* @param eventListener processor for event notifications (logging etc)
* @param name name of this object when registering to the eventListener
*/
class CalculationParsingEngine(
private val parsers: ParserCollection,
private val metaInfo: MetaInfoEnv,
private val eventListener: EventListener,
private val name: Option[String]
) extends StrictLogging {
val myId = eventListener.registerReporter(this, name)
private def getParser(request: CandidateFound): Option[OptimizedParser] = {
parsers.parsers.get(request.parserName).map(_.optimizedParser(Seq()))
}
/**
* stores the events emitted by the real backend
*/
private class BufferForBackend {
val events = new ListBuffer[ParseEvent]
var startEvent: Option[StartedParsingSession] = None
var endEvent: Option[FinishedParsingSession] = None
def handleStartAndEndParsing(event: ParseEvent): Unit = {
event match {
case ev: StartedParsingSession if startEvent.isEmpty => startEvent = Some(ev)
case ev: FinishedParsingSession if endEvent.isEmpty => endEvent = Some(ev)
case x => logger.error(s"received unexpected event $x")
}
}
def handleParseEvents(event: ParseEvent): Unit = events.append(event)
}
/**
* Process the given parsing task signal. Parsing tasks will trigger a parsing run on the
* specified file and return the collected events. Other signals will be forwarded directly.
* Exceptions thrown during parsing will be handled gracefully and indicated in the parsing
* results.
* @param task the original parsing task signal
* @return the generated parsing results signal
*/
def processSignal(task: TreeScanSignal): FileParsingSignal = {
task match {
case signal: TreeScanCompleted => signal
case task: CandidateFound =>
eventListener.processEvent(myId, CalculationParserEventStart(
task.fileTree, task.relativePath, task.parserName
))
val pathToMainFile = task.fileTree.treeType match {
case TreeType.Directory => Right(task.fileTree.treeBasePath.resolve(task.relativePath))
case TreeType.Zip => task.extractedPath match {
case Some(path) => Right(path)
case None => Left("extracted file path is missing in the request")
}
case x => Left(s"requests with file tree type $x are not supported")
}
(pathToMainFile, getParser(task)) match {
case (Right(path), Some(parser)) => parseCalculation(task, parser, path)
case (Left(error), _) => failParseRequest(task, error)
case (_, None) => failParseRequest(task, s"parser ${task.parserName} is unknown")
}
}
}
/**
* Create a failed parsing result for the given task with the specified reason.
* @param request the original parsing request
* @param reason explanation why the parsing has failed
* @return a FileParsingResult representing the failure
*/
private def failParseRequest(request: CandidateFound, reason: String): FileParsingResult = {
eventListener.processEvent(myId, CalculationParserEventEnd(
request.fileTree, request.relativePath, request.parserName, ParseResult.ParseFailure, Some(reason)
))
ParsingResultInMemory(
task = request,
result = ParseResult.ParseFailure,
start = None,
events = Seq(),
end = None,
error = Some(reason)
)
}
private def parseCalculation(request: CandidateFound, parser: OptimizedParser,
pathToMainFile: Path): FileParsingResult = {
val buffer = new BufferForBackend
val backend = new ParseEventsEmitter(metaInfo, buffer.handleParseEvents,
buffer.handleStartAndEndParsing)
val result = SafeParsing.parse(parser, request.mainFileUri, pathToMainFile, backend, request.parserName)
val error = result match {
case ParseResult.ParseFailure => buffer.endEvent match {
case Some(end) => end.parserErrors.children.headOption match {
case Some(message) => Some(message.extract[String])
case None => Some("no specific error message provided by parser")
}
case None => Some("parsing did not terminate cleanly")
}
case _ => None
}
eventListener.processEvent(myId, CalculationParserEventEnd(
request.fileTree, request.relativePath, request.parserName, result, error
))
ParsingResultInMemory(
task = request,
result = result,
start = buffer.startEvent,
events = buffer.events.result(),
end = buffer.endEvent,
error = error
)
}
}
package eu.nomad_lab.integrated_pipeline
import eu.nomad_lab.integrated_pipeline.messages.{ CalculationParserEvent, ResultWriterEvent, TreeParserEvent }
import scala.collection.parallel.mutable
trait EventListener {
private final val registeredTreeParsers: mutable.ParMap[TreeScanner, TreeScannerId] = mutable.ParMap()
private final val registeredCalculationParsers: mutable.ParMap[CalculationParsingEngine, CalculationParserId] = mutable.ParMap()
private final val registeredResultWriters: mutable.ParMap[ParsingResultsProcessingManager, ResultWriterId] = mutable.ParMap()
def registerReporter(reference: TreeScanner, reporterId: Option[String]): TreeScannerId = {
if (registeredTreeParsers.contains(reference)) {
throw new IllegalArgumentException("reference already registered as reporter")
} else {
val id = TreeScannerId(reporterId.getOrElse(reference.toString), reference)
registeredTreeParsers.put(reference, id)
id
}
}
def registerReporter(reference: CalculationParsingEngine, reporterId: Option[String]): CalculationParserId = {
if (registeredCalculationParsers.contains(reference)) {
throw new IllegalArgumentException("reference already registered as reporter")
} else {
val id = CalculationParserId(reporterId.getOrElse(reference.toString), reference)
registeredCalculationParsers.put(reference, id)
id
}
}
def registerReporter(reference: ParsingResultsProcessingManager, reporterId: Option[String]): ResultWriterId = {
if (registeredResultWriters.contains(reference)) {
throw new IllegalArgumentException("reference already registered as reporter")
} else {
val id = ResultWriterId(reporterId.getOrElse(reference.toString), reference)
registeredResultWriters.put(reference, id)
id
}
}
def processEvent(reporter: TreeScannerId, message: TreeParserEvent): Unit = {}
def processEvent(reporter: CalculationParserId, message: CalculationParserEvent): Unit = {}
def processEvent(reporter: ResultWriterId, message: ResultWriterEvent): Unit = {}
}
package eu.nomad_lab.integrated_pipeline
import com.typesafe.scalalogging.StrictLogging
import eu.nomad_lab.integrated_pipeline.messages._
import eu.nomad_lab.parsers.ParseResult
import org.apache.logging.log4j.scala.Logging
import org.apache.logging.log4j.{ Marker, MarkerManager }
object EventLogger {
val baseMarker: Marker = MarkerManager.getMarker("ParsingManager")
val parseResultMarker: Marker = MarkerManager.getMarker("ParsingResult").setParents(baseMarker)
val parseSuccessMarker: Marker = MarkerManager.getMarker("ParsingSuccess").setParents(parseResultMarker)
val parseSkippedMarker: Marker = MarkerManager.getMarker("ParsingSkipped").setParents(parseResultMarker)
val parseFailureMarker: Marker = MarkerManager.getMarker("ParsingFailure").setParents(parseResultMarker)
}
/**
* Write incoming events to a logfile with ScalaLogging and the underlying logging framework.
* This trait stacks and can thus be combined with other EventListener-traits to create a custom
* event logging class with multiple features.
*/
trait EventLogger extends EventListener with Logging {
import EventLogger._
override def processEvent(reporter: TreeScannerId, message: TreeParserEvent): Unit = {
message match {
case x: TreeParserEventStart => logger.info(
baseMarker,
s"[TreeParser ${reporter.name}] start processing file tree " +
s"'${x.fileTree.treeBasePath}' (tree type ${x.fileTree.treeType})"
)
case x: TreeParserEventEnd => logger.info(
baseMarker,
s"[TreeParser ${reporter.name}] finished processing file tree " +
s"'${x.fileTree.treeBasePath}', found ${x.numCandidates} candidate files " +
s"(tree type ${x.fileTree.treeType})"
)
case x: TreeParserEventCandidate => logger.debug(
baseMarker,
s"[TreeParser ${reporter.name}] found candidate in file tree " +
s"'${x.fileTree.treeBasePath}' at '${x.relativePath}' (parser '${x.parser}')"
)
case x: TreeParserEventScanError => logger.warn(
baseMarker,
s"[TreeParser ${reporter.name}] scanning failure in file tree " +
s"'${x.fileTree.treeBasePath}' at '${x.relativePath}' (${x.error.toString})"
)
case x: TreeParserEventTreeFailure => logger.error(
baseMarker,
s"[TreeParser ${reporter.name}] encountered failure while processing file tree " +
s"'${x.fileTree.treeBasePath}', aborting: ${x.error.toString}"
)
}
super.processEvent(reporter, message)
}
override def processEvent(reporter: CalculationParserId, message: CalculationParserEvent): Unit = {
message match {
case x: CalculationParserEventStart => logger.info(
baseMarker,
s"[CalculationParser ${reporter.name}] start parsing file " +
s"'${x.relativePath}' in file tree '${x.fileTree.treeBasePath}' (parser '${x.parser}')"
)
case x: CalculationParserEventEnd =>
x.result match {
case ParseResult.ParseSuccess => logger.info(
parseSuccessMarker,
s"[CalculationParser ${reporter.name}] finished parsing file '${x.relativePath}' in " +
s"file tree '${x.fileTree.treeBasePath}' (parser '${x.parser}')"
)
case ParseResult.ParseSkipped => logger.info(
parseSkippedMarker,
s"[CalculationParser ${reporter.name}] skipped parsing file '${x.relativePath}' in " +
s"file tree '${x.fileTree.treeBasePath}' (parser '${x.parser}')"
)
case ParseResult.ParseFailure => logger.warn(
parseFailureMarker,
s"[CalculationParser ${reporter.name}] failed parsing file '${x.relativePath}' in " +
s"file tree '${x.fileTree.treeBasePath}': ${x.error.get} (parser '${x.parser}')"
)
}
}
super.processEvent(reporter, message)
}
override def processEvent(reporter: ResultWriterId, message: ResultWriterEvent): Unit = {
message match {
case x: ResultWriterEventStart => logger.info(
baseMarker,
s"[ResultWriter ${reporter.name}] started writing results for file tree " +
s"'${x.fileTree.treeBasePath}'"
)
case x: ResultWriterEventResult => logger.debug(
baseMarker,
s"[ResultWriter ${reporter.name}] processes parsing results for file '${x.relativePath}' " +
s"from file tree '${x.fileTree.treeBasePath}'"
)
case x: ResultWriterEventEnd => logger.info(
baseMarker,
s"[ResultWriter ${reporter.name}] finished writing results for file tree " +
s"'${x.fileTree.treeBasePath}', output location: '${x.outputLocation}' " +
s"(${x.numCalculations} calculations, ${x.numParsingFailures} parsing failures)"
)
}
super.processEvent(reporter, message)
}
}