diff --git a/microservices/trainer/trainer.py b/microservices/trainer/trainer.py index 49d8941ddf46e89518f06a2075f61c548206b72c..00314706d9e38a866b7bbea1f7f1eee9611b92e2 100644 --- a/microservices/trainer/trainer.py +++ b/microservices/trainer/trainer.py @@ -19,10 +19,6 @@ from transformers.utils import ( import trainer_pb2_grpc is_busy = False -MAX_LENGTH = 256 - -global_tag2id = global_id2tag = global_label2id = global_id2label = global_tokenizer = global_n_labels = global_fondation_model_id = None - class TrainerServicer(trainer_pb2_grpc.TrainerServicer): def StartTraining(self, request, context): @@ -56,27 +52,157 @@ def serve(): def training_process(training_data, fondation_model_id, finetuned_repo_name, huggingface_token): - global_fondation_model_id = fondation_model_id - global_tag2id = {'action': 1, 'actor': 2, 'artifact': 3, 'condition': 4, 'location': 5, 'modality': 6, 'reference': 7, + MAX_LENGTH = 256 + tag2id = {'action': 1, 'actor': 2, 'artifact': 3, 'condition': 4, 'location': 5, 'modality': 6, 'reference': 7, 'time': 8} - global_id2tag = {v: k for k, v in global_tag2id.items()} - global_label2id = { + id2tag = {v: k for k, v in tag2id.items()} + label2id = { 'O': 0, - **{f'{k}': v for k, v in global_tag2id.items()} + **{f'{k}': v for k, v in tag2id.items()} } - global_id2label = {v: k for k, v in global_label2id.items()} + id2label = {v: k for k, v in label2id.items()} train_ds = Dataset.from_list(training_data) from transformers import AutoTokenizer - tokenizer = AutoTokenizer.from_pretrained(global_fondation_model_id) - print("post load tokenizer") + tokenizer = AutoTokenizer.from_pretrained(fondation_model_id) + + def get_token_role_in_span(token_start: int, token_end: int, span_start: int, span_end: int): + if token_end <= token_start: + return "N" + if token_start < span_start or token_end > span_end: + return "O" + else: + return "I" + + def tokenize_and_adjust_labels(sample): + tokenized = tokenizer(sample["text"], + return_offsets_mapping=True, + padding="max_length", + max_length=MAX_LENGTH, + truncation=True) + + labels = [[0 for _ in label2id.keys()] for _ in range(MAX_LENGTH)] + + for (token_start, token_end), token_labels in zip(tokenized["offset_mapping"], labels): + for span in sample["tags"]: + role = get_token_role_in_span(token_start, token_end, span["start"], span["end"]) + if role == "I": + token_labels[label2id[f"{span['tag']}"]] = 1 + + return {**tokenized, "labels": labels} + tokenized_train_ds = train_ds.map(tokenize_and_adjust_labels, remove_columns=train_ds.column_names) from transformers import DataCollatorWithPadding data_collator = DataCollatorWithPadding(tokenizer, padding=True) - n_labels = len(global_id2label) + n_labels = len(id2label) + + def divide(a: int, b: int): + return a / b if b > 0 else 0 + + def compute_metrics(p): + predictions, true_labels = p + + predicted_labels = np.where(predictions > 0, np.ones(predictions.shape), np.zeros(predictions.shape)) + metrics = {} + + cm = multilabel_confusion_matrix(true_labels.reshape(-1, n_labels), predicted_labels.reshape(-1, n_labels)) + + for label_idx, matrix in enumerate(cm): + if label_idx == 0: + continue # We don't care about the label "O" + tp, fp, fn = matrix[1, 1], matrix[0, 1], matrix[1, 0] + precision = divide(tp, tp + fp) + recall = divide(tp, tp + fn) + f1 = divide(2 * precision * recall, precision + recall) + metrics[f"recall_{id2label[label_idx]}"] = recall + metrics[f"precision_{id2label[label_idx]}"] = precision + metrics[f"f1_{id2label[label_idx]}"] = f1 + + f1_values = {k: v for k, v in metrics.items() if k.startswith('f1_')} + macro_f1 = sum(f1_values.values()) / len(f1_values) + metrics["macro_f1"] = macro_f1 + + return metrics + + class RobertaForSpanCategorization(RobertaPreTrainedModel): + _keys_to_ignore_on_load_unexpected = [r"pooler"] + _keys_to_ignore_on_load_missing = [r"position_ids"] + + def __init__(self, config): + super().__init__(config) + self.num_labels = config.num_labels + self.roberta = RobertaModel(config, add_pooling_layer=False) + classifier_dropout = ( + config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob + ) + self.dropout = nn.Dropout(classifier_dropout) + self.classifier = nn.Linear(config.hidden_size, config.num_labels) + # Initialize weights and apply final processing + self.post_init() + + @add_start_docstrings_to_model_forward(ROBERTA_INPUTS_DOCSTRING.format("batch_size, sequence_length")) + def forward( + self, + input_ids: Optional[torch.LongTensor] = None, + attention_mask: Optional[torch.FloatTensor] = None, + token_type_ids: Optional[torch.LongTensor] = None, + position_ids: Optional[torch.LongTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + labels: Optional[torch.LongTensor] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + ) -> Union[Tuple[torch.Tensor], TokenClassifierOutput]: + r""" + labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): + Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`. + """ + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + outputs = self.roberta( + input_ids, + attention_mask=attention_mask, + token_type_ids=token_type_ids, + position_ids=position_ids, + head_mask=head_mask, + inputs_embeds=inputs_embeds, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + ) + sequence_output = outputs[0] + sequence_output = self.dropout(sequence_output) + logits = self.classifier(sequence_output) + + loss = None + if labels is not None: + loss_fct = nn.BCEWithLogitsLoss() + loss = loss_fct(logits, labels.float()) + if not return_dict: + output = (logits,) + outputs[2:] + return ((loss,) + output) if loss is not None else output + return TokenClassifierOutput( + loss=loss, + logits=logits, + hidden_states=outputs.hidden_states, + attentions=outputs.attentions, + ) + + class TrainingMetricsCallback(TrainerCallback): + def __init__(self): + self.macro_f1 = [] + self.steps = [] + self.counter = 0 + + def on_evaluate(self, args, state, control, metrics=None, **kwargs): + if metrics is not None: + if 'eval_macro_f1' in metrics: + self.macro_f1.append(metrics['eval_macro_f1']) + self.counter += 1 + self.steps.append(self.counter) training_args = TrainingArguments( output_dir="./models/fine_tune_bert_output_span_cat", @@ -97,6 +223,9 @@ def training_process(training_data, fondation_model_id, finetuned_repo_name, hug metrics_callback = TrainingMetricsCallback() + def model_init(): + return RobertaForSpanCategorization.from_pretrained(fondation_model_id, id2label=id2label, label2id=label2id) + trainer = Trainer( model_init=model_init, args=training_args, @@ -112,140 +241,5 @@ def training_process(training_data, fondation_model_id, finetuned_repo_name, hug tokenizer.push_to_hub(finetuned_repo_name, use_auth_token=huggingface_token) -def model_init(): - return RobertaForSpanCategorization.from_pretrained(global_fondation_model_id, id2label=global_id2label, label2id=global_label2id) - - -def get_token_role_in_span(token_start: int, token_end: int, span_start: int, span_end: int): - if token_end <= token_start: - return "N" - if token_start < span_start or token_end > span_end: - return "O" - else: - return "I" - - -def tokenize_and_adjust_labels(sample): - tokenized = global_tokenizer(sample["text"], - return_offsets_mapping=True, - padding="max_length", - max_length=MAX_LENGTH, - truncation=True) - - labels = [[0 for _ in global_label2id.keys()] for _ in range(MAX_LENGTH)] - - for (token_start, token_end), token_labels in zip(tokenized["offset_mapping"], labels): - for span in sample["tags"]: - role = get_token_role_in_span(token_start, token_end, span["start"], span["end"]) - if role == "I": - token_labels[global_label2id[f"{span['tag']}"]] = 1 - - return {**tokenized, "labels": labels} - - -def divide(a: int, b: int): - return a / b if b > 0 else 0 - - -def compute_metrics(p): - predictions, true_labels = p - - predicted_labels = np.where(predictions > 0, np.ones(predictions.shape), np.zeros(predictions.shape)) - metrics = {} - - cm = multilabel_confusion_matrix(true_labels.reshape(-1, global_n_labels), predicted_labels.reshape(-1, global_n_labels)) - - for label_idx, matrix in enumerate(cm): - if label_idx == 0: - continue # We don't care about the label "O" - tp, fp, fn = matrix[1, 1], matrix[0, 1], matrix[1, 0] - precision = divide(tp, tp + fp) - recall = divide(tp, tp + fn) - f1 = divide(2 * precision * recall, precision + recall) - metrics[f"recall_{global_id2label[label_idx]}"] = recall - metrics[f"precision_{global_id2label[label_idx]}"] = precision - metrics[f"f1_{global_id2label[label_idx]}"] = f1 - - f1_values = {k: v for k, v in metrics.items() if k.startswith('f1_')} - macro_f1 = sum(f1_values.values()) / len(f1_values) - metrics["macro_f1"] = macro_f1 - - return metrics - - -class RobertaForSpanCategorization(RobertaPreTrainedModel): - _keys_to_ignore_on_load_unexpected = [r"pooler"] - _keys_to_ignore_on_load_missing = [r"position_ids"] - - def __init__(self, config): - super().__init__(config) - self.num_labels = config.num_labels - self.roberta = RobertaModel(config, add_pooling_layer=False) - classifier_dropout = ( - config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob - ) - self.dropout = nn.Dropout(classifier_dropout) - self.classifier = nn.Linear(config.hidden_size, config.num_labels) - self.post_init() - - @add_start_docstrings_to_model_forward(ROBERTA_INPUTS_DOCSTRING.format("batch_size, sequence_length")) - def forward( - self, - input_ids: Optional[torch.LongTensor] = None, - attention_mask: Optional[torch.FloatTensor] = None, - token_type_ids: Optional[torch.LongTensor] = None, - position_ids: Optional[torch.LongTensor] = None, - head_mask: Optional[torch.FloatTensor] = None, - inputs_embeds: Optional[torch.FloatTensor] = None, - labels: Optional[torch.LongTensor] = None, - output_attentions: Optional[bool] = None, - output_hidden_states: Optional[bool] = None, - return_dict: Optional[bool] = None, - ) -> Union[Tuple[torch.Tensor], TokenClassifierOutput]: - return_dict = return_dict if return_dict is not None else self.config.use_return_dict - outputs = self.roberta( - input_ids, - attention_mask=attention_mask, - token_type_ids=token_type_ids, - position_ids=position_ids, - head_mask=head_mask, - inputs_embeds=inputs_embeds, - output_attentions=output_attentions, - output_hidden_states=output_hidden_states, - return_dict=return_dict, - ) - sequence_output = outputs[0] - sequence_output = self.dropout(sequence_output) - logits = self.classifier(sequence_output) - - loss = None - if labels is not None: - loss_fct = nn.BCEWithLogitsLoss() - loss = loss_fct(logits, labels.float()) - if not return_dict: - output = (logits,) + outputs[2:] - return ((loss,) + output) if loss is not None else output - return TokenClassifierOutput( - loss=loss, - logits=logits, - hidden_states=outputs.hidden_states, - attentions=outputs.attentions, - ) - - -class TrainingMetricsCallback(TrainerCallback): - def __init__(self): - self.macro_f1 = [] - self.steps = [] - self.counter = 0 - - def on_evaluate(self, args, state, control, metrics=None, **kwargs): - if metrics is not None: - if 'eval_macro_f1' in metrics: - self.macro_f1.append(metrics['eval_macro_f1']) - self.counter += 1 - self.steps.append(self.counter) - - if __name__ == '__main__': serve()