Source code for cis_interface.communication.FileComm

import os
import tempfile
from cis_interface import backwards, platform
from cis_interface.communication import CommBase
from cis_interface.schema import register_component, inherit_schema
from cis_interface.serialize.DirectSerialize import DirectSerialize


[docs]@register_component class FileComm(CommBase.CommBase): r"""Class for handling I/O from/to a file on disk. >>> x = FileComm('test_send', address='test_file.txt', direction='send') >>> x.send('Test message') True >>> with open('test_file.txt', 'r') as fd: ... print(fd.read()) Test message >>> x = FileComm('test_recv', address='test_file.txt', direction='recv') >>> x.recv() (True, b'Test message') Args: name (str): The environment variable where communication address is stored. read_meth (str, optional): Method that should be used to read data from the file. Defaults to 'read'. Ignored if direction is 'send'. append (bool, optional): If True and writing, file is openned in append mode. Defaults to False. in_temp (bool, optional): If True, the path will be considered relative to the platform temporary directory. Defaults to False. open_as_binary (bool, optional): If True, the file is opened in binary mode. Defaults to True. newline (str, optional): String indicating a new line. Defaults to serialize._default_newline. is_series (bool, optional): If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series. The addressed is assumed to contain a format for the index of the file. Defaults to False. wait_for_creation (float, optional): Time (in seconds) that should be waited before opening for the file to be created if it dosn't exist. Defaults to 0 s and file will attempt to be opened immediately. **kwargs: Additional keywords arguments are passed to parent class. Attributes: fd (file): File that should be read/written. read_meth (str): Method that should be used to read data from the file. append (bool): If True and writing, file is openned in append mode. in_temp (bool): If True, the path will be considered relative to the platform temporary directory. open_as_binary (bool): If True, the file is opened in binary mode. newline (str): String indicating a new line. is_series (bool): If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series. platform_newline (str): String indicating a newline on the current platform. Raises: ValueError: If the read_meth is not one of the supported values. """ _filetype = 'binary' _datatype = {'type': 'bytes'} _schema_type = 'file' _schema_required = ['name', 'filetype', 'working_dir'] _schema_properties = inherit_schema( CommBase.CommBase._schema_properties, {'working_dir': {'type': 'string'}, 'filetype': {'type': 'string', 'default': _filetype}, 'append': {'type': 'boolean', 'default': False}, 'in_temp': {'type': 'boolean', 'default': False}, 'is_series': {'type': 'boolean', 'default': False}, 'wait_for_creation': {'type': 'float', 'default': 0.0}}, remove_keys=['commtype', 'datatype'], **DirectSerialize._schema_properties) _default_serializer = DirectSerialize _attr_conv = ['newline', 'platform_newline'] _default_extension = '.txt' is_file = True _maxMsgSize = 0 def __init__(self, *args, **kwargs): kwargs.setdefault('close_on_eof_send', True) return super(FileComm, self).__init__(*args, **kwargs) def _init_before_open(self, read_meth='read', open_as_binary=True, **kwargs): r"""Get absolute path and set attributes.""" super(FileComm, self)._init_before_open(**kwargs) # Process file class keywords if not hasattr(self, '_fd'): self._fd = None if read_meth not in ['read', 'readline']: raise ValueError("read_meth '%s' not supported." % read_meth) self.read_meth = read_meth self.platform_newline = platform._newline if self.in_temp: self.address = os.path.join(tempfile.gettempdir(), self.address) self.address = os.path.abspath(self.address) self.open_as_binary = open_as_binary self._series_index = 0 # Put string attributes in the correct format if self.open_as_binary: func_conv = backwards.as_bytes else: func_conv = backwards.as_unicode for k in self._attr_conv: v = getattr(self, k) if v is not None: setattr(self, k, func_conv(v))
[docs] @classmethod def get_testing_options(cls, read_meth='read', open_as_binary=True, **kwargs): r"""Method to return a dictionary of testing options for this class. Returns: dict: Dictionary of variables to use for testing. Key/value pairs: kwargs (dict): Keyword arguments for comms tested with the provided content. send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test file that was sent the messages in 'send'. contents (bytes): Bytes contents of test file created by sending the messages in 'send'. """ out = super(FileComm, cls).get_testing_options(**kwargs) out['kwargs']['read_meth'] = read_meth out['kwargs']['open_as_binary'] = open_as_binary if (read_meth == 'read') and isinstance(out['recv'][0], backwards.bytes_type): out['recv'] = [b''.join(out['recv'])] if not open_as_binary: out['contents'] = out['contents'].replace( backwards.match_stype(out['contents'], '\n'), backwards.match_stype(out['contents'], platform._newline)) return out
[docs] @classmethod def is_installed(cls, language=None): r"""Determine if the necessary libraries are installed for this communication class. Args: language (str, optional): Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. Returns: bool: Is the comm installed. """ # Filesystem is implied return True
[docs] @classmethod def underlying_comm_class(self): r"""str: Name of underlying communication class.""" return 'FileComm'
[docs] @classmethod def close_registry_entry(cls, value): r"""Close a registry entry.""" out = False if not value.closed: # pragma: debug value.close() out = True return out
[docs] @classmethod def new_comm_kwargs(cls, *args, **kwargs): r"""Initialize communication with new queue.""" kwargs.setdefault('address', 'file.txt') return args, kwargs
@property def open_mode(self): r"""str: Mode that should be used to open the file.""" if self.direction == 'recv': io_mode = 'r' elif self.append == 'ow': io_mode = 'r+' elif self.append: io_mode = 'a' else: io_mode = 'w' if self.open_as_binary: io_mode += 'b' return io_mode
[docs] def opp_comm_kwargs(self): r"""Get keyword arguments to initialize communication with opposite comm object. Returns: dict: Keyword arguments for opposite comm object. """ kwargs = super(FileComm, self).opp_comm_kwargs() kwargs['newline'] = self.newline kwargs['open_as_binary'] = self.open_as_binary kwargs['is_series'] = self.is_series return kwargs
@property def registry_key(self): r"""str: String used to register the socket.""" # return self.address return '%s_%s_%s' % (self.address, self.direction, self.uuid)
[docs] def record_position(self): r"""Record the current position in the file/series.""" _rec_pos = self.fd.tell() _rec_ind = self._series_index return _rec_pos, _rec_ind
[docs] def change_position(self, file_pos, series_index=None): r"""Change the position in the file/series. Args: file_pos (int): Position that should be moved to in the file. series_index (int, optinal): Index of the file in the series that should be moved to. Defaults to None and will be set to the current series index. """ if series_index is None: series_index = self._series_index self.advance_in_series(series_index) self.advance_in_file(file_pos)
[docs] def advance_in_file(self, file_pos): r"""Advance to a certain position in the current file. Args: file_pos (int): Position that should be moved to in the current. file. """ if self.is_open: try: self.fd.seek(file_pos) except (AttributeError, ValueError): # pragma: debug if self.is_open: raise
[docs] def advance_in_series(self, series_index=None): r"""Advance to a certain file in a series. Args: series_index (int, optional): Index of file in the series that should be moved to. Defaults to None and call will advance to the next file in the series. Returns: bool: True if the file was advanced in the series, False otherwise. """ out = False if self.is_series: if series_index is None: series_index = self._series_index + 1 if self._series_index != series_index: if (((self.direction == 'send') or os.path.isfile(self.get_series_address(series_index)))): self._file_close() self._series_index = series_index self._open() out = True self.debug("Advanced to %d", series_index) return out
[docs] def get_series_address(self, index=None): r"""Get the address of a file in the series. Args: index (int, optional): Index in series to get address for. Defaults to None and the current index is used. Returns: str: Address for the file in the series. """ if index is None: index = self._series_index return self.address % index
@property def current_address(self): r"""str: Address of file currently being used.""" if self.is_series: address = self.get_series_address() else: address = self.address return address def _open(self): address = self.current_address if self.fd is None: if (not os.path.isfile(address)) and (self.wait_for_creation > 0): T = self.start_timeout(self.wait_for_creation) while (not T.is_out) and (not os.path.isfile(address)): self.sleep() self.stop_timeout() self._fd = open(address, self.open_mode) T = self.start_timeout() while (not T.is_out) and (not self.is_open): # pragma: debug self.sleep() self.stop_timeout() if self.append == 'ow': try: self.fd.seek(0, os.SEEK_END) except (AttributeError, ValueError): # pragma: debug if self.is_open: raise def _file_close(self): if self.is_open: try: self.fd.flush() os.fsync(self.fd.fileno()) except OSError: # pragma: debug pass try: self.fd.close() except (AttributeError, ValueError): # pragma: debug if self.is_open: raise self._fd = None
[docs] def open(self): r"""Open the file.""" super(FileComm, self).open() self._open() self.register_comm(self.registry_key, self.fd)
def _close(self, *args, **kwargs): r"""Close the file.""" self._file_close() self.unregister_comm(self.registry_key) super(FileComm, self)._close(*args, **kwargs)
[docs] def remove_file(self): r"""Remove the file.""" assert(self.is_closed) if self.is_series: i = 0 while True: address = self.get_series_address(i) if not os.path.isfile(address): break os.remove(address) i += 1 else: if os.path.isfile(self.address): os.remove(self.address)
@property def is_open(self): r"""bool: True if the connection is open.""" try: return (self.fd is not None) and (not self.fd.closed) except AttributeError: # pragma: debug if self.fd is not None: raise return False @property def fd(self): r"""Associated file identifier.""" return self._fd @property def remaining_bytes(self): r"""int: Remaining bytes in the file.""" if self.is_closed or self.direction == 'send': return 0 pos = self.record_position() try: curpos = self.fd.tell() self.fd.seek(0, os.SEEK_END) endpos = self.fd.tell() out = endpos - curpos except (ValueError, AttributeError): # pragma: debug if self.is_open: raise out = 0 if self.is_series: i = self._series_index + 1 while True: fname = self.get_series_address(i) if not os.path.isfile(fname): break out += os.path.getsize(fname) i += 1 self.change_position(*pos) return out @property def n_msg_recv(self): r"""int: The number of messages in the file.""" if self.is_closed: return 0 if self.read_meth == 'read': return int(self.remaining_bytes > 0) elif self.read_meth == 'readline': pos = self.record_position() try: out = 0 flag, msg = self._recv() while len(msg) != 0 and msg != self.eof_msg: out += 1 flag, msg = self._recv() except ValueError: # pragma: debug out = 0 self.change_position(*pos) else: # pragma: debug self.error('Unsupported read_meth: %s', self.read_meth) out = 0 return out
[docs] def on_send_eof(self): r"""Close file when EOF to be sent. Returns: bool: False so that message not sent. """ flag, msg_s = super(FileComm, self).on_send_eof() try: self.fd.flush() except (AttributeError, ValueError): # pragma: debug if self.is_open: raise # self.close() return flag, msg_s
def _send(self, msg): r"""Write message to a file. Args: msg (bytes, str): Data to write to the file. Returns: bool: Success or failure of writing to the file. """ try: if msg != self.eof_msg: if not self.open_as_binary: msg = backwards.as_unicode(msg) self.fd.write(msg) if self.append == 'ow': self.fd.truncate() self.fd.flush() except (AttributeError, ValueError): # pragma: debug if self.is_open: raise return False if msg != self.eof_msg and self.is_series: self.advance_in_series() self.debug("Advanced to %d", self._series_index) return True def _recv(self, timeout=0): r"""Reads message from a file. Args: timeout (float, optional): Time in seconds to wait for a message. Defaults to self.recv_timeout. Unused. Returns: tuple (bool, str): Success or failure of reading from the file and the read messages as bytes. """ flag = True try: if self.read_meth == 'read': out = self.fd.read() elif self.read_meth == 'readline': out = self.fd.readline() except BaseException: # pragma: debug # Use this to catch case where close called during receive. # In the future this should be handled via a lock. out = '' if len(out) == 0: if self.advance_in_series(): self.debug("Advanced to %d", self._series_index) flag, out = self._recv() else: out = self.eof_msg else: out = out.replace(self.platform_newline, self.newline) if not self.open_as_binary: out = backwards.as_bytes(out) return (flag, out)
[docs] def purge(self): r"""Purge all messages from the comm.""" if self.is_open and self.direction == 'recv': try: self.fd.seek(0, os.SEEK_END) except (AttributeError, ValueError): # pragma: debug if self.is_open: raise