ArchiveUnpackingFlow.scala 2.29 KB
Newer Older
1
2
3
4
5
6
package eu.nomad_lab.integrated_pipeline.stream_components

import java.nio.file.Path

import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
7
import eu.nomad_lab.TreeType
8
import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler
9
import eu.nomad_lab.integrated_pipeline.messages.{ TreeScanCompleted, CandidateFound, TreeScanSignal }
10
11
12
13
14
15
16
17

import scala.collection.mutable

/**
 * Unpacks archives to a temporary location for any file tree that resides inside an archive when
 * a file tree start signal is received.
 */
class ArchiveUnpackingFlow(archiveHandler: ArchiveHandler)
18
    extends GraphStage[FlowShape[TreeScanSignal, TreeScanSignal]] {
19

20
21
  val in = Inlet[TreeScanSignal]("ArchiveUnpackingFlow.in")
  val out = Outlet[TreeScanSignal]("ArchiveUnpackingFlow.out")
22
23
24
25
  override val shape = new FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

26
    private val activeTrees: mutable.Map[Path, Path] = mutable.Map()
27
28
29
    private def fail(msg: String): Unit = {
      failStage(new IllegalArgumentException(msg))
    }
30
31
32
33
34
35
36
37

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        pull(in)
      }
    })

    setHandler(in, new InHandler {
38
39
      override def onPush(): Unit = {
        val input = grab(in)
40
        val path = input.fileTree.treeBasePath
41

42
        input match {
43
          case _: TreeScanCompleted if !activeTrees.contains(path) =>
44
            fail("to be finished file tree was not registered")
45
          case signal: CandidateFound =>
46
            if (!activeTrees.contains(path)) {
47
              activeTrees(path) = signal.fileTree.treeType match {
48
49
50
51
                case TreeType.Directory => path
                case TreeType.Zip => archiveHandler.extractZipArchive(path)
              }
            }
52
            signal.fileTree.treeType match {
53
54
55
56
              case TreeType.Directory => push(out, signal)
              case TreeType.Zip =>
                val tempPath = activeTrees(path).resolve(signal.relativePath)
                push(out, signal.copy(extractedPath = Some(tempPath)))
57
            }
58
          case signal: TreeScanCompleted =>
59
60
            activeTrees.remove(path)
            push(out, signal)
61
        }
62
      }
63
64
65
66
    })
  }

}