ArchiveCleanUpFlow.scala 2.02 KB
Newer Older
1
2
3
4
5
package eu.nomad_lab.integrated_pipeline.stream_components

import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import eu.nomad_lab.TreeType
6
import eu.nomad_lab.integrated_pipeline.FileTree
7
import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler
8
9
import eu.nomad_lab.integrated_pipeline.messages._

10
11
import scala.collection.mutable

12
13
14
15
16
17
18
19
20
class ArchiveCleanUpFlow(archiveHandler: ArchiveHandler)
    extends GraphStage[FlowShape[FileParsingResultSignal, FileParsingResultSignal]] {

  val in = Inlet[FileParsingResultSignal]("ArchiveCleanUpFlow.in")
  val out = Outlet[FileParsingResultSignal]("ArchiveCleanUpFlow.out")
  override val shape = new FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

21
22
    private val processed: mutable.Map[FileTree, Long] = mutable.Map()
    private val expected: mutable.Map[FileTree, Long] = mutable.Map()
23

24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    private def fail(msg: String): Unit = {
      failStage(new IllegalArgumentException(msg))
    }

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        pull(in)
      }
    })

    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        val input = grab(in)

        input.treeTask.treeType match {
          case TreeType.Directory => push(out, input)
40
41
42
43
44
45
46
47
48
          case TreeType.Zip =>
            input match {
              case x: FileParsingResult =>
                processed(x.treeTask) = processed.getOrElse(x.treeTask, 0l) + 1l
              case x: FileParsingSignalEndTree =>
                expected(x.treeTask) = x.numParsingTasks
            }
            if (processed.getOrElse(input.treeTask, -1) == expected.getOrElse(input.treeTask, -2)) {
              archiveHandler.cleanUpExtractedArchive(input.treeTask.treeBasePath.toAbsolutePath)
49
50
              processed.remove(input.treeTask)
              expected.remove(input.treeTask)
51
52
            }
            push(out, input)
53
54
55
56
57
        }
      }
    })
  }
}