機械学習を触ってみる (1)

PyTorchで言語分類

機械学習に入門……というよりは基礎をすっ飛ばしていきなりライブラリーであるPyTorchを使って動かし方を表面的に試してみた。

題材は「プログラムのソースコードを読み込んで、何の言語か推測する」という物。Pirkaで使えないかな、と思ったわけだ。

自分の学習も目的の一つにしたいので、いきなり「正解」のモデルを見付けてそれを使う、というのではなく、自分である程度コードを書くようにした。

やり方を検索

というわけで、「機械学習 プログラミング言語 分類」とかそういう感じで検索してみたのだけど、これが意外と無い。Guesslangというパッケージは見付かったけど、それこそ「いきなり正解」なので今回はパス。

Hugging Faceという、機械学習、というかニューラルネットワーク用のモデル、データ、アプリケーションをホストして共有できるサービスがあるらしいことを知って、この中も検索してみたが、しかし意外と見付からない。(自然)言語処理一般の物はたくさんあるようだけど。

方針決定

そこで、検索して色々記事を読む中で見受けられた典型的な機械学習のやり方で、自然言語処理によるテキスト分類を選び、学習データとしてソースコードを使うことにした。 問題をこう定義してみると「Text classification with the torchtext library」というズバリなチュートリアル記事がPyTorchの公式にあったので、これに従うことにした。

チュートリアルではニュース記事を学習データにして、ニュースを分類するというタスクを学習させていたけど、このデータをRosetta Codeのデータセットにする。

色々読んでいると、RNNというニューラルネットワークでLSTMというのを使うのがいいのかな、という風に思ったのだけど上の記事ではそういうことはせずにシンプルに(?)入力と出力を繋げていた。(それともインプットがembeddingになっていたらRNNと呼ぶんだろうか?)

コード

code_classification_model.py

from torch import nn

class CodeClassificationModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class):
        super(CodeClassificationModel, self).__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=False)
        # TODO: hidden layer using LSTM
        self.fc = nn.Linear(embed_dim, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        # TODO: init LSTM
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)
        # TODO: LSTM
        return self.fc(embedded)

lang-classification.py

import sys
import time
import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data.dataset import random_split
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from datasets import load_dataset
from code_classification_model import CodeClassificationModel

device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.backends.cuda.is_available() else "cpu")
print(f'device: {device}', file=sys.stderr)

tokenizer = get_tokenizer(None)
ds = load_dataset("christopher/rosetta-code", split="train").with_format("torch", device=device)
labels = set([task["language_name"] for task in ds])
label_list = list(labels)
label_list.sort()
text_pipeline = lambda x: vocab(tokenizer(x))
label_pipeline = lambda x: label_list.index(x)

def collate_batch(batch):
    label_list, text_list, offsets = [], [], [0]
    for _task in batch:
        label_list.append(label_pipeline(_task["language_name"]))
        processed_text = torch.tensor(text_pipeline(_task["code"]), dtype=torch.int64)
        text_list.append(processed_text)
        offsets.append(processed_text.size(0))
    label_list = torch.tensor(label_list, dtype=torch.int64)
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
    text_list = torch.cat(text_list)
    return label_list.to(device), text_list.to(device), offsets.to(device)

def yield_tokens(data_iter):
    for task in data_iter:
        yield tokenizer(task["code"])

vocab = build_vocab_from_iterator(yield_tokens(ds), specials=["<unk>"])
vocab.set_default_index(vocab["<unk>"])
dataloader = DataLoader(
    ds, batch_size=8, shuffle=False, collate_fn=collate_batch
)

num_class = len(labels)
vocab_size = len(vocab)
emsize = 64
model = CodeClassificationModel(vocab_size, emsize, num_class).to(device)
print(model)
exit()

def train(dataloader):
    model.train()
    total_acc, total_count = 0, 0
    log_interval = 500
    start_time = time.time()

    for idx, (label, text, offsets) in enumerate(dataloader):
        optimizer.zero_grad()
        predicated_label = model(text, offsets)
        loss = criterion(predicated_label, label)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
        optimizer.step()
        total_acc += (predicated_label.argmax(1) == label).sum().item()
        total_count += label.size(0)
        if idx % log_interval == 0 and idx > 0:
            elapsed = time.time() - start_time
            print(
                "| epoch: {:3d} | {:5d}|{:5d} batches "
                "| accuracy {:8.3f}".format(
                    epoch, idx, len(dataloader), total_acc / total_count
                )
            )
            total_acc, total_count = 0, 0
            start_time = time.time()

def evaluate(dataloader):
    model.eval()
    total_acc, total_count = 0, 0

    with torch.no_grad():
        for idx, (label, text, offsets) in enumerate(dataloader):
            predicted_label = model(text, offsets)
            loss = criterion(predicted_label, label)
            total_acc += (predicted_label.argmax(1) == label).sum().item()
            total_count += label.size(0)
    return total_acc / total_count

EPOCHS = 10
LR = 5
BATCH_SIZE = 64

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)
total_accu = None
num_split = int(len(ds) * 0.8)
train_dataset, test_dataset = random_split(ds, [num_split, len(ds) - num_split])
num_train = int(len(train_dataset) * 0.95)
split_train_, split_valid_ = random_split(
    train_dataset, [num_train, len(train_dataset) - num_train]
)

