MLOps管道中的模型自动调整

开课吧小一2021-05-06 09:48

点赞
有用
分享分享

在本系列文章中,我们将引导您完成将CI / CD应用于AI任务的过程。最后,您将获得一个满足Google MLOps成熟度模型第2级要求的功能管道。我们假设您对Python,深度学习,Docker,DevOps和Flask有所了解。

MLOps管道中的模型自动调整

我们将显示代码的精简版本。有关完整版本,请参见此存储库。我们将在该项目中使用GCR Docker映像(由TensorFlow支持)–但是请随意使用其他映像。

首先,我们将讨论在本地运行这些解决方案的代码。稍后,我们将看到如何为云部署做好准备。

下图显示了我们项目的文件结构。

MLOps管道中的模型自动调整

data_utils.py

该data_utils.py文件句柄的数据加载,转换和模型保存到GCS。此文件可能因项目而异。本质上,它在模型训练之前执行所有数据处理任务。让我们看一下代码:

import datetimefrom google.cloud import storageimport pandas as pdimport numpy as npfrom sklearn.model_selection import train_test_splitimport tensorflow as tfimport gcfrom sklearn import preprocessingimport osimport zipfileimport cv2import sys
 def dataset_transformation(path):
 images = []
 for dirname, _, filenames in os.walk(path):
  for filename in filenames:
   if filename.endswith('.png'):
    image = cv2.imread(os.path.join(dirname, filename))
    image = cv2.resize(image, (128, 128))
    images.append(image)
 return images
 def load_data(args):    
 file_1 = '/root/AutomaticTraining-Dataset/COVID_RX/normal_images.zip'
 file_2 = '/root/AutomaticTraining-Dataset/COVID_RX/covid_images.zip'
 file_3 = '/root/AutomaticTraining-Dataset/COVID_RX/viral_images.zip'
 extract_to = '/root/AutomaticTraining-Dataset/COVID_RX/'
 
 with zipfile.ZipFile(file_1, 'r') as zip_ref:
  zip_ref.extractall(extract_to)
    
 with zipfile.ZipFile(file_2, 'r') as zip_ref:
  zip_ref.extractall(extract_to)
 
 with zipfile.ZipFile(file_3, 'r') as zip_ref:
  zip_ref.extractall(extract_to)

 normal = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/normal_images')
 covid = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/covid_images')
 viral = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/viral_images')
 #Train and test - dataset combination
 X = normal + viral + covid
 #Transforming from list to numpy array.
 X = np.array(X)
 
 #Creating labels.
 y = []
 for i in range(len(normal)):
  y.append(0)
 for i in range(len(covid)):
  y.append(1)
 for i in range(len(viral)):
  y.append(2)
 y = np.array(y)

 #Dataset splitting
 X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0, shuffle = True)
 return X_train, X_test, y_train, y_test
 def save_model(bucket_name, best_model):
 try:
  storage_client = storage.Client() #if running on GCP
  bucket = storage_client.bucket(bucket_name)
  blob1 = bucket.blob('{}/{}'.format('testing',best_model))
  blob1.upload_from_filename(best_model)
  return True,None
 except Exception as e:
  return False,e 

model_assembly.py

该model_assembly.py文件包含模型的创建和自动的调整的代码。我们希望从一个非常基本的模型开始–对其进行训练和评估。如果初始模型不能达到理想的性能,我们将进行改进直到达到目标。让我们看一下代码:

from tensorflow.keras.models import load_modelfrom tensorflow.keras import layersimport tensorflow as tfimport numpy as np
 def get_base_model():
 input_img = layers.Input(shape=(128, 128, 3))
 x = layers.Conv2D(64,(3, 3), activation='relu')(input_img)
 return input_img,x
def get_additional_layer(filters,x):
 x = layers.MaxPooling2D((2, 2))(x)
 x = layers.Conv2D(filters, (3, 3), activation='relu')(x)
 return x
 def get_final_layers(neurons,x):
 x = layers.SpatialDropout2D(0.2)(x)
 x = layers.Flatten()(x)
 x = layers.Dense(neurons)(x)
 x = layers.Dense(3)(x)
 return x

这些函数将在循环中被调用,并且在第一次迭代中,我们将获得base_model,final_layers并将它们堆叠起来以构建一个非常简单的模型。如果训练我们发现该模型不执行不够好之后,然后我们将再次得到base_model,加additional_layers, 堆栈final_layers,然后训练和一次评估。如果我们仍然无法达到良好的性能,则将在循环中重复最后一个过程,并增加更多的过程,additional_layers直到达到预定义的良好指标为止。

email_notifications.py

该email_notifications.py文件是负责通过本地SMTP服务器产品所有者提供的电子邮件。这些电子邮件会告诉所有者是否一切正常,如果不正常,那是什么问题。

import smtplibimport os
 # Email variables definition
