Model training

Based on TensorBay Action, this example integrates four steps: data crawling, conversion, parsing, and analytics into a complete workflow, giving you a quick overview of the Graviti Data platform.

1. Prepare data

a. Enter the Open Datasets page. Search and fork MNIST to your own workspace.

2. Configurate the AccessKey

a. Enter the Developer Tools page,create and copy an AccessKey.

b. Enter theMNISTdataset you have forked.

c. Click Action Configuration and create Secret on the Settings page.

d. Name the secret you have created as accesskey, and paste the secret value that was copied in step a.

3. Create a training evaluation workflow

a. Click Create Workflow on the Action page.

b. Use the yaml file to create a workflow.

//tasks:
  #  Create a dataset to save the training Model.
  createModelDataset:
    script:
      # The docker image on which this task depends is as below (Images from public and private repositories are both available)
      image: hub.graviti.cn/algorithm/mnist:1.0

      # Use python3 to execute the script
      command: [python3]

      source: |
        import logging
        import os
        from tensorbay import GAS
        logging.basicConfig(level=logging.INFO)
        dataset_name = "MNIST_MODEL"
        ACCESS_KEY = os.environ.get("secret.accesskey")
        gas = GAS(ACCESS_KEY)
        try:
            gas.create_dataset(dataset_name)
            logging.info(f"Created dataset {dataset_name} Successfully")
        except:
            logging.info(f"{dataset_name} aleady exists.")
  #  Train a mnist Model, and use the output model file for prediction
  training:
    dependencies:
      - createModelDataset
    script:
      image: hub.graviti.cn/algorithm/mnist:1.0
      command: [python3]
      source: |
        import logging
        import os

        import torch
        from PIL import Image
        from tensorbay import GAS
        from tensorbay.dataset import Dataset as TensorBayDataset
        from tensorbay.dataset.data import Data
        from torch import nn
        from torch.utils.data import DataLoader, Dataset
        from torchvision import transforms
        from tensorbay.client import config
        
        config.timeout = 150
        config.max_retries = 4

        logging.basicConfig(level=logging.INFO)


        # Building a Network Architecture.
        class NeuralNetwork(nn.Module):
            def __init__(self):
                super(NeuralNetwork, self).__init__()
                self.flatten = nn.Flatten()
                self.linear_relu_stack = nn.Sequential(
                    nn.Linear(28 * 28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10)
                )

            def forward(self, x):
                x = self.flatten(x)
                logits = self.linear_relu_stack(x)
                return logits


        # Read Datasets from Graviti platform.
        class MNISTSegment(Dataset):
            """class for wrapping a MNIST segment."""

            def __init__(self, dataset, segment_name, transform):
                super().__init__()
                self.dataset = dataset
                self.segment = self.dataset[segment_name]
                self.category_to_index = self.dataset.catalog.classification.get_category_to_index()
                self.transform = transform

            def __len__(self):
                return len(self.segment)

            def __getitem__(self, idx):
                data = self.segment[idx]
                with data.open() as fp:
                    image_tensor = self.transform(Image.open(fp))

                return image_tensor, self.category_to_index[data.label.classification.category]


        def train(dataloader, model, loss_fn, optimizer):
            size = len(dataloader.dataset)
            model.train()
            for batch, (X, y) in enumerate(dataloader):
                X, y = X.to(device), y.to(device)

                # Compute prediction error
                pred = model(X)
                loss = loss_fn(pred, y)

                # Backpropagation
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                if batch % 100 == 0:
                    loss, current = loss.item(), batch * len(X)
                    logging.info(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


        def test(dataloader, model, loss_fn):
            size = len(dataloader.dataset)
            num_batches = len(dataloader)
            model.eval()
            test_loss, correct = 0, 0
            with torch.no_grad():
                for X, y in dataloader:
                    X, y = X.to(device), y.to(device)
                    pred = model(X)
                    test_loss += loss_fn(pred, y).item()
                    correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            test_loss /= num_batches
            correct /= size
            logging.info(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

        if __name__ == "__main__":
            BTACH_SIZE = 64
            EPOCHS = 3
            ACCESS_KEY = os.environ.get("secret.accesskey")
            gas = GAS(ACCESS_KEY)
            mnist_dataset = TensorBayDataset("MNIST", gas)
            mnist_dataset.enable_cache()
            to_tensor = transforms.ToTensor()
            normalization = transforms.Normalize(mean=[0.485], std=[0.229])
            my_transforms = transforms.Compose([to_tensor, normalization])

            train_segment = MNISTSegment(mnist_dataset, segment_name="train", transform=my_transforms)
            test_segment = MNISTSegment(mnist_dataset, segment_name="test", transform=my_transforms)
            train_dataloader = DataLoader(train_segment, batch_size=BTACH_SIZE, num_workers=10)
            test_dataloader = DataLoader(test_segment, batch_size=BTACH_SIZE, num_workers=10)

            device = "cuda" if torch.cuda.is_available() else "cpu"
            logging.info(f"Using {device} device")

            model = NeuralNetwork().to(device)
            logging.info(model)
            loss_fn = nn.CrossEntropyLoss()
            optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

            for epoch in range(EPOCHS):
                logging.info(f"Epoch {epoch+1}\n-------------------------------")
                train(train_dataloader, model, loss_fn, optimizer)
                test(test_dataloader, model, loss_fn)
            logging.info("Done!")

            torch.save(model.state_dict(), "model.pth")
            logging.info("Saved PyTorch Model State to model.pth")

            # Upload model file.
            model_dataset = TensorBayDataset("MNIST_MODEL")
            segment = model_dataset.create_segment("model")
            segment.append(Data("./model.pth"))
            dataset_client = gas.upload_dataset(model_dataset)
            dataset_client.commit("upload mnist model file")
            logging.info("Uploaded model!")
  evaluate:
    dependencies:
      - training
    script:
      image: hub.graviti.cn/algorithm/mnist:1.0
      command: [python3]
      source: |
        import logging
        import os
        from concurrent.futures import ThreadPoolExecutor

        import torch
        from PIL import Image
        from tensorbay import GAS
        from tensorbay.dataset import Dataset as TensorBayDataset
        from tensorbay.dataset.data import Data
        from tensorbay.label import Classification
        from torch import nn
        from torchvision import transforms
        from tensorbay.client import config
        
        config.timeout = 150
        config.max_retries = 4



        logging.basicConfig(level=logging.INFO)


        # Building a network structure
        class NeuralNetwork(nn.Module):
            def __init__(self):
                super(NeuralNetwork, self).__init__()
                self.flatten = nn.Flatten()
                self.linear_relu_stack = nn.Sequential(
                    nn.Linear(28 * 28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10)
                )

            def forward(self, x):
                x = self.flatten(x)
                logits = self.linear_relu_stack(x)
                return logits



        def upload_label(model, segment_client, data):
            with data.open() as fp:
                image_tensor = my_transforms(Image.open(fp))
            pred = model(image_tensor)
            pred_data = Data(data.path)
            pred_data.label.classification = Classification(str(int(pred[0].argmax(0))))
            segment_client.upload_label(pred_data)


        if __name__ == "__main__":
            BTACH_SIZE = 64
            EPOCHS = 3
            ACCESS_KEY = os.environ.get("secret.accesskey")
            gas = GAS(ACCESS_KEY)
            to_tensor = transforms.ToTensor()
            normalization = transforms.Normalize(mean=[0.485], std=[0.229])
            my_transforms = transforms.Compose([to_tensor, normalization])
            model_dataset = TensorBayDataset("MNIST_MODEL", gas)
            data = model_dataset[0][0]
            with open(f"./model.pth", "wb") as fp:  # Path where data is stored locally
                fp.write(data.open().read())
            model = NeuralNetwork()
            model.load_state_dict(torch.load("model.pth", map_location=torch.device("cpu")))
            logging.info(model)

            mnist_dataset = TensorBayDataset("MNIST", gas)
            mnist_dataset.enable_cache()
            mnist_dataset_client = gas.get_dataset("MNIST")
            mnist_dataset_client.create_branch("training")
            mnist_dataset_client.create_draft("update label")

            for segment in mnist_dataset:
                segment_client = mnist_dataset_client.get_segment(segment.name)
                with ThreadPoolExecutor(10) as executor:
                    for data in segment:
                        executor.submit(upload_label, model, segment_client, data)
            mnist_dataset_client.commit("update label")

Create a dataset to save the training Model.

createModelDataset: script: # The docker image on which this task depends is as below (Images from public and private repositories are both available) image: hub.graviti.cn/algorithm/mnist:1.0

  # Use python3 to execute the script
  command: [python3]

  source: |
    import logging
    import os
    from tensorbay import GAS
    logging.basicConfig(level=logging.INFO)
    dataset_name = "MNIST_MODEL"
    ACCESS_KEY = os.environ.get("secret.accesskey")
    gas = GAS(ACCESS_KEY)
    try:
        gas.create_dataset(dataset_name)
        logging.info(f"Created dataset {dataset_name} Successfully")
    except:
        logging.info(f"{dataset_name} aleady exists.")

Train a mnist Model, and use the output model file for prediction

training: dependencies: - createModelDataset script: image: hub.graviti.cn/algorithm/mnist:1.0 command: [python3] source: | import logging import os

    import torch
    from PIL import Image
    from tensorbay import GAS
    from tensorbay.dataset import Dataset as TensorBayDataset
    from tensorbay.dataset.data import Data
    from torch import nn
    from torch.utils.data import DataLoader, Dataset
    from torchvision import transforms
    from tensorbay.client import config
    
    config.timeout = 150
    config.max_retries = 4

    logging.basicConfig(level=logging.INFO)


    # Building a Network Architecture.
    class NeuralNetwork(nn.Module):
        def __init__(self):
            super(NeuralNetwork, self).__init__()
            self.flatten = nn.Flatten()
            self.linear_relu_stack = nn.Sequential(
                nn.Linear(28 * 28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10)
            )

        def forward(self, x):
            x = self.flatten(x)
            logits = self.linear_relu_stack(x)
            return logits


    # Read Datasets from Graviti platform.
    class MNISTSegment(Dataset):
        """class for wrapping a MNIST segment."""

        def __init__(self, dataset, segment_name, transform):
            super().__init__()
            self.dataset = dataset
            self.segment = self.dataset[segment_name]
            self.category_to_index = self.dataset.catalog.classification.get_category_to_index()
            self.transform = transform

        def __len__(self):
            return len(self.segment)

        def __getitem__(self, idx):
            data = self.segment[idx]
            with data.open() as fp:
                image_tensor = self.transform(Image.open(fp))

            return image_tensor, self.category_to_index[data.label.classification.category]


    def train(dataloader, model, loss_fn, optimizer):
        size = len(dataloader.dataset)
        model.train()
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)

            # Compute prediction error
            pred = model(X)
            loss = loss_fn(pred, y)

            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                loss, current = loss.item(), batch * len(X)
                logging.info(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


    def test(dataloader, model, loss_fn):
        size = len(dataloader.dataset)
        num_batches = len(dataloader)
        model.eval()
        test_loss, correct = 0, 0
        with torch.no_grad():
            for X, y in dataloader:
                X, y = X.to(device), y.to(device)
                pred = model(X)
                test_loss += loss_fn(pred, y).item()
                correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        test_loss /= num_batches
        correct /= size
        logging.info(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

    if __name__ == "__main__":
        BTACH_SIZE = 64
        EPOCHS = 3
        ACCESS_KEY = os.environ.get("secret.accesskey")
        gas = GAS(ACCESS_KEY)
        mnist_dataset = TensorBayDataset("MNIST", gas)
        mnist_dataset.enable_cache()
        to_tensor = transforms.ToTensor()
        normalization = transforms.Normalize(mean=[0.485], std=[0.229])
        my_transforms = transforms.Compose([to_tensor, normalization])

        train_segment = MNISTSegment(mnist_dataset, segment_name="train", transform=my_transforms)
        test_segment = MNISTSegment(mnist_dataset, segment_name="test", transform=my_transforms)
        train_dataloader = DataLoader(train_segment, batch_size=BTACH_SIZE, num_workers=10)
        test_dataloader = DataLoader(test_segment, batch_size=BTACH_SIZE, num_workers=10)

        device = "cuda" if torch.cuda.is_available() else "cpu"
        logging.info(f"Using {device} device")

        model = NeuralNetwork().to(device)
        logging.info(model)
        loss_fn = nn.CrossEntropyLoss()
        optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

        for epoch in range(EPOCHS):
            logging.info(f"Epoch {epoch+1}\n-------------------------------")
            train(train_dataloader, model, loss_fn, optimizer)
            test(test_dataloader, model, loss_fn)
        logging.info("Done!")

        torch.save(model.state_dict(), "model.pth")
        logging.info("Saved PyTorch Model State to model.pth")

        # Upload model file.
        model_dataset = TensorBayDataset("MNIST_MODEL")
        segment = model_dataset.create_segment("model")
        segment.append(Data("./model.pth"))
        dataset_client = gas.upload_dataset(model_dataset)
        dataset_client.commit("upload mnist model file")
        logging.info("Uploaded model!")

evaluate: dependencies: - training script: image: hub.graviti.cn/algorithm/mnist:1.0 command: [python3] source: | import logging import os from concurrent.futures import ThreadPoolExecutor

    import torch
    from PIL import Image
    from tensorbay import GAS
    from tensorbay.dataset import Dataset as TensorBayDataset
    from tensorbay.dataset.data import Data
    from tensorbay.label import Classification
    from torch import nn
    from torchvision import transforms
    from tensorbay.client import config
    
    config.timeout = 150
    config.max_retries = 4



    logging.basicConfig(level=logging.INFO)


    # Building a network structure
    class NeuralNetwork(nn.Module):
        def __init__(self):
            super(NeuralNetwork, self).__init__()
            self.flatten = nn.Flatten()
            self.linear_relu_stack = nn.Sequential(
                nn.Linear(28 * 28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10)
            )

        def forward(self, x):
            x = self.flatten(x)
            logits = self.linear_relu_stack(x)
            return logits



    def upload_label(model, segment_client, data):
        with data.open() as fp:
            image_tensor = my_transforms(Image.open(fp))
        pred = model(image_tensor)
        pred_data = Data(data.path)
        pred_data.label.classification = Classification(str(int(pred[0].argmax(0))))
        segment_client.upload_label(pred_data)


    if __name__ == "__main__":
        BTACH_SIZE = 64
        EPOCHS = 3
        ACCESS_KEY = os.environ.get("secret.accesskey")
        gas = GAS(ACCESS_KEY)
        to_tensor = transforms.ToTensor()
        normalization = transforms.Normalize(mean=[0.485], std=[0.229])
        my_transforms = transforms.Compose([to_tensor, normalization])
        model_dataset = TensorBayDataset("MNIST_MODEL", gas)
        data = model_dataset[0][0]
        with open(f"./model.pth", "wb") as fp:  # Path where data is stored locally
            fp.write(data.open().read())
        model = NeuralNetwork()
        model.load_state_dict(torch.load("model.pth", map_location=torch.device("cpu")))
        logging.info(model)

        mnist_dataset = TensorBayDataset("MNIST", gas)
        mnist_dataset.enable_cache()
        mnist_dataset_client = gas.get_dataset("MNIST")
        mnist_dataset_client.create_branch("training")
        mnist_dataset_client.create_draft("update label")

        for segment in mnist_dataset:
            segment_client = mnist_dataset_client.get_segment(segment.name)
            with ThreadPoolExecutor(10) as executor:
                for data in segment:
                    executor.submit(upload_label, model, segment_client, data)
        mnist_dataset_client.commit("update label")

4. Start Training

a. Choose the target workflow and click Run on the Action page.

5. View the result

a. View the details of the running on the Workflow Detail page.

b. View the Model Prediction result, diff visualizations and distribution changes in the training Branch.

c. Check the training Model that has been saved

Last updated