Welcome to mirror list, hosted at ThFree Co, Russian Federation.

processor.py « pipeline « stanza - github.com/stanfordnlp/stanza.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: b38c20668e20ad3810d25e9029d33344d30484f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""
Base classes for processors
"""

from abc import ABC, abstractmethod

from stanza.pipeline.registry import NAME_TO_PROCESSOR_CLASS, PIPELINE_NAMES, PROCESSOR_VARIANTS

class ProcessorRequirementsException(Exception):
    """ Exception indicating a processor's requirements will not be met """

    def __init__(self, processors_list, err_processor, provided_reqs):
        self._err_processor = err_processor
        # mark the broken processor as inactive, drop resources
        self.err_processor.mark_inactive()
        self._processors_list = processors_list
        self._provided_reqs = provided_reqs
        self.build_message()

    @property
    def err_processor(self):
        """ The processor that raised the exception """
        return self._err_processor

    @property
    def processor_type(self):
        return type(self.err_processor).__name__

    @property
    def processors_list(self):
        return self._processors_list

    @property
    def provided_reqs(self):
        return self._provided_reqs

    def build_message(self):
        self.message = (f"---\nPipeline Requirements Error!\n"
                        f"\tProcessor: {self.processor_type}\n"
                        f"\tPipeline processors list: {','.join(self.processors_list)}\n"
                        f"\tProcessor Requirements: {self.err_processor.requires}\n"
                        f"\t\t- fulfilled: {self.err_processor.requires.intersection(self.provided_reqs)}\n"
                        f"\t\t- missing: {self.err_processor.requires - self.provided_reqs}\n"
                        f"\nThe processors list provided for this pipeline is invalid.  Please make sure all "
                        f"prerequisites are met for every processor.\n\n")

    def __str__(self):
        return self.message


class Processor(ABC):
    """ Base class for all processors """

    def __init__(self, config, pipeline, use_gpu):
        # overall config for the processor
        self._config = config
        # pipeline building this processor (presently processors are only meant to exist in one pipeline)
        self._pipeline = pipeline
        self._set_up_variants(config, use_gpu)
        # run set up process
        # set up what annotations are required based on config
        self._set_up_requires()
        # set up what annotations are provided based on config
        self._set_up_provides()
        # given pipeline constructing this processor, check if requirements are met, throw exception if not
        self._check_requirements()

        if hasattr(self, '_variant') and self._variant.OVERRIDE:
            self.process = self._variant.process

    @abstractmethod
    def process(self, doc):
        """ Process a Document.  This is the main method of a processor. """
        pass

    def bulk_process(self, docs):
        """ Process a list of Documents. This should be replaced with a more efficient implementation if possible. """

        return [self.process(doc) for doc in docs]

    def _set_up_provides(self):
        """ Set up what processor requirements this processor fulfills.  Default is to use a class defined list. """
        self._provides = self.__class__.PROVIDES_DEFAULT

    def _set_up_requires(self):
        """ Set up requirements for this processor.  Default is to use a class defined list. """
        self._requires = self.__class__.REQUIRES_DEFAULT

    def _set_up_variants(self, config, use_gpu):
        processor_name = list(self.__class__.PROVIDES_DEFAULT)[0]
        if any(config.get(f'with_{variant}', False) for variant in PROCESSOR_VARIANTS[processor_name]):
            self._trainer = None
            variant_name = [variant for variant in PROCESSOR_VARIANTS[processor_name] if config.get(f'with_{variant}', False)][0]
            self._variant = PROCESSOR_VARIANTS[processor_name][variant_name](config)

    @property
    def config(self):
        """ Configurations for the processor """
        return self._config

    @property
    def pipeline(self):
        """ The pipeline that this processor belongs to """
        return self._pipeline

    @property
    def provides(self):
        return self._provides

    @property
    def requires(self):
        return self._requires

    def _check_requirements(self):
        """ Given a list of fulfilled requirements, check if all of this processor's requirements are met or not. """
        provided_reqs = set.union(*[processor.provides for processor in self.pipeline.loaded_processors]+[set([])])
        if self.requires - provided_reqs:
            load_names = [item[0] for item in self.pipeline.load_list]
            raise ProcessorRequirementsException(load_names, self, provided_reqs)


class ProcessorVariant(ABC):
    """ Base class for all processor variants """

    OVERRIDE = False # Set to true to override all the processing from the processor

    @abstractmethod
    def process(self, doc):
        """
        Process a document that is potentially preprocessed by the processor.
        This is the main method of a processor variant.

        If `OVERRIDE` is set to True, all preprocessing by the processor would be bypassed, and the processor variant
        would serve as a drop-in replacement of the entire processor, and has to be able to interpret all the configs
        that are typically handled by the processor it replaces.
        """
        pass

    def bulk_process(self, docs):
        """ Process a list of Documents. This should be replaced with a more efficient implementation if possible. """

        return [self.process(doc) for doc in docs]

class UDProcessor(Processor):
    """ Base class for the neural UD Processors (tokenize,mwt,pos,lemma,depparse,sentiment) """

    def __init__(self, config, pipeline, use_gpu):
        super().__init__(config, pipeline, use_gpu)

        # UD model resources, set up is processor specific
        self._pretrain = None
        self._trainer = None
        self._vocab = None
        if not hasattr(self, '_variant'):
            self._set_up_model(config, use_gpu)

        # build the final config for the processor
        self._set_up_final_config(config)

    @abstractmethod
    def _set_up_model(self, config, gpu):
        pass

    def _set_up_final_config(self, config):
        """ Finalize the configurations for this processor, based off of values from a UD model. """
        # set configurations from loaded model
        if self._trainer is not None:
            loaded_args, self._vocab = self._trainer.args, self._trainer.vocab
            # filter out unneeded args from model
            loaded_args = {k: v for k, v in loaded_args.items() if not UDProcessor.filter_out_option(k)}
        else:
            loaded_args = {}
        loaded_args.update(config)
        self._config = loaded_args

    def mark_inactive(self):
        """ Drop memory intensive resources if keeping this processor around for reasons other than running it. """
        self._trainer = None
        self._vocab = None

    @property
    def pretrain(self):
        return self._pretrain

    @property
    def trainer(self):
        return self._trainer

    @property
    def vocab(self):
        return self._vocab

    @staticmethod
    def filter_out_option(option):
        """ Filter out non-processor configurations """
        options_to_filter = ['cpu', 'cuda', 'dev_conll_gold', 'epochs', 'lang', 'mode', 'save_name', 'shorthand']
        if option.endswith('_file') or option.endswith('_dir'):
            return True
        elif option in options_to_filter:
            return True
        else:
            return False

class ProcessorRegisterException(Exception):
    """ Exception indicating processor or processor registration failure """

    def __init__(self, processor_class, expected_parent):
        self._processor_class = processor_class
        self._expected_parent = expected_parent
        self.build_message()

    def build_message(self):
        self.message = f"Failed to register '{self._processor_class}'. It must be a subclass of '{self._expected_parent}'."

    def __str__(self):
        return self.message

def register_processor(name):
    def wrapper(Cls):
        if not issubclass(Cls, Processor):
            raise ProcessorRegisterException(Cls, Processor)

        NAME_TO_PROCESSOR_CLASS[name] = Cls
        PIPELINE_NAMES.append(name)
        return Cls
    return wrapper

def register_processor_variant(name, variant):
    def wrapper(Cls):
        if not issubclass(Cls, ProcessorVariant):
            raise ProcessorRegisterException(Cls, ProcessorVariant)

        PROCESSOR_VARIANTS[name][variant] = Cls
        return Cls
    return wrapper