Skip to content
Snippets Groups Projects
Commit 40579d40 authored by Julien B.'s avatar Julien B.
Browse files

fix(trainer): change trainer function

parent 03959833
No related branches found
No related tags found
No related merge requests found
......@@ -19,10 +19,6 @@ from transformers.utils import (
import trainer_pb2_grpc
is_busy = False
MAX_LENGTH = 256
global_tag2id = global_id2tag = global_label2id = global_id2label = global_tokenizer = global_n_labels = global_fondation_model_id = None
class TrainerServicer(trainer_pb2_grpc.TrainerServicer):
def StartTraining(self, request, context):
......@@ -56,27 +52,157 @@ def serve():
def training_process(training_data, fondation_model_id, finetuned_repo_name, huggingface_token):
global_fondation_model_id = fondation_model_id
global_tag2id = {'action': 1, 'actor': 2, 'artifact': 3, 'condition': 4, 'location': 5, 'modality': 6, 'reference': 7,
MAX_LENGTH = 256
tag2id = {'action': 1, 'actor': 2, 'artifact': 3, 'condition': 4, 'location': 5, 'modality': 6, 'reference': 7,
'time': 8}
global_id2tag = {v: k for k, v in global_tag2id.items()}
global_label2id = {
id2tag = {v: k for k, v in tag2id.items()}
label2id = {
'O': 0,
**{f'{k}': v for k, v in global_tag2id.items()}
**{f'{k}': v for k, v in tag2id.items()}
}
global_id2label = {v: k for k, v in global_label2id.items()}
id2label = {v: k for k, v in label2id.items()}
train_ds = Dataset.from_list(training_data)
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(global_fondation_model_id)
print("post load tokenizer")
tokenizer = AutoTokenizer.from_pretrained(fondation_model_id)
def get_token_role_in_span(token_start: int, token_end: int, span_start: int, span_end: int):
if token_end <= token_start:
return "N"
if token_start < span_start or token_end > span_end:
return "O"
else:
return "I"
def tokenize_and_adjust_labels(sample):
tokenized = tokenizer(sample["text"],
return_offsets_mapping=True,
padding="max_length",
max_length=MAX_LENGTH,
truncation=True)
labels = [[0 for _ in label2id.keys()] for _ in range(MAX_LENGTH)]
for (token_start, token_end), token_labels in zip(tokenized["offset_mapping"], labels):
for span in sample["tags"]:
role = get_token_role_in_span(token_start, token_end, span["start"], span["end"])
if role == "I":
token_labels[label2id[f"{span['tag']}"]] = 1
return {**tokenized, "labels": labels}
tokenized_train_ds = train_ds.map(tokenize_and_adjust_labels, remove_columns=train_ds.column_names)
from transformers import DataCollatorWithPadding
data_collator = DataCollatorWithPadding(tokenizer, padding=True)
n_labels = len(global_id2label)
n_labels = len(id2label)
def divide(a: int, b: int):
return a / b if b > 0 else 0
def compute_metrics(p):
predictions, true_labels = p
predicted_labels = np.where(predictions > 0, np.ones(predictions.shape), np.zeros(predictions.shape))
metrics = {}
cm = multilabel_confusion_matrix(true_labels.reshape(-1, n_labels), predicted_labels.reshape(-1, n_labels))
for label_idx, matrix in enumerate(cm):
if label_idx == 0:
continue # We don't care about the label "O"
tp, fp, fn = matrix[1, 1], matrix[0, 1], matrix[1, 0]
precision = divide(tp, tp + fp)
recall = divide(tp, tp + fn)
f1 = divide(2 * precision * recall, precision + recall)
metrics[f"recall_{id2label[label_idx]}"] = recall
metrics[f"precision_{id2label[label_idx]}"] = precision
metrics[f"f1_{id2label[label_idx]}"] = f1
f1_values = {k: v for k, v in metrics.items() if k.startswith('f1_')}
macro_f1 = sum(f1_values.values()) / len(f1_values)
metrics["macro_f1"] = macro_f1
return metrics
class RobertaForSpanCategorization(RobertaPreTrainedModel):
_keys_to_ignore_on_load_unexpected = [r"pooler"]
_keys_to_ignore_on_load_missing = [r"position_ids"]
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.roberta = RobertaModel(config, add_pooling_layer=False)
classifier_dropout = (
config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
)
self.dropout = nn.Dropout(classifier_dropout)
self.classifier = nn.Linear(config.hidden_size, config.num_labels)
# Initialize weights and apply final processing
self.post_init()
@add_start_docstrings_to_model_forward(ROBERTA_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
position_ids: Optional[torch.LongTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], TokenClassifierOutput]:
r"""
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`.
"""
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.roberta(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
sequence_output = outputs[0]
sequence_output = self.dropout(sequence_output)
logits = self.classifier(sequence_output)
loss = None
if labels is not None:
loss_fct = nn.BCEWithLogitsLoss()
loss = loss_fct(logits, labels.float())
if not return_dict:
output = (logits,) + outputs[2:]
return ((loss,) + output) if loss is not None else output
return TokenClassifierOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
class TrainingMetricsCallback(TrainerCallback):
def __init__(self):
self.macro_f1 = []
self.steps = []
self.counter = 0
def on_evaluate(self, args, state, control, metrics=None, **kwargs):
if metrics is not None:
if 'eval_macro_f1' in metrics:
self.macro_f1.append(metrics['eval_macro_f1'])
self.counter += 1
self.steps.append(self.counter)
training_args = TrainingArguments(
output_dir="./models/fine_tune_bert_output_span_cat",
......@@ -97,6 +223,9 @@ def training_process(training_data, fondation_model_id, finetuned_repo_name, hug
metrics_callback = TrainingMetricsCallback()
def model_init():
return RobertaForSpanCategorization.from_pretrained(fondation_model_id, id2label=id2label, label2id=label2id)
trainer = Trainer(
model_init=model_init,
args=training_args,
......@@ -112,140 +241,5 @@ def training_process(training_data, fondation_model_id, finetuned_repo_name, hug
tokenizer.push_to_hub(finetuned_repo_name, use_auth_token=huggingface_token)
def model_init():
return RobertaForSpanCategorization.from_pretrained(global_fondation_model_id, id2label=global_id2label, label2id=global_label2id)
def get_token_role_in_span(token_start: int, token_end: int, span_start: int, span_end: int):
if token_end <= token_start:
return "N"
if token_start < span_start or token_end > span_end:
return "O"
else:
return "I"
def tokenize_and_adjust_labels(sample):
tokenized = global_tokenizer(sample["text"],
return_offsets_mapping=True,
padding="max_length",
max_length=MAX_LENGTH,
truncation=True)
labels = [[0 for _ in global_label2id.keys()] for _ in range(MAX_LENGTH)]
for (token_start, token_end), token_labels in zip(tokenized["offset_mapping"], labels):
for span in sample["tags"]:
role = get_token_role_in_span(token_start, token_end, span["start"], span["end"])
if role == "I":
token_labels[global_label2id[f"{span['tag']}"]] = 1
return {**tokenized, "labels": labels}
def divide(a: int, b: int):
return a / b if b > 0 else 0
def compute_metrics(p):
predictions, true_labels = p
predicted_labels = np.where(predictions > 0, np.ones(predictions.shape), np.zeros(predictions.shape))
metrics = {}
cm = multilabel_confusion_matrix(true_labels.reshape(-1, global_n_labels), predicted_labels.reshape(-1, global_n_labels))
for label_idx, matrix in enumerate(cm):
if label_idx == 0:
continue # We don't care about the label "O"
tp, fp, fn = matrix[1, 1], matrix[0, 1], matrix[1, 0]
precision = divide(tp, tp + fp)
recall = divide(tp, tp + fn)
f1 = divide(2 * precision * recall, precision + recall)
metrics[f"recall_{global_id2label[label_idx]}"] = recall
metrics[f"precision_{global_id2label[label_idx]}"] = precision
metrics[f"f1_{global_id2label[label_idx]}"] = f1
f1_values = {k: v for k, v in metrics.items() if k.startswith('f1_')}
macro_f1 = sum(f1_values.values()) / len(f1_values)
metrics["macro_f1"] = macro_f1
return metrics
class RobertaForSpanCategorization(RobertaPreTrainedModel):
_keys_to_ignore_on_load_unexpected = [r"pooler"]
_keys_to_ignore_on_load_missing = [r"position_ids"]
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.roberta = RobertaModel(config, add_pooling_layer=False)
classifier_dropout = (
config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
)
self.dropout = nn.Dropout(classifier_dropout)
self.classifier = nn.Linear(config.hidden_size, config.num_labels)
self.post_init()
@add_start_docstrings_to_model_forward(ROBERTA_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
position_ids: Optional[torch.LongTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], TokenClassifierOutput]:
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.roberta(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
sequence_output = outputs[0]
sequence_output = self.dropout(sequence_output)
logits = self.classifier(sequence_output)
loss = None
if labels is not None:
loss_fct = nn.BCEWithLogitsLoss()
loss = loss_fct(logits, labels.float())
if not return_dict:
output = (logits,) + outputs[2:]
return ((loss,) + output) if loss is not None else output
return TokenClassifierOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
class TrainingMetricsCallback(TrainerCallback):
def __init__(self):
self.macro_f1 = []
self.steps = []
self.counter = 0
def on_evaluate(self, args, state, control, metrics=None, **kwargs):
if metrics is not None:
if 'eval_macro_f1' in metrics:
self.macro_f1.append(metrics['eval_macro_f1'])
self.counter += 1
self.steps.append(self.counter)
if __name__ == '__main__':
serve()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment