# Licensed under a 3-clause BSD style license - see LICENSE.rst
import contextlib
import pathlib
import re
import sys
from collections import OrderedDict
from operator import itemgetter
import numpy as np
__all__ = ['register_reader', 'register_writer', 'register_identifier',
'identify_format', 'get_reader', 'get_writer', 'read', 'write',
'get_formats', 'IORegistryError', 'delay_doc_updates']
__doctest_skip__ = ['register_identifier']
_readers = OrderedDict()
_writers = OrderedDict()
_identifiers = OrderedDict()
PATH_TYPES = (str, pathlib.Path)
[docs]class IORegistryError(Exception):
"""Custom error for registry clashes.
"""
pass
# If multiple formats are added to one class the update of the docs is quite
# expensive. Classes for which the doc update is temporarly delayed are added
# to this set.
_delayed_docs_classes = set()
[docs]@contextlib.contextmanager
def delay_doc_updates(cls):
"""Contextmanager to disable documentation updates when registering
reader and writer. The documentation is only built once when the
contextmanager exits.
.. versionadded:: 1.3
Parameters
----------
cls : class
Class for which the documentation updates should be delayed.
Notes
-----
Registering multiple readers and writers can cause significant overhead
because the documentation of the corresponding ``read`` and ``write``
methods are build every time.
.. warning::
This contextmanager is experimental and may be replaced by a more
general approach.
Examples
--------
see for example the source code of ``astropy.table.__init__``.
"""
_delayed_docs_classes.add(cls)
yield
_delayed_docs_classes.discard(cls)
_update__doc__(cls, 'read')
_update__doc__(cls, 'write')
def _update__doc__(data_class, readwrite):
"""
Update the docstring to include all the available readers / writers for the
``data_class.read`` or ``data_class.write`` functions (respectively).
"""
FORMATS_TEXT = 'The available built-in formats are:'
# Get the existing read or write method and its docstring
class_readwrite_func = getattr(data_class, readwrite)
if not isinstance(class_readwrite_func.__doc__, str):
# No docstring--could just be test code, or possibly code compiled
# without docstrings
return
lines = class_readwrite_func.__doc__.splitlines()
# Find the location of the existing formats table if it exists
sep_indices = [ii for ii, line in enumerate(lines) if FORMATS_TEXT in line]
if sep_indices:
# Chop off the existing formats table, including the initial blank line
chop_index = sep_indices[0]
lines = lines[:chop_index]
# Find the minimum indent, skipping the first line because it might be odd
matches = [re.search(r'(\S)', line) for line in lines[1:]]
left_indent = ' ' * min(match.start() for match in matches if match)
# Get the available unified I/O formats for this class
# Include only formats that have a reader, and drop the 'Data class' column
format_table = get_formats(data_class, readwrite.capitalize())
format_table.remove_column('Data class')
# Get the available formats as a table, then munge the output of pformat()
# a bit and put it into the docstring.
new_lines = format_table.pformat(max_lines=-1, max_width=80)
table_rst_sep = re.sub('-', '=', new_lines[1])
new_lines[1] = table_rst_sep
new_lines.insert(0, table_rst_sep)
new_lines.append(table_rst_sep)
# Check for deprecated names and include a warning at the end.
if 'Deprecated' in format_table.colnames:
new_lines.extend(['',
'Deprecated format names like ``aastex`` will be '
'removed in a future version. Use the full ',
'name (e.g. ``ascii.aastex``) instead.'])
new_lines = [FORMATS_TEXT, ''] + new_lines
lines.extend([left_indent + line for line in new_lines])
# Depending on Python version and whether class_readwrite_func is
# an instancemethod or classmethod, one of the following will work.
try:
class_readwrite_func.__doc__ = '\n'.join(lines)
except AttributeError:
class_readwrite_func.__func__.__doc__ = '\n'.join(lines)
[docs]def register_reader(data_format, data_class, function, force=False):
"""
Register a reader function.
Parameters
----------
data_format : str
The data format identifier. This is the string that will be used to
specify the data type when reading.
data_class : classobj
The class of the object that the reader produces.
function : function
The function to read in a data object.
force : bool, optional
Whether to override any existing function if already present.
Default is ``False``.
"""
if not (data_format, data_class) in _readers or force:
_readers[(data_format, data_class)] = function
else:
raise IORegistryError("Reader for format '{0}' and class '{1}' is "
'already defined'
''.format(data_format, data_class.__name__))
if data_class not in _delayed_docs_classes:
_update__doc__(data_class, 'read')
def unregister_reader(data_format, data_class):
"""
Unregister a reader function
Parameters
----------
data_format : str
The data format identifier.
data_class : classobj
The class of the object that the reader produces.
"""
if (data_format, data_class) in _readers:
_readers.pop((data_format, data_class))
else:
raise IORegistryError("No reader defined for format '{0}' and class '{1}'"
''.format(data_format, data_class.__name__))
if data_class not in _delayed_docs_classes:
_update__doc__(data_class, 'read')
[docs]def register_writer(data_format, data_class, function, force=False):
"""
Register a table writer function.
Parameters
----------
data_format : str
The data format identifier. This is the string that will be used to
specify the data type when writing.
data_class : classobj
The class of the object that can be written.
function : function
The function to write out a data object.
force : bool, optional
Whether to override any existing function if already present.
Default is ``False``.
"""
if not (data_format, data_class) in _writers or force:
_writers[(data_format, data_class)] = function
else:
raise IORegistryError("Writer for format '{0}' and class '{1}' is "
'already defined'
''.format(data_format, data_class.__name__))
if data_class not in _delayed_docs_classes:
_update__doc__(data_class, 'write')
def unregister_writer(data_format, data_class):
"""
Unregister a writer function
Parameters
----------
data_format : str
The data format identifier.
data_class : classobj
The class of the object that can be written.
"""
if (data_format, data_class) in _writers:
_writers.pop((data_format, data_class))
else:
raise IORegistryError("No writer defined for format '{0}' and class '{1}'"
''.format(data_format, data_class.__name__))
if data_class not in _delayed_docs_classes:
_update__doc__(data_class, 'write')
[docs]def register_identifier(data_format, data_class, identifier, force=False):
"""
Associate an identifier function with a specific data type.
Parameters
----------
data_format : str
The data format identifier. This is the string that is used to
specify the data type when reading/writing.
data_class : classobj
The class of the object that can be written.
identifier : function
A function that checks the argument specified to `read` or `write` to
determine whether the input can be interpreted as a table of type
``data_format``. This function should take the following arguments:
- ``origin``: A string ``"read"`` or ``"write"`` identifying whether
the file is to be opened for reading or writing.
- ``path``: The path to the file.
- ``fileobj``: An open file object to read the file's contents, or
`None` if the file could not be opened.
- ``*args``: Positional arguments for the `read` or `write`
function.
- ``**kwargs``: Keyword arguments for the `read` or `write`
function.
One or both of ``path`` or ``fileobj`` may be `None`. If they are
both `None`, the identifier will need to work from ``args[0]``.
The function should return True if the input can be identified
as being of format ``data_format``, and False otherwise.
force : bool, optional
Whether to override any existing function if already present.
Default is ``False``.
Examples
--------
To set the identifier based on extensions, for formats that take a
filename as a first argument, you can do for example::
>>> def my_identifier(*args, **kwargs):
... return isinstance(args[0], str) and args[0].endswith('.tbl')
>>> register_identifier('ipac', Table, my_identifier)
"""
if not (data_format, data_class) in _identifiers or force:
_identifiers[(data_format, data_class)] = identifier
else:
raise IORegistryError("Identifier for format '{0}' and class '{1}' is "
'already defined'.format(data_format,
data_class.__name__))
def unregister_identifier(data_format, data_class):
"""
Unregister an identifier function
Parameters
----------
data_format : str
The data format identifier.
data_class : classobj
The class of the object that can be read/written.
"""
if (data_format, data_class) in _identifiers:
_identifiers.pop((data_format, data_class))
else:
raise IORegistryError("No identifier defined for format '{0}' and class"
" '{1}'".format(data_format, data_class.__name__))
def _get_format_table_str(data_class, readwrite):
format_table = get_formats(data_class, readwrite=readwrite)
format_table.remove_column('Data class')
format_table_str = '\n'.join(format_table.pformat(max_lines=-1))
return format_table_str
[docs]def get_reader(data_format, data_class):
"""Get reader for ``data_format``.
Parameters
----------
data_format : str
The data format identifier. This is the string that is used to
specify the data type when reading/writing.
data_class : classobj
The class of the object that can be written.
Returns
-------
reader : callable
The registered reader function for this format and class.
"""
readers = [(fmt, cls) for fmt, cls in _readers if fmt == data_format]
for reader_format, reader_class in readers:
if _is_best_match(data_class, reader_class, readers):
return _readers[(reader_format, reader_class)]
else:
format_table_str = _get_format_table_str(data_class, 'Read')
raise IORegistryError(
"No reader defined for format '{0}' and class '{1}'.\nThe "
"available formats are:\n{2}".format(
data_format, data_class.__name__, format_table_str))
[docs]def get_writer(data_format, data_class):
"""Get writer for ``data_format``.
Parameters
----------
data_format : str
The data format identifier. This is the string that is used to
specify the data type when reading/writing.
data_class : classobj
The class of the object that can be written.
Returns
-------
writer : callable
The registered writer function for this format and class.
"""
writers = [(fmt, cls) for fmt, cls in _writers if fmt == data_format]
for writer_format, writer_class in writers:
if _is_best_match(data_class, writer_class, writers):
return _writers[(writer_format, writer_class)]
else:
format_table_str = _get_format_table_str(data_class, 'Write')
raise IORegistryError(
"No writer defined for format '{0}' and class '{1}'.\nThe "
"available formats are:\n{2}".format(
data_format, data_class.__name__, format_table_str))
[docs]def read(cls, *args, format=None, **kwargs):
"""
Read in data.
The arguments passed to this method depend on the format.
"""
ctx = None
try:
if format is None:
path = None
fileobj = None
if len(args):
if isinstance(args[0], PATH_TYPES):
from astropy.utils.data import get_readable_fileobj
# path might be a pathlib.Path object
if isinstance(args[0], pathlib.Path):
args = (str(args[0]),) + args[1:]
path = args[0]
try:
ctx = get_readable_fileobj(args[0], encoding='binary')
fileobj = ctx.__enter__()
except OSError:
raise
except Exception:
fileobj = None
else:
args = [fileobj] + list(args[1:])
elif hasattr(args[0], 'read'):
path = None
fileobj = args[0]
format = _get_valid_format(
'read', cls, path, fileobj, args, kwargs)
reader = get_reader(format, cls)
data = reader(*args, **kwargs)
if not isinstance(data, cls):
# User has read with a subclass where only the parent class is
# registered. This returns the parent class, so try coercing
# to desired subclass.
try:
data = cls(data)
except Exception:
raise TypeError('could not convert reader output to {0} '
'class.'.format(cls.__name__))
finally:
if ctx is not None:
ctx.__exit__(*sys.exc_info())
return data
[docs]def write(data, *args, format=None, **kwargs):
"""
Write out data.
The arguments passed to this method depend on the format.
"""
if format is None:
path = None
fileobj = None
if len(args):
if isinstance(args[0], PATH_TYPES):
# path might be a pathlib.Path object
if isinstance(args[0], pathlib.Path):
args = (str(args[0]),) + args[1:]
path = args[0]
fileobj = None
elif hasattr(args[0], 'read'):
path = None
fileobj = args[0]
format = _get_valid_format(
'write', data.__class__, path, fileobj, args, kwargs)
writer = get_writer(format, data.__class__)
writer(data, *args, **kwargs)
def _is_best_match(class1, class2, format_classes):
"""
Determine if class2 is the "best" match for class1 in the list
of classes. It is assumed that (class2 in classes) is True.
class2 is the the best match if:
- ``class1`` is a subclass of ``class2`` AND
- ``class2`` is the nearest ancestor of ``class1`` that is in classes
(which includes the case that ``class1 is class2``)
"""
if issubclass(class1, class2):
classes = {cls for fmt, cls in format_classes}
for parent in class1.__mro__:
if parent is class2: # class2 is closest registered ancestor
return True
if parent in classes: # class2 was superceded
return False
return False
def _get_valid_format(mode, cls, path, fileobj, args, kwargs):
"""
Returns the first valid format that can be used to read/write the data in
question. Mode can be either 'read' or 'write'.
"""
valid_formats = identify_format(mode, cls, path, fileobj, args, kwargs)
if len(valid_formats) == 0:
format_table_str = _get_format_table_str(cls, mode.capitalize())
raise IORegistryError("Format could not be identified.\n"
"The available formats are:\n"
"{0}".format(format_table_str))
elif len(valid_formats) > 1:
raise IORegistryError(
"Format is ambiguous - options are: {0}".format(
', '.join(sorted(valid_formats, key=itemgetter(0)))))
return valid_formats[0]