Skip to main content

Jupyter for Azure机器学习五天入门 - Day5 了解什么是Azure Machine Learning Pipeline

分类:  Azure机器学习 标签:  #Azure #人工智能 #机器学习 #AutoML 发布于: 2023-06-11 21:56:49

我们前一天简要的介绍了什么是AutoML,同时也是用jupyter体验了一下AutoML, 本章我们学习什么是Azure Machine Learning Pipeline

Azure有提供很多种pipeline, 一共有Azure pipelineAzure Machine Learning Pipeline以及Azure Data Factory Pipeline, 这三种pipeline各有自己的标准目标,例如Azure Machine Learning是目标是从数据到模型,Azure Data factory pipeline目标是数据到数据等等。那么Azure Machine Learning pipeline是指专门针对于Azure机器学习的一个工作流,工作流包括很多个子任务,每个子任务都封装了为了完整机器学习的各种要素:

  • 数据处理,例如下载,载入,转为等处理数据的基本流程
  • 训练相关的配置,例如参数设定,日志等等。
  • 训练模型或者模型评估。
  • 部署模型,包括部署,评估,缩放等等。

我们今天用一个简单的例子来说明Azure Machine Learning pipeline如何处理数据,如何组测模型,最后如何部署模型,然后发布pipeline, 然后通过rest api重新触发这些pipeline, 达到重新训练模型的目的。

本章的代码实例可以从如下的位置找到:https://github.com/hylinux/azure-demo/blob/main/azure-machine-learning/jupyter-5-days/Day-5.ipynb

环境的准备

还是和之前一样,我们需要准备开发环境,如果你目前还没有准备一个适合开发的环境,请仔细参考之前的文章,然后请激活一个conda环境,然后通过命令

jupyter notebook

启用jupyter notebook

链接workspace

通过如下的代码链接workspace:

from azureml.core import Workspace
ws = Workspace.from_config()

创建DataStore

注意我们这里的例子是使用Azure global的开放的Azure Storage account, 因此如果你有Azure global的订阅还是建议使用Azure global的订阅比较好。

from azureml.core.datastore import Datastore

batchscore_blob = Datastore.register_azure_blob_container(ws, 
                      datastore_name="images_datastore", 
                      container_name="sampledata", 
                      account_name="pipelinedata", 
                      overwrite=True)

def_data_store = ws.get_default_datastore()

创建DataSet对象引用数据

有了datastore之后,如果我们想要引用数据,我们需要使用DataSet

from azureml.core.dataset import Dataset
from azureml.data import OutputFileDatasetConfig

input_images = Dataset.File.from_files((batchscore_blob, "batchscoring/images/"))
label_ds = Dataset.File.from_files((batchscore_blob, "batchscoring/labels/"))
output_dir = OutputFileDatasetConfig(name="scores")

创建该dataset之后,将该dataset注册到workspace以后还是继续使用该dataset

input_images = input_images.register(workspace = ws, name = "input_images")
label_ds = label_ds.register(workspace = ws, name = "label_ds")

注册模型到workspace

我们这个例子中使用已经训练好的模型,因此我们无需再进行训练,因此我们只需要下载该模型,然后注册到workspace中。

import os
import tarfile
import urllib.request

if not os.path.isdir("models"):
    os.mkdir("models")
    
