diff --git a/build.sbt b/build.sbt index 478260311c50487c152097001821e691967eb0c2..0279a27f4ae55963a70d07f6ecca4872c8bebd26 100644 --- a/build.sbt +++ b/build.sbt @@ -48,6 +48,12 @@ val fullTestLibs = { val json4sNativeLib = "org.json4s" %% "json4s-native" % "3.2.11" val json4sJacksonLib = "org.json4s" %% "json4s-jackson" % "3.2.11" +// ## Hadoop libs +val hadoopLib = "org.apache.hadoop" % "hadoop-client" % "2.7.3" + +// ## Parquet libs +val parquetLib = Seq("org.apache.parquet" % "parquet-column" % "1.9.0", "org.apache.parquet" % "parquet" % "1.9.0", "org.apache.parquet" % "parquet-hadoop" % "1.9.0") + // ## avro (binary serialization) val avroLib = "org.apache.avro" % "avro" % "1.8.1" @@ -213,7 +219,9 @@ lazy val core = (project in file("core")). json4sJacksonLib +: pegdownLib +: tikaLib +: - avroLib +: + avroLib +: + parquetLib ++: + hadoopLib +: netcdf +: compressionLibs ++: loggingLibs), diff --git a/core/src/main/scala/eu/nomad_lab/h5/EmitEventVisitor.scala b/core/src/main/scala/eu/nomad_lab/h5/EmitEventVisitor.scala index 049301b45c2ff88f2c83e184ef475496cee804b9..68b2a01d85603e565b30f2c5e7e1cb954e06e443 100644 --- a/core/src/main/scala/eu/nomad_lab/h5/EmitEventVisitor.scala +++ b/core/src/main/scala/eu/nomad_lab/h5/EmitEventVisitor.scala @@ -1,9 +1,85 @@ package eu.nomad_lab.h5 -import eu.nomad_lab.parsers.ParserBackendExternal +import eu.nomad_lab.parsers.{ParquetWriterBackend, ParserBackendExternal} +import eu.nomad_lab.resolve.{Archive, Calculation, ResolvedRef, Section} +import org.json4s.JsonAST.{JNothing, JString} class EmitEventsVisitor( - backend: ParserBackendExternal + backend: ParquetWriterBackend ) extends H5Visitor { + var archiveIndex = 0L + var calcIndex = 0L + + override def willVisitContext(context: ResolvedRef): Unit = { + println("In willVisitContext") + backend.startedParsingSession(None, JNothing, None) + context match { + case Archive(_, arch) => () + case Calculation(_, calc) => shouldVisitArchive(calc.archive) + case Section(_, sec) => + var sectionPath: List[SectionH5] = Nil + var sAtt: SectionH5 = sec + while (sAtt.parentSection match { + case Some(p) => + sAtt = p + sectionPath = p :: sectionPath + true + case None => + false + }) () + val calc = sec.table.calculation + shouldVisitArchive(calc.archive) + shouldVisitCalculation(calc) + sectionPath.foreach(shouldVisitSection) + case _ => throw new Exception(s"Can not accept context of type ${context}") + } + println("In willVisitContext") + } + + override def didVisitContext(context: ResolvedRef): Unit = { + context match { + case Archive(_, arch) => () + case Calculation(_, calc) => didVisitArchive(calc.archive) + case Section(_, sec) => + var sectionPath: List[SectionH5] = Nil + var sAtt: SectionH5 = sec + while (sAtt.parentSection match { + case Some(p) => + sAtt = p + sectionPath = p :: sectionPath + true + case None => + false + }) () + val calc = sec.table.calculation + sectionPath.reverse.foreach(didVisitSection) + didVisitCalculation(calc) + didVisitArchive(calc.archive) + case _ => throw new Exception(s"Can not accept context of type ${context}") + } + backend.finishedParsingSession(None, JNothing, None) + } + override def shouldVisitArchive(archive: ArchiveH5): Boolean = { + backend.openSectionWithGIndex("archive_context", archiveIndex) + backend.addValue("archive_gid", JString(archive.archiveGid)) + true + } + + override def didVisitArchive(archiveH5: ArchiveH5): Unit = { + backend.closeSection("archive_context",archiveIndex ) + archiveIndex +=1 + } + + override def shouldVisitCalculation(calculationH5: CalculationH5) : Boolean = { + backend.openSectionWithGIndex("calculation_context", calcIndex) + backend.addValue("calculation_gid", JString(calculationH5.calculationGid)) + true + } + + override def didVisitCalculation(calculationH5: CalculationH5): Unit = { + backend.closeSection("calculation_context", calcIndex) + calcIndex +=1 + } + override def shouldVisitSection(section: SectionH5): Boolean = { backend.openSectionWithGIndex(section.metaName, section.gIndex) true diff --git a/core/src/main/scala/eu/nomad_lab/h5/H5EagerScanner.scala b/core/src/main/scala/eu/nomad_lab/h5/H5EagerScanner.scala index e311ff51f20fe9123defee973d27bbb244e8a5ef..6952564fd8b154e47234c45ce10e37349b14b7fa 100644 --- a/core/src/main/scala/eu/nomad_lab/h5/H5EagerScanner.scala +++ b/core/src/main/scala/eu/nomad_lab/h5/H5EagerScanner.scala @@ -44,9 +44,9 @@ class H5EagerScanner() { if (visitor.shouldVisitSection(section)) { try { if (visitor.shouldVisitSectionSubValues(section)) { - for (subValueCollection <- section.valueCollections) { + for (subValueCollection <- section.valueCollections.toList.sortBy(_.table.metaInfo.name)) { if (visitor.shouldVisitValueCollection(subValueCollection)) { - for (subValue <- subValueCollection) + for (subValue <- subValueCollection.sortBy(_.table.metaInfo.name)) scanValue(subValue, visitor) visitor.didVisitValueCollection(subValueCollection) } @@ -54,9 +54,9 @@ class H5EagerScanner() { visitor.didVisitSectionSubValues(section) } if (visitor.shouldVisitSectionSubSections(section)) { - for (subSectionCollection <- section.subSectionCollections) { + for (subSectionCollection <- section.subSectionCollections.toList.sortBy(_.table.metaInfo.name)) { if (visitor.shouldVisitSectionCollection(subSectionCollection)) { - for (subSection <- subSectionCollection) + for (subSection <- subSectionCollection.sortBy(_.table.metaInfo.name)) scanSection(subSection, visitor) visitor.didVisitSectionCollection(subSectionCollection) } diff --git a/core/src/main/scala/eu/nomad_lab/parsers/ParquetWriterBackend.scala b/core/src/main/scala/eu/nomad_lab/parsers/ParquetWriterBackend.scala new file mode 100644 index 0000000000000000000000000000000000000000..b780bcd4712d68d669d8da15b28345593b31d79f --- /dev/null +++ b/core/src/main/scala/eu/nomad_lab/parsers/ParquetWriterBackend.scala @@ -0,0 +1,314 @@ +package eu.nomad_lab.parsers + +import collection.JavaConverters._ +import eu.nomad_lab.JsonUtils +import eu.nomad_lab.meta.{ MetaInfoEnv, MetaInfoRecord } +import eu.nomad_lab.parsers.GenericBackend.InvalidAssignementException +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.api.WriteSupport.WriteContext +import org.apache.parquet.io.api.{ Binary, RecordConsumer } +import org.apache.parquet.schema.MessageType +import org.json4s.{ JNothing, JObject, JString, JValue } +import org.json4s.JsonAST._ +import ucar.ma2.{ IndexIterator => NIndexIterator, Array => NArray } + +import scala.collection.mutable + +class ParquetWriterBackend( + val schema: MessageType, + val indexMap: Map[String, Int], + val metaInfoEnv: MetaInfoEnv +) extends WriteSupport[ParseEvent] with ParserBackendExternal { + + var recordConsumer: RecordConsumer = null + val _openSections = mutable.Set[(String, Long)]() + var lastOpenField:Option[(String,Int)] = None //Parquet allows only one field to be open at a time + val openFields: mutable.Stack[(String,Int)] = mutable.Stack() + + def fieldManager(name: String, index:Int)={ + println(s"field: ${name}" ) + println(s"OpenFields: ${openFields}" ) + if(openFields.isEmpty){ + recordConsumer.startField(name,index) + openFields.push((name, index)) + } else{ + val metaInfo = metaInfoEnv.metaInfoRecordForName(name).get + val parentSection = if (name == "calculation_context") "archive_context" else metaInfoEnv.parentSectionName(name).getOrElse("calculation_context") + println(s"parentSection: $parentSection") + var handled = false + while (!handled && openFields.nonEmpty) { + if (name == openFields.top._1) { + handled = true + } else if (parentSection == openFields.top._1) { + openFields.push((name, index)) + recordConsumer.startField(name, index) + handled = true + } else { + val top = openFields.pop() + recordConsumer.endField(top._1, top._2) + } + } + if (!handled) { + openFields.push((name, index)) + recordConsumer.startField(name, index) + } + } + println(s"OpenFields: ${openFields}" ) + } + + def endLastFields() = { + while (openFields.nonEmpty) { + val top = openFields.pop() + recordConsumer.endField(top._1, top._2) + } + } + override def init(configuration: Configuration): WriteContext = { + var extraMetaData = mutable.Map("nomad.schema" -> schema.toString) + return new WriteSupport.WriteContext(schema, extraMetaData.asJava) + } + + override def write(ev: ParseEvent): Unit = { + + } + + override def prepareForWrite(rc: RecordConsumer): Unit = { + recordConsumer = rc + } + + /** + * Started a parsing session + */ + override def startedParsingSession(mainFileUri: Option[String], parserInfo: JValue, parserStatus: Option[ParseResult.Value], parserErrors: JValue): Unit = { + println("startedParsingSession") + recordConsumer.startMessage() +// recordConsumer.startField("type", index) +// recordConsumer.addBinary(Binary.fromString("""nomad_info_data_parquet_1_0""")) +// recordConsumer.endField("type", index) +// mainFileUri match { +// case Some(uri) => +// recordConsumer.startField("mainFileUri", index) +// recordConsumer.addBinary(Binary.fromString(uri)) +// recordConsumer.endField("mainFileUri", index) +// case None => () +// } +// parserInfo match { +// case JNothing => () +// case _ => +// recordConsumer.startField("parserInfo", index) +// recordConsumer.addBinary(Binary.fromConstantByteArray(JsonUtils.normalizedUtf8(parserInfo))) +// recordConsumer.endField("parserInfo", index) +// } +// parserStatus match { +// case None => () +// case status => +// recordConsumer.startField("parseResult", index) +// recordConsumer.addBinary(Binary.fromString(status.toString)) +// recordConsumer.endField("parseResult", index) +// } +// +// recordConsumer.startField("parserErrors", index) +// recordConsumer.addBinary(Binary.fromConstantByteArray(JsonUtils.normalizedUtf8(parserErrors))) +// recordConsumer.endField("parserErrors", index) + } + + /** + * finished a parsing session + */ + override def finishedParsingSession(parserStatus: Option[ParseResult.Value], parserErrors: JValue, mainFileUri: Option[String], parserInfo: JValue, parsingStats: Map[String, Long]): Unit = { + endLastFields() + recordConsumer.endMessage() + } + + def getIndex(metaInfo: String): Int = { + indexMap(metaInfo) + } + + /** + * Adds a json value corresponding to metaName. + * + * The value is added to the section the meta info metaName is in. + * A gIndex of -1 means the latest section. + */ + override def addValue(metaName: String, value: JValue, gIndex: Long): Unit = { + println(s"addValue: $metaName") + val metaInfo = metaInfoEnv.metaInfoRecordForName(metaName).get + val index = getIndex(metaName) + fieldManager(metaName, index) + value match { + case JBool(b) => + recordConsumer.addBoolean(b) + case JInt(i) => + if (metaInfo.dtypeStr.contains("i") || metaInfo.dtypeStr.contains("i32")) + recordConsumer.addInteger(i.intValue()) + else + recordConsumer.addLong(i.longValue()) + case JDecimal(d) => + if (metaInfo.dtypeStr.contains("f") || metaInfo.dtypeStr.contains("f32")) + recordConsumer.addFloat(d.floatValue()) + else + recordConsumer.addDouble(d.doubleValue()) + case JDouble(d) => + recordConsumer.addDouble(d) + case JString(s) => + recordConsumer.addBinary(Binary.fromString(s)) + case JObject(o) => + recordConsumer.addBinary(Binary.fromConstantByteArray(JsonUtils.normalizedUtf8(value))) + case JNothing => + None + case _ => + throw new Exception //InvalidAssignementException(metaInfoEnv, s"invalid value $value when expecting integer") + } + //recordConsumer.endField(metaName, index) + } + + /** + * Adds a floating point value corresponding to metaName. + * + * The value is added to the section the meta info metaName is in. + * A gIndex of -1 means the latest section. + */ + override def addRealValue(metaName: String, value: Double, gIndex: Long): Unit = { + val index = getIndex(metaName) + fieldManager(metaName, index) + recordConsumer.addDouble(value) + } + + /** + * Adds a new array of the given size corresponding to metaName. + * + * The value is added to the section the meta info metaName is in. + * A gIndex of -1 means the latest section. + * The array is unitialized. + */ + override def addArray(metaName: String, shape: Seq[Long], gIndex: Long): Unit = { + val index = getIndex(metaName) + fieldManager(metaName,index) + recordConsumer.startGroup() + val indexOfShape = getIndex("shape") // The shape is the first element in the group so its index will be 0. + recordConsumer.startField("shape", indexOfShape) + shape.foreach(recordConsumer.addLong) + recordConsumer.endField("shape", indexOfShape) + } + + override def closeArray(metaName: String, shape: Seq[Long], gIndex: Long): Unit = { + val index = getIndex(metaName) + recordConsumer.endGroup() + } + /** + * Adds values to the last array added + */ + override def setArrayValues(metaName: String, values: NArray, offset: Option[Seq[Long]], gIndex: Long): Unit = { + val indexOfValues = getIndex("values") + // What about values.getShape ?? + val dtype = values.getDataType() + val it = values.getIndexIterator() + recordConsumer.startField("values", indexOfValues) + + while (it.hasNext()) { + if (dtype.isFloatingPoint()) { + recordConsumer.addDouble(it.getDoubleNext()) + } else if (dtype.isIntegral()) { + recordConsumer.addLong(it.getLongNext()) + } else { + recordConsumer.addBinary(Binary.fromString(it.next().toString())) + } + } + recordConsumer.endField("values", indexOfValues) + + } + + /** + * Opens and adds an array value; alternative of addArray, setArrayValues and closeArray + */ + override def addArrayValues(metaName: String, values: NArray, gIndex: Long): Unit = { + val index = getIndex(metaName) + fieldManager(metaName, index) + recordConsumer.startGroup() + val indexOfShape = getIndex("shape") // The shape is the first element in the group so its index will be 0. + recordConsumer.startField("shape", indexOfShape) + for (s <- values.getShape) { + recordConsumer.addLong(s) + } + recordConsumer.endField("shape", indexOfShape) + + val indexOfValues = getIndex("values") + val dtype = values.getDataType() + val it = values.getIndexIterator() + recordConsumer.startField("values", indexOfValues) + while (it.hasNext()) { + if (dtype.isFloatingPoint()) { + recordConsumer.addDouble(it.getDoubleNext()) + } else if (dtype.isIntegral()) { + recordConsumer.addLong(it.getLongNext()) + } else { + recordConsumer.addBinary(Binary.fromString(it.next().toString())) + } + } + recordConsumer.endField("values", indexOfValues) + recordConsumer.endGroup() + } + + /** + * returns the sections that are still open + * + * sections are identified by metaName and their gIndex + */ + def openSections(): Iterator[(String, Long)] = _openSections.iterator + + /** + * returns information on an open section (for debugging purposes) + */ + def sectionInfo(metaName: String, gIndex: Long): String = { + if (_openSections.contains(metaName -> gIndex)) + s"section $metaName, gIndex $gIndex is open" + else + s"section $metaName, gIndex $gIndex is closed" + } + + /** + * closes a section + * + * after this no other value can be added to the section. + * metaName is the name of the meta info, gIndex the index of the section + */ + def closeSection(metaName: String, gIndex: Long): Unit = { + println(s"CloseSection called: $metaName") + _openSections -= (metaName -> gIndex) + val index = getIndex(metaName) + recordConsumer.endGroup() +// recordConsumer.endField(metaName, index) + } + + /** + * Informs tha backend that a section with the given gIndex should be opened + * + * The index is assumed to be unused, it is an error to reopen an existing section. + */ + def openSectionWithGIndex(metaName: String, gIndex: Long): Unit = { + println(s" OpenSection called: $metaName ") + _openSections += (metaName -> gIndex) + val index = getIndex(metaName) + fieldManager(metaName, index) + recordConsumer.startGroup() + val in = getIndex("c_index")//The index of c_index should always be 0 + recordConsumer.startField("c_index", in) + recordConsumer.addLong(gIndex) + recordConsumer.endField("c_index", in) + } + + def backendInfo: JValue = { + JObject( + ("backendType" -> JString(getClass().getName())) :: Nil + ) + } + + def cleanup(): Unit = {} + + /** + * sets info values of an open section. + * + * references should be references to gIndex of the root sections this section refers to. + */ + override def setSectionInfo(metaName: String, gIndex: Long, references: Map[String, Long]): Unit = () +} diff --git a/core/src/main/scala/eu/nomad_lab/parsers/ParseEventsWriteSupport.scala b/core/src/main/scala/eu/nomad_lab/parsers/ParseEventsWriteSupport.scala new file mode 100644 index 0000000000000000000000000000000000000000..137c9fe3436b0625399b90d33a0107065f1260d0 --- /dev/null +++ b/core/src/main/scala/eu/nomad_lab/parsers/ParseEventsWriteSupport.scala @@ -0,0 +1,35 @@ +package eu.nomad_lab.parsers + +import eu.nomad_lab.{ JsonSupport, JsonUtils } + +import collection.JavaConverters._ +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.api.WriteSupport.WriteContext +import org.apache.parquet.io.api.RecordConsumer +import org.json4s.JsonAST.JValue +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.schema.MessageType +import org.apache.parquet.io.api.Binary + +import scala.collection.mutable + +class ParseEventsWriteSupport(val rootSchema: MessageType) extends WriteSupport[JValue] { + var recordConsumer: RecordConsumer = null + + override def init(configuration: Configuration): WriteContext = { + var extraMetaData = mutable.Map("nomad.schema" -> rootSchema.toString) + return new WriteSupport.WriteContext(rootSchema, extraMetaData.asJava) + } + + override def write(ev: JValue): Unit = { + recordConsumer.startMessage() + recordConsumer.startField("event", 0) + recordConsumer.addBinary(Binary.fromConstantByteArray(JsonUtils.normalizedUtf8(ev))) + recordConsumer.endMessage() + } + + override def prepareForWrite(rc: RecordConsumer): Unit = { + recordConsumer = rc + } + +}