sender = ‘example@gmail.com’
receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address
smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider
smtp_port = 587
smtp_account = ‘example@gmail.com’
smtp_password = ‘your_password’
 def training_result(result,model_acc):
 if result == 'ok':
  message = 'The model reached '+str(model_acc)+', It has been saved to GCS.'
 if result == 'failed':
  message = 'None of the models reached an acceptable accuracy, training execution had to be forcefully ended.’
 message = 'Subject: {}\n\n{}'.format('An automatic training job has ended recently', message)
 try:
  server = smtplib.SMTP(smtp_provider,smtp_port)
  server.starttls()
  server.login(smtp_account,smtp_password)
  server.sendmail(sender, receiver, message)         
  return
 except Exception as e:
  print('Something went wrong. Unable to send email: '+str(e),flush=True)
  return
 
def exception(e_message):
 try:
  message = 'Subject: {}\n\n{}'.format('An automatic training job has failed.', e_message)
  server = smtplib.SMTP(smtp_provider,smtp_port)
  server.starttls()
  server.login(smtp_account,smtp_password)
  server.sendmail(sender, receiver, message)         
  return
 except Exception as e:
  print('Something went wrong. Unable to send email: '+str(e),flush=True)
  return

task.py

该task.py文件编排节目。它会初始化GPU(如果有),以开始模型训练,并在需要时调整模型。它还接收传递给应用程序的参数。这是代码:

import tensorflow as tffrom tensorflow.keras import Model, layers, optimizersfrom tensorflow.keras.callbacks import ModelCheckpointfrom tensorflow.keras import Modelfrom tensorflow.keras.models import load_modelimport argparseimport model_assembly, data_utils, email_notificationsimport sysimport osimport gcfrom google.cloud import storageimport datetimeimport math
 # general variables declaration
model_name = 'best_model.hdf5'
 def initialize_gpu():
 if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
  tf.config.set_soft_device_placement(True)
  tf.debugging.set_log_device_placement(True)
  return
 def start_training(args):
 # Loading splitted data
 X_train, X_test, y_train, y_test = data_utils.load_data(args)
 # Initializing GPU if available
 initialize_gpu()
 train_model(X_train, X_test, y_train, y_test, args)
 def train_model(X_train, X_test, y_train, y_test,args):
 try:
  model_loss, model_acc = [0,0]
  counter = 0
  while model_acc <= 0.85:
   input_img,x = model_assembly.get_base_model()
   if counter == 0:
    x = model_assembly.get_final_layers(64,x)
   else:
    for i in range(counter):
     x = model_assembly.get_additional_layer(int(64*(math.pow(2,counter))),x)
    x = model_assembly.get_final_layers(int(64*(math.pow(2,counter))),x)
   cnn = Model(input_img, x,name="CNN_COVID_"+str(counter))
   cnn.summary()
   cnn.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
   checkpoint = ModelCheckpoint(model_name, monitor='val_loss', verbose=1,save_best_only=True, mode='auto', save_freq="epoch")
   cnn.fit(X_train, y_train, epochs=args.epochs, validation_data=(X_test, y_test),callbacks=[checkpoint])
   cnn = load_model(model_name)
   model_loss, model_acc = cnn.evaluate(X_test, y_test,verbose=2)
   if model_acc > 0.85:
    saved_ok = data_utils.save_model(args.bucket_name,model_name)
     if saved_ok[0] == True:
      email_notifications.training_result('ok',model_acc)
      sys.exit(0)
     else:
       email_notifications.exception(saved_ok[1])
       sys.exit(1)
   else:
    pass
   if counter >= 5:
    email_notifications.training_result('failed',None)
    sys.exit(1)
   counter += 1
 except Exception as e:
  email_notifications.exception('An exception when training the model has occurred: '+str(e))
  sys.exit(1)
 def get_args():
 parser = argparse.ArgumentParser()
 parser.add_argument('--bucket-name', type=str, default = 'automatictrainingcicd-aiplatform', help = 'GCP bucket name')
 parser.add_argument('--epochs', type=int, default=3, help='Epochs number')
 args = parser.parse_args()
 return args
 def main():
 args = get_args()
 start_training(args)
 if __name__ == '__main__':
 main()

Docker文件

我们的Dockerfile负责将指令传递给Docker守护程序以构建适当的容器。看起来是这样的:

FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0
 
WORKDIR /root
 
RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python
RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev
 
ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-Dataset.git
ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-CodeCommit.git
 
RUN mv /root/AutomaticTraining-CodeCommit/model_assembly.py /root
RUN mv /root/AutomaticTraining-CodeCommit/task.py /root
RUN mv /root/AutomaticTraining-CodeCommit/data_utils.py /root
RUN mv /root/AutomaticTraining-CodeCommit/email_notifications.py /root
 
ENTRYPOINT ["python","task.py"]

上面的文件使用该gcr.io/deeplearning-platform-release/tf2-cpu.2-0映像,安装依赖项,克隆所需的存储库,将文件移至主目录,并设置容器执行的入口点。

以上就是小编为大家整理的“MLOps管道中的模型自动调整”一文,更多信息尽在AI人工智能教程频道。

相关推荐:

免费领完整的AI学习路径资料,带你轻松入门!

AI资料难找吗?AI免费论文资料,等你领取!

福利来袭!人工智能核心课程优惠名额等你来领

有用
分享