train_dataloader = DataLoader(
    split_train_, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch
)
valid_dataloader = DataLoader(
    split_valid_, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch
)
test_dataloader = DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch
)

for epoch in range(1, EPOCHS + 1):
    epoch_start_time = time.time()
    train(train_dataloader)
    accu_val = evaluate(valid_dataloader)
    if total_accu is not None and total_accu > accu_val:
        scheduler.step()
    else:
        total_accu = accu_val
    print("-" * 59)
    print(
        "| end of epoch {:3d} | time: {:5.2f}s | "
        "valid accuracy {:8.3f} ".format(
            epoch, time.time() - epoch_start_time, accu_val
        )
    )
    print("-" * 59)

print("Cheking the results of test dataset.")
accu_test = evaluate(test_dataloader)
print("test accuracy {:8.3f}".format(accu_test))

model_path = "code-classification-model.pth"
torch.save(model.state_dict(), model_path)
print(f'Model saved to {model_path}', file=sys.stderr)

def predict(text, text_pipeline):
    with torch.no_grad():
        text = torch.tensor(text_pipeline(text))
        output = model(text, torch.tensor([0]))
        return output.argmax(1).item() + 1

source_code_str = "class SomeClass \
  def hello_world \
    puts 'Hello, world!' \
  end \
\
  private \
\
  def inner_method \
    do_something \
  end \
end \
"

model = model.to("cpu")

print("This is a %s language" % label_list[predict(source_code_str, text_pipeline)])

classify-lang.py

import sys
import torch
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from datasets import load_dataset
from code_classification_model import CodeClassificationModel

src = ""
with open(sys.argv[1], mode = "r") as f:
    src = f.read()

device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.backends.cuda.is_available() else "cpu")

tokenizer = get_tokenizer(None)
ds = load_dataset("christopher/rosetta-code", split="train").with_format("torch", device=device)
labels = set([task["language_name"] for task in ds])
label_list = list(labels)
label_list.sort()

def yield_tokens(data_iter):
    for task in data_iter:
        yield tokenizer(task["code"])

vocab = build_vocab_from_iterator(yield_tokens(ds), specials=["<unk>"])
vocab.set_default_index(vocab["<unk>"])
text_pipeline = lambda x: vocab(tokenizer(x))

text = torch.tensor(text_pipeline(src))

num_class = len(labels)
vocab_size = len(vocab)
emsize = 64
model = CodeClassificationModel(vocab_size, emsize, num_class).to(device)

model_path = "code-classification-model.pth"
model = CodeClassificationModel(vocab_size, emsize, num_class).to(device)
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()
with torch.no_grad():
    result = model(text, torch.tensor([0]))
    (values, indices) = torch.topk(torch.softmax(result, dim=1), 6, dim=1)
    for i in range(values.size(dim=1)):
        print(f'{label_list[indices[0][i]]}: {values[0][i]:.6f}', file=sys.stderr)

lang-classification.pyで学習させてclassify-lang.pyで分類を試してみると、間違いもするけどまあまあいい感じに分類できていると思う。

次は、コメントにも書いたけど、LSTMというのを挿入してみたいなと思う。また、ハイパーパラメーター(言ってみたかった)を調整して、精度が上がるのか下がるのか試してみたい。Optuna気になる。

そのうちTorch.rbを使ってRubyでも動かしたい。

気が付いたこと・気になる点

  • Pythonに慣れていなくて、コードの書き方の慣例に則っているのか不安
    • ファイル名とかディレクトリー構造とか
    • ファイルから内容を読み込む時の書き方もこれでいいのか不安。冗長じゃない?
  • 正直コードの意味を全部は理解していない
  • モデル定義よりも、データを読み込ませる準備のコードが多い
  • lang-classification.pyの最後でGPUからCPUにしているけど、何故? classify-lang.pyもCPUで動かした方がいいの?
  • classify-lang.pyで、分類に使う言語一覧を作るためにデータセットを読み込んでいるけど、正確にやるにはこれしか無いのかな? 遅くなっちゃう
    • 別途 christopher/rosetta-code を元に言語一覧だけ抜き出してファイルに書いちゃうのが速いんだとは思うが、更新漏れを心配してしまう
  • 使うデータってRosetta Codeでいいか知ら? もっと相応しい物がある?
  • get_tokenizer()None を与えて単純な空白区切りのトークナイザーを使ったけど、ソースコードの場合、どういうトークナイザーが適切だろう? 予め言語が分かっていればその言語のトークナイザーを使えばいいんだけど、今回の問題設定が、知らない状態で分類することだからな
  • TorchTextがdeprecationされているらしく、今後が不安
  • macOSのGPUであるMPSを使ったけど、一部非対応らしくて PYTORCH_ENABLE_MPS_FALLBACK=1 という環境変数を設定しないとエラーになってしまった。これって一部の処理をCPUでやるということだよね
  • チュートリアルではPyTorchのデータセットを iter() でくくってイテレーターを作っていたけど、Hugging Faceのdatasetsではそれが不要だった

環境

  • masOS 15.0.1
  • Apple M2
  • メモリー 24GiB
  • Python 3.12.7
  • torch 2.3.0
  • torchtext 0.18.0
  • datasets 3.0.1