123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- #!/usr/bin/env python3
- # Copyright (c) Facebook, Inc. and its affiliates.
- # All rights reserved.
- #
- # The following code has been taken from
- # https://github.com/facebookresearch/DPR, which is CC-BY-NC 4.0
- # licensed as of now. More details on the license can be found
- # at https://github.com/facebookresearch/DPR/blob/master/LICENSE
- """
- Most of the tokenizers code here is copied from DrQA codebase to avoid adding extra dependency
- """
- import copy
- import logging
- import regex
- import spacy
- logger = logging.getLogger(__name__)
- class Tokens(object):
- """A class to represent a list of tokenized text."""
- TEXT = 0
- TEXT_WS = 1
- SPAN = 2
- POS = 3
- LEMMA = 4
- NER = 5
- def __init__(self, data, annotators, opts=None):
- self.data = data
- self.annotators = annotators
- self.opts = opts or {}
- def __len__(self):
- """The number of tokens."""
- return len(self.data)
- def slice(self, i=None, j=None):
- """Return a view of the list of tokens from [i, j)."""
- new_tokens = copy.copy(self)
- new_tokens.data = self.data[i: j]
- return new_tokens
- def untokenize(self):
- """Returns the original text (with whitespace reinserted)."""
- return ''.join([t[self.TEXT_WS] for t in self.data]).strip()
- def words(self, uncased=False):
- """Returns a list of the text of each token
- Args:
- uncased: lower cases text
- """
- if uncased:
- return [t[self.TEXT].lower() for t in self.data]
- else:
- return [t[self.TEXT] for t in self.data]
- def offsets(self):
- """Returns a list of [start, end) character offsets of each token."""
- return [t[self.SPAN] for t in self.data]
- def pos(self):
- """Returns a list of part-of-speech tags of each token.
- Returns None if this annotation was not included.
- """
- if 'pos' not in self.annotators:
- return None
- return [t[self.POS] for t in self.data]
- def lemmas(self):
- """Returns a list of the lemmatized text of each token.
- Returns None if this annotation was not included.
- """
- if 'lemma' not in self.annotators:
- return None
- return [t[self.LEMMA] for t in self.data]
- def entities(self):
- """Returns a list of named-entity-recognition tags of each token.
- Returns None if this annotation was not included.
- """
- if 'ner' not in self.annotators:
- return None
- return [t[self.NER] for t in self.data]
- def ngrams(self, n=1, uncased=False, filter_fn=None, as_strings=True):
- """Returns a list of all ngrams from length 1 to n.
- Args:
- n: upper limit of ngram length
- uncased: lower cases text
- filter_fn: user function that takes in an ngram list and returns
- True or False to keep or not keep the ngram
- as_string: return the ngram as a string vs list
- """
- def _skip(gram):
- if not filter_fn:
- return False
- return filter_fn(gram)
- words = self.words(uncased)
- ngrams = [(s, e + 1)
- for s in range(len(words))
- for e in range(s, min(s + n, len(words)))
- if not _skip(words[s:e + 1])]
- # Concatenate into strings
- if as_strings:
- ngrams = ['{}'.format(' '.join(words[s:e])) for (s, e) in ngrams]
- return ngrams
- def entity_groups(self):
- """Group consecutive entity tokens with the same NER tag."""
- entities = self.entities()
- if not entities:
- return None
- non_ent = self.opts.get('non_ent', 'O')
- groups = []
- idx = 0
- while idx < len(entities):
- ner_tag = entities[idx]
- # Check for entity tag
- if ner_tag != non_ent:
- # Chomp the sequence
- start = idx
- while (idx < len(entities) and entities[idx] == ner_tag):
- idx += 1
- groups.append((self.slice(start, idx).untokenize(), ner_tag))
- else:
- idx += 1
- return groups
- class Tokenizer(object):
- """Base tokenizer class.
- Tokenizers implement tokenize, which should return a Tokens class.
- """
- def tokenize(self, text):
- raise NotImplementedError
- def shutdown(self):
- pass
- def __del__(self):
- self.shutdown()
- class SimpleTokenizer(Tokenizer):
- ALPHA_NUM = r'[\p{L}\p{N}\p{M}]+'
- NON_WS = r'[^\p{Z}\p{C}]'
- def __init__(self, **kwargs):
- """
- Args:
- annotators: None or empty set (only tokenizes).
- """
- self._regexp = regex.compile(
- '(%s)|(%s)' % (self.ALPHA_NUM, self.NON_WS),
- flags=regex.IGNORECASE + regex.UNICODE + regex.MULTILINE
- )
- if len(kwargs.get('annotators', {})) > 0:
- logger.warning('%s only tokenizes! Skipping annotators: %s' %
- (type(self).__name__, kwargs.get('annotators')))
- self.annotators = set()
- def tokenize(self, text):
- data = []
- matches = [m for m in self._regexp.finditer(text)]
- for i in range(len(matches)):
- # Get text
- token = matches[i].group()
- # Get whitespace
- span = matches[i].span()
- start_ws = span[0]
- if i + 1 < len(matches):
- end_ws = matches[i + 1].span()[0]
- else:
- end_ws = span[1]
- # Format data
- data.append((
- token,
- text[start_ws: end_ws],
- span,
- ))
- return Tokens(data, self.annotators)
- class SpacyTokenizer(Tokenizer):
- def __init__(self, **kwargs):
- """
- Args:
- annotators: set that can include pos, lemma, and ner.
- model: spaCy model to use (either path, or keyword like 'en').
- """
- model = kwargs.get('model', 'en')
- self.annotators = copy.deepcopy(kwargs.get('annotators', set()))
- nlp_kwargs = {'parser': False}
- if not any([p in self.annotators for p in ['lemma', 'pos', 'ner']]):
- nlp_kwargs['tagger'] = False
- if 'ner' not in self.annotators:
- nlp_kwargs['entity'] = False
- self.nlp = spacy.load(model, **nlp_kwargs)
- def tokenize(self, text):
- # We don't treat new lines as tokens.
- clean_text = text.replace('\n', ' ')
- tokens = self.nlp.tokenizer(clean_text)
- if any([p in self.annotators for p in ['lemma', 'pos', 'ner']]):
- self.nlp.tagger(tokens)
- if 'ner' in self.annotators:
- self.nlp.entity(tokens)
- data = []
- for i in range(len(tokens)):
- # Get whitespace
- start_ws = tokens[i].idx
- if i + 1 < len(tokens):
- end_ws = tokens[i + 1].idx
- else:
- end_ws = tokens[i].idx + len(tokens[i].text)
- data.append((
- tokens[i].text,
- text[start_ws: end_ws],
- (tokens[i].idx, tokens[i].idx + len(tokens[i].text)),
- tokens[i].tag_,
- tokens[i].lemma_,
- tokens[i].ent_type_,
- ))
- # Set special option for non-entity tag: '' vs 'O' in spaCy
- return Tokens(data, self.annotators, opts={'non_ent': ''})
|