#!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. # All rights reserved. # # The following code has been taken from # https://github.com/facebookresearch/DPR, which is CC-BY-NC 4.0 # licensed as of now. More details on the license can be found # at https://github.com/facebookresearch/DPR/blob/master/LICENSE """ Most of the tokenizers code here is copied from DrQA codebase to avoid adding extra dependency """ import copy import logging import regex import spacy logger = logging.getLogger(__name__) class Tokens(object): """A class to represent a list of tokenized text.""" TEXT = 0 TEXT_WS = 1 SPAN = 2 POS = 3 LEMMA = 4 NER = 5 def __init__(self, data, annotators, opts=None): self.data = data self.annotators = annotators self.opts = opts or {} def __len__(self): """The number of tokens.""" return len(self.data) def slice(self, i=None, j=None): """Return a view of the list of tokens from [i, j).""" new_tokens = copy.copy(self) new_tokens.data = self.data[i: j] return new_tokens def untokenize(self): """Returns the original text (with whitespace reinserted).""" return ''.join([t[self.TEXT_WS] for t in self.data]).strip() def words(self, uncased=False): """Returns a list of the text of each token Args: uncased: lower cases text """ if uncased: return [t[self.TEXT].lower() for t in self.data] else: return [t[self.TEXT] for t in self.data] def offsets(self): """Returns a list of [start, end) character offsets of each token.""" return [t[self.SPAN] for t in self.data] def pos(self): """Returns a list of part-of-speech tags of each token. Returns None if this annotation was not included. """ if 'pos' not in self.annotators: return None return [t[self.POS] for t in self.data] def lemmas(self): """Returns a list of the lemmatized text of each token. Returns None if this annotation was not included. """ if 'lemma' not in self.annotators: return None return [t[self.LEMMA] for t in self.data] def entities(self): """Returns a list of named-entity-recognition tags of each token. Returns None if this annotation was not included. """ if 'ner' not in self.annotators: return None return [t[self.NER] for t in self.data] def ngrams(self, n=1, uncased=False, filter_fn=None, as_strings=True): """Returns a list of all ngrams from length 1 to n. Args: n: upper limit of ngram length uncased: lower cases text filter_fn: user function that takes in an ngram list and returns True or False to keep or not keep the ngram as_string: return the ngram as a string vs list """ def _skip(gram): if not filter_fn: return False return filter_fn(gram) words = self.words(uncased) ngrams = [(s, e + 1) for s in range(len(words)) for e in range(s, min(s + n, len(words))) if not _skip(words[s:e + 1])] # Concatenate into strings if as_strings: ngrams = ['{}'.format(' '.join(words[s:e])) for (s, e) in ngrams] return ngrams def entity_groups(self): """Group consecutive entity tokens with the same NER tag.""" entities = self.entities() if not entities: return None non_ent = self.opts.get('non_ent', 'O') groups = [] idx = 0 while idx < len(entities): ner_tag = entities[idx] # Check for entity tag if ner_tag != non_ent: # Chomp the sequence start = idx while (idx < len(entities) and entities[idx] == ner_tag): idx += 1 groups.append((self.slice(start, idx).untokenize(), ner_tag)) else: idx += 1 return groups class Tokenizer(object): """Base tokenizer class. Tokenizers implement tokenize, which should return a Tokens class. """ def tokenize(self, text): raise NotImplementedError def shutdown(self): pass def __del__(self): self.shutdown() class SimpleTokenizer(Tokenizer): ALPHA_NUM = r'[\p{L}\p{N}\p{M}]+' NON_WS = r'[^\p{Z}\p{C}]' def __init__(self, **kwargs): """ Args: annotators: None or empty set (only tokenizes). """ self._regexp = regex.compile( '(%s)|(%s)' % (self.ALPHA_NUM, self.NON_WS), flags=regex.IGNORECASE + regex.UNICODE + regex.MULTILINE ) if len(kwargs.get('annotators', {})) > 0: logger.warning('%s only tokenizes! Skipping annotators: %s' % (type(self).__name__, kwargs.get('annotators'))) self.annotators = set() def tokenize(self, text): data = [] matches = [m for m in self._regexp.finditer(text)] for i in range(len(matches)): # Get text token = matches[i].group() # Get whitespace span = matches[i].span() start_ws = span[0] if i + 1 < len(matches): end_ws = matches[i + 1].span()[0] else: end_ws = span[1] # Format data data.append(( token, text[start_ws: end_ws], span, )) return Tokens(data, self.annotators) class SpacyTokenizer(Tokenizer): def __init__(self, **kwargs): """ Args: annotators: set that can include pos, lemma, and ner. model: spaCy model to use (either path, or keyword like 'en'). """ model = kwargs.get('model', 'en') self.annotators = copy.deepcopy(kwargs.get('annotators', set())) nlp_kwargs = {'parser': False} if not any([p in self.annotators for p in ['lemma', 'pos', 'ner']]): nlp_kwargs['tagger'] = False if 'ner' not in self.annotators: nlp_kwargs['entity'] = False self.nlp = spacy.load(model, **nlp_kwargs) def tokenize(self, text): # We don't treat new lines as tokens. clean_text = text.replace('\n', ' ') tokens = self.nlp.tokenizer(clean_text) if any([p in self.annotators for p in ['lemma', 'pos', 'ner']]): self.nlp.tagger(tokens) if 'ner' in self.annotators: self.nlp.entity(tokens) data = [] for i in range(len(tokens)): # Get whitespace start_ws = tokens[i].idx if i + 1 < len(tokens): end_ws = tokens[i + 1].idx else: end_ws = tokens[i].idx + len(tokens[i].text) data.append(( tokens[i].text, text[start_ws: end_ws], (tokens[i].idx, tokens[i].idx + len(tokens[i].text)), tokens[i].tag_, tokens[i].lemma_, tokens[i].ent_type_, )) # Set special option for non-entity tag: '' vs 'O' in spaCy return Tokens(data, self.annotators, opts={'non_ent': ''})