Main.scala 6.73 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
   Copyright 2016-2017 The NOMAD Developers Group

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
 */
package eu.nomad_lab.integrated_pipeline

18
import java.nio.file.{ Files, Path, Paths }
19

20
import akka.Done
21
import akka.actor._
22
import akka.stream.scaladsl.{ Flow, GraphDSL, RunnableGraph, Sink, Source }
23
import akka.stream.{ ActorMaterializer, ClosedShape }
24
import com.typesafe.config.Config
25
import com.typesafe.scalalogging.StrictLogging
26
import eu.nomad_lab.TreeType.TreeType
27
28
import eu.nomad_lab.integrated_pipeline.Main.PipelineSettings
import eu.nomad_lab.integrated_pipeline.OutputType.OutputType
29
import eu.nomad_lab.integrated_pipeline.io_integrations._
30
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingSignal, TreeScanSignal, FileTreeParsingResult }
31
import eu.nomad_lab.integrated_pipeline.stream_components._
32
import eu.nomad_lab.meta.{ KnownMetaInfoEnvs, MetaInfoEnv }
33
import eu.nomad_lab.parsers.AllParsers
34
import eu.nomad_lab.{ LocalEnv, TreeType }
35
36

import scala.concurrent.ExecutionContext.Implicits.global
37
import scala.concurrent.Future
38

39
object Main extends StrictLogging {
40

41
  val metaInfo: MetaInfoEnv = KnownMetaInfoEnvs.all
42
43
44
45
  private val parsers = AllParsers.defaultParserCollection
  private val outputDirRe = """^--outputdir="(.+)"$""".r
  private val outputModeRe = "^--output=(.+)$".r
  private val numWorkersRe = "^--numWorkers=([0-9]+)$".r
46

47
  def main(args: Array[String]): Unit = mainWithFuture(args)
48

49
  def mainWithFuture(args: Array[String]): Future[Done] = {
50
51
    if (args.isEmpty) {
      //TODO: show usage
52
      Future.successful(Done)
53
54
    } else {
      val main = new Main()
55
56
      val verbose = args.contains("--noConfigDump")
      main.parseTreesGivenByCommandLine(getSettings(LocalEnv.defaultConfig(verbose), args))
57
58
    }
  }
59

60
  case class PipelineSettings(
61
    task: Option[FileTree],
62
    mode: OutputType,
63
64
65
    numWorkers: Int,
    treeType: TreeType,
    targetDirectory: Path
66
67
68
69
70
  )

  /**
   * Generate a settings object, the parameters are taken from the global configuration files and
   * can optionally be overwritten by the command-line configuration
71
   * @param config the NOMAD configuration to use as default values
72
73
74
75
76
77
78
79
80
81
82
   * @param commandLine the given command-line parameters
   * @return merged settings
   */
  def getSettings(config: Config, commandLine: Array[String]): PipelineSettings = {
    val (operation, treeType) = commandLine.head match {
      case "parseDirectory" => (OperationMode.Console, TreeType.Directory)
      case "parseRawDataArchive" => (OperationMode.Console, TreeType.Zip)
    }
    var settings = PipelineSettings(
      task = None,
      mode = OutputType.HDF5merged,
83
      numWorkers = config.getInt("nomad_lab.integrated_pipeline.numWorkers"),
84
85
86
87
88
89
90
91
92
93
      treeType = treeType,
      targetDirectory = Paths.get(config.getString("nomad_lab.integrated_pipeline.targetDirectory"))
    )
    var args = commandLine.tail
    while (args.nonEmpty) {
      val head = args.head
      args = args.tail
      settings = head match {
        case outputModeRe(mode) => settings.copy(mode = OutputType.withName(mode))
        case outputDirRe(dir) => settings.copy(targetDirectory = Paths.get(dir))
94
        case numWorkersRe(count) => settings.copy(numWorkers = count.toInt)
95
        case "--noConfigDump" => settings //already consumed by the calling function
96
        case x if args.isEmpty && operation == OperationMode.Console =>
97
          settings.copy(task = Some(FileTree(Paths.get(x).toAbsolutePath, treeType)))
98
99
100
101
102
        case _ => ??? //TODO: show usage instructions
      }
    }
    settings
  }
103
104
}

105
class Main {
106
  import Main.metaInfo
107

108
109
  implicit val system: ActorSystem = ActorSystem("Calculation-Actor-System")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
110

111
  def parseTreesGivenByCommandLine(params: PipelineSettings): Future[Done] = {
112
113
    require(params.task.nonEmpty, "no task specified in console mode")
    val source = Source.single(params.task.get)
114
    val sinkSummaries: Sink[FileTreeParsingResult, Future[Done]] = Sink.ignore
115
116
117
118
    val tempExtracted = Files.createTempDirectory(
      "nomad-parsing-extracted-files",
      LocalEnv.directoryPermissionsAttributes
    )
119
    tempExtracted.toFile.deleteOnExit()
120

121
122
    val eventProcessor = new EventLogger {}

123
124
125
126
    val graph = RunnableGraph.fromGraph(
      GraphDSL.create(sinkSummaries) { implicit builder => (sink) =>
        import GraphDSL.Implicits._

127
        val treeParser = Flow.fromGraph(new MessageProcessorFlow[FileTree, TreeScanSignal] {
128
          override val stageName = "TreeParser"
129
          override val processor = new TreeScanner {
130
            override val eventListener = eventProcessor
131
132
133
            override def createProcessor(fileTree: FileTree) = params.treeType match {
              case TreeType.Zip => new ZipTreeParsingTaskGenerator(fileTree, Main.parsers)
              case TreeType.Directory => new DirectoryTreeParsingTaskGenerator(fileTree, Main.parsers)
134
            }
135
          }
136
        })
137

138
        val archiveHandler = new ArchiveHandler(tempExtracted)
139
140
141
        val unpacker = Flow.fromGraph(new ArchiveUnpackingFlow(archiveHandler))
        val cleanUp = Flow.fromGraph(new ArchiveCleanUpFlow(archiveHandler))
        val parsing = CalculationParsingFlow.createParsingFlow(
142
143
          (1 to params.numWorkers).map(i =>
            new CalculationParsingEngine(Main.parsers, metaInfo, eventProcessor, Some(f"Worker-$i%2d")))
144
        )
145

146
        val processor = Flow.fromGraph(new MessageProcessorFlow[FileParsingSignal, FileTreeParsingResult] {
147
          override val stageName = "ResultWriter"
148
          override val processor = new ParsingResultsProcessingManager {
149
            override val eventListener = eventProcessor
150
151
152
153
154
            override val processor: ParsingResultsProcessor = params.mode match {
              case OutputType.Json => new WriteToJsonResultsProcessor(params.targetDirectory, metaInfo)
              case OutputType.HDF5 => new WriteToHDF5ResultsProcessor(params.targetDirectory, metaInfo)
              case OutputType.HDF5merged => new WriteToHDF5MergedResultsProcessor(params.targetDirectory, metaInfo)
            }
155
          }
156
157
        })

158
        source ~> treeParser.async ~> unpacker.async ~> parsing.async ~> cleanUp.async ~> processor.async ~> sink
159
160
161
162
163
        ClosedShape
      }
    )

    val done = graph.run()
164
165
    done.foreach(_ => system.terminate())
    done
166
167
168
  }

}