Source code for discodop.lexicon

# -*- coding: UTF-8 -*-
"""Add rules to handle unknown words and smooth lexical probabilities.

Rare words in the training set are replaced with word signatures, such that
unknown words can receive similar tags. Given a function to produce such
signatures from words, the flow is as follows:

- Simple lexical smoothing:

  #. getunknownwordmodel (get statistics)
  #. replaceraretrainwords (adjust trees)
  #. [ read off grammar ]
  #. simplesmoothlexicon (add extra lexical productions)

- Sophisticated smoothing (untested):

  #. getunknownwordmodel
  #. getlexmodel
  #. replaceraretrainwords
  #. [ read off grammar ]
  #. smoothlexicon

- During parsing:

  #. replaceraretestwords (only give known words and signatures to parser)
  #. restore original words in derivations

"""
# pylint: disable=abstract-class-instantiated
from __future__ import division, print_function, absolute_import, \
		unicode_literals
import os
import re
import logging
import tempfile
from operator import itemgetter
from subprocess import Popen, PIPE
from collections import defaultdict, Counter
try:
	from cyordereddict import OrderedDict
except ImportError:
	from collections import OrderedDict
from fractions import Fraction
from .treebanktransforms import YEARRE
from .tree import escape
from .util import which

UNK = '_UNK'
NUMBERRE = re.compile('^(?:[0-9]*[,.\'])?[0-9]+$')
TREETAGGERHELP = '''tree tagger not found. commands to install:
mkdir tree-tagger && cd tree-tagger
wget ftp://ftp.ims.uni-stuttgart.de/pub/corpora/tree-tagger-linux-3.2.tar.gz
tar -xzf tree-tagger-linux-3.2.tar.gz
wget ftp://ftp.ims.uni-stuttgart.de/pub/corpora/tagger-scripts.tar.gz
tar -xzf ftp://ftp.ims.uni-stuttgart.de/pub/corpora/tagger-scripts.tar.gz
mkdir lib && cd lib && wget \
ftp://ftp.ims.uni-stuttgart.de/pub/corpora/german-par-linux-3.2-utf8.bin.gz
gunzip german-par-linux-3.2-utf8.bin.gz'''
STANFORDTAGGERHELP = '''Stanford tagger not found. Commands to install:
wget http://nlp.stanford.edu/software/stanford-postagger-full-2012-07-09.tgz
tar -xzf stanford-postagger-full-2012-07-09.tgz'''


