Source code for quippy.cinoutput

# HQ XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
# HQ X
# HQ X   quippy: Python interface to QUIP atomistic simulation library
# HQ X
# HQ X   Copyright James Kermode 2010
# HQ X
# HQ X   These portions of the source code are released under the GNU General
# HQ X   Public License, version 2, http://www.gnu.org/copyleft/gpl.html
# HQ X
# HQ X   If you would like to license the source code under different terms,
# HQ X   please contact James Kermode, james.kermode@gmail.com
# HQ X
# HQ X   When using this software, please cite the following reference:
# HQ X
# HQ X   http://www.jrkermode.co.uk/quippy
# HQ X
# HQ XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

from quippy import _cinoutput
from quippy._cinoutput import *

from quippy import netcdf_file, get_fortran_indexing
from quippy.system import INPUT, OUTPUT, INOUT
from quippy.atoms import Atoms
from quippy.io import AtomsReaders, AtomsWriters, atoms_reader
from quippy.farray import farray, padded_str_array
from quippy.extendable_str import Extendable_str
import numpy as np
import __builtin__
import itertools

__doc__ = _cinoutput.__doc__
__all__ = _cinoutput.__all__ + ['CInOutputReader', 'CInOutputWriter']

class CInOutput(_cinoutput.CInOutput):
    def __init__(self, filename=None, action=INPUT, append=False, netcdf4=False, no_compute_index=None,
                 frame=None, one_frame_per_file=None, mpi=None, zero=False, range=None, indices=None,
                 fpointer=None, finalise=True, string=False):

        self.string = None
        if string:
            self.string = filename
            filename = None
        _cinoutput.CInOutput.__init__(self, filename, action, append, netcdf4, no_compute_index,
                                      frame, one_frame_per_file, mpi, fpointer=fpointer, finalise=finalise)
        self.zero = zero
        self.range = range
        self.indices = indices

    __init__.__doc__ = _cinoutput.CInOutput.__init__.__doc__

    def __len__(self):
        if self.got_index:
            return int(self.n_frame)
        else:
            raise AttributeError('This CInOutput does not have an index')

    def __getitem__(self, index):
        return self.read(frame=index, zero=self.zero, range=self.range, indices=self.indices)

    def __setitem__(self, index, value):
        self.write(value, frame=index)

    def read(self, properties=None, properties_array=None, frame=None,
             zero=None, range=None, str=None, estr=None, indices=None):
        at = Atoms()
        if range == 0 or range == 'empty':
            range = [-1,-1]

        if self.string is not None and str is None:
            str = self.string

        try:
            _cinoutput.CInOutput.read(self, at, properties=properties,
                                      properties_array=properties_array, frame=frame,
                                      zero=zero, range=range, str=str, estr=estr,
                                      indices=indices)
        except RuntimeError, re:
            if 'kind IO EOF' in __builtin__.str(re):
                raise EOFError(__builtin__.str(re))
            else:
                raise

        return at

    read.__doc__ = _cinoutput.CInOutput.read.__doc__

    def write(self, at, properties=None, prefix=None, int_format=None, real_format=None, frame=None,
              shuffle=None, deflate=None, deflate_level=None, estr=None, update_index=None):

        if properties is not None and (not hasattr(properties, 'dtype') or properties.dtype != dtype('S1')):
            properties = padded_str_array(properties, max([len(x) for x in properties])).T

        if self.string and estr is None:
            estr = Extendable_str()

        _cinoutput.CInOutput.write(self, at, properties_array=properties, prefix=prefix, int_format=int_format,
                                   real_format=real_format, frame=frame, shuffle=shuffle, deflate=deflate,
                                   deflate_level=deflate_level, estr=estr, update_index=update_index)
        if estr is not None:
            return str(estr)

    write.__doc__ = _cinoutput.CInOutput.write.__doc__


from quippy import FortranDerivedTypes
FortranDerivedTypes['type(cinoutput)'] = CInOutput

