Skip to content
Snippets Groups Projects
Commit d6e15e4b authored by Carl Poelking's avatar Carl Poelking
Browse files

Cross-references in backend topology.

parent 11db537f
No related branches found
No related tags found
No related merge requests found
...@@ -12,8 +12,8 @@ from contextlib import contextmanager ...@@ -12,8 +12,8 @@ from contextlib import contextmanager
from libDlPolyParser import * from libDlPolyParser import *
try: try:
from momo import osio, endl, flush from libMomo import osio, endl, flush
#osio.ConnectToFile('dl_poly.log') osio.ConnectToFile('parser.osio.log')
green = osio.mg green = osio.mg
except: except:
osio = endl = flush = None osio = endl = flush = None
...@@ -27,8 +27,9 @@ parser_info = { ...@@ -27,8 +27,9 @@ parser_info = {
} }
# LOGGING # LOGGING
def log(msg, highlight=osio.ww, enter=endl): def log(msg, highlight=None, enter=endl):
if osio: if osio:
if highlight==None: hightlight = osio.ww
osio << highlight << msg << enter osio << highlight << msg << enter
return return
...@@ -36,7 +37,7 @@ def log(msg, highlight=osio.ww, enter=endl): ...@@ -36,7 +37,7 @@ def log(msg, highlight=osio.ww, enter=endl):
@contextmanager @contextmanager
def open_section(p, name): def open_section(p, name):
gid = p.openSection(name) gid = p.openSection(name)
yield yield gid
p.closeSection(name, gid) p.closeSection(name, gid)
def push(jbe, terminal, key1, fct=lambda x: x.As(), key2=None): def push(jbe, terminal, key1, fct=lambda x: x.As(), key2=None):
...@@ -45,6 +46,10 @@ def push(jbe, terminal, key1, fct=lambda x: x.As(), key2=None): ...@@ -45,6 +46,10 @@ def push(jbe, terminal, key1, fct=lambda x: x.As(), key2=None):
jbe.addValue(key1, value) jbe.addValue(key1, value)
return value return value
def push_value(jbe, value, key):
jbe.addValue(key, value)
return value
def parse(output_file_name): def parse(output_file_name):
jbe = JsonParseEventsWriterBackend(meta_info_env) jbe = JsonParseEventsWriterBackend(meta_info_env)
jbe.startedParsingSession(output_file_name, parser_info) jbe.startedParsingSession(output_file_name, parser_info)
...@@ -54,7 +59,6 @@ def parse(output_file_name): ...@@ -54,7 +59,6 @@ def parse(output_file_name):
terminal_ctrls = DlPolyControls(osio) terminal_ctrls = DlPolyControls(osio)
terminal_ctrls.ParseControls(ctrl_file_name) terminal_ctrls.ParseControls(ctrl_file_name)
# PARSE OUTPUT / TOPOLOGY ... # PARSE OUTPUT / TOPOLOGY ...
output_file_name = 'OUTPUT'
terminal = DlPolyParser(osio) terminal = DlPolyParser(osio)
terminal.ParseOutput(output_file_name) terminal.ParseOutput(output_file_name)
# PARSE TRAJECTORY ... # PARSE TRAJECTORY ...
...@@ -72,7 +76,7 @@ def parse(output_file_name): ...@@ -72,7 +76,7 @@ def parse(output_file_name):
top = terminal.topology top = terminal.topology
trj = terminal_trj trj = terminal_trj
ofs = open('keys.log', 'w') ofs = open('parser.keys.log', 'w')
terminals = [ctr, out, top, trj] terminals = [ctr, out, top, trj]
for t in terminals: for t in terminals:
keys = sorted(t.data.keys()) keys = sorted(t.data.keys())
...@@ -82,35 +86,74 @@ def parse(output_file_name): ...@@ -82,35 +86,74 @@ def parse(output_file_name):
ofs.close() ofs.close()
# PUSH TO BACKEND # PUSH TO BACKEND
with open_section(jbe, 'section_run'): with open_section(jbe, 'section_run') as gid_run:
push(jbe, out, 'program_name') push(jbe, out, 'program_name')
push(jbe, out, 'program_version') push(jbe, out, 'program_version')
push(jbe, out, 'program_info', key2='program_version_date') push(jbe, out, 'program_info', key2='program_version_date')
with open_section(jbe, 'section_topology'): # TOPOLOGY SECTION
with open_section(jbe, 'section_topology') as gid_top:
# Cross-referencing is done on-the-fly (as gid's become available)
# a) <molecule_to_molecule_type> : shape=(number_of_topology_molecules, [<gid>])
# b) <atom_in_molecule_to_atom_type_ref> : shape=(number_of_atoms_in_molecule, [<gid>])
# c) <atom_to_molecule> : shape=(number_of_topology_atoms, [<molidx>, <atomidx>])
push(jbe, top, 'number_of_topology_molecules', lambda s: s.As(int)) push(jbe, top, 'number_of_topology_molecules', lambda s: s.As(int))
push(jbe, top, 'number_of_topology_atoms', lambda s: s.As(int)) push(jbe, top, 'number_of_topology_atoms', lambda s: s.As(int))
# Molecule types
for mol in top.molecules:
with open_section(jbe, 'section_molecule_type'):
push(jbe, mol, 'molecule_type_name')
push(jbe, mol, 'number_of_atoms_in_molecule', lambda s: s.As(int))
# TODO settings_atom_... is abstract type => set atom_in_molecule_charge via list,
# TODO same for atom_in_molecule_name
# TODO atom_to_molecule, molecule_to_molecule_type, atom_in_molecule_to_atom_type_ref
for atom in mol.atoms:
with open_section(jbe, 'settings_atom_in_molecule'):
push(jbe, atom, 'atom_in_molecule_charge', lambda s: s.As(float), 'atom_charge')
push(jbe, atom, 'atom_in_molecule_name', lambda s: s.As(), 'atom_name')
# Atom types # Atom types
mol_type_atom_type_id_to_atom_type_gid = {}
for mol in top.molecules: for mol in top.molecules:
mol_name = mol['molecule_type_name'].As()
mol_type_atom_type_id_to_atom_type_gid[mol_name] = {}
for atom in mol.atoms: for atom in mol.atoms:
with open_section(jbe, 'section_atom_type'): # Add type
with open_section(jbe, 'section_atom_type') as gid_atom:
atom_id = atom['atom_id'].As(int)
mol_type_atom_type_id_to_atom_type_gid[mol_name][atom_id] = gid_atom
push(jbe, atom, 'atom_type_name', lambda s: s.As(), 'atom_name') push(jbe, atom, 'atom_type_name', lambda s: s.As(), 'atom_name')
push(jbe, atom, 'atom_type_mass', lambda s: s.As(float), 'atom_mass') push(jbe, atom, 'atom_type_mass', lambda s: s.As(float), 'atom_mass')
push(jbe, atom, 'atom_type_charge', lambda s: s.As(float), 'atom_charge') push(jbe, atom, 'atom_type_charge', lambda s: s.As(float), 'atom_charge')
# Molecule types
molecule_type_name_to_type_gid = {}
for mol in top.molecules:
mol_name = mol['molecule_type_name'].As()
# Extract references of atoms to atom types
atom_type_id_to_atom_type_gid = mol_type_atom_type_id_to_atom_type_gid[mol_name]
atom_gid_list = []
for atom in mol.atoms:
atom_id = atom['atom_id'].As(int)
atom_gid_list.append(atom_type_id_to_atom_type_gid[atom_id])
# Add molecule
with open_section(jbe, 'section_molecule_type') as gid_mol:
molecule_type_name_to_type_gid[mol['molecule_type_name'].As()] = gid_mol
push(jbe, mol, 'molecule_type_name')
push(jbe, mol, 'number_of_atoms_in_molecule', lambda s: s.As(int))
push(jbe, mol, 'atom_in_molecule_name')
push(jbe, mol, 'atom_in_molecule_charge')
push_value(jbe, atom_gid_list, 'atom_in_molecule_to_atom_type_ref')
# Global molecule type map
molecule_to_molecule_type = []
for mol in top.molecules:
type_name_this_mol = mol['molecule_type_name'].As()
type_gid_this_mol = molecule_type_name_to_type_gid[type_name_this_mol]
n_this_mol = mol['number_of_molecules'].As(int)
for i in range(n_this_mol):
molecule_to_molecule_type.append(type_gid_this_mol)
push_value(jbe, molecule_to_molecule_type, 'molecule_to_molecule_type_map')
# Global atom map
atoms_to_molidx_atomidx = []
molidx = 0
for mol in top.molecules:
n_mol = mol['number_of_molecules'].As(int)
for i in range(n_mol):
atomidx = 0
for atom in mol.atoms:
molidx_atomidx = [ molidx, atomidx ]
atoms_to_molidx_atomidx.append(molidx_atomidx)
atomidx += 1
molidx += 1
push_value(jbe, atoms_to_molidx_atomidx, 'atom_to_molecule')
# SAMPLING-METHOD SECTION
with open_section(jbe, 'section_sampling_method'): with open_section(jbe, 'section_sampling_method'):
# Ensemble # Ensemble
ensemble = push(jbe, out, 'ensemble_type', lambda s: s.As().split()[0].upper()) ensemble = push(jbe, out, 'ensemble_type', lambda s: s.As().split()[0].upper())
...@@ -128,6 +171,7 @@ def parse(output_file_name): ...@@ -128,6 +171,7 @@ def parse(output_file_name):
push(jbe, out, 'barostat_tau', lambda s: s.As(float)) push(jbe, out, 'barostat_tau', lambda s: s.As(float))
pass pass
# FRAME-SEQUENCE SECTION
with open_section(jbe, 'section_frame_sequence'): with open_section(jbe, 'section_frame_sequence'):
pass pass
...@@ -150,9 +194,7 @@ if __name__ == '__main__': ...@@ -150,9 +194,7 @@ if __name__ == '__main__':
extraArgsHandling=InfoKindEl.ADD_EXTRA_ARGS, extraArgsHandling=InfoKindEl.ADD_EXTRA_ARGS,
uri=None) uri=None)
log("Parsing ...", green)
output_file_name = sys.argv[1] output_file_name = sys.argv[1]
parse(output_file_name) parse(output_file_name)
log("... Done.", green)
...@@ -404,7 +404,7 @@ class DlPolyControls(DlPolyParser): ...@@ -404,7 +404,7 @@ class DlPolyControls(DlPolyParser):
return return
def ParseControls(self, ctrl_file): def ParseControls(self, ctrl_file):
if self.log: if self.log:
self.log << self.log.endl << self.log.mg << "Start controls ..." << self.log.endl self.log << self.log.mg << "Start controls ..." << self.log.endl
ifs = FileStream(ctrl_file) ifs = FileStream(ctrl_file)
while not ifs.all_read(): while not ifs.all_read():
ln = ifs.ln() ln = ifs.ln()
...@@ -575,6 +575,11 @@ class DlPolyMolecule(DlPolyParser): ...@@ -575,6 +575,11 @@ class DlPolyMolecule(DlPolyParser):
break break
assert atom_count == n_atoms assert atom_count == n_atoms
atom_charges = [ atom['atom_charge'].As(float) for atom in self.atoms ]
atom_names = [ atom['atom_name'].As() for atom in self.atoms ]
self.Set('atom_in_molecule_charge', atom_charges)
self.Set('atom_in_molecule_name', atom_names)
# TODO Parse interactions # TODO Parse interactions
return return
...@@ -582,7 +587,7 @@ class DlPolyMolecule(DlPolyParser): ...@@ -582,7 +587,7 @@ class DlPolyMolecule(DlPolyParser):
class DlPolyAtom(DlPolyParser): class DlPolyAtom(DlPolyParser):
def __init__(self, atom_properties, atom_property_labels, parser): def __init__(self, atom_properties, atom_property_labels, parser):
super(DlPolyAtom, self).__init__(parser.log) super(DlPolyAtom, self).__init__(parser.log)
if not self.log.debug: self.log = None if self.log and not self.log.debug: self.log = None
self.logtag = 'atm' self.logtag = 'atm'
for value, label in zip(atom_properties, atom_property_labels): for value, label in zip(atom_properties, atom_property_labels):
self.Set(label, value) self.Set(label, value)
......
# See git https://github.com/capoe/momo.git
import os
import sys
import commands
import argparse
import time
import numpy as np
try:
from lxml import etree
except ImportError:
pass
boolean_dict = \
{'true' : True, '1' : True, 'yes' : True,
'false' : False, '0' : False, 'no' : False, 'none' : False }
# =============================================================================
# XML WRAPPERS
# =============================================================================
class ExtendableNamespace(argparse.Namespace):
def AddNamespace(self, **kwargs):
for name in kwargs:
att = getattr(self, name, None)
if att is None:
setattr(self, name, kwargs[name])
else:
setattr(self, name, kwargs[name].As(type(att)))
return
def Add(self, name, value):
att = getattr(self, name, None)
if att is None:
setattr(self, name, value)
else:
att.Add(name, value)
return value
def GenerateTreeDict(tree, element, path='', paths_rel_to=None):
if type(element) == etree._Comment: return [], {}
# Update path
if path == '':
if element.tag != paths_rel_to:
path += element.tag
else:
path += '/' + element.tag
# Containers for lower levels
tag_node = {}
nodes = []
# Construct Node
xmlnode = XmlNode(element, path) # tree.getpath(element))
nodes.append(xmlnode)
if len(element) == 0:
tag_node[path] = xmlnode
#else:
# print "len 0", xmlnode.path
# tag_node[path] = xmlnode
# Iterate over children
for child in element:
child_elements, childtag_element = GenerateTreeDict(tree, child, path)
nodes = nodes + child_elements
for key in childtag_element.keys():
if tag_node.has_key(key):
if type(tag_node[key]) != list:
tag_node[key] = [ tag_node[key], childtag_element[key] ]
else:
tag_node[key].append(childtag_element[key])
else:
tag_node[key] = childtag_element[key]
return nodes, tag_node
def NamespaceFromDict(tree_dict):
nspace = ExtendableNamespace()
for key in tree_dict.keys():
#print "NSPACE Path = %s" % key
sections = key.split('/')
values = [ None for s in sections ]
values[-1] = tree_dict[key]
add_to_nspace = nspace
for s,v in zip(sections, values):
if v == None:
#print "NSPACE Check for existing"
if getattr(add_to_nspace, s, None):
#print "NSPACE '%s' already exists" % s
add_to_nspace = getattr(add_to_nspace, s, None)
else:
#print "NSPACE '%s' created" % s
sub_nspace = ExtendableNamespace()
add_to_nspace = add_to_nspace.Add(s, sub_nspace)
else:
#print "NSPACE '%s' value added" % s
add_to_nspace.Add(s, v)
return nspace
class XmlTree(list):
def __init__(self, xmlfile, paths_rel_to=None):
self.xmlfile = xmlfile
self.xtree = etree.parse(xmlfile)
self.xroot = self.xtree.getroot()
self.nodes, self.tag_node = GenerateTreeDict(self.xtree, self.xroot, '', paths_rel_to)
self.xspace = NamespaceFromDict(self.tag_node)
def SelectByTag(self, tag):
selection = [ e for e in self.nodes if e.tag == tag ]
return selection
def __getitem__(self, key):
return self.tag_node[key]
def keys(self):
return self.tag_node.keys()
class XmlNode(object):
def __init__(self, element, path):
self.path = path
self.node = element
self.tag = element.tag
self.value = element.text
self.attributes = element.attrib
def As(self, typ):
if typ == np.array:
sps = self.value.split()
return typ([ float(sp) for sp in sps ])
elif typ == bool:
return boolean_dict.get(self.value.lower())
else:
return typ(self.value)
def AsArray(self, typ, sep=' ', rep='\t\n'):
for r in rep:
self.value = self.value.replace(r, sep)
sp = self.value.split(sep)
return [ typ(s) for s in sp if str(s) != '' ]
def SetNodeValue(self, new_value):
self.value = new_value
if self.node != None:
self.node.firstChild.nodeValue = new_value
return
def __getitem__(self, key):
return self.node.get(key)
# =============================================================================
# COMMAND LINE & XML INPUT INTERFACE
# =============================================================================
class CLIO_HelpFormatter(argparse.HelpFormatter):
def _format_usage(self, usage, action, group, prefix):
return "%s : Command Line Interface\n" % sys.argv[0]
class OptionsInterface(object):
def __init__(self):
# COMMAND-LINE ARGUMENTS
self.is_connected_to_cmd_ln = False
self.cmd_ln_args = None
self.cmd_ln_opts = None
self.cmd_ln_nicknames = [ '-h' ]
self.boolean_translator = boolean_dict
self.subtype = str
# XML OPTIONS FILE
self.is_connected_to_xml = False
self.xmlfile = None
self.tree = None
self.xdict = None
self.xspace = None
# JOINED OPTIONS
self.opts = ExtendableNamespace()
def Connect(self, xmlfile=None):
self.ConnectToCmdLn()
self.ConnectToOptionsFile(xmlfile)
def Parse(self, xkey='options'):
if self.is_connected_to_cmd_ln:
self.ParseCmdLn()
if self.is_connected_to_xml:
self.ParseOptionsFileXml(xkey)
if self.is_connected_to_cmd_ln and not self.is_connected_to_xml:
return self.cmd_ln_opts
elif self.is_connected_to_xml and not self.is_connected_to_cmd_ln:
return self.xspace
else:
return self.cmd_ln_opts, self.xspace
def ParseOptionsFile(self, xmlfile, xkey):
self.xmlfile = xmlfile
self.is_connected_to_xml = True
self.ParseOptionsFileXml(xkey)
return self.xspace
# COMMAND-LINE CONVENIENCE ================================================
def __call__(self):
return self.cmd_ln_opts
def ConnectToCmdLn(self, prog=sys.argv[0], descr=None):
self.cmd_ln_args = argparse.ArgumentParser(prog=sys.argv[0],
formatter_class=lambda prog: CLIO_HelpFormatter(prog,max_help_position=70))
self.is_connected_to_cmd_ln = True
return
def ParseCmdLn(self):
self.cmd_ln_opts = self.cmd_ln_args.parse_args()
def InterpretAsBoolean(self, expr):
try:
return self.boolean_translator.get(expr.lower())
except KeyError:
raise ValueError('CLIO does not know how to convert %s into a boolean.' % expr)
def InterpretAsNumpyArray(self, expr):
print "Interpret", expr
array = [ float(e) for e in expr ]
array = np.array(array)
return array
def InterpretAsList(self, expr):
array = [ self.subtype(e) for e in expr ]
return array
def AddArg(self, name, typ=str, nickname=None,
default=None, destination=None, help=None):
# Sort out <name> (e.g. --time) vs <destination> (e.g., time)
if '--' != name[0:2]:
dest = name
name = '--' + name
else:
dest = name[2:]
# Sort out <default> vs <required>
if default == None: required = True
else: required = False
# Construct default <help> it not given
if help == None: help = str(typ)
# Construct <nickname> if not given
if nickname == None:
nickname = '-'
for char in dest:
nickname += char
if not nickname in self.cmd_ln_nicknames:
break
if nickname in self.cmd_ln_nicknames:
raise ValueError('CLIO could not construct nickname from %s option'\
% name)
self.cmd_ln_nicknames.append(nickname)
# Process type
if typ in [int, float, str]:
nargs = None
elif typ == bool:
typ = self.InterpretAsBoolean
nargs = None
elif typ == np.array:
raise NotImplementedError
typ = float # self.InterpretAsNumpyArray
nargs = 3
elif typ == list:
typ = str
nargs = '*'
elif len(typ) == 2 and typ[0] == list:
typ = typ[1]
nargs = '*'
else:
raise NotImplementedError("CLIO does not know how to generate type '%s'"\
% typ)
self.cmd_ln_args.add_argument(nickname, name,
dest=dest,
action='store',
nargs=nargs,
required=required,
type=typ,
metavar=dest[0:1].upper(),
default=default,
help=help)
return
# COMMAND-LINE CONVENIENCE ================================================
def ConnectToOptionsFile(self, xmlfile):
if xmlfile == None or xmlfile == '': return
self.xmlfile = xmlfile
self.is_connected_to_xml = True
return
def ParseOptionsFileXml(self, xkey='options'):
if self.xmlfile == None: return
self.tree = XmlTree(self.xmlfile, paths_rel_to=xkey)
self.xdict = self.tree.tag_node
self.xspace = self.tree.xspace
return
def __getitem__(self, key):
try:
return self.xspace.__dict__[key]
except KeyError:
return self.cmd_ln_opts.__dict__[key]
except KeyError:
raise AttributeError('No such option registered: \'%s\'' % key)
return None
# =============================================================================
# OS SHELL INTERFACE
# =============================================================================
class ShellInterface(object):
def __init__(self):
# =====================================================================
# PRINTER ATTRIBUTES
self.color_dict = { \
'pp' : '\033[95m',
'mb' : '\033[34m',
'lb' : '\033[1;34m',
'my' : '\033[1;33m',
'mg' : '\033[92m',
'mr' : '\033[91m',
'ww' : '\033[0;1m',
'ok' : '\033[92m',
'xx' : '\033[91m',
'warning' : '\033[93m',
'error' : '\033[95m',
'endcolor' : '\033[0;1m' }
self.justify_dict = { \
'o' : ' o ',
'.' : '... ',
'r' : '\r',
'ro' : '\r o '}
self.pp = OS_COLOR('pp')
self.lb = OS_COLOR('lb')
self.mb = OS_COLOR('mb')
self.mg = OS_COLOR('mg')
self.my = OS_COLOR('my')
self.mr = OS_COLOR('mr')
self.ww = OS_COLOR('ww')
self.ok = OS_COLOR('ok')
self.xx = OS_COLOR('xx')
self.colors = [ OS_COLOR(c) for c in sorted(self.color_dict.keys()) ]
self.item = ' o '
self.iitem = ' - '
self.endl = OS_LINE_CHAR('\n')
self.flush = OS_LINE_CHAR('')
self.back = OS_LINE_CHAR('\r')
self.trail = ' '
# CURRENT STYLE SELECTION
self.sel_color = None
self.sel_justify = None
self.sel_header = False
self.sel_trim = "="
# EXE ATTRIBUTES
self.catch = OS_EXE_CATCH()
self.assert_zero = OS_EXE_ASSERT()
self.dev = OS_EXE_DEV('')
self.nodev = OS_EXE_DEV('')
self.devnull = OS_EXE_DEV()
self.devfile = OS_EXE_DEV('')
self.os_exe_get = False
self.os_exe_assert_zero = False
self.os_exe_dev = ''
self.os_exe_verbose = False
# LOGGING
self.logfile = None
# DIRECTORY HOPPING
self.paths_visited = [ os.getcwd() ]
self.exe_root_path = self.paths_visited[0]
self.N_store_paths_visited = 1+5
def __call__(self, mssg, c=None, j=None, h=False, t="="):
# c=color, j=justify, h=header, t=trim, u=upper-case
if j:
mssg = self.justify_dict[j] + mssg
if c != None:
mssg = self.color_dict[c] + mssg + self.color_dict['endcolor']
if h:
mssg = self.os_generate_header(mssg, t)
print mssg
# LOGFILE ADAPTOR =========================================================
def ConnectToFile(self, logfile):
self.logfile = logfile
sys.stdout = open(logfile, 'w')
self.devfile = OS_EXE_DEV(' >> {log} 2>> {log}'.format(log=logfile))
return
def DisconnectFromFile(self):
if self.logfile != None:
self.devfile = OS_EXE_DEV('')
self.logfile = None
sys.stdout = sys.__stdout__
else: pass
return
# PRINTER METHODS =========================================================
def __lshift__(self, mssg):
if type(mssg) == OS_LINE_CHAR:
# <LOG MESSAGE HERE>
sys.stdout.write(str(mssg))
sys.stdout.flush()
self.sel_color = None
return self
elif type(mssg) == OS_COLOR:
self.sel_color = str(mssg)
return self
mssg = str(mssg)
if self.sel_justify != None:
mssg = self.justify_dict[j] + mssg
mssg += self.trail
if self.sel_color != None:
mssg = self.color_dict[self.sel_color] \
+ mssg + self.color_dict['endcolor']
if self.sel_header:
mssg = self.os_generate_header(mssg, self.sel_trim)
# <LOG MESSAGE HERE>
sys.stdout.write(mssg)
return self
def os_print(self, mssg, c=None, j=None, h=False, t="="):
# c=color, j=justify, h=header, t=trim, u=upper-case
if j:
mssg = OS_JUSTIFY_DICT[j] + mssg
if c != None:
mssg = OS_COLOR_DICT[c] + mssg + OS_COLOR_DICT['endcolor']
if h:
mssg = os_generate_header(mssg, t)
print mssg
return
def os_print_config(self, c=None, j=None, h=False, t="=", tl=' '):
self.sel_color = c
self.sel_justify = j
self.sel_header = h
self.sel_trim = t
self.trail = tl
return
def os_print_reset(self):
self.sel_color = None
self.sel_justify = None
self.sel_header = False
self.sel_trim = "="
self.trail = ' '
return
def os_generate_header(title, trim="="):
try:
height, width = os.popen('stty size', 'r').read().split()
width = int(width)
leftright = int((width - len(title)-2)/2)
except ValueError:
leftright = 40
return trim*leftright + " " + title + " " + trim*leftright
# EXECUTE METHODS =========================================================
def __rshift__(self, cmmd):
if type(cmmd) == OS_EXE_CATCH:
self.os_exe_get = True
return self
elif type(cmmd) == OS_EXE_DEV:
self.dev = cmmd
return self
elif type(cmmd) == OS_EXE_ASSERT:
self.os_exe_assert_zero = True
return self
# Redirect command as requested (not applicable if output is asked for)
if not self.os_exe_get:
if str(self.dev) != '':
cmmd += str(self.dev)
self.dev = self.nodev
else:
cmmd += str(self.devfile)
# Execute
if self.debug: self << self.my << "exe:" << cmmd << endl
if self.os_exe_get:
output = commands.getoutput(cmmd)
self.os_exe_get = False
return output
else:
sign = os.system(cmmd)
if self.os_exe_assert_zero:
if str(sign) != '0':
raise RuntimeError("<OSIO> '%s' returned '%s'" % (cmmd, sign))
self.os_exe_assert_zero = False
return sign
# EXIT PROTOCOLS ==========================================================
def okquit(self, what=''):
if what != '': self << self.ok << what << self.endl
self.DisconnectFromFile()
sys.exit(0)
return
def xxquit(self, what=''):
if what != '': self << self.xx << "ERROR" << what << self.endl
self.DisconnectFromFile()
sys.exit(1)
return
# DIRECTORY HOPPING =======================================================
def cd(self, d):
# Current working directory, for archiving ... =>
cwd = os.getcwd()
if type(d) == int:
# Change to previously visited path
os.chdir(self.paths_visited[d])
elif type(d) == str:
# Change to path as specified explicitly
os.chdir(d)
else:
raise NotImplementedError
# <= ... previous path
self.paths_visited.append(cwd)
if len(self.paths_visited) > self.N_store_paths_visited:
self.paths_visited.pop(1) # 0 stores root
if self.debug: self << self.my << "cd: " << os.getcwd() << self.endl
return
def pwd(self):
return self.cwd()
def cwd(self):
return os.getcwd()
def root(self):
self.cd(self.exe_root_path)
return
def abspath(self, file):
if not os.path.exists(file):
raise IOError("<osio::abspath> No such item in local directory: '%s'" % file)
return os.path.join(self.cwd(), file)
def mkcd(self, directory):
self >> self.assert_zero >> 'mkdir -p %s' % directory
self.cd(directory)
return directory
class OS_EXE_DEV(object):
def __init__(self, dev=' > /dev/null 2> /dev/null'):
self.dev = dev
def __str__(self):
return self.dev
class OS_EXE_CATCH(object):
def __init__(self):
self.catch = True
class OS_EXE_ASSERT(object):
def __init__(self):
self.assert_0 = True
class OS_COLOR(object):
def __init__(self, colstr):
self.colstr = colstr
def __str__(self):
return self.colstr
class OS_LINE_CHAR(object):
def __init__(self, char):
self.char = char
def __str__(self):
return self.char
class OSIO(ShellInterface, OptionsInterface):
def __init__(self):
self.debug = False
ShellInterface.__init__(self)
OptionsInterface.__init__(self)
return
def sleep(self, dt):
time.sleep(dt)
return
osio = OSIO()
endl = OS_LINE_CHAR('\n')
flush = OS_LINE_CHAR('')
back = OS_LINE_CHAR('\r')
catch = OS_EXE_CATCH()
devnull = OS_EXE_DEV()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment