"""Read and write treebanks."""
from __future__ import generator_stop
import os
import re
import sys
import gzip
from glob import glob
from itertools import count, chain, islice
from collections import defaultdict
import xml.etree.ElementTree as ElementTree
from collections import OrderedDict
from .tree import (Tree, ParentedTree, brackettree, escape, unescape,
writebrackettree, writediscbrackettree, SUPERFLUOUSSPACERE, HEAD)
from .treetransforms import removeemptynodes
from .punctuation import applypunct
from .heads import applyheadrules, readheadrules, readmodifierrules
from .util import openread
FIELDS = tuple(range(6))
WORD, LEMMA, TAG, MORPH, FUNC, PARENT = FIELDS
EXPORTHEADER = '%% word\tlemma\ttag\tmorph\tedge\tparent\tsecedge\n'
EXPORTNONTERMINAL = re.compile(r'^#([5-9][0-9][0-9])$')
POSRE = re.compile(r'\(([^() ]+)\s+[^ ()]+\s*\)')
# leaf itself can be empty; leaf ends with closing paren or whitespace
# Assumes there is no whitespace between open paren and non-terminal: "( NP "
LEAVESRE = re.compile(r' (?=\))| ([^ ()]+)\s*(?=[\s)])')
ALPINOXML = re.compile(
rb'<\?xml version="1.0" encoding="UTF-?8"\?>.*?</alpino_ds>\r?\n',
flags=re.DOTALL | re.IGNORECASE)
ALPINOSENTID = re.compile(b'<sentence sentid="(.*?)">')
[docs]class Item(object):
"""A treebank item."""
__slots__ = ('tree', 'sent', 'comment', 'block')
def __init__(self, tree, sent, comment, block):
self.tree = tree # A ParentedTree
self.sent = sent # list of str
self.comment = comment # a string or None
self.block = block # a string with tree in original treebank format
[docs]class CorpusReader(object):
"""Abstract corpus reader."""
def __init__(self, path, encoding='utf8', ensureroot=None, punct=None,
headrules=None, removeempty=False,
functions=None, morphology=None, lemmas=None,
modifierrules=None):
"""
:param path: filename or pattern of corpus files; e.g., ``wsj*.mrg``.
:param ensureroot: add root node with given label if necessary.
:param removeempty: remove empty nodes and any empty ancestors; a
terminal is empty if it is equal to None, '', or '-NONE-'.
:param headrules: if given, read rules for assigning heads and apply
them by ordering constituents according to their heads.
:param punct: one of ...
:None: leave punctuation as is [default].
:'move': move punctuation to appropriate constituents
using heuristics.
:'moveall': same as 'move', but moves all preterminals under root,
instead of only recognized punctuation.
:'prune': prune away leading & ending quotes & periods, then move.
:'remove': eliminate punctuation.
:'removeall': eliminate all preterminals directly under root.
:'root': attach punctuation directly to root
(as in original Negra/Tiger treebanks).
:param functions: one of ...
:None, 'leave': leave syntactic labels as is [default].
:'add': concatenate grammatical function to syntactic label,
separated by a hypen: e.g., ``NP => NP-SBJ``.
:'remove': strip away hyphen-separated grammatical function,
e.g., ``NP-SBJ => NP``.
:'replace': replace syntactic label with grammatical function,
e.g., ``NP => SBJ``.
:param morphology: one of ...
:None, 'no': use POS tags as preterminals [default].
:'add': concatenate morphological information to POS tags,
e.g., ``DET/sg.def``.
:'replace': use morphological information as preterminal label
:'between': add node with morphological information between
POS tag and word, e.g., ``(DET (sg.def the))``.
:param lemmas: one of ...
:None: ignore lemmas [default].
:'add': concatenate lemma to terminals, e.g., men/man.
:'replace': use lemmas as terminals.
:'between': insert lemma as node between POS tag and word."""
self.removeempty = removeempty
self.ensureroot = ensureroot
self.functions = functions
self.punct = punct
self.morphology = morphology
self.lemmas = lemmas
self.headrules = readheadrules(headrules) if headrules else {}
self.modifierrules = (readmodifierrules(modifierrules)
if modifierrules else None)
self._encoding = encoding
try:
self._filenames = (sorted(glob(path), key=numbase)
if path != '-' else ['-'])
except TypeError:
# print('all sentence IDs must have the same type signature '
# '(number, string)')
# raise
self._filenames = sorted(glob(path))
for opt, opts in (
(functions, (None, 'leave', 'add', 'replace', 'remove',
'between')),
(morphology, (None, 'no', 'add', 'replace', 'between')),
(punct, (None, 'no', 'move', 'moveall', 'remove', 'removeall',
'prune', 'root')),
(lemmas, (None, 'no', 'add', 'replace', 'between'))):
if opt not in opts:
raise ValueError('Expected one of %r. Got: %r' % (opts, opt))
if not self._filenames:
raise ValueError("no files matched pattern '%s' in %s" % (
path, os.getcwd()))
self._block_cache = None # optionally, cache of blocks (e.g., etrees)
self._trees_cache = None
[docs] def itertrees(self, start=None, end=None):
"""
:returns: an iterator returning tuples ``(key, item)``
of sentences in corpus, where ``item`` is an :py:class:Item
instance with ``tree``, ``sent``, and ``comment`` attributes.
Useful when the dictionary of all trees in corpus would not fit in
memory."""
for n, a in islice(self._read_blocks(), start, end):
yield n, self._parsetree(a)
[docs] def trees(self):
"""
:returns: an ordered dictionary of parse trees
(``Tree`` objects with integer indices as leaves)."""
if not self._trees_cache:
self._trees_cache = OrderedDict((n, self._parsetree(a))
for n, a in self._read_blocks())
return OrderedDict((n, a.tree) for n, a in self._trees_cache.items())
[docs] def sents(self):
"""
:returns: an ordered dictionary of sentences,
each sentence being a list of words."""
if not self._trees_cache:
self._trees_cache = OrderedDict((n, self._parsetree(a))
for n, a in self._read_blocks())
return OrderedDict((n, a.sent) for n, a in self._trees_cache.items())
[docs] def tagged_sents(self):
"""
:returns: an ordered dictionary of tagged sentences,
each tagged sentence being a list of (word, tag) pairs."""
if not self._trees_cache:
self._trees_cache = OrderedDict((n, self._parsetree(a))
for n, a in self._read_blocks())
return OrderedDict(
(n, [(w, t) for w, (_, t) in zip(a.sent, sorted(a.tree.pos()))])
for n, a in self._trees_cache.items())
[docs] def blocks(self):
"""
:returns: a list of strings containing the raw representation of
trees in the original treebank."""
def _read_blocks(self):
"""Iterate over blocks in corpus file corresponding to parse trees."""
def _parse(self, block):
""":returns: a parse tree given a block from the treebank file."""
raise NotImplementedError('this is an abstract base class.')
def _parsetree(self, block):
""":returns: a transformed parse tree and sentence."""
item = self._parse(block)
if not item.sent: # ???
return item
if self.removeempty:
removeemptynodes(item.tree, item.sent)
if self.ensureroot:
if item.tree.label == '':
item.tree.label = self.ensureroot
elif item.tree.label != self.ensureroot:
item.tree = ParentedTree(self.ensureroot, [item.tree])
if not isinstance(self, BracketCorpusReader):
# roughly order constituents by order in sentence
for a in reversed(list(item.tree.subtrees(lambda x: len(x) > 1))):
a.children.sort(key=Tree.leaves)
if self.punct:
applypunct(self.punct, item.tree, item.sent)
if self.headrules:
applyheadrules(item.tree, self.headrules, self.modifierrules)
return item
def _word(self, block):
""":returns: a list of words given a block from the treebank file."""
if self.punct in {'remove', 'prune'}:
return self._parsetree(block).sent
return self._parse(block).sent
[docs]class BracketCorpusReader(CorpusReader):
"""Corpus reader for phrase-structures in bracket notation.
For example::
(S (NP John) (VP (VB is) (JJ rich)) (. .))"""
[docs] def blocks(self):
"""
:returns: a list of strings containing the raw representation of
trees in the original treebank."""
return OrderedDict(self._read_blocks())
def _read_blocks(self):
for filename in self._filenames:
with openread(filename, encoding=self._encoding) as inp:
yield from enumerate((line for line in inp if line), 1)
def _parse(self, block):
c = count()
block = SUPERFLUOUSSPACERE.sub(')', block)
try:
tree = ParentedTree(LEAVESRE.sub(lambda _: ' %d' % next(c), block))
except ValueError:
print(block)
raise
for node in tree.subtrees():
if node.source is None:
node.source = ['--'] * len(FIELDS)
for char in '-=': # map NP-SUBJ and NP=2 to NP; don't touch -NONE-
x = node.label.find(char)
if x > 0:
if char == '-' and not node.label[x + 1:].isdigit():
node.source[FUNC] = node.label[x + 1:].rstrip(
'=0123456789')
if self.functions == 'remove':
node.label = node.label[:x]
sent = [escape(token) for token in LEAVESRE.findall(block)]
return Item(tree, sent, None, block)
[docs]class DiscBracketCorpusReader(BracketCorpusReader):
"""A corpus reader for discontinuous trees in bracket notation.
Leaves are consist of an index and a word, with the indices indicating
the word order of the sentence. For example::
(S (NP 1=John) (VP (VB 0=is) (JJ 2=rich)) (? 3=?))
There is one tree per line. Optionally, the tree may be followed by a
comment, separated by a TAB. Compared to Negra's export format, this format
lacks morphology, lemmas and functional edges. On the other hand, it is
close to the internal representation employed here, so it can be read
efficiently."""
def _parse(self, block):
treestr, comment = block, None
if '\t' in block:
treestr, comment = block.rstrip('\n\r').split('\t', 1)
sent = {}
def substleaf(x):
"""Collect token and return index."""
idx, token = x.split('=', 1)
idx = int(idx)
sent[idx] = unescape(token)
return int(idx)
tree = ParentedTree.parse(treestr, parse_leaf=substleaf)
sent = [sent.get(n, None) for n in range(max(sent) + 1)]
if not all(0 <= n < len(sent) for n in tree.leaves()):
raise ValueError('All leaves must be in the interval 0..n with '
'n=len(sent)\ntokens: %d indices: %r\nsent: %s' % (
len(sent), tree.leaves(), sent))
for node in tree.subtrees():
if node.source is None:
node.source = ['--'] * len(FIELDS)
for char in '-=': # map NP-SUBJ and NP=2 to NP; don't touch -NONE-
x = node.label.find(char)
if x > 0:
if char == '-' and not node.label[x + 1:].isdigit():
node.source[FUNC] = node.label[x + 1:].rstrip(
'=0123456789')
if self.functions == 'remove':
node.label = node.label[:x]
return Item(tree, sent, comment, block)
[docs]class NegraCorpusReader(CorpusReader):
"""Read a corpus in the Negra export format."""
[docs] def blocks(self):
"""
:returns: a list of strings containing the raw representation of
trees in the original treebank."""
if self._block_cache is None:
self._block_cache = OrderedDict(self._read_blocks())
return OrderedDict((a, '\n'.join(b) + '\n')
for a, b in self._block_cache.items())
def _read_blocks(self):
"""Read corpus and yield blocks corresponding to each sentence."""
# NB: A Negra "block" is a list of lines without line endings.
results = set()
started = False
lines = []
for filename in self._filenames:
with openread(filename, encoding=self._encoding) as inp:
for line in inp:
line = line.rstrip()
if line.startswith('#BOS '):
if started:
raise ValueError('beginning of sentence marker '
'while previous one still open: %s' % line)
started = True
sentid = line.split(None, 2)[1]
lines = [line]
elif line.startswith('#EOS '):
if not started:
raise ValueError('end of sentence marker while '
'none started')
thissentid = line.split(None, 2)[1]
if sentid != thissentid:
raise ValueError('unexpected sentence id: '
'start=%s, end=%s' % (sentid, thissentid))
started = False
if sentid in results:
raise ValueError(
'duplicate sentence ID: %s' % sentid)
results.add(sentid)
lines.append(line)
yield sentid, lines
elif started:
lines.append(line)
# other lines are ignored: #FORMAT x, %% comments, ...
def _parse(self, block):
return exporttree(block, self.functions, self.morphology, self.lemmas)
[docs]class TigerXMLCorpusReader(CorpusReader):
"""Corpus reader for the Tiger XML format."""
[docs] def blocks(self):
"""
:returns: a list of strings containing the raw representation of
trees in the treebank."""
if self._block_cache is None:
self._block_cache = OrderedDict(self._read_blocks())
return OrderedDict((sentid, rawblock)
for sentid, (rawblock, _xmlblock) in self._block_cache.items())
def _read_blocks(self):
if self._encoding not in (None, 'utf8', 'utf-8'):
raise ValueError('Encoding specified in XML files, '
'cannot be overriden.')
for filename in self._filenames:
with openread(filename, encoding=None) as inp:
# iterator over elements in XML file
context = ElementTree.iterparse(
inp, events=('start', 'end'))
_, root = next(context) # event == 'start' of root element
for event, elem in context:
if event == 'end' and elem.tag == 's':
sentid = elem.get('id')
rawblock = ElementTree.tostring(elem)
yield sentid, (rawblock, elem)
root.clear()
def _parse(self, block):
"""Translate Tiger XML structure to the fields of export format."""
rawblock, xmlblock = block
nodes = OrderedDict()
root = xmlblock.find('graph').get('root')
for term in xmlblock.find('graph').find('terminals'):
fields = nodes.setdefault(term.get('id'), 6 * [None])
fields[WORD] = term.get('word')
fields[LEMMA] = term.get('lemma')
fields[TAG] = term.get('pos')
fields[MORPH] = term.get('morph')
fields[PARENT] = '0' if term.get('id') == root else None
fields[FUNC] = '--'
nodes[term.get('id')] = fields
for nt in xmlblock.find('graph').find('nonterminals'):
if nt.get('id') == root:
ntid = '0'
else:
fields = nodes.setdefault(nt.get('id'), 6 * [None])
ntid = nt.get('id').split('_')[-1]
fields[WORD] = '#' + ntid
fields[TAG] = nt.get('cat')
fields[LEMMA] = fields[MORPH] = fields[FUNC] = '--'
for edge in nt:
idref = edge.get('idref')
nodes.setdefault(idref, 6 * [None])
if edge.tag == 'edge':
if nodes[idref][FUNC] not in (None, '--'):
raise ValueError('%s already has a parent: %r'
% (idref, nodes[idref]))
nodes[idref][FUNC] = edge.get('label')
nodes[idref][PARENT] = ntid
elif edge.tag == 'secedge':
nodes[idref].extend((edge.get('label'), ntid))
else:
raise ValueError("expected 'edge' or 'secedge' tag.")
for idref in nodes:
if nodes[idref][PARENT] is None:
raise ValueError('%s does not have a parent: %r' % (
idref, nodes[idref]))
item = exporttree(
['#BOS ' + xmlblock.get('id')]
+ ['\t'.join(a) for a in nodes.values()]
+ ['#EOS ' + xmlblock.get('id')],
self.functions, self.morphology, self.lemmas)
item.tree.label = root.split('_', 1)[1]
item.block = rawblock
return item
[docs]class AlpinoCorpusReader(CorpusReader):
"""Corpus reader for the Dutch Alpino treebank in XML format.
Expects a corpus in directory format, where every sentence is in a single
``.xml`` file."""
[docs] def blocks(self):
"""
:returns: a list of strings containing the raw representation of
trees in the treebank."""
if self._block_cache is None:
self._block_cache = OrderedDict(self._read_blocks())
return OrderedDict((n, rawblock)
for n, (rawblock, _xmlblock) in self._block_cache.items())
def _read_blocks(self):
"""Read corpus and yield blocks corresponding to each sentence."""
if self._encoding not in (None, 'utf8', 'utf-8'):
raise ValueError('Encoding specified in XML files, '
'cannot be overriden.')
for filename in self._filenames:
with open(filename, 'rb') as inp:
rawblock = inp.read() # NB: store XML data as bytes
# ../path/dir/file.xml => dir/file
path, filename = os.path.split(filename)
_, lastdir = os.path.split(path)
n = os.path.join(lastdir, filename)[:-len('.xml')]
try:
xmlblock = ElementTree.fromstring(rawblock)
except ElementTree.ParseError:
print('Problem with %r:\n%s' % (
filename,
rawblock.decode('utf8', errors='replace')),
file=sys.stderr)
yield n, (rawblock, xmlblock)
def _parse(self, block):
""":returns: a parse tree given a string."""
return alpinotree(
block, self.functions, self.morphology, self.lemmas)
[docs]class AlpinoCompactCorpusReader(AlpinoCorpusReader):
"""Corpus reader for the Alpino compact treebank format (Indexed Corpus).
Pass one or more .index or .data.dz files as filenames."""
def _read_blocks(self):
"""Read corpus and yield blocks corresponding to each sentence."""
if self._encoding not in (None, 'utf8', 'utf-8'):
raise ValueError('Encoding specified in XML files, '
'cannot be overriden.')
# NB: could implement proper streaming, random access using .index file
# which lists offsets and sizes in base64 encoding.
for filename in self._filenames:
dzfile = re.sub(r'\.index$', '.data.dz', filename)
dirname = os.path.basename(re.sub(r'\.index$', '', filename))
with gzip.open(dzfile, 'rb') as inp:
for block in ALPINOXML.findall(inp.read()):
sentid = ALPINOSENTID.search(block).group(1).decode('utf8')
n = '%s/%s' % (dirname, sentid)
try:
xmlblock = ElementTree.fromstring(block)
except ElementTree.ParseError:
print('Problem with %r:\n%s' % (
n, block.decode('utf8', errors='replace')),
file=sys.stderr)
yield n, (block, xmlblock)
[docs]class FTBXMLCorpusReader(CorpusReader):
"""Corpus reader for the French treebank (FTB) in XML format."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# hack to ensure test/dev set as the first 1235 + 1235 sentences
order = {'flmf7aa1ep.cat.xml': 0,
'flmf7aa2ep.cat.xml': 1,
'flmf7ab2ep.xml': 2,
'flmf7ae1ep.cat.xml': 3,
'flmf7af2ep.cat.xml': 4,
'flmf7ag1exp.cat.xml': 5}
self._filenames.sort(key=lambda x: order.get(os.path.basename(x), 99))
[docs] def blocks(self):
"""
:returns: a list of strings containing the raw representation of
trees in the treebank."""
if self._block_cache is None:
self._block_cache = OrderedDict(self._read_blocks())
return OrderedDict((sentid, rawblock)
for sentid, (rawblock, _xmlblock) in self._block_cache.items())
def _read_blocks(self):
if self._encoding not in (None, 'utf8', 'utf-8'):
raise ValueError('Encoding specified in XML files, '
'cannot be overriden.')
for filename in self._filenames:
with openread(filename, encoding=None) as inp:
# iterator over elements in XML file
context = ElementTree.iterparse(inp,
events=('start', 'end'))
_event, root = next(context) # event: 'start' of root element
filename1 = os.path.splitext(os.path.basename(filename))[0]
for event, elem in context:
if event == 'end' and elem.tag == 'SENT':
sentid = '%s-%s' % (filename1, elem.get('nb'))
rawblock = ElementTree.tostring(elem)
yield sentid, (rawblock, elem)
root.clear()
def _parse(self, block):
""":returns: a parse tree given a string."""
return ftbtree(
block, self.functions, self.morphology, self.lemmas)
[docs]def exporttree(block, functions=None, morphology=None, lemmas=None):
"""Get tree, sentence from tree in export format given as list of lines.
:param block: a list of lines without line endings.
:returns: Item object, with tree, sent, command, block fields."""
def getchildren(parent):
"""Traverse tree in export format and create Tree object."""
results = []
for n, source in children.get(parent, []):
# n is the index in the block to record word indices
m = EXPORTNONTERMINAL.match(source[WORD])
label = source[TAG].replace('(', '[').replace(')', ']')
if m:
child = ParentedTree(label, getchildren(m.group(1)))
else: # POS + terminal
child = ParentedTree(label, [n])
handlemorphology(morphology, lemmas, child, source, sent)
child.source = tuple(source)
results.append(child)
return results
comment = block[0].split('%%')[1].strip() if '%%' in block[0] else None
table = [exportsplit(x) for x in block[1:-1]]
sent = []
children = {'0': []}
for source in table:
m = EXPORTNONTERMINAL.match(source[WORD])
if m:
children[m.group(1)] = []
else:
sent.append(source[WORD])
for n, source in enumerate(table):
children[source[PARENT]].append((n, source))
tree = ParentedTree('ROOT', getchildren('0'))
handlefunctions(functions, tree, morphology=morphology)
return Item(tree, sent, comment, '\n'.join(block) + '\n')
[docs]def exportsplit(line):
"""Take a line in export format and split into fields.
Strip comments. Add dummy field for lemma if absent.
:returns: a list with >= 6 elements; if > 6, length is even since
secondary edges are defined by pairs of (label, parentid) fields.
"""
commentidx = line.find('%%') # remove comments.
if commentidx != -1:
line = line[:commentidx]
fields = line.split()
fieldlen = len(fields)
if fieldlen < 5:
raise ValueError('expected at least 5 columns: %r' % fields)
elif fieldlen & 1: # odd number of fields?
fields.insert(LEMMA, '--') # add empty lemma field
return fields
[docs]def alpinotree(block, functions=None, morphology=None, lemmas=None):
"""Get tree, sent from tree in Alpino format given as string, etree object.
"""
def getsubtree(node, parentid, morphology, lemmas):
"""Parse a subtree of an Alpino tree."""
source = [''] * len(FIELDS)
nodeid = int(node.get('id')) + 500
source[WORD] = node.get('word') or ("#%s" % nodeid)
source[LEMMA] = node.get('lemma') or node.get('root')
source[MORPH] = node.get('postag') or node.get('frame')
source[FUNC] = node.get('rel')
if 'cat' in node.keys():
source[TAG] = node.get('cat')
if node.get('index'):
coindexed[int(node.get('index')) + 500] = source
label = node.get('cat')
result = ParentedTree(label.upper(), [])
for child in node:
subtree = getsubtree(child, nodeid, morphology, lemmas)
if subtree and (
'word' in child.keys() or 'cat' in child.keys()):
subtree.source[PARENT] = nodeid
result.append(subtree)
if not result:
return None
elif 'word' in node.keys():
source[TAG] = node.get('pt') or node.get('pos')
if node.get('index'):
coindexed[int(node.get('index')) + 500] = source
result = ParentedTree(source[TAG], list(
range(int(node.get('begin')), int(node.get('end')))))
handlemorphology(morphology, lemmas, result, source, sent)
elif 'index' in node.keys():
coindexation[int(node.get('index')) + 500].extend(
[node.get('rel'), parentid])
return None
source[:] = [a.replace(' ', '_') if a else a for a in source]
result.source = source
return result
coindexed = {}
coindexation = defaultdict(list)
rawblock, xmlblock = block
sent = xmlblock.find('sentence').text.split(' ')
tree = getsubtree(xmlblock.find('node'), 0, morphology, lemmas)
for i in coindexation:
coindexed[i].extend(coindexation[i])
comment = xmlblock.find('comments/comment') # NB: only use first comment
if comment is not None:
comment = comment.text
handlefunctions(functions, tree, morphology=morphology)
return Item(tree, sent, comment, rawblock)
[docs]def ftbtree(block, functions=None, morphology=None, lemmas=None):
"""Get tree, sent from tree in FTB format given as etree XML object."""
def getsubtree(node):
"""Parse a subtree of an FTB tree."""
source = [''] * len(FIELDS)
nodeid = next(nodeids)
source[WORD] = node.text or ("#%s" % nodeid)
source[LEMMA] = node.get('lemma') or ''
source[MORPH] = node.get('ee') or ''
source[FUNC] = node.get('fct') or ''
if node.tag == 'w':
# Could rely on compound="yes" attribute here, but there are a few
# cases (annotation errors) where it is inconsistent with the
# actual structure.
if len(node) == 0: # regular word token
source[TAG] = node.get('cat') or node.get('catint')
result = ParentedTree(source[TAG], [len(sent)])
sent.append(re.sub(r'\s+', '', (node.text or '')))
handlemorphology(morphology, lemmas, result, source, sent)
else: # compound node, has <w> child nodes with actual terminals.
source[TAG] = label = 'MW' + node.get('cat')
result = ParentedTree(label, [])
for child in node:
subtree = getsubtree(child)
subtree.source[PARENT] = nodeid
result.append(subtree)
else: # non-terminal node
source[TAG] = label = node.tag
result = ParentedTree(label, [])
for child in node:
subtree = getsubtree(child)
subtree.source[PARENT] = nodeid
result.append(subtree)
if not result:
return None
source[:] = [a.replace(' ', '_') if a else a for a in source]
result.source = source
return result
rawblock, xmlblock = block
sent = []
nodeids = count(500)
tree = getsubtree(xmlblock)
comment = ' '.join('%s=%r' % (a, xmlblock.get(a))
for a in ('nb', 'textid', 'argument', 'author', 'date'))
handlefunctions(functions, tree, morphology=morphology)
return Item(tree, sent, comment, rawblock)
[docs]def writetree(tree, sent, key, fmt, comment=None, morphology=None,
sentid=False):
"""Convert a tree to a string representation in the given treebank format.
:param tree: should have indices as terminals
:param sent: contains the words corresponding to the indices in ``tree``
:param key: an identifier for this tree; part of the output with some
formats or when ``sentid`` is True.
:param fmt: Formats are ``bracket``, ``discbracket``, Negra's ``export``
format, and ``alpino`` XML format, as well unlabeled dependency
conversion into ``mst`` or ``conll`` format (requires head rules).
The formats ``tokens`` and ``wordpos`` are to strip away tree structure
and leave only lines with space-separated tokens or ``token/POS``.
When using ``bracket``, make sure tree is canonicalized.
:param comment: optionally, a string that will go in the format's comment
field (supported by ``export`` and ``alpino``), or at the end of the
line preceded by a tab (``discbracket``); ignored by other formats.
Should be a single line.
:param sentid: for line-based formats, prefix output by ``key|``.
Lemmas, functions, and morphology information will be empty unless nodes
contain a 'source' attribute with such information."""
if fmt == 'bracket':
result = writebrackettree(tree, sent)
# if comment:
# result = '# %s\n%s\n' % (comment, result.rstrip('\n'))
elif fmt == 'discbracket':
result = writediscbrackettree(tree, sent)
if comment:
result = '%s\t%s\n' % (result.rstrip('\n'), comment)
elif fmt == 'tokens':
result = '%s\n' % ' '.join(sent)
elif fmt == 'wordpos':
result = '%s\n' % ' '.join('%s/%s' % (word, pos) for word, (_, pos)
in zip(sent, sorted(tree.pos())))
elif fmt == 'export':
result = writeexporttree(tree, sent, key, comment, morphology)
elif fmt == 'alpino':
result = writealpinotree(tree, sent, key, comment)
elif fmt in ('conll', 'mst'):
result = writedependencies(tree, sent, fmt)
else:
raise ValueError('unrecognized format: %r' % fmt)
if sentid and fmt in ('tokens', 'wordpos', 'bracket', 'discbracket'):
return '%s|%s' % (key, result)
return result
[docs]def writeexporttree(tree, sent, key, comment, morphology):
"""Return string with given tree in Negra's export format."""
def collectsecedges(node):
if node.source:
for rel, pid in zip(node.source[6::2], node.source[7::2]):
try:
idx = nodeidindex.index(int(pid))
except ValueError:
print('skipping secondary edge; %s' % key, file=sys.stderr)
continue
yield rel
yield str(500 + idx)
result = []
if key is not None:
cmt = (' %% ' + comment) if comment else ''
result.append('#BOS %s%s' % (key, cmt))
# visit nodes in post-order traversal
preterms, phrasalnodes = {}, []
agenda = list(tree)
while agenda:
node = agenda.pop()
if not node or isinstance(node[0], Tree):
# NB: to get a proper post-order traversal, children need to be
# reversed, but for the assignment of IDs this does not matter.
agenda.extend(node)
phrasalnodes.append(node)
else:
preterms[node[0]] = node
phrasalnodes.reverse()
if len(sent) != len(preterms):
raise ValueError('sentence and terminals length mismatch: '
'sentno: %s\ntree: %s\nsent (len=%d): %r\nleaves (len=%d): %r'
% (key, tree, len(sent), sent, len(preterms), preterms))
idindex = [id(node) for node in phrasalnodes]
nodeidindex = [int(node.source[WORD][1:])
if node.source and node.source[WORD].startswith('#') else 0
for node in phrasalnodes]
for n, word in enumerate(sent):
if not word:
# raise ValueError('empty word in sentence: %r' % sent)
word = '...'
node = preterms[n]
lemma = '--'
postag = node.label.replace('$[', '$(') or '--'
func = morphtag = '--'
if node.source:
lemma = node.source[LEMMA] or '--'
morphtag = node.source[MORPH] or '--'
func = node.source[FUNC] or '--'
secedges = list(collectsecedges(node))
if morphtag == '--' and morphology == 'replace':
morphtag = postag
elif morphtag == '--' and morphology == 'add' and '/' in postag:
postag, morphtag = postag.split('/', 1)
parentid = '%d' % (0 if node.parent is tree
else 500 + idindex.index(id(node.parent)))
result.append("\t".join((word, lemma, postag, morphtag, func,
parentid) + tuple(secedges)))
for n, node in enumerate(phrasalnodes):
nodeid = '#%d' % (500 + n)
lemma = '--'
label = node.label or '--'
func = morphtag = '--'
if node.source:
morphtag = node.source[MORPH] or '--'
func = node.source[FUNC] or '--'
secedges = collectsecedges(node)
parentid = '%d' % (0 if node.parent is tree
else 500 + idindex.index(id(node.parent)))
result.append('\t'.join((nodeid, lemma, label, morphtag, func,
parentid) + tuple(secedges)))
if key is not None:
result.append("#EOS %s" % key)
return "%s\n" % "\n".join(result)
[docs]def writealpinotree(tree, sent, key, commentstr):
"""Return XML string with tree in AlpinoXML format."""
def addchildren(tree, sent, parent, cnt, depth=1, last=False):
"""Recursively add children of ``tree`` to XML object ``node``"""
node = ElementTree.SubElement(parent, 'node')
node.set('id', str(next(cnt)))
node.set('begin', str(min(tree.leaves())))
node.set('end', str(max(tree.leaves()) + 1))
if tree.source:
node.set('rel', tree.source[FUNC] or '--')
if isinstance(tree[0], Tree):
node.set('cat', tree.label.lower())
node.text = '\n ' + ' ' * depth
else:
assert isinstance(tree[0], int)
node.set('pos', tree.label.lower())
node.set('word', sent[tree[0]])
if tree.source:
node.set('lemma', tree.source[LEMMA] or '--')
node.set('postag', tree.source[MORPH] or '--')
# FIXME: split features in multiple attributes
else:
node.set('lemma', '--')
node.set('postag', '--')
node.tail = '\n' + ' ' * (depth - last)
for x, child in enumerate(tree, 1):
if isinstance(child, Tree):
addchildren(child, sent, node, cnt, depth + 1, x == len(tree))
result = ElementTree.Element('alpino_ds')
result.set('version', '1.3')
# FIXME: add coindexed nodes
addchildren(tree, sent, result, count())
sentence = ElementTree.SubElement(result, 'sentence')
sentence.text = ' '.join(sent)
comment = ElementTree.SubElement(result, 'comment')
comment.text = ('%s|%s' % (key, commentstr)) if commentstr else str(key)
result.text = sentence.tail = '\n '
result.tail = comment.tail = '\n'
return ElementTree.tostring(result).decode('utf8') # hack
[docs]def writedependencies(tree, sent, fmt):
"""Convert tree to dependencies in `mst` or `conll` format."""
deps = dependencies(tree)
if fmt == 'mst': # MST parser can read this format
# https://github.com/travisbrown/mstparser#3a-input-data-format
return '\n'.join((
'\t'.join(sent),
'\t'.join(tag for _, tag in sorted(tree.pos())),
'\t'.join(str(rel) for _n, rel, _head in deps),
'\t'.join(str(head) for _n, _rel, head in deps),
)) + '\n\n'
elif fmt == 'conll':
# Cf. https://depparse.uvt.nl/DataFormat.html
return '\n'.join('%d\t%s\t_\t%s\t%s\t_\t%d\t%s\t_\t_' % (
n, word, tag, tag, head, rel)
for word, (_, tag), (n, rel, head)
in zip(sent, sorted(tree.pos()), deps)) + '\n\n'
[docs]def dependencies(root):
"""Lin (1995): A Dependency-based Method for Evaluating [...] Parsers.
http://ijcai.org/Proceedings/95-2/Papers/052.pdf
:returns: list of tuples of the form ``(headidx, label, depidx)``."""
deps = []
for child in root:
deps.append((_makedep(child, deps), 'root', 0))
# if root:
# deps.append((_makedep(root, deps), 'ROOT', 0))
return sorted(deps)
def _makedep(node, deps):
"""Traverse a head-marked tree and extract dependencies."""
if isinstance(node[0], int):
return node[0] + 1
headchild = next(iter(a for a in node if a.type == HEAD))
lexhead = _makedep(headchild, deps)
for child in node:
if child is headchild:
continue
lexheadofchild = _makedep(child, deps)
func = '-'
if (child.source
and child.source[FUNC] and child.source[FUNC] != '--'):
func = child.source[FUNC]
deps.append((lexheadofchild, func, lexhead))
return lexhead
[docs]def deplen(deps):
"""Compute dependency length from result of ``dependencies()``.
:returns: tuple ``(totaldeplen, numdeps)``."""
total = sum(abs(a - b) for a, label, b in deps
if label != 'ROOT')
return (total, float(len(deps) - 1)) # discount ROOT
[docs]def handlefunctions(action, tree, pos=True, root=False, morphology=None):
"""Add function tags to phrasal labels e.g., 'VP' => 'VP-HD'.
:param action: one of {None, 'add', 'replace', 'remove'}
:param pos: whether to add function tags to POS tags.
:param root: whether to add function tags to the root node.
:param morphology: if morphology='between', skip those nodes."""
if action in (None, 'leave'):
return
for node in tree.subtrees():
if action == 'remove':
for char in '-=': # map NP-SUBJ and NP=2 to NP; don't touch -NONE-
x = node.label.find(char)
if x > 0:
node.label = node.label[:x]
elif morphology == 'between' and not isinstance(node[0], Tree):
continue
elif (not root or action == 'between') and node is tree: # skip root
continue
elif pos or isinstance(node[0], Tree):
# test for non-empty function tag ('--' is considered empty)
func = None
if node.source and node.source[FUNC] and node.source[FUNC] != '--':
func = node.source[FUNC]
if func and action == 'add':
node.label += '-%s' % func
elif action == 'replace':
node.label = func or '--'
elif action == 'between':
parent, idx = node.parent, node.parent_index
newnode = ParentedTree('-' + (func or '--'), [parent.pop(idx)])
parent.insert(idx, newnode)
[docs]def handlemorphology(action, lemmaaction, preterminal, source, sent=None):
"""Augment/replace preterminal label with morphological information."""
if not source:
return
# escape any parentheses to avoid hassles w/bracket notation of trees
# tag = source[TAG].replace('(', '[').replace(')', ']')
morph = source[MORPH].replace('(', '[').replace(')', ']').replace(' ', '_')
lemma = (source[LEMMA].replace('(', '[').replace(')', ']').replace(
' ', '_') or '--')
if lemmaaction == 'add':
if sent is None:
raise ValueError('adding lemmas requires passing sent argument.')
sent[preterminal[0]] += '/' + lemma
elif lemmaaction == 'replace':
if sent is None:
raise ValueError('adding lemmas requires passing sent argument.')
sent[preterminal[0]] = lemma
elif lemmaaction == 'between':
preterminal[:] = [preterminal.__class__(lemma, preterminal)]
elif lemmaaction not in (None, 'no'):
raise ValueError('unrecognized action: %r' % lemmaaction)
if action in (None, 'no'):
pass # preterminal.label = tag
elif action == 'add':
preterminal.label = '%s/%s' % (preterminal.label, morph)
elif action == 'replace':
preterminal.label = morph
elif action == 'between':
preterminal[:] = [preterminal.__class__(morph, [preterminal.pop()])]
# preterminal.label = tag
elif action not in (None, 'no'):
raise ValueError('unrecognized action: %r' % action)
return preterminal
CONSUMED = True
NEWLB = re.compile(r'(?:.*[\n\r])?\s*')
[docs]def incrementaltreereader(treeinput, morphology=None, functions=None,
strict=False, robust=True, othertext=False):
"""Incremental corpus reader.
Supports brackets, discbrackets, export and alpino-xml format.
The format is autodetected.
:param treeinput: an iterable of lines (line endings optional).
:param strict: if True, raise ValueError on malformed data.
:param robust: if True, only return trees with more than 2 brackets;
e.g., (DT the) is not recognized as a tree.
:param othertext: if True, yield non-tree data as ``(None, None, line)``.
By default, text in lines without trees is ignored.
:yields: tuples ``(tree, sent, comment)`` with a Tree object, a separate
lists of terminals, and a string with any other data following the
tree."""
treeinput = chain(iter(treeinput), ('(', None, None)) # hack
line = next(treeinput)
# try the following readers on each line in this order
readers = [segmentexport(morphology, functions, strict),
segmentalpino(morphology, functions),
segmentbrackets(strict, robust)]
for reader in readers:
reader.send(None)
while True:
# status 0: line not consumed, not part of tree;
# status 1: line consumed, waiting for end of tree.
res, status = None, CONSUMED
for reader in readers:
while res is None:
try:
res, status = reader.send(line)
except StopIteration:
return
if status != CONSUMED:
break # there was no tree, or a complete tree was read
try:
line = next(treeinput)
except StopIteration:
line = None
break
if res is not None:
for tree, sent, rest in res:
x = -1 if rest is None else rest.find('\n')
if othertext and x != -1:
yield tree, sent, rest[:x]
yield None, None, rest[x:]
else:
yield tree, sent, rest
break
if res is None: # none of the readers accepted this line
if othertext and line is not None:
yield None, None, line.rstrip()
try:
line = next(treeinput)
except StopIteration:
return
[docs]def segmentbrackets(strict=False, robust=True):
"""Co-routine that accepts one line at a time.
Yields tuples ``(result, status)`` where ...
- result is None or one or more S-expressions as a list of
tuples (tree, sent, rest), where rest is the string outside of brackets
between this S-expression and the next.
- status is 1 if the line was consumed, else 0.
:param strict: if True, raise ValueError for improperly nested brackets.
:param robust: if True, only return trees with at least 2 brackets;
e.g., (DT the) is not recognized as a tree.
"""
def tryparse(result, rest):
"""Add a tree to the results list."""
try:
tree, sent = brackettree(result, detectdisc=True)
except Exception as err:
raise ValueError('%r\nwhile parsing:\n%r' % (
err, dict(result=result, rest=rest, parens=parens,
depth=depth, prev=prev)))
else:
results.append((tree, sent, rest.rstrip()))
lb, rb = '()'
parens = 0 # number of open parens
depth = 0 # max. number of open parens
prev = '' # incomplete tree currently being read
result = '' # string of complete tree
results = [] # trees found in current line
rest = '' # any non-tree data after a tree
line = (yield None, CONSUMED)
while True:
start = 0 # index where current tree starts
a, b = line.find(lb, len(prev)), line.find(rb, len(prev))
# ignore first left bracket when not preceded by whitespace
if parens == 0 and a > 0 and NEWLB.match(prev) is None:
a = -1
prev = line
while a != -1 or b != -1:
if a != -1 and (a < b or b == -1): # left bracket
# look ahead to see whether this will be a tree with depth > 1
if parens == 0 and (b == -1
or (not robust or 0 <= line.find(lb, a + 1) < b)):
rest, prev = line[start:a], line[a:]
if result:
tryparse(result, rest)
result, start = '', a
parens += 1
depth = max(depth, parens)
a = line.find(lb, a + 1)
elif b != -1 and (b < a or a == -1): # right bracket
parens -= 1
if parens == 0 and (not robust or depth > 1):
result, prev = line[start:b + 1], line[b + 1:]
start = b + 1
depth = 0
elif parens < 0:
if strict:
raise ValueError('unbalanced parentheses')
parens = 0
b = line.find(rb, b + 1)
status = CONSUMED if results or result or parens else not CONSUMED
line = (yield results or None, status)
if results:
results = []
if line is None:
if result:
tryparse(result, rest)
status = CONSUMED if results or result or parens else not CONSUMED
yield results or None, status
line = ''
if results:
results = []
if parens or result:
line = prev + line
else:
prev = ''
[docs]def segmentalpino(morphology, functions):
"""Co-routine that accepts one line at a time.
Yields tuples ``(result, status)`` where ...
- result is ``None`` or a segment delimited by
``<alpino_ds>`` and ``</alpino_ds>`` as a list of lines;
- status is 1 if the line was consumed, else 0."""
cur = []
inblock = 0
line = (yield None, CONSUMED)
while line is not None:
if line.startswith('<alpino_ds'):
cur = ['<?xml version="1.0" encoding="UTF-8"?>', line]
inblock = 1
line = (yield None, CONSUMED)
elif line.startswith('</alpino_ds>'):
cur.append(line)
rawblock = '\n'.join(cur).encode('utf8')
xmlblock = ElementTree.fromstring(rawblock)
block = rawblock, xmlblock
item = alpinotree(block, functions, morphology)
line = (yield ((item.tree, item.sent, item.comment), ), CONSUMED)
inblock = 0
cur = []
elif line.strip():
if inblock == 1:
cur.append(line)
line = line.lstrip()
line = (yield None, (CONSUMED if inblock
or line.startswith('<?xml')
else not CONSUMED))
else:
line = (yield None, not CONSUMED)
[docs]def segmentexport(morphology, functions, strict=False):
"""Co-routine that accepts one line at a time.
Yields tuples ``(result, status)`` where ...
- result is ``None`` or a segment delimited by
``#BOS`` and ``#EOS`` as a list of lines;
- status is 1 if the line was consumed, else 0."""
cur = []
inblock = 0
line = (yield None, CONSUMED)
while line is not None:
line = line.rstrip()
if line.startswith('#BOS ') or line.startswith('#BOT '):
if strict and inblock != 0:
raise ValueError('nested #BOS or #BOT')
cur[:] = [line]
inblock = 1 if line.startswith('#BOS ') else 2
line = (yield None, CONSUMED)
elif line.startswith('#EOS ') or line.startswith('#EOT '):
if strict and inblock == 0:
raise ValueError('#EOS or #EOT without start tag')
cur.append(line)
item = exporttree(cur, functions, morphology)
line = (yield ((item.tree, item.sent, item.comment), ), CONSUMED)
inblock = 0
cur = []
elif line:
if inblock == 1:
cur.append(line)
else:
line = line.lstrip()
line = (yield None, (CONSUMED if inblock
or line.startswith('%%')
or line.startswith('#FORMAT ')
else not CONSUMED))
else:
line = (yield None, not CONSUMED)
[docs]def numbase(key):
"""Split file name in numeric and string components to use as sort key."""
path, base = os.path.split(key)
components = re.split(r'[-.,_ ]', os.path.splitext(base)[0])
components = [int(a) if re.match(r'[0-9]+$', a) else a for a in components]
return [path] + components
READERS = OrderedDict((
('export', NegraCorpusReader),
('bracket', BracketCorpusReader),
('discbracket', DiscBracketCorpusReader),
('tiger', TigerXMLCorpusReader),
('alpino', AlpinoCorpusReader),
('alpinocompact', AlpinoCompactCorpusReader),
('ftb', FTBXMLCorpusReader)))
WRITERS = ('export', 'bracket', 'discbracket',
'conll', 'mst', 'tokens', 'wordpos')
__all__ = ['Item', 'CorpusReader', 'BracketCorpusReader',
'DiscBracketCorpusReader', 'NegraCorpusReader', 'TigerXMLCorpusReader',
'AlpinoCorpusReader', 'AlpinoCompactCorpusReader',
'FTBXMLCorpusReader',
'exporttree', 'exportsplit', 'alpinotree', 'ftbtree',
'writetree', 'writeexporttree', 'writealpinotree', 'writedependencies',
'dependencies', 'deplen', 'handlefunctions', 'handlemorphology',
'incrementaltreereader', 'segmentbrackets', 'segmentexport',
'segmentalpino', 'numbase']