import os
import copy
from yggdrasil.communication.FileComm import FileComm
[docs]class DedicatedFileBase(FileComm):
r"""Base class for handling I/O via a dedicated library."""
_mode_as_bytes = False
_default_serializer = False
_deprecated_drivers = []
_stores_fd = False
_requires_refresh = False
def __init__(self, *args, **kwargs):
self._external_fd = None
kwargs['read_meth'] = 'read'
self._last_size = 0
self._last_file_size = 0
return super(DedicatedFileBase, self).__init__(*args, **kwargs)
@property
def concats_as_str(self):
r"""bool: True if concatenating file contents result in a
valid file."""
return False
@property
def is_open(self):
r"""bool: True if the connection is open."""
if self._stores_fd:
return super(DedicatedFileBase, self).is_open
return bool(self._external_fd)
@property
def requires_refresh(self):
r"""bool: True if a refresh is necessary."""
return (
self.is_open
and ((self._external_fd is None)
or (self.append
or (self._requires_refresh
and self._last_file_size < self.file_size))))
[docs] def serialize(self, obj, **kwargs):
r"""Don't serialize for dedicated comms since using a serializer
is inefficient."""
return obj
[docs] def deserialize(self, msg, **kwargs):
r"""Don't deserialize for dedicated comms since using a serializer
is inefficient."""
return msg, {}
# Methods related to position in the file/series
@property
def file_size(self):
r"""int: Current size of file."""
out = 0
if os.path.isfile(self.current_address):
out = os.stat(self.current_address).st_size
return out
[docs] def file_tell(self):
r"""int: Current position in the file."""
return self._last_size
[docs] def file_seek(self, pos, whence=os.SEEK_SET):
r"""Move in the file to the specified position.
Args:
pos (int): Position (in bytes) to move file to.
whence (int, optional): Flag indicating position that pos
is relative to. 0 for the beginning of the file, 1 for
from the current location, and 2 from the end of the
file.
"""
if self._stores_fd:
super(DedicatedFileBase, self).file_seek(pos, whence)
if whence == 0:
self._last_size = pos
elif whence == 1: # pragma: no cover
self._last_size = min(self.file_size, self._last_size + pos)
elif whence == 2:
self._last_size = self.file_size
[docs] def file_flush(self):
r"""Flush the file."""
if self._stores_fd:
super(DedicatedFileBase, self).file_flush()
if self._external_fd is not None:
self._external_fd.flush()
self._external_fd.sync()
def _file_open(self, address, mode, **kwargs):
self._last_size = 0
if ((((not os.path.isfile(address)) or (os.stat(address).st_size == 0))
and (mode == 'r') and self._stores_fd)):
# Cannot open an empty file for read
return super(DedicatedFileBase, self)._file_open(address, mode)
out = self._dedicated_open(address, mode, **kwargs)
self._last_file_size = self.file_size
return out
def _file_close(self, **kwargs):
if self._external_fd is not None:
self._dedicated_close(**kwargs)
super(DedicatedFileBase, self)._file_close()
def _file_refresh(self):
prev_pos = self.file_tell()
self._file_close()
self._fd = self._file_open(self.current_address,
self.open_mode)
self.file_seek(prev_pos)
def _file_send(self, msg):
self._dedicated_send(msg)
self.file_seek(self.file_size)
@property
def _file_size_recv(self):
return self.file_size
def _file_recv(self):
if self.requires_refresh:
self._file_refresh()
if self.file_size > self._last_size:
out = self._dedicated_recv()
self.file_seek(self._file_size_recv)
else:
out = self.empty_bytes_msg
return copy.deepcopy(out)
@property
def remaining_bytes(self):
r"""int: Remaining bytes in the file."""
if self.requires_refresh:
self._file_refresh()
return super(DedicatedFileBase, self).remaining_bytes
# Methods that must be overriden by child classes
def _dedicated_open(self, address, mode): # pragma: debug
raise NotImplementedError("Must be overriden by the base class.")
def _dedicated_close(self): # pragma: debug
raise NotImplementedError("Must be overriden by the base class.")
def _dedicated_send(self, msg): # pragma: debug
raise NotImplementedError("Must be overriden by the base class.")
def _dedicated_recv(self): # pragma: debug
raise NotImplementedError("Must be overriden by the base class.")
[docs] @classmethod
def get_testing_options(cls, **kwargs): # pragma: debug
r"""Method to return a dictionary of testing options for this
class.
Returns:
dict: Dictionary of variables to use for testing. Items:
kwargs (dict): Keyword arguments for comms tested with
the provided content.
send (list): List of objects to send to test file.
recv (list): List of objects that will be received from a
test file that was sent the messages in 'send'.
contents (bytes): Bytes contents of test file created by
sending the messages in 'send'.
"""
raise NotImplementedError("Must be overriden by the base class.")