response = urllib.request.urlretrieve("http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz", "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall("models")

注册到workspace

from azureml.core.model import Model
 
model = Model.register(model_path="models/inception_v3.ckpt",
                       model_name="inception",
                       tags={"pretrained": "inception"},
                       description="Imagenet trained tensorflow inception",
                       workspace=ws)

创建或者附着compute instance

Azure machine learning pipeline不能在本地运行的,必须要运行在云资源或者是远程的计算资源上,因此必须有一个compute instance

from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.exceptions import ComputeTargetException
compute_name = "cpu-cluster"

# checks to see if compute target already exists in workspace, else create it
try:
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException:
    config = AmlCompute.provisioning_configuration(vm_size="Standard_D2_v2",
                                                   vm_priority="lowpriority", 
                                                   min_nodes=0, 
                                                   max_nodes=1)

    compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

编写Scoring脚本

为了创建一个评分,需要创建一个脚本,Scoring脚本接受输入的图片,然后应用分类模型,最后输出预测结果到文件。

batch_scoring.py需要接受如下的两个参数:

  • --model_name, 需要使用的模型名称
  • --labels_dir, label.txt的目录。

pipeline使用类ArgumentParse类来解析参数传递到PipelineStep中去。

%%writefile batch_scoring.py

import os
import argparse
import datetime
import time
import tensorflow as tf
from math import ceil
import numpy as np
import shutil
from tensorflow.contrib.slim.python.slim.nets import inception_v3

from azureml.core import Run
from azureml.core.model import Model
from azureml.core.dataset import Dataset

slim = tf.contrib.slim

image_size = 299
num_channel = 3


def get_class_label_dict(labels_dir):
    label = []
    labels_path = os.path.join(labels_dir, 'labels.txt')
    proto_as_ascii_lines = tf.gfile.GFile(labels_path).readlines()
    for l in proto_as_ascii_lines:
        label.append(l.rstrip())
    return label


def init():
    global g_tf_sess, probabilities, label_dict, input_images

    parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    parser.add_argument('--labels_dir', dest="labels_dir", required=True)
    args, _ = parser.parse_known_args()

    label_dict = get_class_label_dict(args.labels_dir)
    classes_num = len(label_dict)

    with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
        input_images = tf.placeholder(tf.float32, [1, image_size, image_size, num_channel])
        logits, _ = inception_v3.inception_v3(input_images,
                                              num_classes=classes_num,
                                              is_training=False)
        probabilities = tf.argmax(logits, 1)

    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    g_tf_sess = tf.Session(config=config)
    g_tf_sess.run(tf.global_variables_initializer())
    g_tf_sess.run(tf.local_variables_initializer())

    model_path = Model.get_model_path(args.model_name)
    saver = tf.train.Saver()
    saver.restore(g_tf_sess, model_path)


def file_to_tensor(file_path):
    image_string = tf.read_file(file_path)
    image = tf.image.decode_image(image_string, channels=3)

    image.set_shape([None, None, None])
    image = tf.image.resize_images(image, [image_size, image_size])
    image = tf.divide(tf.subtract(image, [0]), [255])
    image.set_shape([image_size, image_size, num_channel])
    return image


def run(mini_batch):
    result_list = []
    for file_path in mini_batch:
        test_image = file_to_tensor(file_path)
        out = g_tf_sess.run(test_image)
        result = g_tf_sess.run(probabilities, feed_dict={input_images: [out]})
        result_list.append(os.path.basename(file_path) + ": " + label_dict[result[0]])
    return result_list

创建pipeline

在开始创建pipeline之前先设置环境:

from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_GPU_IMAGE

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.15.2",
                                            "azureml-core", "azureml-dataprep[fuse]"])
env = Environment(name="parallelenv")
env.python.conda_dependencies = cd
env.docker.base_image = DEFAULT_GPU_IMAGE

创建p配置

from azureml.pipeline.steps import ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    environment=env,
    entry_script="batch_scoring.py",
    source_directory=".",
    output_action="append_row",
    mini_batch_size="20",
    error_threshold=1,
    compute_target=compute_target,
    process_count_per_node=2,
    node_count=1
)

创建pipestep

每个pipeline有一个或者多个pipelinestep组成,本例中只有一个pipelinestep

from azureml.pipeline.steps import ParallelRunStep
from datetime import datetime

parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

label_config = label_ds.as_named_input("labels_input")

batch_score_step = ParallelRunStep(
    name=parallel_step_name,
    inputs=[input_images.as_named_input("input_images")],
    output=output_dir,
    arguments=["--model_name", "inception",
               "--labels_dir", label_config],
    side_inputs=[label_config],
    parallel_run_config=parallel_run_config,
    allow_reuse=False
)

向workspace 提交pipeline

from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'Tutorial-Batch-Scoring').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

等候pipeline 运行完成了之后,下载和查看结果:

import pandas as pd

batch_run = next(pipeline_run.get_children())
batch_output = batch_run.get_output_data("scores")
batch_output.download(local_path="inception_results")

for root, dirs, files in os.walk("inception_results"):
    for file in files:
        if file.endswith("parallel_run_step.txt"):
            result_file = os.path.join(root, file)

df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
print("Prediction has ", df.shape[0], " rows")
df.head(10)

发布pipeline并且从rest api来调用该pipeline

先发布pipeline

published_pipeline = pipeline_run.publish_pipeline(
    name="Inception_v3_scoring", description="Batch scoring using Inception v3 model", version="1.0")

published_pipeline
通过rest api调用pipeline
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "Tutorial-Batch-Scoring",
                               "ParameterAssignments": {"process_count_per_node": 6}})
run_id = response.json()["Id"]


#  得到run id之后,可以通过如下的代码来查看run的状态和结果。
<p class="mume-header " id="得到run-id之后可以通过如下的代码来查看run的状态和结果"></p>


from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments["Tutorial-Batch-Scoring"], run_id)
RunDetails(published_pipeline_run).show()

那么本章介绍的pipeline就到这里了。