Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
nomad-lab
parser-lib-atoms
Commits
12e80786
Commit
12e80786
authored
Jul 14, 2016
by
Carl Poelking
Browse files
Crude framework.
parent
6f066b21
Changes
4
Hide whitespace changes
Inline
Side-by-side
parser/parser-lib-atoms/libAtomsParser.py
View file @
12e80786
from
builtins
import
range
import
os
import
sys
import
re
import
json
#import logging
import
setup_paths
import
numpy
as
np
from
nomadcore.local_meta_info
import
loadJsonFile
,
InfoKindEl
from
nomadcore.parser_backend
import
JsonParseEventsWriterBackend
from
contextlib
import
contextmanager
from
libLibAtomsParser
import
*
try
:
from
libMomo
import
osio
,
endl
,
flush
osio
.
ConnectToFile
(
'parser.osio.log'
)
green
=
osio
.
mg
except
:
osio
=
endl
=
flush
=
None
green
=
None
parser_info
=
{
"name"
:
"parser-lib-atoms"
,
"version"
:
"0.0"
,
"json"
:
"../../../../nomad-meta-info/meta_info/nomad_meta_info/lib_atoms.nomadmetainfo.json"
}
# LOGGING
def
log
(
msg
,
highlight
=
None
,
enter
=
endl
):
if
osio
:
if
highlight
==
None
:
hightlight
=
osio
.
ww
osio
<<
highlight
<<
msg
<<
enter
return
# CONTEXT GUARD
@
contextmanager
def
open_section
(
p
,
name
):
gid
=
p
.
openSection
(
name
)
yield
gid
p
.
closeSection
(
name
,
gid
)
def
push
(
jbe
,
terminal
,
key1
,
fct
=
lambda
x
:
x
.
As
(),
key2
=
None
):
if
key2
==
None
:
key2
=
key1
value
=
fct
(
terminal
[
key2
])
jbe
.
addValue
(
key1
,
value
)
return
value
def
push_array
(
jbe
,
terminal
,
key1
,
fct
=
lambda
x
:
x
.
As
(),
key2
=
None
):
if
key2
==
None
:
key2
=
key1
value
=
np
.
asarray
(
fct
(
terminal
[
key2
]))
jbe
.
addArrayValues
(
key1
,
value
)
return
value
def
push_value
(
jbe
,
value
,
key
):
jbe
.
addValue
(
key
,
value
)
return
value
def
push_array_values
(
jbe
,
value
,
key
):
jbe
.
addArrayValues
(
key
,
value
)
return
value
def
parse
(
output_file_name
):
jbe
=
JsonParseEventsWriterBackend
(
meta_info_env
)
jbe
.
startedParsingSession
(
output_file_name
,
parser_info
)
base_dir
=
os
.
path
.
dirname
(
os
.
path
.
abspath
(
output_file_name
))
terminal
=
LibAtomsParser
(
osio
)
terminal_trj
=
LibAtomsTrajectory
(
osio
)
terminal_trj
.
ParseOutput
(
output_file_name
)
out
=
terminal
trj
=
terminal_trj
log
(
"Start parsing ..."
)
log
(
"Base directory = '%s'"
%
base_dir
)
with
open_section
(
jbe
,
'section_run'
)
as
gid_run
:
push
(
jbe
,
trj
,
'program_name'
)
push
(
jbe
,
trj
,
'program_version'
)
jbe
.
finishedParsingSession
(
"ParseSuccess"
,
None
)
return
if
__name__
==
'__main__'
:
# CALCULATE PATH TO META-INFO FILE
this_py_file
=
os
.
path
.
abspath
(
__file__
)
this_py_dirname
=
os
.
path
.
dirname
(
this_py_file
)
json_supp_file
=
parser_info
[
"json"
]
meta_info_path
=
os
.
path
.
normpath
(
os
.
path
.
join
(
this_py_dirname
,
json_supp_file
))
# LOAD META-INFO FILE
log
(
"Meta-info from '%s'"
%
meta_info_path
)
meta_info_env
,
warns
=
loadJsonFile
(
filePath
=
meta_info_path
,
dependencyLoader
=
None
,
extraArgsHandling
=
InfoKindEl
.
ADD_EXTRA_ARGS
,
uri
=
None
)
output_file_name
=
sys
.
argv
[
1
]
parse
(
output_file_name
)
parser/parser-lib-atoms/libLibAtomsParser.py
View file @
12e80786
from
__future__
import
print_function
from
builtins
import
zip
from
builtins
import
str
from
builtins
import
map
from
builtins
import
range
from
builtins
import
object
import
os
import
sys
import
re
import
numpy
as
np
try
:
import
ase
import
ase.io
HAVE_ASE
=
True
except
ImportError
:
HAVE_ASE
=
False
pass
class
LibAtomsParser
(
object
):
def
__init__
(
self
,
log
=
None
):
self
.
log
=
log
self
.
data
=
{}
self
.
logtag
=
'main'
# KEY DEFAULT DICTIONARIES
self
.
missing_keys_lh
=
[]
# Transform keys that were not found in output
self
.
missing_keys_rh
=
[]
self
.
ignored_keys
=
[]
# Raw keys that did not have a transform
self
.
keys_not_found
=
[]
# Searches that failed
return
def
__getitem__
(
self
,
key
):
self
.
selected_data_item
=
self
.
data
[
key
]
return
self
def
As
(
self
,
typ
=
None
):
if
typ
==
None
:
typ
=
type
(
self
.
selected_data_item
)
return
typ
(
self
.
selected_data_item
)
def
SummarizeKeyDefaults
(
self
):
if
not
self
.
log
:
return
if
len
(
self
.
missing_keys_lh
):
self
.
log
<<
self
.
log
.
my
\
<<
"[%s] Keys from transformation maps that went unused (=> set to 'None'):"
\
%
self
.
logtag
<<
self
.
log
.
endl
for
lh
,
rh
in
zip
(
self
.
missing_keys_lh
,
self
.
missing_keys_rh
):
self
.
log
<<
self
.
log
.
item
<<
"Key = %-25s <> %25s"
%
(
rh
,
lh
)
<<
self
.
log
.
endl
if
len
(
self
.
ignored_keys
):
self
.
log
<<
self
.
log
.
mb
\
<<
"[%s] Keys from XY mapping that were not transformed (=> not stored):"
\
%
self
.
logtag
<<
self
.
log
.
endl
for
key
in
self
.
ignored_keys
:
self
.
log
<<
self
.
log
.
item
<<
"Key ="
<<
key
<<
self
.
log
.
endl
if
len
(
self
.
keys_not_found
):
self
.
log
<<
self
.
log
.
mr
\
<<
"[%s] Keys from searches that failed (=> set to 'None'):"
\
%
self
.
logtag
<<
self
.
log
.
endl
for
key
in
self
.
keys_not_found
:
self
.
log
<<
self
.
log
.
item
<<
"Key ="
<<
key
<<
self
.
log
.
endl
return
def
Set
(
self
,
key
,
value
):
if
self
.
log
:
self
.
log
<<
"Set [%s] %-40s = %s"
%
(
self
.
logtag
,
key
,
str
(
value
))
<<
self
.
log
.
endl
if
key
not
in
self
.
data
:
self
.
data
[
key
]
=
value
else
:
raise
KeyError
(
"Key already exists: '%s'"
%
key
)
return
def
SearchMapKeys
(
self
,
expr
,
ln
,
keys
):
s
=
re
.
search
(
expr
,
ln
)
try
:
for
i
in
range
(
len
(
keys
)):
self
.
Set
(
keys
[
i
],
s
.
group
(
i
+
1
).
strip
())
except
AttributeError
:
for
i
in
range
(
len
(
keys
)):
self
.
Set
(
keys
[
i
],
None
)
self
.
keys_not_found
.
append
(
keys
[
i
])
return
def
ReadBlockXy
(
self
,
block
):
lns
=
block
.
lns
block_data
=
{}
for
ln
in
lns
:
ln
=
ln
.
replace
(
'
\n
'
,
''
)
if
ln
==
''
:
continue
if
':'
in
ln
:
sp
=
ln
.
split
(
':'
)
x
=
sp
[
0
].
strip
().
split
()
y
=
sp
[
1
].
strip
()
elif
'='
in
ln
:
sp
=
ln
.
split
(
'='
)
x
=
sp
[
0
].
strip
().
split
()
y
=
sp
[
1
].
strip
()
else
:
sp
=
ln
.
split
()
x
=
sp
[:
-
1
]
y
=
sp
[
-
1
]
key
=
''
for
i
in
range
(
len
(
x
)
-
1
):
xi
=
x
[
i
].
replace
(
'('
,
''
).
replace
(
')'
,
''
).
lower
()
key
+=
'%s_'
%
xi
key
+=
'%s'
%
x
[
-
1
].
replace
(
'('
,
''
).
replace
(
')'
,
''
).
lower
()
value
=
y
block_data
[
key
]
=
value
return
block_data
def
ApplyBlockXyData
(
self
,
block_data
,
key_map
):
for
key_in
in
key_map
:
key_out
=
key_map
[
key_in
]
if
key_in
not
in
block_data
:
# Missing key in output
self
.
missing_keys_lh
.
append
(
key_in
)
self
.
missing_keys_rh
.
append
(
key_out
)
value
=
None
else
:
value
=
block_data
[
key_in
]
if
key_out
==
None
:
key_out
=
key_in
self
.
Set
(
key_out
,
value
)
for
key
in
block_data
:
if
key
not
in
key_map
:
# Missing key in transform map
self
.
ignored_keys
.
append
(
key
)
return
def
ParseOutput
(
self
,
output_file
):
if
self
.
log
:
self
.
log
<<
self
.
log
.
mg
<<
"libAtomsParser::ParseOutput ..."
<<
self
.
log
.
endl
if
HAVE_ASE
:
read_fct
=
ase
.
io
.
read
read_fct_args
=
{
'index'
:
':'
}
else
:
raise
NotImplementedError
(
"None-ASE read function requested, but not yet available."
)
read_fct
=
None
read_fct_args
=
None
# PARSE CONFIGURATIONS
self
.
ase_configs
=
read_fct
(
output_file
,
**
read_fct_args
)
for
config
in
ase_configs
:
print
(
config
)
self
.
Set
(
'program_name'
,
'libAtoms'
)
self
.
Set
(
'program_version'
,
'n/a'
)
return
class
LibAtomsTrajectory
(
LibAtomsParser
):
def
__init__
(
self
,
log
=
None
):
super
(
LibAtomsTrajectory
,
self
).
__init__
(
log
)
self
.
ase_configs
=
None
self
.
frames
=
[]
def
ParseOutput
(
self
,
output_file
):
if
self
.
log
:
self
.
log
<<
self
.
log
.
mg
<<
"libAtomsParser::ParseOutput ..."
<<
self
.
log
.
endl
if
HAVE_ASE
:
read_fct
=
ase
.
io
.
read
read_fct_args
=
{
'index'
:
':'
}
else
:
raise
NotImplementedError
(
"None-ASE read function requested, but not yet available."
)
read_fct
=
None
read_fct_args
=
None
# PARSE CONFIGURATIONS
self
.
ase_configs
=
read_fct
(
output_file
,
**
read_fct_args
)
self
.
LoadAseConfigs
(
self
.
ase_configs
)
self
.
Set
(
'program_name'
,
'libAtoms'
)
self
.
Set
(
'program_version'
,
'n/a'
)
return
def
LoadAseConfigs
(
self
,
ase_configs
):
for
config
in
ase_configs
:
frame
=
LibAtomsFrame
(
self
.
log
)
frame
.
LoadAseConfig
(
config
)
self
.
frames
.
append
(
frame
)
if
self
.
log
:
log
<<
"Loaded %d configurations"
%
len
(
self
.
frames
)
<<
log
.
endl
return
class
LibAtomsFrame
(
LibAtomsParser
):
def
__init__
(
self
,
log
=
None
):
super
(
LibAtomsFrame
,
self
).
__init__
(
log
)
self
.
ase_config
=
None
def
LoadAseConfig
(
self
,
ase_config
):
self
.
ase_atoms
=
ase_config
return
# ===================
# FILE & BLOCK STREAM
# ===================
class
FileStream
(
object
):
def
__init__
(
self
,
filename
=
None
):
if
filename
:
self
.
ifs
=
open
(
filename
,
'r'
)
else
:
self
.
ifs
=
None
return
def
SkipTo
(
self
,
expr
):
while
True
:
ln
=
self
.
ifs
.
readline
()
if
expr
in
ln
:
break
if
self
.
all_read
():
break
return
ln
def
SkipToMatch
(
self
,
expr
):
while
True
:
ln
=
self
.
ifs
.
readline
()
m
=
re
.
search
(
expr
,
ln
)
if
m
:
return
ln
if
self
.
all_read
():
break
return
None
def
GetBlock
(
self
,
expr1
,
expr2
):
inside
=
False
outside
=
False
block
=
''
block_stream
=
BlockStream
()
while
True
:
last_pos
=
self
.
ifs
.
tell
()
ln
=
self
.
ifs
.
readline
()
if
expr1
in
ln
:
inside
=
True
if
expr2
in
ln
:
outside
=
True
if
inside
and
not
outside
:
# Inside the block
block
+=
ln
block_stream
.
append
(
ln
)
elif
inside
and
outside
:
self
.
ifs
.
seek
(
last_pos
)
# Block finished
break
else
:
# Block not started yet
pass
if
self
.
all_read
():
break
return
block_stream
def
GetBlockSequence
(
self
,
expr_start
,
expr_new
,
expr_end
,
remove_eol
=
True
,
skip_empty
=
True
):
inside
=
False
outside
=
False
# Setup dictionary to collect blocks
blocks
=
{
expr_start
:
[]
}
for
e
in
expr_new
:
blocks
[
e
]
=
[]
# Assume structure like this (i <> inside, o <> outside)
# Lines with 'i' get "eaten"
# """
# o ...
# i <expr_start>
# i ...
# i <expr_new[1]>
# i ...
# i <expr_new[0]>
# i ...
# o <expr_end>
# o ...
# """
key
=
None
while
True
:
# Log line position
last_pos
=
self
.
ifs
.
tell
()
ln
=
self
.
ifs
.
readline
()
# Figure out where we are
if
not
inside
and
expr_start
in
ln
:
#print "Enter", expr_start
inside
=
True
key
=
expr_start
new_block
=
BlockStream
(
key
)
blocks
[
key
].
append
(
new_block
)
for
expr
in
expr_new
:
if
inside
and
expr
in
ln
:
#print "Enter", expr
key
=
expr
new_block
=
BlockStream
(
key
)
blocks
[
key
].
append
(
new_block
)
if
inside
and
expr_end
!=
None
and
expr_end
in
ln
:
outside
=
True
if
inside
and
not
outside
:
# Inside a block
if
remove_eol
:
ln
=
ln
.
replace
(
'
\n
'
,
''
)
if
skip_empty
and
ln
==
''
:
pass
else
:
blocks
[
key
][
-
1
].
append
(
ln
)
elif
inside
and
outside
:
# All blocks finished
self
.
ifs
.
seek
(
last_pos
)
break
else
:
# No blocks started yet
pass
if
self
.
all_read
():
break
return
blocks
def
all_read
(
self
):
return
self
.
ifs
.
tell
()
==
os
.
fstat
(
self
.
ifs
.
fileno
()).
st_size
def
readline
(
self
):
return
ifs
.
readline
()
def
close
(
self
):
self
.
ifs
.
close
()
def
nextline
(
self
):
while
True
:
ln
=
self
.
ifs
.
readline
()
if
ln
.
strip
()
!=
''
:
return
ln
else
:
pass
if
self
.
all_read
():
break
return
ln
def
ln
(
self
):
return
self
.
nextline
()
def
sp
(
self
):
return
self
.
ln
().
split
()
def
skip
(
self
,
n
):
for
i
in
range
(
n
):
self
.
ln
()
return
class
BlockStream
(
FileStream
):
def
__init__
(
self
,
label
=
None
):
super
(
BlockStream
,
self
).
__init__
(
None
)
self
.
ifs
=
self
self
.
lns
=
[]
self
.
idx
=
0
self
.
label
=
label
def
append
(
self
,
ln
):
self
.
lns
.
append
(
ln
)
def
readline
(
self
):
if
self
.
all_read
():
return
''
ln
=
self
.
lns
[
self
.
idx
]
self
.
idx
+=
1
return
ln
def
all_read
(
self
):
return
self
.
idx
>
len
(
self
.
lns
)
-
1
def
tell
(
self
):
return
self
.
idx
def
cat
(
self
,
remove_eol
=
True
,
add_eol
=
False
):
cat
=
''
for
ln
in
self
.
lns
:
if
remove_eol
:
cat
+=
ln
.
replace
(
'
\n
'
,
''
)
elif
add_eol
:
cat
+=
ln
+
'
\n
'
else
:
cat
+=
ln
return
cat
parser/parser-lib-atoms/libMomo.py
View file @
12e80786
from
__future__
import
print_function
from
__future__
import
division
from
future
import
standard_library
standard_library
.
install_aliases
()
from
builtins
import
zip
from
builtins
import
str
from
past.utils
import
old_div
from
builtins
import
object
# See git https://github.com/capoe/momo.git
import
os
import
sys
import
subprocess
import
argparse
import
time
import
numpy
as
np
try
:
from
lxml
import
etree
except
ImportError
:
pass
boolean_dict
=
\
{
'true'
:
True
,
'1'
:
True
,
'yes'
:
True
,
'false'
:
False
,
'0'
:
False
,
'no'
:
False
,
'none'
:
False
}
# =============================================================================
# XML WRAPPERS
# =============================================================================
class
ExtendableNamespace
(
argparse
.
Namespace
):
def
AddNamespace
(
self
,
**
kwargs
):
for
name
in
kwargs
:
att
=
getattr
(
self
,
name
,
None
)
if
att
is
None
:
setattr
(
self
,
name
,
kwargs
[
name
])
else
:
setattr
(
self
,
name
,
kwargs
[
name
].
As
(
type
(
att
)))
return
def
Add
(
self
,
name
,
value
):
att
=
getattr
(
self
,
name
,
None
)
if
att
is
None
:
setattr
(
self
,
name
,
value
)
else
:
att
.
Add
(
name
,
value
)
return
value
def
GenerateTreeDict
(
tree
,
element
,
path
=
''
,
paths_rel_to
=
None
):
if
type
(
element
)
==
etree
.
_Comment
:
return
[],
{}
# Update path
if
path
==
''
:
if
element
.
tag
!=
paths_rel_to
:
path
+=
element
.
tag
else
:
path
+=
'/'
+
element
.
tag
# Containers for lower levels
tag_node
=
{}
nodes
=
[]
# Construct Node
xmlnode
=
XmlNode
(
element
,
path
)
# tree.getpath(element))
nodes
.
append
(
xmlnode
)
if
len
(
element
)
==
0
:
tag_node
[
path
]
=
xmlnode
#else:
# print "len 0", xmlnode.path
# tag_node[path] = xmlnode
# Iterate over children
for
child
in
element
:
child_elements
,
childtag_element
=
GenerateTreeDict
(
tree
,
child
,
path
)
nodes
=
nodes
+
child_elements
for
key
in
childtag_element
.
keys
():
if
key
in
tag_node
:
if
type
(
tag_node
[
key
])
!=
list
:
tag_node
[
key
]
=
[
tag_node
[
key
],
childtag_element
[
key
]
]
else
:
tag_node
[
key
].
append
(
childtag_element
[
key
])
else
:
tag_node
[
key
]
=
childtag_element
[
key
]
return
nodes
,
tag_node
def
NamespaceFromDict
(
tree_dict
):
nspace
=
ExtendableNamespace
()
for
key
in
tree_dict
.
keys
():
#print "NSPACE Path = %s" % key
sections
=
key
.
split
(
'/'
)
values
=
[
None
for
s
in
sections
]
values
[
-
1
]
=
tree_dict
[
key
]
add_to_nspace
=
nspace
for
s
,
v
in
zip
(
sections
,
values
):
if
v
==
None
:
#print "NSPACE Check for existing"
if
getattr
(
add_to_nspace
,
s
,
None
):
#print "NSPACE '%s' already exists" % s
add_to_nspace
=
getattr
(
add_to_nspace
,
s
,
None
)
else
:
#print "NSPACE '%s' created" % s
sub_nspace
=
ExtendableNamespace
()
add_to_nspace
=
add_to_nspace
.
Add
(
s
,
sub_nspace
)
else
:
#print "NSPACE '%s' value added" % s
add_to_nspace
.
Add
(
s
,
v
)
return
nspace
class
XmlTree
(
list
):
def
__init__
(
self
,
xmlfile
,
paths_rel_to
=
None
):
self
.
xmlfile
=
xmlfile
self
.
xtree
=
etree
.
parse
(
xmlfile
)
self
.
xroot
=
self
.
xtree
.
getroot
()
self
.
nodes
,
self
.
tag_node
=
GenerateTreeDict
(
self
.
xtree
,
self
.
xroot
,
''
,
paths_rel_to
)
self
.
xspace
=
NamespaceFromDict
(
self
.
tag_node
)
def
SelectByTag
(
self
,
tag
):
selection
=
[
e
for
e
in
self
.
nodes
if
e
.
tag
==
tag
]
return
selection
def
__getitem__
(
self
,
key
):
return
self
.
tag_node
[
key
]
def
keys
(
self
):
return
list
(
self
.
tag_node
.
keys
())
class
XmlNode
(
object
):
def
__init__
(
self
,
element
,
path
):
self
.
path
=
path
self
.
node
=
element
self
.
tag
=
element
.
tag
self
.
value
=
element
.
text
self
.
attributes
=
element
.
attrib
def
As
(
self
,
typ
):
if
typ
==
np
.
array
:
sps
=
self
.
value
.
split
()
return
typ
([
float
(
sp
)
for
sp
in
sps
])
elif
typ
==
bool
:
return
boolean_dict
.
get
(
self
.
value
.
lower
())
else
:
return
typ
(
self
.
value
)
def
AsArray
(
self
,
typ
,
sep
=
' '
,
rep
=
'
\t\n
'
):
for
r
in
rep
:
self
.
value
=
self
.
value
.
replace
(
r
,
sep
)
sp
=
self
.
value
.
split
(
sep
)
return
[
typ
(
s
)
for
s
in
sp
if
str
(
s
)
!=
''
]
def
SetNodeValue
(
self
,
new_value
):
self
.
value
=
new_value
if
self
.
node
!=
None
:
self
.
node
.
firstChild
.
nodeValue
=
new_value
return
def
__getitem__
(
self
,
key
):
return
self
.
node
.
get