Source code for discodop.util

"""Misc code to avoid cyclic imports."""
import io
import os
import sys
import gzip
import codecs
import traceback
from functools import wraps
from collections import Set, Iterable


[docs]def ishead(tree): """Test whether this node is the head of the parent constituent.""" return getattr(tree, 'head', False)
[docs]def which(program): """Return first match for program in search path.""" for path in os.environ.get('PATH', os.defpath).split(":"): if path and os.path.exists(os.path.join(path, program)): return os.path.join(path, program) raise ValueError('%r not found in path; please install it.' % program)
[docs]def workerfunc(func): """Wrap a multiprocessing worker function to produce a full traceback.""" @wraps(func) def wrapper(*args, **kwds): """Apply decorated function.""" try: import faulthandler faulthandler.enable() # Dump information on segfault. except (ImportError, io.UnsupportedOperation): pass # NB: only concurrent.futures on Python 3.3+ will exit gracefully. try: return func(*args, **kwds) except Exception: # pylint: disable=W0703 # Put traceback as string into an exception and raise that raise Exception('in worker process\n%s' % ''.join(traceback.format_exception(*sys.exc_info()))) return wrapper
[docs]def openread(filename, encoding='utf8'): """Open stdin/text file for reading; decompress .gz files on-the-fly.""" if filename == '-': return io.open(sys.stdin.fileno(), encoding=encoding) if not isinstance(filename, int) and filename.endswith('.gz'): return codecs.getreader(encoding)(gzip.open(filename)) else: return io.open(filename, encoding=encoding)
[docs]def slice_bounds(seq, slice_obj, allow_step=False): """Calculate the effective (start, stop) bounds of a slice. Takes into account ``None`` indices and negative indices. :returns: tuple ``(start, stop, 1)``, s.t. ``0 <= start <= stop <= len(seq)`` :raises ValueError: if slice_obj.step is not None. :param allow_step: If true, then the slice object may have a non-None step. If it does, then return a tuple (start, stop, step).""" start, stop = (slice_obj.start, slice_obj.stop) if allow_step: slice_obj.step = 1 if slice_obj.step is None else slice_obj.step # Use a recursive call without allow_step to find the slice # bounds. If step is negative, then the roles of start and # stop (in terms of default values, etc), are swapped. if slice_obj.step < 0: start, stop, _ = slice_bounds(seq, slice(stop, start)) else: start, stop, _ = slice_bounds(seq, slice(start, stop)) return start, stop, slice_obj.step elif slice_obj.step not in (None, 1): raise ValueError('slices with steps are not supported by %s' % seq.__class__.__name__) start = 0 if start is None else start stop = len(seq) if stop is None else stop start = max(0, len(seq) + start) if start < 0 else start stop = max(0, len(seq) + stop) if stop < 0 else stop if stop > 0: # Make sure stop doesn't go past the end of the list. try: # Avoid calculating len(seq), may be expensive for lazy sequences seq[stop - 1] except IndexError: stop = len(seq) start = min(start, stop) return start, stop, 1
[docs]class OrderedSet(Set): """A frozen, ordered set which maintains a regular list/tuple and set. The set is indexable. Equality is defined _without_ regard for order.""" def __init__(self, iterable=None): if iterable: self.seq = tuple(iterable) self.theset = frozenset(self.seq) else: self.seq = () self.theset = frozenset() def __hash__(self): return hash(self.theset) def __contains__(self, value): return value in self.theset def __len__(self): return len(self.theset) def __iter__(self): return iter(self.seq) def __getitem__(self, n): return self.seq[n] def __reversed__(self): return reversed(self.seq) def __repr__(self): if not self.seq: return '%s()' % self.__class__.__name__ return '%s(%r)' % (self.__class__.__name__, self.seq) def __eq__(self, other): """equality is defined _without_ regard for order.""" return self.theset == set(other) def __and__(self, other): """maintain the order of the left operand.""" if not isinstance(other, Iterable): return NotImplemented return self._from_iterable(value for value in self if value in other)
ANSICOLOR = { 'black': 30, 'red': 31, 'green': 32, 'yellow': 33, 'blue': 34, 'magenta': 35, 'cyan': 36, 'white': 37, } __all__ = ['ishead', 'which', 'workerfunc', 'openread', 'slice_bounds', 'OrderedSet', 'ANSICOLOR']