diff options
Diffstat (limited to 'transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py')
-rw-r--r-- | transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py | 606 |
1 files changed, 606 insertions, 0 deletions
diff --git a/transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py b/transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py new file mode 100644 index 0000000..02c1c73 --- /dev/null +++ b/transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py @@ -0,0 +1,606 @@ +import json +import logging +import math +import os +import queue +from collections import OrderedDict +from typing import List, Dict, Tuple, Iterable, Type, Union, Callable + +import numpy as np +import torch +import torch.multiprocessing as mp +import transformers +from numpy import ndarray +from sklearn.metrics.pairwise import paired_cosine_distances +from torch import nn, Tensor, device +from torch.optim.optimizer import Optimizer +from torch.utils.data import DataLoader +from tqdm.autonotebook import trange + +from transquest.algo.sentence_level.siamesetransquest.evaluation.sentence_evaluator import SentenceEvaluator +from transquest.algo.sentence_level.siamesetransquest.models import Transformer, Pooling +from transquest.algo.sentence_level.siamesetransquest.util import batch_to_device + +logger = logging.getLogger(__name__) + + +class SiameseTransformer(nn.Sequential): + + def __init__(self, model_name: str = None, args=None, device: str = None): + + transformer_model = Transformer(model_name, max_seq_length=args.max_seq_length) + pooling_model = Pooling(transformer_model.get_word_embedding_dimension(), pooling_mode_mean_tokens=True, + pooling_mode_cls_token=False, + pooling_mode_max_tokens=False) + modules = [transformer_model, pooling_model] + + if modules is not None and not isinstance(modules, OrderedDict): + modules = OrderedDict([(str(idx), module) for idx, module in enumerate(modules)]) + + super().__init__(modules) + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + logger.info("Use pytorch device: {}".format(device)) + + self._target_device = torch.device(device) + + def encode(self, sentences: Union[str, List[str], List[int]], + batch_size: int = 32, + show_progress_bar: bool = None, + output_value: str = 'sentence_embedding', + convert_to_numpy: bool = True, + convert_to_tensor: bool = False, + device: str = None, + normalize_embeddings: bool = False) -> Union[List[Tensor], ndarray, Tensor]: + """ + Computes sentence embeddings + + :param sentences: the sentences to embed + :param batch_size: the batch size used for the computation + :param show_progress_bar: Output a progress bar when encode sentences + :param output_value: Default sentence_embedding, to get sentence embeddings. Can be set to token_embeddings to get wordpiece token embeddings. + :param convert_to_numpy: If true, the output is a list of numpy vectors. Else, it is a list of pytorch tensors. + :param convert_to_tensor: If true, you get one large tensor as return. Overwrites any setting from convert_to_numpy + :param device: Which torch.device to use for the computation + :param normalize_embeddings: If set to true, returned vectors will have length 1. In that case, the faster dot-product (util.dot_score) instead of cosine similarity can be used. + + :return: + By default, a list of tensors is returned. If convert_to_tensor, a stacked tensor is returned. If convert_to_numpy, a numpy matrix is returned. + """ + self.eval() + if show_progress_bar is None: + show_progress_bar = ( + logger.getEffectiveLevel() == logging.INFO or logger.getEffectiveLevel() == logging.DEBUG) + + if convert_to_tensor: + convert_to_numpy = False + + if output_value == 'token_embeddings': + convert_to_tensor = False + convert_to_numpy = False + + input_was_string = False + if isinstance(sentences, str) or not hasattr(sentences, + '__len__'): # Cast an individual sentence to a list with length 1 + sentences = [sentences] + input_was_string = True + + if device is None: + device = self._target_device + + self.to(device) + + all_embeddings = [] + length_sorted_idx = np.argsort([-self._text_length(sen) for sen in sentences]) + sentences_sorted = [sentences[idx] for idx in length_sorted_idx] + + for start_index in trange(0, len(sentences), batch_size, desc="Batches", disable=not show_progress_bar): + sentences_batch = sentences_sorted[start_index:start_index + batch_size] + features = self.tokenize(sentences_batch) + features = batch_to_device(features, device) + + with torch.no_grad(): + out_features = self.forward(features) + + if output_value == 'token_embeddings': + embeddings = [] + for token_emb, attention in zip(out_features[output_value], out_features['attention_mask']): + last_mask_id = len(attention) - 1 + while last_mask_id > 0 and attention[last_mask_id].item() == 0: + last_mask_id -= 1 + + embeddings.append(token_emb[0:last_mask_id + 1]) + else: # Sentence embeddings + embeddings = out_features[output_value] + embeddings = embeddings.detach() + if normalize_embeddings: + embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) + + # fixes for #522 and #487 to avoid oom problems on gpu with large datasets + if convert_to_numpy: + embeddings = embeddings.cpu() + + all_embeddings.extend(embeddings) + + all_embeddings = [all_embeddings[idx] for idx in np.argsort(length_sorted_idx)] + + if convert_to_tensor: + all_embeddings = torch.stack(all_embeddings) + elif convert_to_numpy: + all_embeddings = np.asarray([emb.numpy() for emb in all_embeddings]) + + if input_was_string: + all_embeddings = all_embeddings[0] + + return all_embeddings + + def predict(self, to_predict, verbose=True): + sentences1 = [] + sentences2 = [] + + for text_1, text_2 in to_predict: + sentences1.append(text_1) + sentences2.append(text_2) + + embeddings1 = self.encode(sentences1, show_progress_bar=verbose, convert_to_numpy=True) + embeddings2 = self.encode(sentences2, show_progress_bar=verbose, convert_to_numpy=True) + + cosine_scores = 1 - (paired_cosine_distances(embeddings1, embeddings2)) + + return cosine_scores + + def start_multi_process_pool(self, target_devices: List[str] = None): + """ + Starts multi process to process the encoding with several, independent processes. + This method is recommended if you want to encode on multiple GPUs. It is advised + to start only one process per GPU. This method works together with encode_multi_process + + :param target_devices: PyTorch target devices, e.g. cuda:0, cuda:1... If None, all available CUDA devices will be used + :return: Returns a dict with the target processes, an input queue and and output queue. + """ + if target_devices is None: + if torch.cuda.is_available(): + target_devices = ['cuda:{}'.format(i) for i in range(torch.cuda.device_count())] + else: + logger.info("CUDA is not available. Start 4 CPU worker") + target_devices = ['cpu'] * 4 + + logger.info("Start multi-process pool on devices: {}".format(', '.join(map(str, target_devices)))) + + ctx = mp.get_context('spawn') + input_queue = ctx.Queue() + output_queue = ctx.Queue() + processes = [] + + for cuda_id in target_devices: + p = ctx.Process(target=SiameseTransformer._encode_multi_process_worker, + args=(cuda_id, self, input_queue, output_queue), daemon=True) + p.start() + processes.append(p) + + return {'input': input_queue, 'output': output_queue, 'processes': processes} + + @staticmethod + def stop_multi_process_pool(pool): + """ + Stops all processes started with start_multi_process_pool + """ + for p in pool['processes']: + p.terminate() + + for p in pool['processes']: + p.join() + p.close() + + pool['input'].close() + pool['output'].close() + + def encode_multi_process(self, sentences: List[str], pool: Dict[str, object], batch_size: int = 32, + chunk_size: int = None): + """ + This method allows to run encode() on multiple GPUs. The sentences are chunked into smaller packages + and sent to individual processes, which encode these on the different GPUs. This method is only suitable + for encoding large sets of sentences + + :param sentences: List of sentences + :param pool: A pool of workers started with SentenceTransformer.start_multi_process_pool + :param batch_size: Encode sentences with batch size + :param chunk_size: Sentences are chunked and sent to the individual processes. If none, it determine a sensible size. + :return: Numpy matrix with all embeddings + """ + + if chunk_size is None: + chunk_size = min(math.ceil(len(sentences) / len(pool["processes"]) / 10), 5000) + + logger.info("Chunk data into packages of size {}".format(chunk_size)) + + input_queue = pool['input'] + last_chunk_id = 0 + chunk = [] + + for sentence in sentences: + chunk.append(sentence) + if len(chunk) >= chunk_size: + input_queue.put([last_chunk_id, batch_size, chunk]) + last_chunk_id += 1 + chunk = [] + + if len(chunk) > 0: + input_queue.put([last_chunk_id, batch_size, chunk]) + last_chunk_id += 1 + + output_queue = pool['output'] + results_list = sorted([output_queue.get() for _ in range(last_chunk_id)], key=lambda x: x[0]) + embeddings = np.concatenate([result[1] for result in results_list]) + return embeddings + + @staticmethod + def _encode_multi_process_worker(target_device: str, model, input_queue, results_queue): + """ + Internal working process to encode sentences in multi-process setup + """ + while True: + try: + id, batch_size, sentences = input_queue.get() + embeddings = model.encode(sentences, device=target_device, show_progress_bar=False, + convert_to_numpy=True, batch_size=batch_size) + results_queue.put([id, embeddings]) + except queue.Empty: + break + + def get_max_seq_length(self): + """ + Returns the maximal sequence length for input the model accepts. Longer inputs will be truncated + """ + if hasattr(self._first_module(), 'max_seq_length'): + return self._first_module().max_seq_length + + return None + + def tokenize(self, text: str): + """ + Tokenizes the text + """ + return self._first_module().tokenize(text) + + def get_sentence_features(self, *features): + return self._first_module().get_sentence_features(*features) + + def get_sentence_embedding_dimension(self): + for mod in reversed(self._modules.values()): + sent_embedding_dim_method = getattr(mod, "get_sentence_embedding_dimension", None) + if callable(sent_embedding_dim_method): + return sent_embedding_dim_method() + return None + + def _first_module(self): + """Returns the first module of this sequential embedder""" + return self._modules[next(iter(self._modules))] + + def _last_module(self): + """Returns the last module of this sequential embedder""" + return self._modules[next(reversed(self._modules))] + + def save(self, path): + """ + Saves all elements for this seq. sentence embedder into different sub-folders + """ + if path is None: + return + + os.makedirs(path, exist_ok=True) + + logger.info("Save model to {}".format(path)) + contained_modules = [] + + for idx, name in enumerate(self._modules): + module = self._modules[name] + # model_path = os.path.join(path, str(idx)+"_"+type(module).__name__) + os.makedirs(path, exist_ok=True) + module.save(path) + contained_modules.append( + {'idx': idx, 'name': name, 'path': os.path.basename(path), 'type': type(module).__module__}) + + with open(os.path.join(path, 'modules.json'), 'w') as fOut: + json.dump(contained_modules, fOut, indent=2) + + self.save_model_args(path) + + def smart_batching_collate(self, batch): + """ + Transforms a batch from a SmartBatchingDataset to a batch of tensors for the model + Here, batch is a list of tuples: [(tokens, label), ...] + + :param batch: + a batch from a SmartBatchingDataset + :return: + a batch of tensors for the model + """ + num_texts = len(batch[0].texts) + texts = [[] for _ in range(num_texts)] + labels = [] + + for example in batch: + for idx, text in enumerate(example.texts): + texts[idx].append(text) + + labels.append(example.label) + + labels = torch.tensor(labels).to(self._target_device) + + sentence_features = [] + for idx in range(num_texts): + tokenized = self.tokenize(texts[idx]) + batch_to_device(tokenized, self._target_device) + sentence_features.append(tokenized) + + return sentence_features, labels + + def _text_length(self, text: Union[List[int], List[List[int]]]): + """ + Help function to get the length for the input text. Text can be either + a list of ints (which means a single text as input), or a tuple of list of ints + (representing several text inputs to the model). + """ + + if isinstance(text, dict): # {key: value} case + return len(next(iter(text.values()))) + elif not hasattr(text, '__len__'): # Object has no len() method + return 1 + elif len(text) == 0 or isinstance(text[0], int): # Empty string or list of ints + return len(text) + else: + return sum([len(t) for t in text]) # Sum of length of individual strings + + + def fit(self, + train_objectives: Iterable[Tuple[DataLoader, nn.Module]], + evaluator: SentenceEvaluator = None, + epochs: int = 1, + steps_per_epoch=None, + scheduler: str = 'WarmupLinear', + warmup_steps: int = 10000, + optimizer_class: Type[Optimizer] = transformers.AdamW, + optimizer_params: Dict[str, object] = {'lr': 2e-5}, + weight_decay: float = 0.01, + evaluation_steps: int = 0, + output_path: str = None, + save_best_model: bool = True, + max_grad_norm: float = 1, + use_amp: bool = False, + callback: Callable[[float, int, int], None] = None, + show_progress_bar: bool = True + ): + """ + Train the model with the given training objective + Each training objective is sampled in turn for one batch. + We sample only as many batches from each objective as there are in the smallest one + to make sure of equal training with each dataset. + + :param train_objectives: Tuples of (DataLoader, LossFunction). Pass more than one for multi-task learning + :param evaluator: An evaluator (sentence_transformers.evaluation) evaluates the model performance during training on held-out dev data. It is used to determine the best model that is saved to disc. + :param epochs: Number of epochs for training + :param steps_per_epoch: Number of training steps per epoch. If set to None (default), one epoch is equal the DataLoader size from train_objectives. + :param scheduler: Learning rate scheduler. Available schedulers: constantlr, warmupconstant, warmuplinear, warmupcosine, warmupcosinewithhardrestarts + :param warmup_steps: Behavior depends on the scheduler. For WarmupLinear (default), the learning rate is increased from o up to the maximal learning rate. After these many training steps, the learning rate is decreased linearly back to zero. + :param optimizer_class: Optimizer + :param optimizer_params: Optimizer parameters + :param weight_decay: Weight decay for model parameters + :param evaluation_steps: If > 0, evaluate the model using evaluator after each number of training steps + :param output_path: Storage path for the model and evaluation files + :param save_best_model: If true, the best model (according to evaluator) is stored at output_path + :param max_grad_norm: Used for gradient normalization. + :param use_amp: Use Automatic Mixed Precision (AMP). Only for Pytorch >= 1.6.0 + :param callback: Callback function that is invoked after each evaluation. + It must accept the following three parameters in this order: + `score`, `epoch`, `steps` + :param show_progress_bar: If True, output a tqdm progress bar + """ + + if use_amp: + from torch.cuda.amp import autocast + scaler = torch.cuda.amp.GradScaler() + + self.to(self._target_device) + + if output_path is not None: + os.makedirs(output_path, exist_ok=True) + + dataloaders = [dataloader for dataloader, _ in train_objectives] + + # Use smart batching + for dataloader in dataloaders: + dataloader.collate_fn = self.smart_batching_collate + + loss_models = [loss for _, loss in train_objectives] + for loss_model in loss_models: + loss_model.to(self._target_device) + + self.best_score = -9999999 + + if steps_per_epoch is None or steps_per_epoch == 0: + steps_per_epoch = min([len(dataloader) for dataloader in dataloaders]) + + num_train_steps = int(steps_per_epoch * epochs) + + # Prepare optimizers + optimizers = [] + schedulers = [] + for loss_model in loss_models: + param_optimizer = list(loss_model.named_parameters()) + + no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight'] + optimizer_grouped_parameters = [ + {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], + 'weight_decay': weight_decay}, + {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0} + ] + + optimizer = optimizer_class(optimizer_grouped_parameters, **optimizer_params) + scheduler_obj = self._get_scheduler(optimizer, scheduler=scheduler, warmup_steps=warmup_steps, + t_total=num_train_steps) + + optimizers.append(optimizer) + schedulers.append(scheduler_obj) + + global_step = 0 + data_iterators = [iter(dataloader) for dataloader in dataloaders] + + num_train_objectives = len(train_objectives) + + skip_scheduler = False + for epoch in trange(epochs, desc="Epoch", disable=not show_progress_bar): + training_steps = 0 + + for loss_model in loss_models: + loss_model.zero_grad() + loss_model.train() + + for _ in trange(steps_per_epoch, desc="Iteration", smoothing=0.05, disable=not show_progress_bar): + for train_idx in range(num_train_objectives): + loss_model = loss_models[train_idx] + optimizer = optimizers[train_idx] + scheduler = schedulers[train_idx] + data_iterator = data_iterators[train_idx] + + try: + data = next(data_iterator) + except StopIteration: + data_iterator = iter(dataloaders[train_idx]) + data_iterators[train_idx] = data_iterator + data = next(data_iterator) + + features, labels = data + + if use_amp: + with autocast(): + loss_value = loss_model(features, labels) + + scale_before_step = scaler.get_scale() + scaler.scale(loss_value).backward() + scaler.unscale_(optimizer) + torch.nn.utils.clip_grad_norm_(loss_model.parameters(), max_grad_norm) + scaler.step(optimizer) + scaler.update() + + skip_scheduler = scaler.get_scale() != scale_before_step + else: + loss_value = loss_model(features, labels) + loss_value.backward() + torch.nn.utils.clip_grad_norm_(loss_model.parameters(), max_grad_norm) + optimizer.step() + + optimizer.zero_grad() + + if not skip_scheduler: + scheduler.step() + + training_steps += 1 + global_step += 1 + + if evaluation_steps > 0 and training_steps % evaluation_steps == 0: + self._eval_during_training(evaluator, output_path, save_best_model, epoch, + training_steps, callback) + for loss_model in loss_models: + loss_model.zero_grad() + loss_model.train() + + self._eval_during_training(evaluator, output_path, save_best_model, epoch, -1, callback) + + if evaluator is None and output_path is not None: # No evaluator, but output path: save final model version + self.save(output_path) + + def evaluate(self, evaluator: SentenceEvaluator, output_path: str = None, verbose: bool = True): + """ + Evaluate the model + + :param evaluator: + the evaluator + :param verbose: + print the results + :param output_path: + the evaluator can write the results to this path + """ + if output_path is not None: + os.makedirs(output_path, exist_ok=True) + return evaluator(self, output_path, verbose) + + def _eval_during_training(self, evaluator, output_path, save_best_model, epoch, steps, callback): + """Runs evaluation during the training""" + if evaluator is not None: + score = evaluator(self, output_path=output_path, epoch=epoch, steps=steps) + if callback is not None: + callback(score, epoch, steps) + if score > self.best_score: + self.best_score = score + if save_best_model: + self.save(output_path) + + @staticmethod + def _get_scheduler(optimizer, scheduler: str, warmup_steps: int, t_total: int): + """ + Returns the correct learning rate scheduler. Available scheduler: constantlr, warmupconstant, warmuplinear, warmupcosine, warmupcosinewithhardrestarts + """ + scheduler = scheduler.lower() + if scheduler == 'constantlr': + return transformers.get_constant_schedule(optimizer) + elif scheduler == 'warmupconstant': + return transformers.get_constant_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps) + elif scheduler == 'warmuplinear': + return transformers.get_linear_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps, + num_training_steps=t_total) + elif scheduler == 'warmupcosine': + return transformers.get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps, + num_training_steps=t_total) + elif scheduler == 'warmupcosinewithhardrestarts': + return transformers.get_cosine_with_hard_restarts_schedule_with_warmup(optimizer, + num_warmup_steps=warmup_steps, + num_training_steps=t_total) + else: + raise ValueError("Unknown scheduler {}".format(scheduler)) + + @property + def device(self) -> device: + """ + Get torch.device from module, assuming that the whole module has one device. + """ + try: + return next(self.parameters()).device + except StopIteration: + # For nn.DataParallel compatibility in PyTorch 1.5 + + def find_tensor_attributes(module: nn.Module) -> List[Tuple[str, Tensor]]: + tuples = [(k, v) for k, v in module.__dict__.items() if torch.is_tensor(v)] + return tuples + + gen = self._named_members(get_members_fn=find_tensor_attributes) + first_tuple = next(gen) + return first_tuple[1].device + + @property + def tokenizer(self): + """ + Property to get the tokenizer that is used by this model + """ + return self._first_module().tokenizer + + @tokenizer.setter + def tokenizer(self, value): + """ + Property to set the tokenizer that is should used by this model + """ + self._first_module().tokenizer = value + + @property + def max_seq_length(self): + """ + Property to get the maximal input sequence length for the model. Longer inputs will be truncated. + """ + return self._first_module().max_seq_length + + @max_seq_length.setter + def max_seq_length(self, value): + """ + Property to set the maximal input sequence length for the model. Longer inputs will be truncated. + """ + self._first_module().max_seq_length = value
\ No newline at end of file |