Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/TharinduDR/TransQuest.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py')
-rw-r--r--transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py606
1 files changed, 606 insertions, 0 deletions
diff --git a/transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py b/transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py
new file mode 100644
index 0000000..02c1c73
--- /dev/null
+++ b/transquest/algo/sentence_level/siamesetransquest/models/siamese_transformer.py
@@ -0,0 +1,606 @@
+import json
+import logging
+import math
+import os
+import queue
+from collections import OrderedDict
+from typing import List, Dict, Tuple, Iterable, Type, Union, Callable
+
+import numpy as np
+import torch
+import torch.multiprocessing as mp
+import transformers
+from numpy import ndarray
+from sklearn.metrics.pairwise import paired_cosine_distances
+from torch import nn, Tensor, device
+from torch.optim.optimizer import Optimizer
+from torch.utils.data import DataLoader
+from tqdm.autonotebook import trange
+
+from transquest.algo.sentence_level.siamesetransquest.evaluation.sentence_evaluator import SentenceEvaluator
+from transquest.algo.sentence_level.siamesetransquest.models import Transformer, Pooling
+from transquest.algo.sentence_level.siamesetransquest.util import batch_to_device
+
+logger = logging.getLogger(__name__)
+
+
+class SiameseTransformer(nn.Sequential):
+
+ def __init__(self, model_name: str = None, args=None, device: str = None):
+
+ transformer_model = Transformer(model_name, max_seq_length=args.max_seq_length)
+ pooling_model = Pooling(transformer_model.get_word_embedding_dimension(), pooling_mode_mean_tokens=True,
+ pooling_mode_cls_token=False,
+ pooling_mode_max_tokens=False)
+ modules = [transformer_model, pooling_model]
+
+ if modules is not None and not isinstance(modules, OrderedDict):
+ modules = OrderedDict([(str(idx), module) for idx, module in enumerate(modules)])
+
+ super().__init__(modules)
+ if device is None:
+ device = "cuda" if torch.cuda.is_available() else "cpu"
+ logger.info("Use pytorch device: {}".format(device))
+
+ self._target_device = torch.device(device)
+
+ def encode(self, sentences: Union[str, List[str], List[int]],
+ batch_size: int = 32,
+ show_progress_bar: bool = None,
+ output_value: str = 'sentence_embedding',
+ convert_to_numpy: bool = True,
+ convert_to_tensor: bool = False,
+ device: str = None,
+ normalize_embeddings: bool = False) -> Union[List[Tensor], ndarray, Tensor]:
+ """
+ Computes sentence embeddings
+
+ :param sentences: the sentences to embed
+ :param batch_size: the batch size used for the computation
+ :param show_progress_bar: Output a progress bar when encode sentences
+ :param output_value: Default sentence_embedding, to get sentence embeddings. Can be set to token_embeddings to get wordpiece token embeddings.
+ :param convert_to_numpy: If true, the output is a list of numpy vectors. Else, it is a list of pytorch tensors.
+ :param convert_to_tensor: If true, you get one large tensor as return. Overwrites any setting from convert_to_numpy
+ :param device: Which torch.device to use for the computation
+ :param normalize_embeddings: If set to true, returned vectors will have length 1. In that case, the faster dot-product (util.dot_score) instead of cosine similarity can be used.
+
+ :return:
+ By default, a list of tensors is returned. If convert_to_tensor, a stacked tensor is returned. If convert_to_numpy, a numpy matrix is returned.
+ """
+ self.eval()
+ if show_progress_bar is None:
+ show_progress_bar = (
+ logger.getEffectiveLevel() == logging.INFO or logger.getEffectiveLevel() == logging.DEBUG)
+
+ if convert_to_tensor:
+ convert_to_numpy = False
+
+ if output_value == 'token_embeddings':
+ convert_to_tensor = False
+ convert_to_numpy = False
+
+ input_was_string = False
+ if isinstance(sentences, str) or not hasattr(sentences,
+ '__len__'): # Cast an individual sentence to a list with length 1
+ sentences = [sentences]
+ input_was_string = True
+
+ if device is None:
+ device = self._target_device
+
+ self.to(device)
+
+ all_embeddings = []
+ length_sorted_idx = np.argsort([-self._text_length(sen) for sen in sentences])
+ sentences_sorted = [sentences[idx] for idx in length_sorted_idx]
+
+ for start_index in trange(0, len(sentences), batch_size, desc="Batches", disable=not show_progress_bar):
+ sentences_batch = sentences_sorted[start_index:start_index + batch_size]
+ features = self.tokenize(sentences_batch)
+ features = batch_to_device(features, device)
+
+ with torch.no_grad():
+ out_features = self.forward(features)
+
+ if output_value == 'token_embeddings':
+ embeddings = []
+ for token_emb, attention in zip(out_features[output_value], out_features['attention_mask']):
+ last_mask_id = len(attention) - 1
+ while last_mask_id > 0 and attention[last_mask_id].item() == 0:
+ last_mask_id -= 1
+
+ embeddings.append(token_emb[0:last_mask_id + 1])
+ else: # Sentence embeddings
+ embeddings = out_features[output_value]
+ embeddings = embeddings.detach()
+ if normalize_embeddings:
+ embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
+
+ # fixes for #522 and #487 to avoid oom problems on gpu with large datasets
+ if convert_to_numpy:
+ embeddings = embeddings.cpu()
+
+ all_embeddings.extend(embeddings)
+
+ all_embeddings = [all_embeddings[idx] for idx in np.argsort(length_sorted_idx)]
+
+ if convert_to_tensor:
+ all_embeddings = torch.stack(all_embeddings)
+ elif convert_to_numpy:
+ all_embeddings = np.asarray([emb.numpy() for emb in all_embeddings])
+
+ if input_was_string:
+ all_embeddings = all_embeddings[0]
+
+ return all_embeddings
+
+ def predict(self, to_predict, verbose=True):
+ sentences1 = []
+ sentences2 = []
+
+ for text_1, text_2 in to_predict:
+ sentences1.append(text_1)
+ sentences2.append(text_2)
+
+ embeddings1 = self.encode(sentences1, show_progress_bar=verbose, convert_to_numpy=True)
+ embeddings2 = self.encode(sentences2, show_progress_bar=verbose, convert_to_numpy=True)
+
+ cosine_scores = 1 - (paired_cosine_distances(embeddings1, embeddings2))
+
+ return cosine_scores
+
+ def start_multi_process_pool(self, target_devices: List[str] = None):
+ """
+ Starts multi process to process the encoding with several, independent processes.
+ This method is recommended if you want to encode on multiple GPUs. It is advised
+ to start only one process per GPU. This method works together with encode_multi_process
+
+ :param target_devices: PyTorch target devices, e.g. cuda:0, cuda:1... If None, all available CUDA devices will be used
+ :return: Returns a dict with the target processes, an input queue and and output queue.
+ """
+ if target_devices is None:
+ if torch.cuda.is_available():
+ target_devices = ['cuda:{}'.format(i) for i in range(torch.cuda.device_count())]
+ else:
+ logger.info("CUDA is not available. Start 4 CPU worker")
+ target_devices = ['cpu'] * 4
+
+ logger.info("Start multi-process pool on devices: {}".format(', '.join(map(str, target_devices))))
+
+ ctx = mp.get_context('spawn')
+ input_queue = ctx.Queue()
+ output_queue = ctx.Queue()
+ processes = []
+
+ for cuda_id in target_devices:
+ p = ctx.Process(target=SiameseTransformer._encode_multi_process_worker,
+ args=(cuda_id, self, input_queue, output_queue), daemon=True)
+ p.start()
+ processes.append(p)
+
+ return {'input': input_queue, 'output': output_queue, 'processes': processes}
+
+ @staticmethod
+ def stop_multi_process_pool(pool):
+ """
+ Stops all processes started with start_multi_process_pool
+ """
+ for p in pool['processes']:
+ p.terminate()
+
+ for p in pool['processes']:
+ p.join()
+ p.close()
+
+ pool['input'].close()
+ pool['output'].close()
+
+ def encode_multi_process(self, sentences: List[str], pool: Dict[str, object], batch_size: int = 32,
+ chunk_size: int = None):
+ """
+ This method allows to run encode() on multiple GPUs. The sentences are chunked into smaller packages
+ and sent to individual processes, which encode these on the different GPUs. This method is only suitable
+ for encoding large sets of sentences
+
+ :param sentences: List of sentences
+ :param pool: A pool of workers started with SentenceTransformer.start_multi_process_pool
+ :param batch_size: Encode sentences with batch size
+ :param chunk_size: Sentences are chunked and sent to the individual processes. If none, it determine a sensible size.
+ :return: Numpy matrix with all embeddings
+ """
+
+ if chunk_size is None:
+ chunk_size = min(math.ceil(len(sentences) / len(pool["processes"]) / 10), 5000)
+
+ logger.info("Chunk data into packages of size {}".format(chunk_size))
+
+ input_queue = pool['input']
+ last_chunk_id = 0
+ chunk = []
+
+ for sentence in sentences:
+ chunk.append(sentence)
+ if len(chunk) >= chunk_size:
+ input_queue.put([last_chunk_id, batch_size, chunk])
+ last_chunk_id += 1
+ chunk = []
+
+ if len(chunk) > 0:
+ input_queue.put([last_chunk_id, batch_size, chunk])
+ last_chunk_id += 1
+
+ output_queue = pool['output']
+ results_list = sorted([output_queue.get() for _ in range(last_chunk_id)], key=lambda x: x[0])
+ embeddings = np.concatenate([result[1] for result in results_list])
+ return embeddings
+
+ @staticmethod
+ def _encode_multi_process_worker(target_device: str, model, input_queue, results_queue):
+ """
+ Internal working process to encode sentences in multi-process setup
+ """
+ while True:
+ try:
+ id, batch_size, sentences = input_queue.get()
+ embeddings = model.encode(sentences, device=target_device, show_progress_bar=False,
+ convert_to_numpy=True, batch_size=batch_size)
+ results_queue.put([id, embeddings])
+ except queue.Empty:
+ break
+
+ def get_max_seq_length(self):
+ """
+ Returns the maximal sequence length for input the model accepts. Longer inputs will be truncated
+ """
+ if hasattr(self._first_module(), 'max_seq_length'):
+ return self._first_module().max_seq_length
+
+ return None
+
+ def tokenize(self, text: str):
+ """
+ Tokenizes the text
+ """
+ return self._first_module().tokenize(text)
+
+ def get_sentence_features(self, *features):
+ return self._first_module().get_sentence_features(*features)
+
+ def get_sentence_embedding_dimension(self):
+ for mod in reversed(self._modules.values()):
+ sent_embedding_dim_method = getattr(mod, "get_sentence_embedding_dimension", None)
+ if callable(sent_embedding_dim_method):
+ return sent_embedding_dim_method()
+ return None
+
+ def _first_module(self):
+ """Returns the first module of this sequential embedder"""
+ return self._modules[next(iter(self._modules))]
+
+ def _last_module(self):
+ """Returns the last module of this sequential embedder"""
+ return self._modules[next(reversed(self._modules))]
+
+ def save(self, path):
+ """
+ Saves all elements for this seq. sentence embedder into different sub-folders
+ """
+ if path is None:
+ return
+
+ os.makedirs(path, exist_ok=True)
+
+ logger.info("Save model to {}".format(path))
+ contained_modules = []
+
+ for idx, name in enumerate(self._modules):
+ module = self._modules[name]
+ # model_path = os.path.join(path, str(idx)+"_"+type(module).__name__)
+ os.makedirs(path, exist_ok=True)
+ module.save(path)
+ contained_modules.append(
+ {'idx': idx, 'name': name, 'path': os.path.basename(path), 'type': type(module).__module__})
+
+ with open(os.path.join(path, 'modules.json'), 'w') as fOut:
+ json.dump(contained_modules, fOut, indent=2)
+
+ self.save_model_args(path)
+
+ def smart_batching_collate(self, batch):
+ """
+ Transforms a batch from a SmartBatchingDataset to a batch of tensors for the model
+ Here, batch is a list of tuples: [(tokens, label), ...]
+
+ :param batch:
+ a batch from a SmartBatchingDataset
+ :return:
+ a batch of tensors for the model
+ """
+ num_texts = len(batch[0].texts)
+ texts = [[] for _ in range(num_texts)]
+ labels = []
+
+ for example in batch:
+ for idx, text in enumerate(example.texts):
+ texts[idx].append(text)
+
+ labels.append(example.label)
+
+ labels = torch.tensor(labels).to(self._target_device)
+
+ sentence_features = []
+ for idx in range(num_texts):
+ tokenized = self.tokenize(texts[idx])
+ batch_to_device(tokenized, self._target_device)
+ sentence_features.append(tokenized)
+
+ return sentence_features, labels
+
+ def _text_length(self, text: Union[List[int], List[List[int]]]):
+ """
+ Help function to get the length for the input text. Text can be either
+ a list of ints (which means a single text as input), or a tuple of list of ints
+ (representing several text inputs to the model).
+ """
+
+ if isinstance(text, dict): # {key: value} case
+ return len(next(iter(text.values())))
+ elif not hasattr(text, '__len__'): # Object has no len() method
+ return 1
+ elif len(text) == 0 or isinstance(text[0], int): # Empty string or list of ints
+ return len(text)
+ else:
+ return sum([len(t) for t in text]) # Sum of length of individual strings
+
+
+ def fit(self,
+ train_objectives: Iterable[Tuple[DataLoader, nn.Module]],
+ evaluator: SentenceEvaluator = None,
+ epochs: int = 1,
+ steps_per_epoch=None,
+ scheduler: str = 'WarmupLinear',
+ warmup_steps: int = 10000,
+ optimizer_class: Type[Optimizer] = transformers.AdamW,
+ optimizer_params: Dict[str, object] = {'lr': 2e-5},
+ weight_decay: float = 0.01,
+ evaluation_steps: int = 0,
+ output_path: str = None,
+ save_best_model: bool = True,
+ max_grad_norm: float = 1,
+ use_amp: bool = False,
+ callback: Callable[[float, int, int], None] = None,
+ show_progress_bar: bool = True
+ ):
+ """
+ Train the model with the given training objective
+ Each training objective is sampled in turn for one batch.
+ We sample only as many batches from each objective as there are in the smallest one
+ to make sure of equal training with each dataset.
+
+ :param train_objectives: Tuples of (DataLoader, LossFunction). Pass more than one for multi-task learning
+ :param evaluator: An evaluator (sentence_transformers.evaluation) evaluates the model performance during training on held-out dev data. It is used to determine the best model that is saved to disc.
+ :param epochs: Number of epochs for training
+ :param steps_per_epoch: Number of training steps per epoch. If set to None (default), one epoch is equal the DataLoader size from train_objectives.
+ :param scheduler: Learning rate scheduler. Available schedulers: constantlr, warmupconstant, warmuplinear, warmupcosine, warmupcosinewithhardrestarts
+ :param warmup_steps: Behavior depends on the scheduler. For WarmupLinear (default), the learning rate is increased from o up to the maximal learning rate. After these many training steps, the learning rate is decreased linearly back to zero.
+ :param optimizer_class: Optimizer
+ :param optimizer_params: Optimizer parameters
+ :param weight_decay: Weight decay for model parameters
+ :param evaluation_steps: If > 0, evaluate the model using evaluator after each number of training steps
+ :param output_path: Storage path for the model and evaluation files
+ :param save_best_model: If true, the best model (according to evaluator) is stored at output_path
+ :param max_grad_norm: Used for gradient normalization.
+ :param use_amp: Use Automatic Mixed Precision (AMP). Only for Pytorch >= 1.6.0
+ :param callback: Callback function that is invoked after each evaluation.
+ It must accept the following three parameters in this order:
+ `score`, `epoch`, `steps`
+ :param show_progress_bar: If True, output a tqdm progress bar
+ """
+
+ if use_amp:
+ from torch.cuda.amp import autocast
+ scaler = torch.cuda.amp.GradScaler()
+
+ self.to(self._target_device)
+
+ if output_path is not None:
+ os.makedirs(output_path, exist_ok=True)
+
+ dataloaders = [dataloader for dataloader, _ in train_objectives]
+
+ # Use smart batching
+ for dataloader in dataloaders:
+ dataloader.collate_fn = self.smart_batching_collate
+
+ loss_models = [loss for _, loss in train_objectives]
+ for loss_model in loss_models:
+ loss_model.to(self._target_device)
+
+ self.best_score = -9999999
+
+ if steps_per_epoch is None or steps_per_epoch == 0:
+ steps_per_epoch = min([len(dataloader) for dataloader in dataloaders])
+
+ num_train_steps = int(steps_per_epoch * epochs)
+
+ # Prepare optimizers
+ optimizers = []
+ schedulers = []
+ for loss_model in loss_models:
+ param_optimizer = list(loss_model.named_parameters())
+
+ no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
+ optimizer_grouped_parameters = [
+ {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
+ 'weight_decay': weight_decay},
+ {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
+ ]
+
+ optimizer = optimizer_class(optimizer_grouped_parameters, **optimizer_params)
+ scheduler_obj = self._get_scheduler(optimizer, scheduler=scheduler, warmup_steps=warmup_steps,
+ t_total=num_train_steps)
+
+ optimizers.append(optimizer)
+ schedulers.append(scheduler_obj)
+
+ global_step = 0
+ data_iterators = [iter(dataloader) for dataloader in dataloaders]
+
+ num_train_objectives = len(train_objectives)
+
+ skip_scheduler = False
+ for epoch in trange(epochs, desc="Epoch", disable=not show_progress_bar):
+ training_steps = 0
+
+ for loss_model in loss_models:
+ loss_model.zero_grad()
+ loss_model.train()
+
+ for _ in trange(steps_per_epoch, desc="Iteration", smoothing=0.05, disable=not show_progress_bar):
+ for train_idx in range(num_train_objectives):
+ loss_model = loss_models[train_idx]
+ optimizer = optimizers[train_idx]
+ scheduler = schedulers[train_idx]
+ data_iterator = data_iterators[train_idx]
+
+ try:
+ data = next(data_iterator)
+ except StopIteration:
+ data_iterator = iter(dataloaders[train_idx])
+ data_iterators[train_idx] = data_iterator
+ data = next(data_iterator)
+
+ features, labels = data
+
+ if use_amp:
+ with autocast():
+ loss_value = loss_model(features, labels)
+
+ scale_before_step = scaler.get_scale()
+ scaler.scale(loss_value).backward()
+ scaler.unscale_(optimizer)
+ torch.nn.utils.clip_grad_norm_(loss_model.parameters(), max_grad_norm)
+ scaler.step(optimizer)
+ scaler.update()
+
+ skip_scheduler = scaler.get_scale() != scale_before_step
+ else:
+ loss_value = loss_model(features, labels)
+ loss_value.backward()
+ torch.nn.utils.clip_grad_norm_(loss_model.parameters(), max_grad_norm)
+ optimizer.step()
+
+ optimizer.zero_grad()
+
+ if not skip_scheduler:
+ scheduler.step()
+
+ training_steps += 1
+ global_step += 1
+
+ if evaluation_steps > 0 and training_steps % evaluation_steps == 0:
+ self._eval_during_training(evaluator, output_path, save_best_model, epoch,
+ training_steps, callback)
+ for loss_model in loss_models:
+ loss_model.zero_grad()
+ loss_model.train()
+
+ self._eval_during_training(evaluator, output_path, save_best_model, epoch, -1, callback)
+
+ if evaluator is None and output_path is not None: # No evaluator, but output path: save final model version
+ self.save(output_path)
+
+ def evaluate(self, evaluator: SentenceEvaluator, output_path: str = None, verbose: bool = True):
+ """
+ Evaluate the model
+
+ :param evaluator:
+ the evaluator
+ :param verbose:
+ print the results
+ :param output_path:
+ the evaluator can write the results to this path
+ """
+ if output_path is not None:
+ os.makedirs(output_path, exist_ok=True)
+ return evaluator(self, output_path, verbose)
+
+ def _eval_during_training(self, evaluator, output_path, save_best_model, epoch, steps, callback):
+ """Runs evaluation during the training"""
+ if evaluator is not None:
+ score = evaluator(self, output_path=output_path, epoch=epoch, steps=steps)
+ if callback is not None:
+ callback(score, epoch, steps)
+ if score > self.best_score:
+ self.best_score = score
+ if save_best_model:
+ self.save(output_path)
+
+ @staticmethod
+ def _get_scheduler(optimizer, scheduler: str, warmup_steps: int, t_total: int):
+ """
+ Returns the correct learning rate scheduler. Available scheduler: constantlr, warmupconstant, warmuplinear, warmupcosine, warmupcosinewithhardrestarts
+ """
+ scheduler = scheduler.lower()
+ if scheduler == 'constantlr':
+ return transformers.get_constant_schedule(optimizer)
+ elif scheduler == 'warmupconstant':
+ return transformers.get_constant_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps)
+ elif scheduler == 'warmuplinear':
+ return transformers.get_linear_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps,
+ num_training_steps=t_total)
+ elif scheduler == 'warmupcosine':
+ return transformers.get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps,
+ num_training_steps=t_total)
+ elif scheduler == 'warmupcosinewithhardrestarts':
+ return transformers.get_cosine_with_hard_restarts_schedule_with_warmup(optimizer,
+ num_warmup_steps=warmup_steps,
+ num_training_steps=t_total)
+ else:
+ raise ValueError("Unknown scheduler {}".format(scheduler))
+
+ @property
+ def device(self) -> device:
+ """
+ Get torch.device from module, assuming that the whole module has one device.
+ """
+ try:
+ return next(self.parameters()).device
+ except StopIteration:
+ # For nn.DataParallel compatibility in PyTorch 1.5
+
+ def find_tensor_attributes(module: nn.Module) -> List[Tuple[str, Tensor]]:
+ tuples = [(k, v) for k, v in module.__dict__.items() if torch.is_tensor(v)]
+ return tuples
+
+ gen = self._named_members(get_members_fn=find_tensor_attributes)
+ first_tuple = next(gen)
+ return first_tuple[1].device
+
+ @property
+ def tokenizer(self):
+ """
+ Property to get the tokenizer that is used by this model
+ """
+ return self._first_module().tokenizer
+
+ @tokenizer.setter
+ def tokenizer(self, value):
+ """
+ Property to set the tokenizer that is should used by this model
+ """
+ self._first_module().tokenizer = value
+
+ @property
+ def max_seq_length(self):
+ """
+ Property to get the maximal input sequence length for the model. Longer inputs will be truncated.
+ """
+ return self._first_module().max_seq_length
+
+ @max_seq_length.setter
+ def max_seq_length(self, value):
+ """
+ Property to set the maximal input sequence length for the model. Longer inputs will be truncated.
+ """
+ self._first_module().max_seq_length = value \ No newline at end of file