Skip to content
Snippets Groups Projects
Commit 210c70ae authored by Julien Breton's avatar Julien Breton
Browse files

camembert classifier

parent 3260d268
No related branches found
No related tags found
No related merge requests found
Source diff could not be displayed: it is too large. Options to address this: view the blob.
%% Cell type:markdown id:8ced2e3ca31fb46c tags:
# Dataset
%% Cell type:code id:757a8bf026156e77 tags:
``` python
tag2id = {'action': 1, 'actor': 2, 'artifact': 3, 'condition': 4, 'location': 5, 'modality': 6, 'reference': 7, 'time': 8}
id2tag = {v:k for k, v in tag2id.items()}
```
%% Cell type:code id:be3a4c320f9d4a5 tags:
``` python
label2id = {
'O': 0,
**{f'B-{k}': 2*v - 1 for k, v in tag2id.items()},
**{f'I-{k}': 2*v for k, v in tag2id.items()}
}
id2label = {v:k for k, v in label2id.items()}
```
%% Cell type:code id:2aa2fefac95e7f04 tags:
``` python
from datasets import Dataset
train_ds = Dataset.from_json("data/annotations.train.jsonlines")
val_ds = Dataset.from_json("data/annotations.eval.jsonlines")
```
%% Cell type:code id:9e0a21356e7701a1 tags:
``` python
modelId = '../../../models/CamemBERT-large'
```
%% Cell type:markdown id:66e00d5a79a66753 tags:
# Tokenization
%% Cell type:code id:e6459259f5ab2d98 tags:
``` python
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(modelId)
```
%% Cell type:code id:8c96680645f077fb tags:
``` python
def get_token_role_in_span(token_start: int, token_end: int, span_start: int, span_end: int):
"""
Check if the token is inside a span.
Args:
- token_start, token_end: Start and end offset of the token
- span_start, span_end: Start and end of the span
Returns:
- "B" if beginning
- "I" if inner
- "O" if outer
- "N" if not valid token (like <SEP>, <CLS>, <UNK>)
"""
if token_end <= token_start:
return "N"
if token_start < span_start or token_end > span_end:
return "O"
if token_start > span_start:
return "I"
else:
return "B"
MAX_LENGTH = 256
def tokenize_and_adjust_labels(sample):
"""
Args:
- sample (dict): {"id": "...", "text": "...", "tags": [{"start": ..., "end": ..., "tag": ...}, ...]
Returns:
- The tokenized version of `sample` and the labels of each token.
"""
# Tokenize the text, keep the start and end positions of tokens with `return_offsets_mapping` option
# Use max_length and truncation to ajust the text length
tokenized = tokenizer(sample["text"],
return_offsets_mapping=True,
padding="max_length",
max_length=MAX_LENGTH,
truncation=True)
# We are doing a multilabel classification task at each token, we create a list of size len(label2id)=13
# for the 13 labels
labels = [[0 for _ in label2id.keys()] for _ in range(MAX_LENGTH)]
# Scan all the tokens and spans, assign 1 to the corresponding label if the token lies at the beginning
# or inside the spans
for (token_start, token_end), token_labels in zip(tokenized["offset_mapping"], labels):
for span in sample["tags"]:
role = get_token_role_in_span(token_start, token_end, span["start"], span["end"])
if role == "B":
token_labels[label2id[f"B-{span['tag']}"]] = 1
elif role == "I":
token_labels[label2id[f"I-{span['tag']}"]] = 1
return {**tokenized, "labels": labels}
```
%% Cell type:code id:53310845f13e9d70 tags:
``` python
tokenized_train_ds = train_ds.map(tokenize_and_adjust_labels, remove_columns=train_ds.column_names)
tokenized_val_ds = val_ds.map(tokenize_and_adjust_labels, remove_columns=val_ds.column_names)
```
%% Cell type:code id:6990d89800dbb440 tags:
``` python
from transformers import DataCollatorWithPadding
data_collator = DataCollatorWithPadding(tokenizer, padding=True)
```
%% Cell type:markdown id:668dcf9750404d1c tags:
# Adapt the model
%% Cell type:code id:7bd0cddab7ddb448 tags:
``` python
import numpy as np
from sklearn.metrics import multilabel_confusion_matrix
n_labels = len(id2label)
def divide(a: int, b: int):
return a / b if b > 0 else 0
def compute_metrics(p):
"""
Customize the `compute_metrics` of `transformers`
Args:
- p (tuple): 2 numpy arrays: predictions and true_labels
Returns:
- metrics (dict): f1 score on
"""
# (1)
predictions, true_labels = p
# (2)
predicted_labels = np.where(predictions > 0, np.ones(predictions.shape), np.zeros(predictions.shape))
metrics = {}
# (3)
cm = multilabel_confusion_matrix(true_labels.reshape(-1, n_labels), predicted_labels.reshape(-1, n_labels))
# (4)
for label_idx, matrix in enumerate(cm):
if label_idx == 0:
continue # We don't care about the label "O"
tp, fp, fn = matrix[1, 1], matrix[0, 1], matrix[1, 0]
precision = divide(tp, tp + fp)
recall = divide(tp, tp + fn)
f1 = divide(2 * precision * recall, precision + recall)
metrics[f"f1_{id2label[label_idx]}"] = f1
# (5)
macro_f1 = sum(list(metrics.values())) / (n_labels - 1)
metrics["macro_f1"] = macro_f1
return metrics
```
%% Cell type:code id:ea5d16f59728e2b9 tags:
``` python
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer
from transformers import RobertaPreTrainedModel, RobertaModel
from transformers.utils import (
add_code_sample_docstrings,
add_start_docstrings,
add_start_docstrings_to_model_forward,
logging,
replace_return_docstrings,
)
from transformers.models.roberta.modeling_roberta import (
ROBERTA_INPUTS_DOCSTRING,
ROBERTA_START_DOCSTRING,
RobertaEmbeddings,
)
from typing import Optional, Union, Tuple
from transformers.modeling_outputs import TokenClassifierOutput
import torch
from torch import nn
class RobertaForSpanCategorization(RobertaPreTrainedModel):
_keys_to_ignore_on_load_unexpected = [r"pooler"]
_keys_to_ignore_on_load_missing = [r"position_ids"]
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.roberta = RobertaModel(config, add_pooling_layer=False)
classifier_dropout = (
config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
)
self.dropout = nn.Dropout(classifier_dropout)
self.classifier = nn.Linear(config.hidden_size, config.num_labels)
# Initialize weights and apply final processing
self.post_init()
@add_start_docstrings_to_model_forward(ROBERTA_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
position_ids: Optional[torch.LongTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], TokenClassifierOutput]:
r"""
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`.
"""
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.roberta(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
sequence_output = outputs[0]
sequence_output = self.dropout(sequence_output)
logits = self.classifier(sequence_output)
loss = None
if labels is not None:
loss_fct = nn.BCEWithLogitsLoss()
loss = loss_fct(logits, labels.float())
if not return_dict:
output = (logits,) + outputs[2:]
return ((loss,) + output) if loss is not None else output
return TokenClassifierOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
```
%% Cell type:markdown id:77f4fc68394aa754 tags:
# Fine-tuning
%% Cell type:code id:79161ed938cad895 tags:
``` python
training_args = TrainingArguments(
output_dir="./models/fine_tune_bert_output_span_cat",
evaluation_strategy="epoch",
learning_rate=2.5e-4,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
num_train_epochs=100,
weight_decay=0.01,
logging_steps = 100,
save_strategy='epoch',
save_total_limit=2,
load_best_model_at_end=True,
metric_for_best_model='macro_f1',
log_level='critical',
seed=12345
)
```
%% Cell type:code id:931792b554582a9f tags:
``` python
def model_init():
# For reproducibility
return RobertaForSpanCategorization.from_pretrained(modelId, id2label=id2label, label2id=label2id)
trainer = Trainer(
model_init=model_init,
args=training_args,
train_dataset=tokenized_train_ds,
eval_dataset=tokenized_val_ds,
data_collator=data_collator,
tokenizer=tokenizer,
compute_metrics=compute_metrics
)
trainer.train()
```
%% Cell type:code id:89c997e6a944bc70 tags:
``` python
trainer.model.save_pretrained("../../../models/Fine-tuned_CamemBERT-large")
```
import json
import jsonlines
import pandas as pd
def find_all_occurrences(text, phrase):
start = 0
while True:
start = text.find(phrase, start)
if start == -1: return
yield start
start += len(phrase) # déplacez start après cette occurrence pour trouver la suivante
with open('config.json', 'r', encoding='utf-8') as f:
data = json.load(f)
global_output = []
for index, (sentence, annot) in enumerate(data.items()):
sentence_output = {
'id': f"train_{index}",
'text': sentence,
'tags': []
}
for tag, spans in annot.items():
for span in spans:
occurrences = find_all_occurrences(sentence, span)
for (start_index, end_index) in occurrences:
sentence_output['tags'].append({
"end": end_index,
"start": start_index,
"tag": tag
})
with jsonlines.open('../../data/annotations.eval.jsonlines', mode='w') as writer:
for item in global_output:
writer.write(item)
import json
import pandas as pd
import importlib
spec = importlib.util.spec_from_file_location("utils", "../llm/utils.py")
utils = importlib.util.module_from_spec(spec)
spec.loader.exec_module(utils)
data = pd.read_csv('../../data/dataQS.csv', delimiter=',', low_memory=False, encoding='utf-8')
output = {}
# Converting the original CSV and grouping by sentence
for index, row in data.iterrows():
if type(row['content']) == float:
print(f"The row {index} has a float value")
continue
if row['sentence'] not in output.keys():
output[row['sentence']] = {}
if row['concept'] not in output[row['sentence']]:
output[row['sentence']][row['concept']] = []
output[row['sentence']][row['concept']].append(row['content'])
if row['content'] not in row['sentence']:
print(f"The row {index + 2} is not a part of the sentence : {row['content']}")
# Create the fine-tuned dataset
filter = ["action", "actor", "artifact", "condition", "location", "modality", "reference", "time"]
input_for_finetuned = []
output_for_finetuned = []
for sentence in output:
temp_output = {}
for concept in output[sentence]:
if concept in filter:
temp_output[concept] = output[sentence][concept]
input_for_finetuned.append(f'<s> [INST] {utils.get_pre_prompt_zero_shot()} [/INST]\nUser: {sentence}\nAssistant: ')
output_for_finetuned.append(f'{json.dumps(temp_output, ensure_ascii=False)} </s>')
finetuned_dataset = {'input': input_for_finetuned, 'output': output_for_finetuned}
df = pd.DataFrame(finetuned_dataset)
df.to_csv('../../data/finetuned_dataset.csv', index=False)
\ No newline at end of file
......@@ -21,8 +21,8 @@ class MarkerEnhancerLegalRoberta:
self.sentenceStorage[element].add(sentence['sentence'])
def exec(self):
tokenizer = AutoTokenizer.from_pretrained("joelito/legal-french-roberta-large")
model = AutoModelForMaskedLM.from_pretrained("joelito/legal-french-roberta-large")
tokenizer = AutoTokenizer.from_pretrained("joelito/legal-french-camembert-large")
model = AutoModelForMaskedLM.from_pretrained("joelito/legal-french-camembert-large")
for marker, sentences in self.sentenceStorage.items():
for sentence in sentences:
......
%% Cell type:code id:initial_id tags:
``` python
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment