diff options
author | TharinduDR <rhtdranasinghe@gmail.com> | 2021-04-24 01:57:25 +0300 |
---|---|---|
committer | TharinduDR <rhtdranasinghe@gmail.com> | 2021-04-24 01:57:25 +0300 |
commit | ac8e402339606301bc8bd775044360ee7ff4a9e2 (patch) | |
tree | 33d70c13bc21240fe45a829fb4cbd81e9448a668 | |
parent | d9eac1cb3010009c0aa7f754f565470b4920b123 (diff) |
057: Code Refactoring - Siamese Architectures
3 files changed, 620 insertions, 578 deletions
diff --git a/transquest/algo/sentence_level/siamesetransquest/losses/cosine_similarity_loss.py b/transquest/algo/sentence_level/siamesetransquest/losses/cosine_similarity_loss.py index 60e8133..e1aef01 100644 --- a/transquest/algo/sentence_level/siamesetransquest/losses/cosine_similarity_loss.py +++ b/transquest/algo/sentence_level/siamesetransquest/losses/cosine_similarity_loss.py @@ -3,7 +3,7 @@ from typing import Iterable, Dict import torch from torch import nn, Tensor -from transquest.algo.sentence_level.siamesetransquest.run_model import SiameseTransQuestModel +from transquest.algo.sentence_level.siamesetransquest.models.siamese_transformer import SiameseTransformer class CosineSimilarityLoss(nn.Module): @@ -31,7 +31,7 @@ class CosineSimilarityLoss(nn.Module): """ - def __init__(self, model: SiameseTransQuestModel, loss_fct=nn.MSELoss(), cos_score_transformation=nn.Identity()): + def __init__(self, model: SiameseTransformer, loss_fct=nn.MSELoss(), cos_score_transformation=nn.Identity()): super(CosineSimilarityLoss, self).__init__() self.model = model self.loss_fct = loss_fct diff --git a/transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py b/transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py new file mode 100644 index 0000000..02c1c73 --- /dev/null +++ b/transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py @@ -0,0 +1,606 @@ +import json +import logging +import math +import os +import queue +from collections import OrderedDict +from typing import List, Dict, Tuple, Iterable, Type, Union, Callable + +import numpy as np +import torch +import torch.multiprocessing as mp +import transformers +from numpy import ndarray +from sklearn.metrics.pairwise import paired_cosine_distances +from torch import nn, Tensor, device +from torch.optim.optimizer import Optimizer +from torch.utils.data import DataLoader +from tqdm.autonotebook import trange + +from transquest.algo.sentence_level.siamesetransquest.evaluation.sentence_evaluator import SentenceEvaluator +from transquest.algo.sentence_level.siamesetransquest.models import Transformer, Pooling +from transquest.algo.sentence_level.siamesetransquest.util import batch_to_device + +logger = logging.getLogger(__name__) + + +class SiameseTransformer(nn.Sequential): + + def __init__(self, model_name: str = None, args=None, device: str = None): + + transformer_model = Transformer(model_name, max_seq_length=args.max_seq_length) + pooling_model = Pooling(transformer_model.get_word_embedding_dimension(), pooling_mode_mean_tokens=True, + pooling_mode_cls_token=False, + pooling_mode_max_tokens=False) + modules = [transformer_model, pooling_model] + + if modules is not None and not isinstance(modules, OrderedDict): + modules = OrderedDict([(str(idx), module) for idx, module in enumerate(modules)]) + + super().__init__(modules) + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + logger.info("Use pytorch device: {}".format(device)) + + self._target_device = torch.device(device) + + def encode(self, sentences: Union[str, List[str], List[int]], + batch_size: int = 32, + show_progress_bar: bool = None, + output_value: str = 'sentence_embedding', + convert_to_numpy: bool = True, + convert_to_tensor: bool = False, + device: str = None, + normalize_embeddings: bool = False) -> Union[List[Tensor], ndarray, Tensor]: + """ + Computes sentence embeddings + + :param sentences: the sentences to embed + :param batch_size: the batch size used for the computation + :param show_progress_bar: Output a progress bar when encode sentences + :param output_value: Default sentence_embedding, to get sentence embeddings. Can be set to token_embeddings to get wordpiece token embeddings. + :param convert_to_numpy: If true, the output is a list of numpy vectors. Else, it is a list of pytorch tensors. + :param convert_to_tensor: If true, you get one large tensor as return. Overwrites any setting from convert_to_numpy + :param device: Which torch.device to use for the computation + :param normalize_embeddings: If set to true, returned vectors will have length 1. In that case, the faster dot-product (util.dot_score) instead of cosine similarity can be used. + + :return: + By default, a list of tensors is returned. If convert_to_tensor, a stacked tensor is returned. If convert_to_numpy, a numpy matrix is returned. + """ + self.eval() + if show_progress_bar is None: + show_progress_bar = ( + logger.getEffectiveLevel() == logging.INFO or logger.getEffectiveLevel() == logging.DEBUG) + + if convert_to_tensor: + convert_to_numpy = False + + if output_value == 'token_embeddings': + convert_to_tensor = False + convert_to_numpy = False + + input_was_string = False + if isinstance(sentences, str) or not hasattr(sentences, + '__len__'): # Cast an individual sentence to a list with length 1 + sentences = [sentences] + input_was_string = True + + if device is None: + device = self._target_device + + self.to(device) + + all_embeddings = [] + length_sorted_idx = np.argsort([-self._text_length(sen) for sen in sentences]) + sentences_sorted = [sentences[idx] for idx in length_sorted_idx] + + for start_index in trange(0, len(sentences), batch_size, desc="Batches", disable=not show_progress_bar): + sentences_batch = sentences_sorted[start_index:start_index + batch_size] + features = self.tokenize(sentences_batch) + features = batch_to_device(features, device) + + with torch.no_grad(): + out_features = self.forward(features) + + if output_value == 'token_embeddings': + embeddings = [] + for token_emb, attention in zip(out_features[output_value], out_features['attention_mask']): + last_mask_id = len(attention) - 1 + while last_mask_id > 0 and attention[last_mask_id].item() == 0: + last_mask_id -= 1 + + embeddings.append(token_emb[0:last_mask_id + 1]) + else: # Sentence embeddings + embeddings = out_features[output_value] + embeddings = embeddings.detach() + if normalize_embeddings: + embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) + + # fixes for #522 and #487 to avoid oom problems on gpu with large datasets + if convert_to_numpy: + embeddings = embeddings.cpu() + + all_embeddings.extend(embeddings) + + all_embeddings = [all_embeddings[idx] for idx in np.argsort(length_sorted_idx)] + + if convert_to_tensor: + all_embeddings = torch.stack(all_embeddings) + elif convert_to_numpy: + all_embeddings = np.asarray([emb.numpy() for emb in all_embeddings]) + + if input_was_string: + all_embeddings = all_embeddings[0] + + return all_embeddings + + def predict(self, to_predict, verbose=True): + sentences1 = [] + sentences2 = [] + + for text_1, text_2 in to_predict: + sentences1.append(text_1) + sentences2.append(text_2) + + embeddings1 = self.encode(sentences1, show_progress_bar=verbose, convert_to_numpy=True) + embeddings2 = self.encode(sentences2, show_progress_bar=verbose, convert_to_numpy=True) + + cosine_scores = 1 - (paired_cosine_distances(embeddings1, embeddings2)) + + return cosine_scores + + def start_multi_process_pool(self, target_devices: List[str] = None): + """ + Starts multi process to process the encoding with several, independent processes. + This method is recommended if you want to encode on multiple GPUs. It is advised + to start only one process per GPU. This method works together with encode_multi_process + + :param target_devices: PyTorch target devices, e.g. cuda:0, cuda:1... If None, all available CUDA devices will be used + :return: Returns a dict with the target processes, an input queue and and output queue. + """ + if target_devices is None: + if torch.cuda.is_available(): + target_devices = ['cuda:{}'.format(i) for i in range(torch.cuda.device_count())] + else: + logger.info("CUDA is not available. Start 4 CPU worker") + target_devices = ['cpu'] * 4 + + logger.info("Start multi-process pool on devices: {}".format(', '.join(map(str, target_devices)))) + + ctx = mp.get_context('spawn') + input_queue = ctx.Queue() + output_queue = ctx.Queue() + processes = [] + + for cuda_id in target_devices: + p = ctx.Process(target=SiameseTransformer._encode_multi_process_worker, + args=(cuda_id, self, input_queue, output_queue), daemon=True) + p.start() + processes.append(p) + + return {'input': input_queue, 'output': output_queue, 'processes': processes} + + @staticmethod + def stop_multi_process_pool(pool): + """ + Stops all processes started with start_multi_process_pool + """ + for p in pool['processes']: + p.terminate() + + for p in pool['processes']: + p.join() + p.close() + + pool['input'].close() + pool['output'].close() + + def encode_multi_process(self, sentences: List[str], pool: Dict[str, object], batch_size: int = 32, + chunk_size: int = None): + """ + This method allows to run encode() on multiple GPUs. The sentences are chunked into smaller packages + and sent to individual processes, which encode these on the different GPUs. This method is only suitable + for encoding large sets of sentences + + :param sentences: List of sentences + :param pool: A pool of workers started with SentenceTransformer.start_multi_process_pool + :param batch_size: Encode sentences with batch size + :param chunk_size: Sentences are chunked and sent to the individual processes. If none, it determine a sensible size. + :return: Numpy matrix with all embeddings + """ + + if chunk_size is None: + chunk_size = min(math.ceil(len(sentences) / len(pool["processes"]) / 10), 5000) + + logger.info("Chunk data into packages of size {}".format(chunk_size)) + + input_queue = pool['input'] + last_chunk_id = 0 + chunk = [] + + for sentence in sentences: + chunk.append(sentence) + if len(chunk) >= chunk_size: + input_queue.put([last_chunk_id, batch_size, chunk]) + last_chunk_id += 1 + chunk = [] + + if len(chunk) > 0: + input_queue.put([last_chunk_id, batch_size, chunk]) + last_chunk_id += 1 + + output_queue = pool['output'] + results_list = sorted([output_queue.get() for _ in range(last_chunk_id)], key=lambda x: x[0]) + embeddings = np.concatenate([result[1] for result in results_list]) + return embeddings + + @staticmethod + def _encode_multi_process_worker(target_device: str, model, input_queue, results_queue): + """ + Internal working process to encode sentences in multi-process setup + """ + while True: + try: + id, batch_size, sentences = input_queue.get() + embeddings = model.encode(sentences, device=target_device, show_progress_bar=False, + convert_to_numpy=True, batch_size=batch_size) + results_queue.put([id, embeddings]) + except queue.Empty: + break + + def get_max_seq_length(self): + """ + Returns the maximal sequence length for input the model accepts. Longer inputs will be truncated + """ + if hasattr(self._first_module(), 'max_seq_length'): + return self._first_module().max_seq_length + + return None + + def tokenize(self, text: str): + """ + Tokenizes the text + """ + return self._first_module().tokenize(text) + + def get_sentence_features(self, *features): + return self._first_module().get_sentence_features(*features) + + def get_sentence_embedding_dimension(self): + for mod in reversed(self._modules.values()): + sent_embedding_dim_method = getattr(mod, "get_sentence_embedding_dimension", None) + if callable(sent_embedding_dim_method): + return sent_embedding_dim_method() + return None + + def _first_module(self): + """Returns the first module of this sequential embedder""" + return self._modules[next(iter(self._modules))] + + def _last_module(self): + """Returns the last module of this sequential embedder""" + return self._modules[next(reversed(self._modules))] + + def save(self, path): + """ + Saves all elements for this seq. sentence embedder into different sub-folders + """ + if path is None: + return + + os.makedirs(path, exist_ok=True) + + logger.info("Save model to {}".format(path)) + contained_modules = [] + + for idx, name in enumerate(self._modules): + module = self._modules[name] + # model_path = os.path.join(path, str(idx)+"_"+type(module).__name__) + os.makedirs(path, exist_ok=True) + module.save(path) + contained_modules.append( + {'idx': idx, 'name': name, 'path': os.path.basename(path), 'type': type(module).__module__}) + + with open(os.path.join(path, 'modules.json'), 'w') as fOut: + json.dump(contained_modules, fOut, indent=2) + + self.save_model_args(path) + + def smart_batching_collate(self, batch): + """ + Transforms a batch from a SmartBatchingDataset to a batch of tensors for the model + Here, batch is a list of tuples: [(tokens, label), ...] + + :param batch: + a batch from a SmartBatchingDataset + :return: + a batch of tensors for the model + """ + num_texts = len(batch[0].texts) + texts = [[] for _ in range(num_texts)] + labels = [] + + for example in batch: + for idx, text in enumerate(example.texts): + texts[idx].append(text) + + labels.append(example.label) + + labels = torch.tensor(labels).to(self._target_device) + + sentence_features = [] + for idx in range(num_texts): + tokenized = self.tokenize(texts[idx]) + batch_to_device(tokenized, self._target_device) + sentence_features.append(tokenized) + + return sentence_features, labels + + def _text_length(self, text: Union[List[int], List[List[int]]]): + """ + Help function to get the length for the input text. Text can be either + a list of ints (which means a single text as input), or a tuple of list of ints + (representing several text inputs to the model). + """ + + if isinstance(text, dict): # {key: value} case + return len(next(iter(text.values()))) + elif not hasattr(text, '__len__'): # Object has no len() method + return 1 + elif len(text) == 0 or isinstance(text[0], int): # Empty string or list of ints + return len(text) + else: + return sum([len(t) for t in text]) # Sum of length of individual strings + + + def fit(self, + train_objectives: Iterable[Tuple[DataLoader, nn.Module]], + evaluator: SentenceEvaluator = None, + epochs: int = 1, + steps_per_epoch=None, + scheduler: str = 'WarmupLinear', + warmup_steps: int = 10000, + optimizer_class: Type[Optimizer] = transformers.AdamW, + optimizer_params: Dict[str, object] = {'lr': 2e-5}, + weight_decay: float = 0.01, + evaluation_steps: int = 0, + output_path: str = None, + save_best_model: bool = True, + max_grad_norm: float = 1, + use_amp: bool = False, + callback: Callable[[float, int, int], None] = None, + show_progress_bar: bool = True + ): + """ + Train the model with the given training objective + Each training objective is sampled in turn for one batch. + We sample only as many batches from each objective as there are in the smallest one + to make sure of equal training with each dataset. + + :param train_objectives: Tuples of (DataLoader, LossFunction). Pass more than one for multi-task learning + :param evaluator: An evaluator (sentence_transformers.evaluation) evaluates the model performance during training on held-out dev data. It is used to determine the best model that is saved to disc. + :param epochs: Number of epochs for training + :param steps_per_epoch: Number of training steps per epoch. If set to None (default), one epoch is equal the DataLoader size from train_objectives. + :param scheduler: Learning rate scheduler. Available schedulers: constantlr, warmupconstant, warmuplinear, warmupcosine, warmupcosinewithhardrestarts + :param warmup_steps: Behavior depends on the scheduler. For WarmupLinear (default), the learning rate is increased from o up to the maximal learning rate. After these many training steps, the learning rate is decreased linearly back to zero. + :param optimizer_class: Optimizer + :param optimizer_params: Optimizer parameters + :param weight_decay: Weight decay for model parameters + :param evaluation_steps: If > 0, evaluate the model using evaluator after each number of training steps + :param output_path: Storage path for the model and evaluation files + :param save_best_model: If true, the best model (according to evaluator) is stored at output_path + :param max_grad_norm: Used for gradient normalization. + :param use_amp: Use Automatic Mixed Precision (AMP). Only for Pytorch >= 1.6.0 + :param callback: Callback function that is invoked after each evaluation. + It must accept the following three parameters in this order: + `score`, `epoch`, `steps` + :param show_progress_bar: If True, output a tqdm progress bar + """ + + if use_amp: + from torch.cuda.amp import autocast + scaler = torch.cuda.amp.GradScaler() + + self.to(self._target_device) + + if output_path is not None: + os.makedirs(output_path, exist_ok=True) + + dataloaders = [dataloader for dataloader, _ in train_objectives] + + # Use smart batching + for dataloader in dataloaders: + dataloader.collate_fn = self.smart_batching_collate + + loss_models = [loss for _, loss in train_objectives] + for loss_model in loss_models: + loss_model.to(self._target_device) + + self.best_score = -9999999 + + if steps_per_epoch is None or steps_per_epoch == 0: + steps_per_epoch = min([len(dataloader) for dataloader in dataloaders]) + + num_train_steps = int(steps_per_epoch * epochs) + + # Prepare optimizers + optimizers = [] + schedulers = [] + for loss_model in loss_models: + param_optimizer = list(loss_model.named_parameters()) + + no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight'] + optimizer_grouped_parameters = [ + {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], + 'weight_decay': weight_decay}, + {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0} + ] + + optimizer = optimizer_class(optimizer_grouped_parameters, **optimizer_params) + scheduler_obj = self._get_scheduler(optimizer, scheduler=scheduler, warmup_steps=warmup_steps, + t_total=num_train_steps) + + optimizers.append(optimizer) + schedulers.append(scheduler_obj) + + global_step = 0 + data_iterators = [iter(dataloader) for dataloader in dataloaders] + + num_train_objectives = len(train_objectives) + + skip_scheduler = False + for epoch in trange(epochs, desc="Epoch", disable=not show_progress_bar): + training_steps = 0 + + for loss_model in loss_models: + loss_model.zero_grad() + loss_model.train() + + for _ in trange(steps_per_epoch, desc="Iteration", smoothing=0.05, disable=not show_progress_bar): + for train_idx in range(num_train_objectives): + loss_model = loss_models[train_idx] + optimizer = optimizers[train_idx] + scheduler = schedulers[train_idx] + data_iterator = data_iterators[train_idx] + + try: + data = next(data_iterator) + except StopIteration: + data_iterator = iter(dataloaders[train_idx]) + data_iterators[train_idx] = data_iterator + data = next(data_iterator) + + features, labels = data + + if use_amp: + with autocast(): + loss_value = loss_model(features, labels) + + scale_before_step = scaler.get_scale() + scaler.scale(loss_value).backward() + scaler.unscale_(optimizer) + torch.nn.utils.clip_grad_norm_(loss_model.parameters(), max_grad_norm) + scaler.step(optimizer) + scaler.update() + + skip_scheduler = scaler.get_scale() != scale_before_step + else: + loss_value = loss_model(features, labels) + loss_value.backward() + torch.nn.utils.clip_grad_norm_(loss_model.parameters(), max_grad_norm) + optimizer.step() + + optimizer.zero_grad() + + if not skip_scheduler: + scheduler.step() + + training_steps += 1 + global_step += 1 + + if evaluation_steps > 0 and training_steps % evaluation_steps == 0: + self._eval_during_training(evaluator, output_path, save_best_model, epoch, + training_steps, callback) + for loss_model in loss_models: + loss_model.zero_grad() + loss_model.train() + + self._eval_during_training(evaluator, output_path, save_best_model, epoch, -1, callback) + + if evaluator is None and output_path is not None: # No evaluator, but output path: save final model version + self.save(output_path) + + def evaluate(self, evaluator: SentenceEvaluator, output_path: str = None, verbose: bool = True): + """ + Evaluate the model + + :param evaluator: + the evaluator + :param verbose: + print the results + :param output_path: + the evaluator can write the results to this path + """ + if output_path is not None: + os.makedirs(output_path, exist_ok=True) + return evaluator(self, output_path, verbose) + + def _eval_during_training(self, evaluator, output_path, save_best_model, epoch, steps, callback): + """Runs evaluation during the training""" + if evaluator is not None: + score = evaluator(self, output_path=output_path, epoch=epoch, steps=steps) + if callback is not None: + callback(score, epoch, steps) + if score > self.best_score: + self.best_score = score + if save_best_model: + self.save(output_path) + + @staticmethod + def _get_scheduler(optimizer, scheduler: str, warmup_steps: int, t_total: int): + """ + Returns the correct learning rate scheduler. Available scheduler: constantlr, warmupconstant, warmuplinear, warmupcosine, warmupcosinewithhardrestarts + """ + scheduler = scheduler.lower() + if scheduler == 'constantlr': + return transformers.get_constant_schedule(optimizer) + elif scheduler == 'warmupconstant': + return transformers.get_constant_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps) + elif scheduler == 'warmuplinear': + return transformers.get_linear_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps, + num_training_steps=t_total) + elif scheduler == 'warmupcosine': + return transformers.get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps, + num_training_steps=t_total) + elif scheduler == 'warmupcosinewithhardrestarts': + return transformers.get_cosine_with_hard_restarts_schedule_with_warmup(optimizer, + num_warmup_steps=warmup_steps, + num_training_steps=t_total) + else: + raise ValueError("Unknown scheduler {}".format(scheduler)) + + @property + def device(self) -> device: + """ + Get torch.device from module, assuming that the whole module has one device. + """ + try: + return next(self.parameters()).device + except StopIteration: + # For nn.DataParallel compatibility in PyTorch 1.5 + + def find_tensor_attributes(module: nn.Module) -> List[Tuple[str, Tensor]]: + tuples = [(k, v) for k, v in module.__dict__.items() if torch.is_tensor(v)] + return tuples + + gen = self._named_members(get_members_fn=find_tensor_attributes) + first_tuple = next(gen) + return first_tuple[1].device + + @property + def tokenizer(self): + """ + Property to get the tokenizer that is used by this model + """ + return self._first_module().tokenizer + + @tokenizer.setter + def tokenizer(self, value): + """ + Property to set the tokenizer that is should used by this model + """ + self._first_module().tokenizer = value + + @property + def max_seq_length(self): + """ + Property to get the maximal input sequence length for the model. Longer inputs will be truncated. + """ + return self._first_module().max_seq_length + + @max_seq_length.setter + def max_seq_length(self, value): + """ + Property to set the maximal input sequence length for the model. Longer inputs will be truncated. + """ + self._first_module().max_seq_length = value
\ No newline at end of file diff --git a/transquest/algo/sentence_level/siamesetransquest/run_model.py b/transquest/algo/sentence_level/siamesetransquest/run_model.py index 511a3b6..5d2024e 100644 --- a/transquest/algo/sentence_level/siamesetransquest/run_model.py +++ b/transquest/algo/sentence_level/siamesetransquest/run_model.py @@ -1,36 +1,29 @@ -import json import logging import math import os -import queue import random -from collections import OrderedDict -from typing import List, Dict, Tuple, Iterable, Type, Union, Callable + import numpy as np import torch -import torch.multiprocessing as mp -import transformers -from numpy import ndarray from sklearn.metrics.pairwise import paired_cosine_distances -from torch import nn, Tensor, device -from torch.optim.optimizer import Optimizer + + from torch.utils.data import DataLoader -from tqdm.autonotebook import trange + from transquest.algo.sentence_level.siamesetransquest.evaluation.embedding_similarity_evaluator import \ EmbeddingSimilarityEvaluator -from transquest.algo.sentence_level.siamesetransquest.evaluation.sentence_evaluator import SentenceEvaluator from transquest.algo.sentence_level.siamesetransquest.losses.cosine_similarity_loss import CosineSimilarityLoss from transquest.algo.sentence_level.siamesetransquest.model_args import SiameseTransQuestArgs -from transquest.algo.sentence_level.siamesetransquest.models import Transformer, Pooling +from transquest.algo.sentence_level.siamesetransquest.models.siamese_transformer import SiameseTransformer from transquest.algo.sentence_level.siamesetransquest.readers.input_example import InputExample -from transquest.algo.sentence_level.siamesetransquest.util import batch_to_device + logger = logging.getLogger(__name__) -class SiameseTransQuestModel(nn.Sequential): +class SiameseTransQuestModel: """ Loads or create a SentenceTransformer model, that can be used to map sentences / text to embeddings. @@ -57,111 +50,7 @@ class SiameseTransQuestModel(nn.Sequential): if self.args.n_gpu > 0: torch.cuda.manual_seed_all(self.args.manual_seed) - transformer_model = Transformer(model_name, max_seq_length=args.max_seq_length) - pooling_model = Pooling(transformer_model.get_word_embedding_dimension(), pooling_mode_mean_tokens=True, - pooling_mode_cls_token=False, - pooling_mode_max_tokens=False) - modules = [transformer_model, pooling_model] - - if modules is not None and not isinstance(modules, OrderedDict): - modules = OrderedDict([(str(idx), module) for idx, module in enumerate(modules)]) - - super().__init__(modules) - if device is None: - device = "cuda" if torch.cuda.is_available() else "cpu" - logger.info("Use pytorch device: {}".format(device)) - - self._target_device = torch.device(device) - - def encode(self, sentences: Union[str, List[str], List[int]], - batch_size: int = 32, - show_progress_bar: bool = None, - output_value: str = 'sentence_embedding', - convert_to_numpy: bool = True, - convert_to_tensor: bool = False, - device: str = None, - normalize_embeddings: bool = False) -> Union[List[Tensor], ndarray, Tensor]: - """ - Computes sentence embeddings - - :param sentences: the sentences to embed - :param batch_size: the batch size used for the computation - :param show_progress_bar: Output a progress bar when encode sentences - :param output_value: Default sentence_embedding, to get sentence embeddings. Can be set to token_embeddings to get wordpiece token embeddings. - :param convert_to_numpy: If true, the output is a list of numpy vectors. Else, it is a list of pytorch tensors. - :param convert_to_tensor: If true, you get one large tensor as return. Overwrites any setting from convert_to_numpy - :param device: Which torch.device to use for the computation - :param normalize_embeddings: If set to true, returned vectors will have length 1. In that case, the faster dot-product (util.dot_score) instead of cosine similarity can be used. - - :return: - By default, a list of tensors is returned. If convert_to_tensor, a stacked tensor is returned. If convert_to_numpy, a numpy matrix is returned. - """ - self.eval() - if show_progress_bar is None: - show_progress_bar = ( - logger.getEffectiveLevel() == logging.INFO or logger.getEffectiveLevel() == logging.DEBUG) - - if convert_to_tensor: - convert_to_numpy = False - - if output_value == 'token_embeddings': - convert_to_tensor = False - convert_to_numpy = False - - input_was_string = False - if isinstance(sentences, str) or not hasattr(sentences, - '__len__'): # Cast an individual sentence to a list with length 1 - sentences = [sentences] - input_was_string = True - - if device is None: - device = self._target_device - - self.to(device) - - all_embeddings = [] - length_sorted_idx = np.argsort([-self._text_length(sen) for sen in sentences]) - sentences_sorted = [sentences[idx] for idx in length_sorted_idx] - - for start_index in trange(0, len(sentences), batch_size, desc="Batches", disable=not show_progress_bar): - sentences_batch = sentences_sorted[start_index:start_index + batch_size] - features = self.tokenize(sentences_batch) - features = batch_to_device(features, device) - - with torch.no_grad(): - out_features = self.forward(features) - - if output_value == 'token_embeddings': - embeddings = [] - for token_emb, attention in zip(out_features[output_value], out_features['attention_mask']): - last_mask_id = len(attention) - 1 - while last_mask_id > 0 and attention[last_mask_id].item() == 0: - last_mask_id -= 1 - - embeddings.append(token_emb[0:last_mask_id + 1]) - else: # Sentence embeddings - embeddings = out_features[output_value] - embeddings = embeddings.detach() - if normalize_embeddings: - embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) - - # fixes for #522 and #487 to avoid oom problems on gpu with large datasets - if convert_to_numpy: - embeddings = embeddings.cpu() - - all_embeddings.extend(embeddings) - - all_embeddings = [all_embeddings[idx] for idx in np.argsort(length_sorted_idx)] - - if convert_to_tensor: - all_embeddings = torch.stack(all_embeddings) - elif convert_to_numpy: - all_embeddings = np.asarray([emb.numpy() for emb in all_embeddings]) - - if input_was_string: - all_embeddings = all_embeddings[0] - - return all_embeddings + self.model = SiameseTransformer(model_name, args=args) def predict(self, to_predict, verbose=True): sentences1 = [] @@ -171,216 +60,13 @@ class SiameseTransQuestModel(nn.Sequential): sentences1.append(text_1) sentences2.append(text_2) - embeddings1 = self.encode(sentences1, show_progress_bar=verbose, convert_to_numpy=True) - embeddings2 = self.encode(sentences2, show_progress_bar=verbose, convert_to_numpy=True) + embeddings1 = self.model.encode(sentences1, show_progress_bar=verbose, convert_to_numpy=True) + embeddings2 = self.model.encode(sentences2, show_progress_bar=verbose, convert_to_numpy=True) cosine_scores = 1 - (paired_cosine_distances(embeddings1, embeddings2)) return cosine_scores - def start_multi_process_pool(self, target_devices: List[str] = None): - """ - Starts multi process to process the encoding with several, independent processes. - This method is recommended if you want to encode on multiple GPUs. It is advised - to start only one process per GPU. This method works together with encode_multi_process - - :param target_devices: PyTorch target devices, e.g. cuda:0, cuda:1... If None, all available CUDA devices will be used - :return: Returns a dict with the target processes, an input queue and and output queue. - """ - if target_devices is None: - if torch.cuda.is_available(): - target_devices = ['cuda:{}'.format(i) for i in range(torch.cuda.device_count())] - else: - logger.info("CUDA is not available. Start 4 CPU worker") - target_devices = ['cpu'] * 4 - - logger.info("Start multi-process pool on devices: {}".format(', '.join(map(str, target_devices)))) - - ctx = mp.get_context('spawn') - input_queue = ctx.Queue() - output_queue = ctx.Queue() - processes = [] - - for cuda_id in target_devices: - p = ctx.Process(target=SiameseTransQuestModel._encode_multi_process_worker, - args=(cuda_id, self, input_queue, output_queue), daemon=True) - p.start() - processes.append(p) - - return {'input': input_queue, 'output': output_queue, 'processes': processes} - - @staticmethod - def stop_multi_process_pool(pool): - """ - Stops all processes started with start_multi_process_pool - """ - for p in pool['processes']: - p.terminate() - - for p in pool['processes']: - p.join() - p.close() - - pool['input'].close() - pool['output'].close() - - def encode_multi_process(self, sentences: List[str], pool: Dict[str, object], batch_size: int = 32, - chunk_size: int = None): - """ - This method allows to run encode() on multiple GPUs. The sentences are chunked into smaller packages - and sent to individual processes, which encode these on the different GPUs. This method is only suitable - for encoding large sets of sentences - - :param sentences: List of sentences - :param pool: A pool of workers started with SentenceTransformer.start_multi_process_pool - :param batch_size: Encode sentences with batch size - :param chunk_size: Sentences are chunked and sent to the individual processes. If none, it determine a sensible size. - :return: Numpy matrix with all embeddings - """ - - if chunk_size is None: - chunk_size = min(math.ceil(len(sentences) / len(pool["processes"]) / 10), 5000) - - logger.info("Chunk data into packages of size {}".format(chunk_size)) - - input_queue = pool['input'] - last_chunk_id = 0 - chunk = [] - - for sentence in sentences: - chunk.append(sentence) - if len(chunk) >= chunk_size: - input_queue.put([last_chunk_id, batch_size, chunk]) - last_chunk_id += 1 - chunk = [] - - if len(chunk) > 0: - input_queue.put([last_chunk_id, batch_size, chunk]) - last_chunk_id += 1 - - output_queue = pool['output'] - results_list = sorted([output_queue.get() for _ in range(last_chunk_id)], key=lambda x: x[0]) - embeddings = np.concatenate([result[1] for result in results_list]) - return embeddings - - @staticmethod - def _encode_multi_process_worker(target_device: str, model, input_queue, results_queue): - """ - Internal working process to encode sentences in multi-process setup - """ - while True: - try: - id, batch_size, sentences = input_queue.get() - embeddings = model.encode(sentences, device=target_device, show_progress_bar=False, - convert_to_numpy=True, batch_size=batch_size) - results_queue.put([id, embeddings]) - except queue.Empty: - break - - def get_max_seq_length(self): - """ - Returns the maximal sequence length for input the model accepts. Longer inputs will be truncated - """ - if hasattr(self._first_module(), 'max_seq_length'): - return self._first_module().max_seq_length - - return None - - def tokenize(self, text: str): - """ - Tokenizes the text - """ - return self._first_module().tokenize(text) - - def get_sentence_features(self, *features): - return self._first_module().get_sentence_features(*features) - - def get_sentence_embedding_dimension(self): - for mod in reversed(self._modules.values()): - sent_embedding_dim_method = getattr(mod, "get_sentence_embedding_dimension", None) - if callable(sent_embedding_dim_method): - return sent_embedding_dim_method() - return None - - def _first_module(self): - """Returns the first module of this sequential embedder""" - return self._modules[next(iter(self._modules))] - - def _last_module(self): - """Returns the last module of this sequential embedder""" - return self._modules[next(reversed(self._modules))] - - def save(self, path): - """ - Saves all elements for this seq. sentence embedder into different sub-folders - """ - if path is None: - return - - os.makedirs(path, exist_ok=True) - - logger.info("Save model to {}".format(path)) - contained_modules = [] - - for idx, name in enumerate(self._modules): - module = self._modules[name] - # model_path = os.path.join(path, str(idx)+"_"+type(module).__name__) - os.makedirs(path, exist_ok=True) - module.save(path) - contained_modules.append( - {'idx': idx, 'name': name, 'path': os.path.basename(path), 'type': type(module).__module__}) - - with open(os.path.join(path, 'modules.json'), 'w') as fOut: - json.dump(contained_modules, fOut, indent=2) - - self.save_model_args(path) - - def smart_batching_collate(self, batch): - """ - Transforms a batch from a SmartBatchingDataset to a batch of tensors for the model - Here, batch is a list of tuples: [(tokens, label), ...] - - :param batch: - a batch from a SmartBatchingDataset - :return: - a batch of tensors for the model - """ - num_texts = len(batch[0].texts) - texts = [[] for _ in range(num_texts)] - labels = [] - - for example in batch: - for idx, text in enumerate(example.texts): - texts[idx].append(text) - - labels.append(example.label) - - labels = torch.tensor(labels).to(self._target_device) - - sentence_features = [] - for idx in range(num_texts): - tokenized = self.tokenize(texts[idx]) - batch_to_device(tokenized, self._target_device) - sentence_features.append(tokenized) - - return sentence_features, labels - - def _text_length(self, text: Union[List[int], List[List[int]]]): - """ - Help function to get the length for the input text. Text can be either - a list of ints (which means a single text as input), or a tuple of list of ints - (representing several text inputs to the model). - """ - - if isinstance(text, dict): # {key: value} case - return len(next(iter(text.values()))) - elif not hasattr(text, '__len__'): # Object has no len() method - return 1 - elif len(text) == 0 or isinstance(text[0], int): # Empty string or list of ints - return len(text) - else: - return sum([len(t) for t in text]) # Sum of length of individual strings - def train_model(self, train_df, eval_df, args=None, output_dir=None, verbose=True): train_samples = [] @@ -402,7 +88,7 @@ class SiameseTransQuestModel(nn.Sequential): evaluator = EmbeddingSimilarityEvaluator.from_input_examples(eval_samples, name='eval') warmup_steps = math.ceil(len(train_dataloader) * self.args.num_train_epochs * 0.1) - self.fit(train_objectives=[(train_dataloader, train_loss)], + self.model.fit(train_objectives=[(train_dataloader, train_loss)], evaluator=evaluator, epochs=self.args.num_train_epochs, evaluation_steps=self.args.evaluate_during_training_steps, @@ -414,230 +100,6 @@ class SiameseTransQuestModel(nn.Sequential): max_grad_norm=self.args.max_grad_norm, output_path=self.args.best_model_dir) - def fit(self, - train_objectives: Iterable[Tuple[DataLoader, nn.Module]], - evaluator: SentenceEvaluator = None, - epochs: int = 1, - steps_per_epoch=None, - scheduler: str = 'WarmupLinear', - warmup_steps: int = 10000, - optimizer_class: Type[Optimizer] = transformers.AdamW, - optimizer_params: Dict[str, object] = {'lr': 2e-5}, - weight_decay: float = 0.01, - evaluation_steps: int = 0, - output_path: str = None, - save_best_model: bool = True, - max_grad_norm: float = 1, - use_amp: bool = False, - callback: Callable[[float, int, int], None] = None, - show_progress_bar: bool = True - ): - """ - Train the model with the given training objective - Each training objective is sampled in turn for one batch. - We sample only as many batches from each objective as there are in the smallest one - to make sure of equal training with each dataset. - - :param train_objectives: Tuples of (DataLoader, LossFunction). Pass more than one for multi-task learning - :param evaluator: An evaluator (sentence_transformers.evaluation) evaluates the model performance during training on held-out dev data. It is used to determine the best model that is saved to disc. - :param epochs: Number of epochs for training - :param steps_per_epoch: Number of training steps per epoch. If set to None (default), one epoch is equal the DataLoader size from train_objectives. - :param scheduler: Learning rate scheduler. Available schedulers: constantlr, warmupconstant, warmuplinear, warmupcosine, warmupcosinewithhardrestarts - :param warmup_steps: Behavior depends on the scheduler. For WarmupLinear (default), the learning rate is increased from o up to the maximal learning rate. After these many training steps, the learning rate is decreased linearly back to zero. - :param optimizer_class: Optimizer - :param optimizer_params: Optimizer parameters - :param weight_decay: Weight decay for model parameters - :param evaluation_steps: If > 0, evaluate the model using evaluator after each number of training steps - :param output_path: Storage path for the model and evaluation files - :param save_best_model: If true, the best model (according to evaluator) is stored at output_path - :param max_grad_norm: Used for gradient normalization. - :param use_amp: Use Automatic Mixed Precision (AMP). Only for Pytorch >= 1.6.0 - :param callback: Callback function that is invoked after each evaluation. - It must accept the following three parameters in this order: - `score`, `epoch`, `steps` - :param show_progress_bar: If True, output a tqdm progress bar - """ - - if use_amp: - from torch.cuda.amp import autocast - scaler = torch.cuda.amp.GradScaler() - - self.to(self._target_device) - - if output_path is not None: - os.makedirs(output_path, exist_ok=True) - - dataloaders = [dataloader for dataloader, _ in train_objectives] - - # Use smart batching - for dataloader in dataloaders: - dataloader.collate_fn = self.smart_batching_collate - - loss_models = [loss for _, loss in train_objectives] - for loss_model in loss_models: - loss_model.to(self._target_device) - - self.best_score = -9999999 - - if steps_per_epoch is None or steps_per_epoch == 0: - steps_per_epoch = min([len(dataloader) for dataloader in dataloaders]) - - num_train_steps = int(steps_per_epoch * epochs) - - # Prepare optimizers - optimizers = [] - schedulers = [] - for loss_model in loss_models: - param_optimizer = list(loss_model.named_parameters()) - - no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight'] - optimizer_grouped_parameters = [ - {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], - 'weight_decay': weight_decay}, - {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0} - ] - - optimizer = optimizer_class(optimizer_grouped_parameters, **optimizer_params) - scheduler_obj = self._get_scheduler(optimizer, scheduler=scheduler, warmup_steps=warmup_steps, - t_total=num_train_steps) - - optimizers.append(optimizer) - schedulers.append(scheduler_obj) - - global_step = 0 - data_iterators = [iter(dataloader) for dataloader in dataloaders] - - num_train_objectives = len(train_objectives) - - skip_scheduler = False - for epoch in trange(epochs, desc="Epoch", disable=not show_progress_bar): - training_steps = 0 - - for loss_model in loss_models: - loss_model.zero_grad() - loss_model.train() - - for _ in trange(steps_per_epoch, desc="Iteration", smoothing=0.05, disable=not show_progress_bar): - for train_idx in range(num_train_objectives): - loss_model = loss_models[train_idx] - optimizer = optimizers[train_idx] - scheduler = schedulers[train_idx] - data_iterator = data_iterators[train_idx] - - try: - data = next(data_iterator) - except StopIteration: - data_iterator = iter(dataloaders[train_idx]) - data_iterators[train_idx] = data_iterator - data = next(data_iterator) - - features, labels = data - - if use_amp: - with autocast(): - loss_value = loss_model(features, labels) - - scale_before_step = scaler.get_scale() - scaler.scale(loss_value).backward() - scaler.unscale_(optimizer) - torch.nn.utils.clip_grad_norm_(loss_model.parameters(), max_grad_norm) - scaler.step(optimizer) - scaler.update() - - skip_scheduler = scaler.get_scale() != scale_before_step - else: - loss_value = loss_model(features, labels) - loss_value.backward() - torch.nn.utils.clip_grad_norm_(loss_model.parameters(), max_grad_norm) - optimizer.step() - - optimizer.zero_grad() - - if not skip_scheduler: - scheduler.step() - - training_steps += 1 - global_step += 1 - - if evaluation_steps > 0 and training_steps % evaluation_steps == 0: - self._eval_during_training(evaluator, output_path, save_best_model, epoch, - training_steps, callback) - for loss_model in loss_models: - loss_model.zero_grad() - loss_model.train() - - self._eval_during_training(evaluator, output_path, save_best_model, epoch, -1, callback) - - if evaluator is None and output_path is not None: # No evaluator, but output path: save final model version - self.save(output_path) - - def evaluate(self, evaluator: SentenceEvaluator, output_path: str = None, verbose: bool = True): - """ - Evaluate the model - - :param evaluator: - the evaluator - :param verbose: - print the results - :param output_path: - the evaluator can write the results to this path - """ - if output_path is not None: - os.makedirs(output_path, exist_ok=True) - return evaluator(self, output_path, verbose) - - def _eval_during_training(self, evaluator, output_path, save_best_model, epoch, steps, callback): - """Runs evaluation during the training""" - if evaluator is not None: - score = evaluator(self, output_path=output_path, epoch=epoch, steps=steps) - if callback is not None: - callback(score, epoch, steps) - if score > self.best_score: - self.best_score = score - if save_best_model: - self.save(output_path) - - @staticmethod - def _get_scheduler(optimizer, scheduler: str, warmup_steps: int, t_total: int): - """ - Returns the correct learning rate scheduler. Available scheduler: constantlr, warmupconstant, warmuplinear, warmupcosine, warmupcosinewithhardrestarts - """ - scheduler = scheduler.lower() - if scheduler == 'constantlr': - return transformers.get_constant_schedule(optimizer) - elif scheduler == 'warmupconstant': - return transformers.get_constant_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps) - elif scheduler == 'warmuplinear': - return transformers.get_linear_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps, - num_training_steps=t_total) - elif scheduler == 'warmupcosine': - return transformers.get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps, - num_training_steps=t_total) - elif scheduler == 'warmupcosinewithhardrestarts': - return transformers.get_cosine_with_hard_restarts_schedule_with_warmup(optimizer, - num_warmup_steps=warmup_steps, - num_training_steps=t_total) - else: - raise ValueError("Unknown scheduler {}".format(scheduler)) - - @property - def device(self) -> device: - """ - Get torch.device from module, assuming that the whole module has one device. - """ - try: - return next(self.parameters()).device - except StopIteration: - # For nn.DataParallel compatibility in PyTorch 1.5 - - def find_tensor_attributes(module: nn.Module) -> List[Tuple[str, Tensor]]: - tuples = [(k, v) for k, v in module.__dict__.items() if torch.is_tensor(v)] - return tuples - - gen = self._named_members(get_members_fn=find_tensor_attributes) - first_tuple = next(gen) - return first_tuple[1].device - def save_model_args(self, output_dir): os.makedirs(output_dir, exist_ok=True) self.args.save(output_dir) @@ -647,30 +109,4 @@ class SiameseTransQuestModel(nn.Sequential): args.load(input_dir) return args - @property - def tokenizer(self): - """ - Property to get the tokenizer that is used by this model - """ - return self._first_module().tokenizer - - @tokenizer.setter - def tokenizer(self, value): - """ - Property to set the tokenizer that is should used by this model - """ - self._first_module().tokenizer = value - - @property - def max_seq_length(self): - """ - Property to get the maximal input sequence length for the model. Longer inputs will be truncated. - """ - return self._first_module().max_seq_length - - @max_seq_length.setter - def max_seq_length(self, value): - """ - Property to set the maximal input sequence length for the model. Longer inputs will be truncated. - """ - self._first_module().max_seq_length = value + |