[docs]def getunknownwordmodel(tagged_sents, unknownword, unknownthreshold, openclassthreshold): """Collect statistics for an unknown word model. :param tagged_sents: the sentences from the training set with the gold POS tags from the treebank. :param unknownword: a function that returns a signature for a given word; e.g., "eschewed" => "_UNK-L-d". :param unknownthreshold: words with frequency lower than or equal to this are replaced by their signature. :param openclassthreshold: tags that rewrite to at least this much word types are considered to be open class categories.""" wordsfortag = defaultdict(set) tags = Counter() wordtags = Counter() sigs = Counter() sigtag = Counter() words = Counter(word for sent in tagged_sents for word, tag in sent) lexicon = {word for word, freq in words.items() if freq > unknownthreshold} wordsig = {} for sent in tagged_sents: for n, (word, tag) in enumerate(sent): wordsfortag[tag].add(word) tags[tag] += 1 wordtags[word, tag] += 1 sig = unknownword(word, n, lexicon) wordsig[word] = sig # NB: sig may also depend on n and lexicon sigtag[sig, tag] += 1 if openclassthreshold: openclasstags = {tag: len({w.lower() for w in ws}) for tag, ws in wordsfortag.items() if len({w.lower() for w in ws}) >= openclassthreshold} closedclasstags = {tag: len({w.lower() for w in wordsfortag[tag]}) for tag in tags if tag not in openclasstags} closedclasswords = {word for tag in closedclasstags for word in wordsfortag[tag]} openclasswords = lexicon - closedclasswords # add rare closed-class words back to lexicon lexicon.update(closedclasswords) else: openclasstags = {} openclasswords = {} for sent in tagged_sents: for n, (word, _) in enumerate(sent): if word not in lexicon: sig = unknownword(word, n, lexicon) sigs[sig] += 1 msg = 'known words: %d, signature types seen: %d\n' % ( len(lexicon), len(sigs)) msg += 'open class tags: %s\n\n' % ' '.join(sorted( '%s:%d' % a for a in openclasstags.items())) msg += 'closed class tags: %s' % ' '.join(sorted( '%s:%d' % a for a in closedclasstags.items())) return (sigs, words, lexicon, wordsfortag, openclasstags, openclasswords, tags, wordtags, wordsig, sigtag), msg
[docs]def replaceraretrainwords(tagged_sents, unknownword, lexicon): """Replace train set words not in lexicon w/signature from unknownword().""" def repl(n, word): """Replace word w/signature if needed.""" if YEARRE.match(word): return '1970' elif NUMBERRE.match(word): return '000' elif word not in lexicon: return unknownword(word, n, lexicon) return word return [[repl(n, word) for n, (word, _) in enumerate(sent)] for sent in tagged_sents]
[docs]def replaceraretestwords(sent, unknownword, lexicon, sigs): """Replace test set words not in lexicon w/signature from unknownword(). If only a lowercase version of a word is in the grammar, that will be used instead. If the returned signature is not part of the grammar, a default one is returned.""" for n, word in enumerate(sent): if YEARRE.match(word): yield '1970' elif NUMBERRE.match(word): yield '000' elif word in lexicon: yield word elif word.lower() in lexicon: yield word.lower() else: sig = unknownword(word, n, lexicon) if sig in sigs: yield sig else: yield UNK
[docs]def simplesmoothlexicon(lexmodel, epsilon=1. / 100): """Collect new lexical productions. - unobserved combinations of tags with known open class words. - unobserved signatures which are mapped to ``'_UNK'``. :param epsilon: 'frequency' of productions for unseen tag, word pair. :returns: a dictionary of lexical rules, with pseudofrequencies as values. """ (lexicon, wordsfortag, openclasstags, openclasswords, tags, wordtags) = lexmodel newrules = {} # rare words as signature AND as word: for word, tag in wordtags: if word not in lexicon: # needs to be normalized later newrules[(tag, 'Epsilon'), (escape(word), )] = wordtags[word, tag] # print(tag, '=>', word, wordstags[word, tag], file=sys.stderr) for tag in openclasstags: # open class tag-word pairs for word in openclasswords - wordsfortag[tag] - {UNK}: newrules[(tag, 'Epsilon'), (escape(word), )] = epsilon for tag in tags: # catch all unknown signature newrules[(tag, 'Epsilon'), (UNK, )] = epsilon return newrules
[docs]def getlexmodel(sigs, words, _lexicon, wordsfortag, openclasstags, openclasswords, tags, wordtags, wordsig, sigtag, openclassoffset=1, kappa=1): """Compute a smoothed lexical model. :returns: a dictionary giving P(word_or_sig | tag). :param openclassoffset: for words that only appear with open class tags, add unseen combinations of open class (tag, word) with this count. :param kappa: FIXME; cf. Klein & Manning (2003), footnote 5. http://aclweb.org/anthology/P03-1054""" for tag in openclasstags: for word in openclasswords - wordsfortag[tag]: wordtags[word, tag] += openclassoffset words[word] += openclassoffset tags[tag] += openclassoffset # unseen signatures sigs[UNK] += 1 sigtag[UNK, tag] += 1 # Compute P(tag|sig) tagtotal = sum(tags.values()) wordstotal = sum(words.values()) sigstotal = sum(sigs.values()) P_tag = {} for tag in tags: P_tag[tag] = Fraction(tags[tag], tagtotal) P_word = defaultdict(int) for word in words: P_word[word] = Fraction(words[word], wordstotal) P_tagsig = defaultdict(Fraction) # ?? for sig in sigs: P_tagsig[tag, sig] = Fraction(P_tag[tag], Fraction(sigs[sig], sigstotal)) # print("P(%s | %s) = %s " % () # tag, sig, P_tagsig[tag, sig], file=sys.stderr) # Klein & Manning (2003) Accurate unlexicalized parsing # http://aclweb.org/anthology/P03-1054 # P(tag|word) = [count(tag, word) + kappa * P(tag|sig)] # / [count(word) + kappa] P_tagword = defaultdict(int) for word, tag in wordtags: P_tagword[tag, word] = Fraction(wordtags[word, tag] + kappa * P_tagsig[tag, wordsig[word]], words[word] + kappa) # print("P(%s | %s) = %s " % () # tag, word, P_tagword[tag, word], file=sys.stderr) # invert with Bayes theorem to get P(word|tag) P_wordtag = defaultdict(int) for tag, word in P_tagword: # wordorsig = word if word in lexicon else wordsig[word] wordorsig = word P_wordtag[wordorsig, tag] += Fraction((P_tagword[tag, word] * P_word[word]), P_tag[tag]) # print("P(%s | %s) = %s " % () # word, tag, P_wordtag[wordorsig, tag], file=sys.stderr) msg = "(word, tag) pairs in model: %d" % len(P_tagword) return P_wordtag, msg
[docs]def smoothlexicon(grammar, P_wordtag): """Replace lexical probabilities using given unknown word model. Ignores lexical productions of known subtrees (tag contains '@') introduced by DOP, i.e., we only modify lexical depth 1 subtrees.""" newrules = [] for (rule, yf), w in grammar: if rule[1] == 'Epsilon' and '@' not in rule[0]: wordorsig = yf[0] tag = rule[0] newrule = (((tag, 'Epsilon'), (wordorsig, )), P_wordtag[wordorsig, tag]) newrules.append(newrule) else: newrules.append(((rule, yf), w)) return newrules
# === functions for unknown word signatures ============ HASDIGIT = re.compile(r"\d", re.UNICODE) HASNONDIGIT = re.compile(r"\D", re.UNICODE) # NB: includes '-', hyphen, non-breaking hyphen # does NOT include: figure-dash, em-dash, en-dash (these are punctuation, # not word-combining) u2012-u2015; nb: these are hex values. HASDASH = re.compile("[-\u2010\u2011]") # FIXME: exclude accented characters for model 6? HASLOWER = re.compile('[a-z\xe7\xe9\xe0\xec\xf9\xe2\xea\xee\xf4\xfb\xeb' '\xef\xfc\xff\u0153\xe6]') HASUPPER = re.compile('[A-Z\xc7\xc9\xc0\xcc\xd9\xc2\xca\xce\xd4\xdb\xcb' '\xcf\xdc\u0178\u0152\xc6]') HASLETTER = re.compile('[A-Za-z\xe7\xe9\xe0\xec\xf9\xe2\xea\xee\xf4\xfb' '\xeb\xef\xfc\xff\u0153\xe6\xc7\xc9\xc0\xcc\xd9\xc2\xca\xce\xd4' '\xdb\xcb\xcf\xdc\u0178\u0152\xc6]') # Cf. http://en.wikipedia.org/wiki/French_alphabet LOWER = ('abcdefghijklmnopqrstuvwxyz\xe7\xe9\xe0\xec\xf9\xe2\xea\xee\xf4\xfb' '\xeb\xef\xfc\xff\u0153\xe6') UPPER = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ\xc7\xc9\xc0\xcc\xd9\xc2\xca\xce\xd4\xdb' '\xcb\xcf\xdc\u0178\u0152\xc6') LOWERUPPER = LOWER + UPPER
[docs]def unknownword6(word, loc, lexicon): """Model 6 of the Stanford parser (for WSJ treebank).""" wlen = len(word) numcaps = 0 sig = UNK numcaps = len(HASUPPER.findall(word)) lowered = word.lower() if numcaps > 1: sig += "-CAPS" elif numcaps > 0: if loc == 0: sig += "-INITC" if lowered in lexicon: sig += "-KNOWNLC" else: sig += "-CAP" elif HASLOWER.search(word): sig += "-LC" if HASDIGIT.search(word): sig += "-NUM" if HASDASH.search(word): sig += "-DASH" if lowered.endswith('s') and wlen >= 3: if lowered[-2] not in 'siu': sig += '-s' elif wlen >= 5 and not HASDASH.search(word) and not ( HASDIGIT.search(word) and numcaps > 0): suffixes = ('ed', 'ing', 'ion', 'er', 'est', 'ly', 'ity', 'y', 'al') for a in suffixes: if lowered.endswith(a): sig += "-%s" % a break return sig
[docs]def unknownword4(word, loc, _lexicon): """Model 4 of the Stanford parser. Relatively language agnostic.""" sig = UNK # letters if word and word[0] in UPPER: if not HASLOWER.search(word): sig += "-AC" elif loc == 0: sig += "-SC" else: sig += "-C" elif HASLOWER.search(word): sig += "-L" elif HASLETTER.search(word): sig += "-U" else: sig += "-S" # no letter # digits if HASDIGIT.search(word): if HASNONDIGIT.search(word): sig += "-n" else: sig += "-N" # punctuation if "-" in word: sig += "-H" if "." in word: sig += "-P" if "," in word: sig += "-C" if len(word) > 3: if word[-1] in LOWERUPPER: sig += "-%s" % word[-2:].lower() return sig
[docs]def unknownwordbase(word, _loc, _lexicon): """BaseUnknownWordModel of the Stanford parser. Relatively language agnostic.""" sig = UNK # letters if word[0] in UPPER: sig += "-C" else: sig += "-c" # digits if HASDIGIT.search(word): if HASNONDIGIT.search(word): sig += "-n" else: sig += "-N" # punctuation if "-" in word: sig += "-H" if word == ".": sig += "-P" if word == ",": sig += "-C" if len(word) > 3: if word[-1] in LOWERUPPER: sig += "-%s" % word[-2:].lower() return sig
NOUNSUFFIX = re.compile("(ier|ière|ité|ion|ison|isme|ysme|iste|esse|eur|euse" "|ence|eau|erie|ng|ette|age|ade|ance|ude|ogue|aphe|ate|duc|anthe" "|archie|coque|érèse|ergie|ogie|lithe|mètre|métrie|odie|pathie|phie" "|phone|phore|onyme|thèque|scope|some|pole|ôme|chromie|pie)s?$") ADJSUFFIX = re.compile("(iste|ième|uple|issime|aire|esque|atoire|ale|al|able" "|ible|atif|ique|if|ive|eux|aise|ent|ois|oise|ante|el|elle|ente|oire" "|ain|aine)s?$") POSSIBLEPLURAL = re.compile("(s|ux)$") VERBSUFFIX = re.compile("(ir|er|re|ez|ont|ent|ant|ais|ait|ra|era|eras" "|é|és|ées|isse|it)$") ADVSUFFIX = re.compile("(iment|ement|emment|amment)$") HASPUNC = re.compile("([\u0021-\u002F\u003A-\u0040\u005B\u005C\u005D" "\u005E-\u0060\u007B-\u007E\u00A1-\u00BF\u2010-\u2027\u2030-\u205E" "\u20A0-\u20B5])+") ISPUNC = re.compile("([\u0021-\u002F\u003A-\u0040\u005B\u005C\u005D" "\u005E-\u0060\u007B-\u007E\u00A1-\u00BF\u2010-\u2027\u2030-\u205E" "\u20A0-\u20B5])+$")
[docs]def unknownwordftb(word, loc, _lexicon): """Model 2 for French of the Stanford parser.""" sig = UNK if ADVSUFFIX.search(word): sig += "-ADV" elif VERBSUFFIX.search(word): sig += "-VB" elif NOUNSUFFIX.search(word): sig += "-NN" if ADJSUFFIX.search(word): sig += "-ADV" if HASDIGIT.search(word): sig += "-NUM" if POSSIBLEPLURAL.search(word): sig += "-PL" if ISPUNC.search(word): sig += "-ISPUNC" elif HASPUNC.search(word): sig += "-HASPUNC" if loc > 0 and len(word) > 0 and word[0] in UPPER: sig += "-UP" return sig
UNKNOWNWORDFUNC = { "4": unknownword4, "6": unknownword6, "base": unknownwordbase, "ftb": unknownwordftb, } # === Performing POS tagging with external tools ============
[docs]def externaltagging(usetagger, model, sents, overridetag, tagmap): """Use an external tool to tag a list of sentences.""" logging.info('Start tagging.') goldtags = [t for sent in sents.values() for _, t in sent] if usetagger == 'treetagger': # Tree-tagger if not os.path.exists('tree-tagger/bin/tree-tagger'): raise ValueError(TREETAGGERHELP) infile, inname = tempfile.mkstemp(text=True) with os.fdopen(infile, 'w') as infile: for tagsent in sents.values(): sent = map(itemgetter(0), tagsent) infile.write('\n'.join(w.encode('utf-8') for w in sent) + '\n<S>\n') filtertags = '' if not model: model = 'tree-tagger/lib/german-par-linux-3.2-utf8.bin' filtertags = '| tree-tagger/cmd/filter-german-tags' tagger = Popen('tree-tagger/bin/tree-tagger -token -sgml' ' %s %s %s' % (model, inname, filtertags), stdout=PIPE, shell=True) tagout = tagger.stdout.read( ).decode('utf-8').split('<S>')[:-1] os.unlink(inname) taggedsents = OrderedDict((n, [tagmangle(a, None, overridetag, tagmap) for a in tags.splitlines() if a.strip()]) for n, tags in zip(sents, tagout)) elif usetagger == 'stanford': # Stanford Tagger if not os.path.exists('stanford-postagger-full-2012-07-09'): raise ValueError(STANFORDTAGGERHELP) infile, inname = tempfile.mkstemp(text=True) with os.fdopen(infile, 'w') as infile: for tagsent in sents.values(): sent = map(itemgetter(0), tagsent) infile.write(' '.join(w.encode('utf-8') for w in sent) + '\n') if not model: model = 'models/german-hgc.tagger' tagger = Popen(args=( '/usr/bin/java -mx2G -classpath stanford-postagger.jar' ' edu.stanford.nlp.tagger.maxent.MaxentTagger' ' -tokenize false -encoding utf-8' ' -model %s -textFile %s' % (model, inname)).split(), cwd='stanford-postagger-full-2012-07-09', shell=False, stdout=PIPE) tagout = tagger.stdout.read( ).decode('utf-8').splitlines() os.unlink(inname) taggedsents = OrderedDict((n, [tagmangle(a, '_', overridetag, tagmap) for a in tags.split()]) for n, tags in zip(sents, tagout)) elif usetagger == 'frog': # Dutch 'frog' tagger tagger = Popen(args=[which('frog')] + '-n --skip=tacmnp -t /dev/stdin'.split(), shell=False, stdin=PIPE, stdout=PIPE) tagout, stderr = tagger.communicate(''.join( ' '.join(w for w in map(itemgetter(0), tagsent)) + '\n' for tagsent in sents.values()).encode('utf8')) logging.info(stderr) # lines consist of: 'idx token lemma POS score' taggedsents = OrderedDict((n, [(line.split()[1], line.split()[3].replace('(', '[').replace(')', ']')) for line in lines.splitlines()]) for n, lines in zip(sents, tagout.decode('utf-8').split('\n\n'))) if len(taggedsents) != len(sents): raise ValueError('mismatch in number of sentences after tagging.') for n, tags in taggedsents.items(): if len(sents[n]) != len(tags): raise ValueError('mismatch in number of tokens after tagging.\n' 'before: %r\nafter: %r' % (sents[n], tags)) newtags = [t for sent in taggedsents.values() for _, t in sent] logging.info('Tag accuracy: %5.2f\ngold - cand: %r\ncand - gold %r', (100 * accuracy(goldtags, newtags)), set(goldtags) - set(newtags), set(newtags) - set(goldtags)) return taggedsents
def accuracy(gold, cand): """Compute fraction of equivalent pairs in two sequences.""" return sum(a == b for a, b in zip(gold, cand)) / len(gold)
[docs]def tagmangle(a, splitchar, overridetag, tagmap): """Function to filter tags after they are produced by the tagger.""" word, tag = a.rsplit(splitchar, 1) for newtag in overridetag: if word in overridetag[newtag]: tag = newtag return word, tagmap.get(tag, tag)
__all__ = ['getunknownwordmodel', 'replaceraretrainwords', 'replaceraretestwords', 'simplesmoothlexicon', 'getlexmodel', 'smoothlexicon', 'unknownword6', 'unknownword4', 'unknownwordbase', 'unknownwordftb', 'externaltagging', 'tagmangle']