Jupyter for Azure机器学习五天入门 - Day5 了解什么是Azure Machine Learning Pipeline
分类: Azure机器学习 ◆ 标签: #Azure #人工智能 #机器学习 #AutoML ◆ 发布于: 2023-06-11 21:56:49
 
                    
我们前一天简要的介绍了什么是AutoML,同时也是用jupyter体验了一下AutoML, 本章我们学习什么是Azure Machine Learning Pipeline。
Azure有提供很多种pipeline, 一共有Azure pipeline, Azure Machine Learning Pipeline以及Azure Data Factory Pipeline, 这三种pipeline各有自己的标准目标,例如Azure Machine Learning是目标是从数据到模型,Azure Data factory pipeline目标是数据到数据等等。那么Azure Machine Learning pipeline是指专门针对于Azure机器学习的一个工作流,工作流包括很多个子任务,每个子任务都封装了为了完整机器学习的各种要素:
- 数据处理,例如下载,载入,转为等处理数据的基本流程
- 训练相关的配置,例如参数设定,日志等等。
- 训练模型或者模型评估。
- 部署模型,包括部署,评估,缩放等等。
我们今天用一个简单的例子来说明Azure Machine Learning pipeline如何处理数据,如何组测模型,最后如何部署模型,然后发布pipeline, 然后通过rest api重新触发这些pipeline, 达到重新训练模型的目的。
本章的代码实例可以从如下的位置找到:https://github.com/hylinux/azure-demo/blob/main/azure-machine-learning/jupyter-5-days/Day-5.ipynb
环境的准备
还是和之前一样,我们需要准备开发环境,如果你目前还没有准备一个适合开发的环境,请仔细参考之前的文章,然后请激活一个conda环境,然后通过命令
jupyter notebook
启用jupyter notebook
链接workspace
通过如下的代码链接workspace:
from azureml.core import Workspace ws = Workspace.from_config()
创建DataStore
请注意我们这里的例子是使用
Azure global的开放的Azure Storage account, 因此如果你有Azure global的订阅还是建议使用Azure global的订阅比较好。
from azureml.core.datastore import Datastore batchscore_blob = Datastore.register_azure_blob_container(ws, datastore_name="images_datastore", container_name="sampledata", account_name="pipelinedata", overwrite=True) def_data_store = ws.get_default_datastore()
创建DataSet对象引用数据
有了datastore之后,如果我们想要引用数据,我们需要使用DataSet
from azureml.core.dataset import Dataset from azureml.data import OutputFileDatasetConfig input_images = Dataset.File.from_files((batchscore_blob, "batchscoring/images/")) label_ds = Dataset.File.from_files((batchscore_blob, "batchscoring/labels/")) output_dir = OutputFileDatasetConfig(name="scores")
创建该dataset之后,将该dataset注册到workspace以后还是继续使用该dataset
input_images = input_images.register(workspace = ws, name = "input_images") label_ds = label_ds.register(workspace = ws, name = "label_ds")
注册模型到workspace
我们这个例子中使用已经训练好的模型,因此我们无需再进行训练,因此我们只需要下载该模型,然后注册到workspace中。
import os import tarfile import urllib.request if not os.path.isdir("models"): os.mkdir("models") response = urllib.request.urlretrieve("http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz", "model.tar.gz") tar = tarfile.open("model.tar.gz", "r:gz") tar.extractall("models")
注册到workspace
from azureml.core.model import Model model = Model.register(model_path="models/inception_v3.ckpt", model_name="inception", tags={"pretrained": "inception"}, description="Imagenet trained tensorflow inception", workspace=ws)
创建或者附着compute instance
Azure machine learning pipeline不能在本地运行的,必须要运行在云资源或者是远程的计算资源上,因此必须有一个compute instance
from azureml.core.compute import AmlCompute, ComputeTarget from azureml.exceptions import ComputeTargetException compute_name = "cpu-cluster" # checks to see if compute target already exists in workspace, else create it try: compute_target = ComputeTarget(workspace=ws, name=compute_name) except ComputeTargetException: config = AmlCompute.provisioning_configuration(vm_size="Standard_D2_v2", vm_priority="lowpriority", min_nodes=0, max_nodes=1) compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config) compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
编写Scoring脚本
为了创建一个评分,需要创建一个脚本,Scoring脚本接受输入的图片,然后应用分类模型,最后输出预测结果到文件。
batch_scoring.py需要接受如下的两个参数:
- --model_name, 需要使用的模型名称
- --labels_dir, label.txt的目录。
pipeline使用类ArgumentParse类来解析参数传递到PipelineStep中去。
%%writefile batch_scoring.py import os import argparse import datetime import time import tensorflow as tf from math import ceil import numpy as np import shutil from tensorflow.contrib.slim.python.slim.nets import inception_v3 from azureml.core import Run from azureml.core.model import Model from azureml.core.dataset import Dataset slim = tf.contrib.slim image_size = 299 num_channel = 3 def get_class_label_dict(labels_dir): label = [] labels_path = os.path.join(labels_dir, 'labels.txt') proto_as_ascii_lines = tf.gfile.GFile(labels_path).readlines() for l in proto_as_ascii_lines: label.append(l.rstrip()) return label def init(): global g_tf_sess, probabilities, label_dict, input_images parser = argparse.ArgumentParser(description="Start a tensorflow model serving") parser.add_argument('--model_name', dest="model_name", required=True) parser.add_argument('--labels_dir', dest="labels_dir", required=True) args, _ = parser.parse_known_args() label_dict = get_class_label_dict(args.labels_dir) classes_num = len(label_dict) with slim.arg_scope(inception_v3.inception_v3_arg_scope()): input_images = tf.placeholder(tf.float32, [1, image_size, image_size, num_channel]) logits, _ = inception_v3.inception_v3(input_images, num_classes=classes_num, is_training=False) probabilities = tf.argmax(logits, 1) config = tf.ConfigProto() config.gpu_options.allow_growth = True g_tf_sess = tf.Session(config=config) g_tf_sess.run(tf.global_variables_initializer()) g_tf_sess.run(tf.local_variables_initializer()) model_path = Model.get_model_path(args.model_name) saver = tf.train.Saver() saver.restore(g_tf_sess, model_path) def file_to_tensor(file_path): image_string = tf.read_file(file_path) image = tf.image.decode_image(image_string, channels=3) image.set_shape([None, None, None]) image = tf.image.resize_images(image, [image_size, image_size]) image = tf.divide(tf.subtract(image, [0]), [255]) image.set_shape([image_size, image_size, num_channel]) return image def run(mini_batch): result_list = [] for file_path in mini_batch: test_image = file_to_tensor(file_path) out = g_tf_sess.run(test_image) result = g_tf_sess.run(probabilities, feed_dict={input_images: [out]}) result_list.append(os.path.basename(file_path) + ": " + label_dict[result[0]]) return result_list
创建pipeline
在开始创建pipeline之前先设置环境:
from azureml.core import Environment from azureml.core.conda_dependencies import CondaDependencies from azureml.core.runconfig import DEFAULT_GPU_IMAGE cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.15.2", "azureml-core", "azureml-dataprep[fuse]"]) env = Environment(name="parallelenv") env.python.conda_dependencies = cd env.docker.base_image = DEFAULT_GPU_IMAGE
创建p配置
from azureml.pipeline.steps import ParallelRunConfig parallel_run_config = ParallelRunConfig( environment=env, entry_script="batch_scoring.py", source_directory=".", output_action="append_row", mini_batch_size="20", error_threshold=1, compute_target=compute_target, process_count_per_node=2, node_count=1 )
创建pipestep
每个pipeline有一个或者多个pipelinestep组成,本例中只有一个pipelinestep
from azureml.pipeline.steps import ParallelRunStep from datetime import datetime parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M") label_config = label_ds.as_named_input("labels_input") batch_score_step = ParallelRunStep( name=parallel_step_name, inputs=[input_images.as_named_input("input_images")], output=output_dir, arguments=["--model_name", "inception", "--labels_dir", label_config], side_inputs=[label_config], parallel_run_config=parallel_run_config, allow_reuse=False )
向workspace 提交pipeline
from azureml.core import Experiment from azureml.pipeline.core import Pipeline pipeline = Pipeline(workspace=ws, steps=[batch_score_step]) pipeline_run = Experiment(ws, 'Tutorial-Batch-Scoring').submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
等候pipeline 运行完成了之后,下载和查看结果:
import pandas as pd batch_run = next(pipeline_run.get_children()) batch_output = batch_run.get_output_data("scores") batch_output.download(local_path="inception_results") for root, dirs, files in os.walk("inception_results"): for file in files: if file.endswith("parallel_run_step.txt"): result_file = os.path.join(root, file) df = pd.read_csv(result_file, delimiter=":", header=None) df.columns = ["Filename", "Prediction"] print("Prediction has ", df.shape[0], " rows") df.head(10)
发布pipeline并且从rest api来调用该pipeline
先发布pipeline
published_pipeline = pipeline_run.publish_pipeline( name="Inception_v3_scoring", description="Batch scoring using Inception v3 model", version="1.0") published_pipeline
通过rest api调用pipeline
from azureml.core.authentication import InteractiveLoginAuthentication
interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
import requests
rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "Tutorial-Batch-Scoring",
                               "ParameterAssignments": {"process_count_per_node": 6}})
run_id = response.json()["Id"]
#  得到run id之后,可以通过如下的代码来查看run的状态和结果。
<p class="mume-header " id="得到run-id之后可以通过如下的代码来查看run的状态和结果"></p>
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails
published_pipeline_run = PipelineRun(ws.experiments["Tutorial-Batch-Scoring"], run_id)
RunDetails(published_pipeline_run).show()
那么本章介绍的pipeline就到这里了。