[docs]class CInOutputReader(object): """Class to read atoms from a CInOutput. Supports generator and random access via indexing.""" def __init__(self, source, frame=None, range=None, start=0, stop=None, step=1, no_compute_index=False, zero=False, one_frame_per_file=False, indices=None, string=False, format=None): if isinstance(source, basestring): self.opened = True self.source = CInOutput(source, action=INPUT, append=False, zero=zero, range=range, no_compute_index=no_compute_index, one_frame_per_file=one_frame_per_file, indices=indices, string=string) self.netcdf_file = None if self.source.string is None and source.endswith('.nc'): try: self.netcdf_file = netcdf_file(source) except AssertionError: pass else: self.opened = False self.source = source self.netcdf_file = None if frame is not None: self.start = frame self.stop = frame+1 self.step = 1 else: self.start = start if stop is None: try: stop = len(self.source) except AttributeError: stop = None self.stop = stop self.step = step def __iter__(self): frames = itertools.islice(itertools.count(0), self.start,self.stop,self.step) for frame in frames: try: yield self.source[frame] except EOFError: break def close(self): if self.opened: self.source.close() if self.netcdf_file: self.netcdf_file.close() self.netcdf_file = None def __len__(self): return len(self.source) def __getitem__(self, idx): return self.source[idx] def __getattr__(self, name): if self.netcdf_file is not None: try: return self.netcdf_file.__getattr__(name) except AttributeError: try: a = self.netcdf_file.variables[name][:] if get_fortran_indexing(): a = farray(a) return a except KeyError: raise AttributeError('Attribute %s not found' % name) else: raise AttributeError('Attribute %s not found' % name)
AtomsReaders['extxyz'] = AtomsReaders['xyz'] = AtomsReaders['nc'] = \ AtomsReaders[CInOutput] = CInOutputReader @atoms_reader('stdin') def CInOutputStdinReader(source='stdin', format=None): assert source == 'stdin' source = CInOutput(source, action=INPUT) while True: try: yield source.read() except (RuntimeError, EOFError): break
[docs]class CInOutputWriter(object): """Class to write atoms sequentially to a CInOutput stream""" def __init__(self, dest, append=False, netcdf4=False, one_frame_per_file=False, string=False, **write_kwargs): self.opened = False self.write_kwargs = {} self.write_kwargs.update(write_kwargs) if isinstance(dest, basestring): self.opened = True self.dest = CInOutput(dest, action=OUTPUT, append=append, netcdf4=netcdf4, one_frame_per_file=one_frame_per_file, string=string) else: self.dest = dest def write(self, at, **kwargs): kwargs.update(self.write_kwargs) return self.dest.write(at, **kwargs) def close(self): self.dest.close()
AtomsWriters['stdout'] = AtomsWriters['extxyz'] = AtomsWriters['xyz'] \ = AtomsWriters['nc'] = AtomsWriters[CInOutput] = CInOutputWriter class CInOutputStringReader(CInOutputReader): def __init__(self, source, frame=None, range=None, start=0, stop=None, step=1, no_compute_index=False, zero=False, one_frame_per_file=False, indices=None, format=None): CInOutputReader.__init__(self, source=source, frame=frame, range=range, start=start, stop=stop, step=step, no_compute_index=no_compute_index, zero=zero, one_frame_per_file=one_frame_per_file, indices=indices, string=True) AtomsReaders['string'] = CInOutputStringReader class CInOutputStringWriter(CInOutputWriter): def __init__(self, dest, append=False, netcdf4=False, one_frame_per_file=False, format=None, **write_kwargs): assert dest == 'string' CInOutputWriter.__init__(self, dest=dest, append=append, netcdf4=netcdf4, one_frame_per_file=one_frame_per_file, string=True, **write_kwargs) AtomsWriters['string'] = CInOutputStringWriter @atoms_reader('out') def EvalOutputReader(source, format=None): """ Reader for QUIP `eval` output files Extracts XYZ configurations from lines beginnging with "AT " """ lines = open(source, 'r').readlines() lines = [line[3:] for line in lines if line.startswith('AT ')] reader = CInOutputStringReader(''.join(lines)) for at in reader: yield at