1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
"""
Base classes for processors
"""
from abc import ABC, abstractmethod
from stanza.pipeline.registry import NAME_TO_PROCESSOR_CLASS, PIPELINE_NAMES, PROCESSOR_VARIANTS
class ProcessorRequirementsException(Exception):
""" Exception indicating a processor's requirements will not be met """
def __init__(self, processors_list, err_processor, provided_reqs):
self._err_processor = err_processor
# mark the broken processor as inactive, drop resources
self.err_processor.mark_inactive()
self._processors_list = processors_list
self._provided_reqs = provided_reqs
self.build_message()
@property
def err_processor(self):
""" The processor that raised the exception """
return self._err_processor
@property
def processor_type(self):
return type(self.err_processor).__name__
@property
def processors_list(self):
return self._processors_list
@property
def provided_reqs(self):
return self._provided_reqs
def build_message(self):
self.message = (f"---\nPipeline Requirements Error!\n"
f"\tProcessor: {self.processor_type}\n"
f"\tPipeline processors list: {','.join(self.processors_list)}\n"
f"\tProcessor Requirements: {self.err_processor.requires}\n"
f"\t\t- fulfilled: {self.err_processor.requires.intersection(self.provided_reqs)}\n"
f"\t\t- missing: {self.err_processor.requires - self.provided_reqs}\n"
f"\nThe processors list provided for this pipeline is invalid. Please make sure all "
f"prerequisites are met for every processor.\n\n")
def __str__(self):
return self.message
class Processor(ABC):
""" Base class for all processors """
def __init__(self, config, pipeline, use_gpu):
# overall config for the processor
self._config = config
# pipeline building this processor (presently processors are only meant to exist in one pipeline)
self._pipeline = pipeline
self._set_up_variants(config, use_gpu)
# run set up process
# set up what annotations are required based on config
self._set_up_requires()
# set up what annotations are provided based on config
self._set_up_provides()
# given pipeline constructing this processor, check if requirements are met, throw exception if not
self._check_requirements()
if hasattr(self, '_variant') and self._variant.OVERRIDE:
self.process = self._variant.process
@abstractmethod
def process(self, doc):
""" Process a Document. This is the main method of a processor. """
pass
def bulk_process(self, docs):
""" Process a list of Documents. This should be replaced with a more efficient implementation if possible. """
return [self.process(doc) for doc in docs]
def _set_up_provides(self):
""" Set up what processor requirements this processor fulfills. Default is to use a class defined list. """
self._provides = self.__class__.PROVIDES_DEFAULT
def _set_up_requires(self):
""" Set up requirements for this processor. Default is to use a class defined list. """
self._requires = self.__class__.REQUIRES_DEFAULT
def _set_up_variants(self, config, use_gpu):
processor_name = list(self.__class__.PROVIDES_DEFAULT)[0]
if any(config.get(f'with_{variant}', False) for variant in PROCESSOR_VARIANTS[processor_name]):
self._trainer = None
variant_name = [variant for variant in PROCESSOR_VARIANTS[processor_name] if config.get(f'with_{variant}', False)][0]
self._variant = PROCESSOR_VARIANTS[processor_name][variant_name](config)
@property
def config(self):
""" Configurations for the processor """
return self._config
@property
def pipeline(self):
""" The pipeline that this processor belongs to """
return self._pipeline
@property
def provides(self):
return self._provides
@property
def requires(self):
return self._requires
def _check_requirements(self):
""" Given a list of fulfilled requirements, check if all of this processor's requirements are met or not. """
provided_reqs = set.union(*[processor.provides for processor in self.pipeline.loaded_processors]+[set([])])
if self.requires - provided_reqs:
load_names = [item[0] for item in self.pipeline.load_list]
raise ProcessorRequirementsException(load_names, self, provided_reqs)
class ProcessorVariant(ABC):
""" Base class for all processor variants """
OVERRIDE = False # Set to true to override all the processing from the processor
@abstractmethod
def process(self, doc):
"""
Process a document that is potentially preprocessed by the processor.
This is the main method of a processor variant.
If `OVERRIDE` is set to True, all preprocessing by the processor would be bypassed, and the processor variant
would serve as a drop-in replacement of the entire processor, and has to be able to interpret all the configs
that are typically handled by the processor it replaces.
"""
pass
def bulk_process(self, docs):
""" Process a list of Documents. This should be replaced with a more efficient implementation if possible. """
return [self.process(doc) for doc in docs]
class UDProcessor(Processor):
""" Base class for the neural UD Processors (tokenize,mwt,pos,lemma,depparse,sentiment) """
def __init__(self, config, pipeline, use_gpu):
super().__init__(config, pipeline, use_gpu)
# UD model resources, set up is processor specific
self._pretrain = None
self._trainer = None
self._vocab = None
if not hasattr(self, '_variant'):
self._set_up_model(config, use_gpu)
# build the final config for the processor
self._set_up_final_config(config)
@abstractmethod
def _set_up_model(self, config, gpu):
pass
def _set_up_final_config(self, config):
""" Finalize the configurations for this processor, based off of values from a UD model. """
# set configurations from loaded model
if self._trainer is not None:
loaded_args, self._vocab = self._trainer.args, self._trainer.vocab
# filter out unneeded args from model
loaded_args = {k: v for k, v in loaded_args.items() if not UDProcessor.filter_out_option(k)}
else:
loaded_args = {}
loaded_args.update(config)
self._config = loaded_args
def mark_inactive(self):
""" Drop memory intensive resources if keeping this processor around for reasons other than running it. """
self._trainer = None
self._vocab = None
@property
def pretrain(self):
return self._pretrain
@property
def trainer(self):
return self._trainer
@property
def vocab(self):
return self._vocab
@staticmethod
def filter_out_option(option):
""" Filter out non-processor configurations """
options_to_filter = ['cpu', 'cuda', 'dev_conll_gold', 'epochs', 'lang', 'mode', 'save_name', 'shorthand']
if option.endswith('_file') or option.endswith('_dir'):
return True
elif option in options_to_filter:
return True
else:
return False
class ProcessorRegisterException(Exception):
""" Exception indicating processor or processor registration failure """
def __init__(self, processor_class, expected_parent):
self._processor_class = processor_class
self._expected_parent = expected_parent
self.build_message()
def build_message(self):
self.message = f"Failed to register '{self._processor_class}'. It must be a subclass of '{self._expected_parent}'."
def __str__(self):
return self.message
def register_processor(name):
def wrapper(Cls):
if not issubclass(Cls, Processor):
raise ProcessorRegisterException(Cls, Processor)
NAME_TO_PROCESSOR_CLASS[name] = Cls
PIPELINE_NAMES.append(name)
return Cls
return wrapper
def register_processor_variant(name, variant):
def wrapper(Cls):
if not issubclass(Cls, ProcessorVariant):
raise ProcessorRegisterException(Cls, ProcessorVariant)
PROCESSOR_VARIANTS[name][variant] = Cls
return Cls
return wrapper
|