ArchiveCleanUpFlowSpec.scala 4.99 KB
Newer Older
1
2
package eu.nomad_lab.integrated_pipeline_tests

3
4
import java.nio.file.Paths

5
6
7
8
import akka.stream.ClosedShape
import akka.stream.scaladsl.{ GraphDSL, RunnableGraph }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import eu.nomad_lab.TreeType
9
import eu.nomad_lab.integrated_pipeline.io_integrations.ArchiveHandler
10
import eu.nomad_lab.integrated_pipeline.messages.FileParsingSignal
11
import eu.nomad_lab.integrated_pipeline.stream_components.ArchiveCleanUpFlow
12
import eu.nomad_lab.integrated_pipeline_tests.matchers.StreamAssertions
13
import eu.nomad_lab.parsers.ParseResult
14
import org.mockito.Mockito._
15
16
17
18
19
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{ Matchers, WordSpec }

class ArchiveCleanUpFlowSpec extends WordSpec with MockitoSugar with TestDataBuilders with Matchers {

20
  abstract class Fixture extends StreamAssertions[FileParsingSignal] {
21
22
    val archiveHandler = mock[ArchiveHandler]

23
24
    private val testInput = TestSource.probe[FileParsingSignal]
    private val testOutput = TestSink.probe[FileParsingSignal]
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    private val testGraph = RunnableGraph.fromGraph(
      GraphDSL.create(testInput, testOutput)((_, _)) { implicit builder => (source, sink) =>
        import GraphDSL.Implicits._

        val worker = builder.add(new ArchiveCleanUpFlow(archiveHandler))

        source ~> worker ~> sink
        ClosedShape
      }
    )
    val (source, sink) = testGraph.run()
  }

  class DirectoryTreeFixture extends Fixture {
39
    val treeTask = aFileTree().withTreeType(TreeType.Directory).build()
40
41
42
  }

  class ZipArchiveTreeFixture extends Fixture {
43
    val treeTask = aFileTree().withTreeType(TreeType.Zip)
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
  }

  "An ArchiveCleanUpFlow" when {
    "receiving signals from directory file trees" should {

      "forward file parsing results unchanged" in {
        val f = new DirectoryTreeFixture()
        val task = anInMemoryResult().withParseResult(ParseResult.ParseWithWarnings).build()
        f.source.sendNext(task)
        f.findFirstMatchingStreamElement(be(task))
        verifyZeroInteractions(f.archiveHandler)
      }

      "forward end file tree signals unchanged" in {
        val f = new DirectoryTreeFixture()
59
        val task = aFileParsingSignalEndTree().withFileTree(f.treeTask).build()
60
61
62
63
64
65
66
67
        f.source.sendNext(task)
        f.findFirstMatchingStreamElement(be(task))
        verifyZeroInteractions(f.archiveHandler)
      }

    }

  }
68
69
70
71
72
73
74
75
76
77
78
79
80

  "receiving signals from zip archive file trees" should {

    "forward file parsing results unchanged" in {
      val f = new ZipArchiveTreeFixture()
      val task = anInMemoryResult().withParseResult(ParseResult.ParseWithWarnings).build()
      f.source.sendNext(task)
      f.findFirstMatchingStreamElement(be(task))
      verifyZeroInteractions(f.archiveHandler)
    }

    "forward end file tree signals unchanged" in {
      val f = new ZipArchiveTreeFixture()
81
      val task = aFileParsingSignalEndTree().withFileTree(f.treeTask).withTaskCount(1).build()
82
83
84
85
86
      f.source.sendNext(task)
      f.findFirstMatchingStreamElement(be(task))
      verifyZeroInteractions(f.archiveHandler)
    }

87
88
89
    "clean up temporary extracted files once all parsing results for a tree have arrived (end signal arrives last)" in {
      val f = new ZipArchiveTreeFixture()
      val treeTask = f.treeTask.withBasePath("/tmp/mighty/magic")
90
91
      (1 to 4).foreach(_ => f.source.sendNext(anInMemoryResult().withFileTree(treeTask)))
      f.source.sendNext(aFileParsingSignalEndTree().withFileTree(treeTask).withTaskCount(4))
92
93
94
95
96
97
98
99
      f.drainStream()
      verify(f.archiveHandler).cleanUpExtractedArchive(Paths.get("/tmp/mighty/magic").toAbsolutePath)
      verifyNoMoreInteractions(f.archiveHandler)
    }

    "clean up temporary extracted files once all parsing results for a tree have arrived (parsing result arrives last)" in {
      val f = new ZipArchiveTreeFixture()
      val treeTask = f.treeTask.withBasePath("/tmp/mighty/magic")
100
101
102
      (1 to 3).foreach(_ => f.source.sendNext(anInMemoryResult().withFileTree(treeTask)))
      f.source.sendNext(aFileParsingSignalEndTree().withFileTree(treeTask).withTaskCount(4))
      f.source.sendNext(anInMemoryResult().withFileTree(treeTask))
103
104
105
106
107
      f.drainStream()
      verify(f.archiveHandler).cleanUpExtractedArchive(Paths.get("/tmp/mighty/magic").toAbsolutePath)
      verifyNoMoreInteractions(f.archiveHandler)
    }

108
109
110
    "handle the same extracted path again after cleaning it up the first time" in {
      val f = new ZipArchiveTreeFixture()
      val treeTask = f.treeTask.withBasePath("/tmp/mighty/magic")
111
112
113
114
      (1 to 4).foreach(_ => f.source.sendNext(anInMemoryResult().withFileTree(treeTask)))
      f.source.sendNext(aFileParsingSignalEndTree().withFileTree(treeTask).withTaskCount(4))
      (1 to 4).foreach(_ => f.source.sendNext(anInMemoryResult().withFileTree(treeTask)))
      f.source.sendNext(aFileParsingSignalEndTree().withFileTree(treeTask).withTaskCount(4))
115
116
117
118
119
      f.drainStream()
      verify(f.archiveHandler, times(2)).cleanUpExtractedArchive(Paths.get("/tmp/mighty/magic").toAbsolutePath)
      verifyNoMoreInteractions(f.archiveHandler)
    }

120
121
  }

122
}