Commit 3fcbcfb2 authored by Ihrig, Arvid Conrad (ari)'s avatar Ihrig, Arvid Conrad (ari)
Browse files

Integrated Pipeline: prepared TreeParser implementations to be able to process...

Integrated Pipeline: prepared TreeParser implementations to be able to process errors properly, updated existing tests
parent a042b99c
......@@ -4,18 +4,27 @@ import java.io.InputStream
import java.nio.file.Path
import java.util.NoSuchElementException
import eu.nomad_lab.integrated_pipeline.ParsingTaskGenerator.FileScanningError
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask }
import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection }
object ParsingTaskGenerator {
case class FileScanningError(
fileTree: FileTreeScanTask,
relativePath: Path,
error: Throwable
)
}
/**
* contains methods shared by the different TreeParserLogic Implementations.
* These are streams-adapted versions of the code found in the original TreeParser class.
*/
trait ParsingTaskGenerator extends Iterator[FileParsingTask] {
trait ParsingTaskGenerator extends Iterator[Either[FileScanningError, FileParsingTask]] {
protected[this] def findNextParsingCandidate(): Option[FileParsingTask]
protected[this] def findNextParsingCandidate(): Option[Either[FileScanningError, FileParsingTask]]
private var nextRequest: Option[FileParsingTask] = None
private var nextRequest: Option[Either[FileScanningError, FileParsingTask]] = None
private var searchStarted: Boolean = false
def hasNext: Boolean = {
......@@ -26,7 +35,7 @@ trait ParsingTaskGenerator extends Iterator[FileParsingTask] {
nextRequest.isDefined
}
def next(): FileParsingTask = {
def next(): Either[FileScanningError, FileParsingTask] = {
if (!searchStarted) {
nextRequest = findNextParsingCandidate()
searchStarted = true
......
......@@ -4,13 +4,13 @@ import eu.nomad_lab.integrated_pipeline.messages._
trait TreeParser extends MessageProcessor[FileTreeScanTask, FileParsingTaskSignal] {
protected[this] def createProcessor(task: FileTreeScanTask): Iterator[FileParsingTask]
protected[this] def createProcessor(task: FileTreeScanTask): ParsingTaskGenerator
protected[this] val eventListener: EventListener
protected[this] val eventReporterName: Option[String] = None
private lazy val myId = eventListener.registerReporter(this, eventReporterName)
private var generator: Option[Iterator[FileParsingTask]] = None
private var generator: Option[ParsingTaskGenerator] = None
private var readyForInbound = true
private var request: Option[FileTreeScanTask] = None
private var taskCount = 0l
......@@ -32,12 +32,15 @@ trait TreeParser extends MessageProcessor[FileTreeScanTask, FileParsingTaskSigna
final override def getNextSignalToEmit(): FileParsingTaskSignal = {
if (generator.exists(_.hasNext)) {
taskCount += 1
val task = generator.get.next()
eventListener.processEvent(
myId,
TreeParserEventCandidate(request.get, task.relativePath, task.parserName)
)
task
generator.get.next() match {
case Left(_) => ???
case Right(task) =>
eventListener.processEvent(
myId,
TreeParserEventCandidate(request.get, task.relativePath, task.parserName)
)
task
}
} else if (!readyForInbound) {
readyForInbound = true
eventListener.processEvent(myId, TreeParserEventEnd(request.get, taskCount))
......
......@@ -2,10 +2,10 @@ package eu.nomad_lab.integrated_pipeline.io_integrations
import java.io.FileInputStream
import java.nio.file.Files
import java.util.NoSuchElementException
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.ParsingTaskGenerator
import eu.nomad_lab.integrated_pipeline.ParsingTaskGenerator.FileScanningError
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask }
import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection }
......@@ -19,7 +19,7 @@ class DirectoryTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollect
private val basePath = request.treeBasePath
private val fileIterator = Files.walk(basePath).iterator().asScala.filter(Files.isRegularFile(_))
@tailrec final protected[this] override def findNextParsingCandidate(): Option[FileParsingTask] = {
@tailrec final protected[this] override def findNextParsingCandidate(): Option[Either[FileScanningError, FileParsingTask]] = {
if (fileIterator.hasNext) {
val file = fileIterator.next()
val internalFilePath = basePath.relativize(file)
......@@ -36,7 +36,7 @@ class DirectoryTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollect
Seq[CandidateParser]()
}
generateRequest(request, internalFilePath, candidateParsers) match {
case Some(x) => Some(x)
case Some(x) => Some(Right(x))
case None => findNextParsingCandidate()
}
} else {
......
package eu.nomad_lab.integrated_pipeline.io_integrations
import java.nio.file.Paths
import java.util.NoSuchElementException
import eu.nomad_lab.TreeType
import eu.nomad_lab.integrated_pipeline.ParsingTaskGenerator
import eu.nomad_lab.integrated_pipeline.ParsingTaskGenerator.FileScanningError
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask }
import eu.nomad_lab.parsers.{ CandidateParser, ParserCollection }
import org.apache.commons.compress.archivers.zip.{ ZipArchiveEntry, ZipFile }
......@@ -23,7 +23,7 @@ class ZipTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollection: P
zipFile.close()
}
@tailrec protected[this] final override def findNextParsingCandidate(): Option[FileParsingTask] = {
@tailrec protected[this] final override def findNextParsingCandidate(): Option[Either[FileScanningError, FileParsingTask]] = {
if (zipEntries.hasMoreElements) {
val zipEntry: ZipArchiveEntry = zipEntries.nextElement()
val internalFilePath = Paths.get(zipEntry.getName)
......@@ -39,7 +39,7 @@ class ZipTreeParsingTaskGenerator(request: FileTreeScanTask, parserCollection: P
Seq[CandidateParser]()
}
generateRequest(request, internalFilePath, candidateParsers) match {
case Some(x) => Some(x)
case Some(x) => Some(Right(x))
case None => findNextParsingCandidate()
}
} else {
......
package eu.nomad_lab.integrated_pipeline_tests
import eu.nomad_lab.integrated_pipeline.{ EventListener, TreeParser }
import eu.nomad_lab.integrated_pipeline.ParsingTaskGenerator.FileScanningError
import eu.nomad_lab.integrated_pipeline.messages._
import eu.nomad_lab.integrated_pipeline.{ EventListener, ParsingTaskGenerator, TreeParser }
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.{ ArgumentCaptor, Mockito }
import org.scalatest.WordSpec
import org.scalatest.mockito.MockitoSugar
import org.mockito.Mockito._
import org.mockito.ArgumentMatchers._
import scala.collection.JavaConverters._
......@@ -14,12 +17,24 @@ class TreeParserSpec extends WordSpec
with MessageProcessorBehaviour[FileTreeScanTask, FileParsingTaskSignal]
with TestDataBuilders with MockitoSugar {
def generatorMock(task: FileTreeScanTask): Iterator[FileParsingTask] = {
var count = 0
Iterator.fill(3) {
count += 1
aFileParsingTask().withTreeTask(task).withRelativePath(s"file$count").build()
def generatorMock(task: FileTreeScanTask): ParsingTaskGenerator = {
val dummy = mock[ParsingTaskGenerator]
val data: Iterator[Either[FileScanningError, FileParsingTask]] = (1 to 3).map { i =>
Right(aFileParsingTask().withTreeTask(task).withRelativePath(s"file$i").build())
}.iterator
when(dummy.hasNext).thenAnswer {
new Answer[Boolean] {
override def answer(invocation: InvocationOnMock): Boolean = data.hasNext
}
}
when(dummy.next()).thenAnswer {
new Answer[Either[FileScanningError, FileParsingTask]] {
override def answer(invocation: InvocationOnMock): Either[FileScanningError, FileParsingTask] = {
data.next()
}
}
}
dummy
}
private def argsCaptor(): ArgumentCaptor[TreeParserEvent] = ArgumentCaptor.forClass(classOf[TreeParserEvent])
......
......@@ -2,17 +2,18 @@ package eu.nomad_lab.integrated_pipeline_tests
import eu.nomad_lab.TreeType
import eu.nomad_lab.TreeType.TreeType
import eu.nomad_lab.integrated_pipeline.ParsingTaskGenerator
import eu.nomad_lab.integrated_pipeline.io_integrations.{ DirectoryTreeParsingTaskGenerator, ZipTreeParsingTaskGenerator }
import eu.nomad_lab.integrated_pipeline.messages.{ FileParsingTask, FileTreeScanTask }
import eu.nomad_lab.parsers.AllParsers
import org.scalatest.WordSpec
import org.scalatest.{ Matchers, WordSpec }
class TreeParsingTaskGeneratorSpec extends WordSpec {
class TreeParsingTaskGeneratorSpec extends WordSpec with Matchers {
private val parsers = AllParsers.defaultParserCollection
case class TestGenerator(
generator: (FileTreeScanTask) => Iterator[FileParsingTask], treeType: TreeType
generator: (FileTreeScanTask) => ParsingTaskGenerator, treeType: TreeType
)
"a ZipTreeParsingRequestGenerator" when {
......@@ -37,7 +38,9 @@ class TreeParsingTaskGeneratorSpec extends WordSpec {
val inbound = createFileTreeScanRequest(sampleData, testCase.treeType)
val generator = testCase.generator(inbound)
val requests = generator.toSeq
val associations = requests.map(x => x.relativePath.toString -> x.parserName).toMap
all(requests) should be(a[Right[_, _]])
val tasks = requests.collect { case Right(x) => x }
val associations = tasks.map(x => x.relativePath.toString -> x.parserName).toMap
assert(
associations === sampleData.candidateCalculationsWithParsers,
"generated parsing requests don't match expectations"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment