import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import PreTrainedTokenizerFast, GPT2LMHeadModel
import re
from tqdm import tqdm
U_TKN = '<usr>'
S_TKN = '<sys>'
BOS = '</s>'
EOS = '</s>'
MASK = '<unused0>'
SENT = '<unused1>'
PAD = '<pad>'
koGPT2_TOKENIZER = PreTrainedTokenizerFast.from_pretrained('skt/kogpt2-base-v2',
bos_token=BOS, eos_token=EOS, unk_token='<unk>',
pad_token=PAD, mask_token=MASK)
model = GPT2LMHeadModel.from_pretrained('skt/kogpt2-base-v2')
epoch = 2
Sneg = -1e18
learning_rate = 3e-5
criterion = torch.nn.CrossEntropyLoss(reduction='none')
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
df = pd.read_csv('../../ChatbotData.csv')
df.head()
class ChatbotDataset(Dataset):
def __init__(self, chats, max_len=64):
self._data = chats
self.max_len = max_len
self.q_token = U_TKN
self.a_token = S_TKN
self.sent_token = SENT
self.eos = EOS
self.pad = PAD
self.mask = MASK
self.tokenizer = koGPT2_TOKENIZER
def __len__(self):
return len(self._data)
def __getitem__(self, idx):
turn = self._data.iloc[idx]
q = turn['Q']
q = re.sub(r'([?.!,])', r' ', q)
a = turn['A']
a = re.sub(r'([?.!,])', r' ', a)
q_toked = self.tokenizer.tokenize(self.q_token + q + self.sent_token)
q_len = len(q_toked)
a_toked = self.tokenizer.tokenize(self.a_token + a + self.eos)
a_len = len(a_toked)
if q_len > self.max_len:
a_len = self.max_len - q_len
if a_len <= 0:
q_toked = q_toked[-(int(self.max_len / 2)) :]
q_len = len(q_toked)
a_len = self.max_len - q_len
a_toked = a_toked[:a_len]
a_len = len(a_toked)
if q_len + a_len > self.max_len:
a_len = self.max_len - q_len
if a_len <= 0:
q_toked = q_toked[-(int(self.max_len / 2)) :]
q_len = len(q_toked)
a_len = self.max_len - q_len
a_toked = a_toked[:a_len]
a_len = len(a_toked)
labels = [self.mask] * q_len + a_toked[1:]
mask = [0] * q_len + [1] * a_len + [0] * (self.max_len - q_len - a_len)
labels_ids = self.tokenizer.convert_tokens_to_ids(labels)
while len(labels_ids) < self.max_len:
labels_ids += [self.tokenizer.pad_token_id]
token_ids = self.tokenizer.convert_tokens_to_ids(q_toked + a_toked)
while len(token_ids) < self.max_len:
token_ids += [self.tokenizer.pad_token_id]
return (token_ids, np.array(mask), labels_ids)
def collate_batch(batch):
data = [item[0] for item in batch]
mask = [item[1] for item in batch]
label = [item[2] for item in batch]
return torch.LongTensor(data), torch.LongTensor(mask), torch.LongTensor(label)
df=df[['Q','A']]
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'GPU 사용 가능한가요 ? : {torch.cuda.is_available()}')
train_set = ChatbotDataset(df, max_len=64)
train_dataloader = DataLoader(train_set, batch_size=32, num_workers=0, shuffle=True, collate_fn=collate_batch)
model.to(device)
model.train()
print ('학습 시작')
for epoch in range(epoch):
for batch_idx, samples in enumerate(tqdm(train_dataloader)):
optimizer.zero_grad()
token_ids, mask, label = samples
out = model(token_ids)
out = out.logits
mask_3d = mask.unsqueeze(dim=2).repeat_interleave(repeats=out.shape[2], dim=2)
mask_out = torch.where(mask_3d == 1, out, Sneg * torch.ones_like(out))
loss = criterion(mask_out.transpose(2, 1), label)
avg_loss = loss.sum() / mask.sum()
avg_loss.backward()
optimizer.step()
print ('학습 종료')
with torch.no_grad():
print('챗봇 작동 중입니다. 종료를 원하면 \"quit\"을 입력해주세요')
print(' ')
while True :
q = input('나 > ').strip()
if q == 'quit':
break
a = ''
while True:
input_ids = torch.LongTensor(koGPT2_TOKENIZER.encode(U_TKN + q + SENT + S_TKN + a)).unsqueeze(dim=0)
pred = model(input_ids)
pred = pred.logits
gen = koGPT2_TOKENIZER.convert_ids_to_tokens(torch.argmax(pred, dim=-1).squeeze().numpy().tolist())[-1]
if gen == EOS:
break
a += gen.replace('▁', ' ')
print('Chatbot > {}'.format(a.strip()))