NLP模型工程化部署

发布于:2025-02-11 ⋅ 阅读:(79) ⋅ 点赞:(0)

一、理论-微服务、测试与GPU

  • 学习目标与成果
    在这里插入图片描述

1)微服务架构

①单服务架构
在这里插入图片描述
②微服务架构
在这里插入图片描述
③微服务特点
在这里插入图片描述
④主要技术栈

在这里插入图片描述
1)HTTP协议做RESTFul接口
在这里插入图片描述
基于flask web框架实现restful接口
在这里插入图片描述
2)远程RPC调用
在这里插入图片描述
基于grpc
在这里插入图片描述

  • restful还是rpc
    在这里插入图片描述

2)代码测试

在这里插入图片描述

  • 具体实践举例
    在这里插入图片描述
    代码测试
    在这里插入图片描述
    压力测试
    在这里插入图片描述

压力测试工具
在这里插入图片描述

3)GPU使用

在这里插入图片描述
可以使用命令,每一秒刷新一次

watch -n 1 nvidia-smi

二、实践-封装微服务,编写测试用例和脚本,并观察GPU

1)微服务封装(RestFul和RPC)

①RestFul接口

(1)配置日志logger
在这里插入图片描述

import logging.handlers

# 获取logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

# 生成文件handler,打印到文件
# 按天滚动的log,一天滚动一次,只保留最近7个日志文件(即保留最近7天)
file_handler = logging.handlers.TimedRotatingFileHandler('./logs/root.log', 'D', 1, 7, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)

# 设置formatter
# 打印日志时间、级别、文件名、行号、函数名字、内容
formatter = logging.Formatter(
    '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(funcName)s() - %(message)s'
)

# 将formatter设置到两个handler
file_handler.setFormatter(formatter)

# 将handler设置到logger
logger.addHandler(file_handler)

(2)编写RESTFUL API
1>编写hello_resource,配置健康检查路由

#hello_resource.py
from flask_restful import Resource


class HelloResource(Resource):
    """
    hello路由
    快速检查服务是否健康
    """

    def get(self):
        return 'hello'

2>编写seg_resource,配置分词路由

import json
import time

from flask import request
from flask_restful import Resource

from online import logger


class SegResource(Resource):
    """
    分词路由
    主要调用segment.seg
    """
    def __init__(self, segment):
        # 使用传过来的segment对象,进行后面的分词
        self.segment = segment

    def post(self):
        data = request.get_json()  # 解析输入json为一个dict

        init_time = time.time()
        result = {
            'status': 'OK',  # 本次请求返回状态
            'msg': ''  # 额外说明
        }

        request_id = data.get('request_id')  # 支持传入request_id,便于线上追踪请求

        try:
            assert data, "请确保输入不为空"

            # 从data取用户输入的各种参数
            content, model, enable_offset, enable_stop_word, use_ner = \
                data['content'], data.get('model'), data.get('enable_offset', False), \
                data.get('enable_stop_word', False), data.get('use_ner', False)
            logger.info('request_id: {}, model: {}, enable_offset: {}, enable_stop_word: {}, use_ner: {}, '
                        'content: {} ...'.format(request_id, model, enable_offset, enable_stop_word, use_ner,
                                                 content[:100]))

            # 调用segment对象的seg方法
            r = self.segment.seg(content, model=model, enable_offset=enable_offset,
                                 enable_stop_word=enable_stop_word, use_ner=use_ner)
            result['result'] = list(r)  # 将分词结果存放在result里面

        except Exception as e:
            # 出现异常,打印异常栈,更改本次请求状态为ERROR
            logger.exception(e)
            result['status'] = 'ERROR'
            result['msg'] = str(e)

        logger.info('request_id: {}, result: {} ..., cost time: {}s'.format(
            request_id, json.dumps(result, ensure_ascii=False)[:200], time.time() - init_time)
        )

        return result

3>编写其余resource
4>编写http server

#server.py
import sys

from flask import Flask
from flask_restful import Api

from online import logger
from online.http.resources.dict_resource import DictResource
from online.http.resources.hello_resource import HelloResource
from online.http.resources.pos_resource import PosResource
from online.http.resources.seg_resource import SegResource
from segment.segment import Segment


def start_server(port=8000):
    # 如果输入第1个参数,将第1个参数解析为端口号
    if len(sys.argv) > 1:
        port = int(sys.argv[1])

    # 实例化flask app
    app = Flask(__name__)
    app.config.update(RESTFUL_JSON=dict(ensure_ascii=False))  # 设置ensure_ascii=False,确保接口返回的中文正常
    api = Api(app)

    # 实例化segment对象,准备传入到各个resource里面
    segment = Segment()
    resource_class_kwargs = {'segment': segment}

    # 为api添加hello路由、seg路由、pos路由、dict路由
    api.add_resource(HelloResource, '/')  # hello路由用于快速检查服务可用性
    api.add_resource(SegResource, '/seg', resource_class_kwargs=resource_class_kwargs)  # seg路由用于分词
    api.add_resource(PosResource, '/pos', resource_class_kwargs=resource_class_kwargs)  # pos路由用于词性标注
    api.add_resource(DictResource, '/dict', resource_class_kwargs=resource_class_kwargs)  # dict路由用于管理词典

    # 启动服务,设置host port
    # host='0.0.0.0',表示外部机器可以访问,必须设置为0.0.0.0
    # threaded=False,表示我们的主程序是单线程模式,需要一个一个处理请求
    # (我们的word_graph对象不是线程安全的)
    logger.info('server starts port {}'.format(port))
    app.run(debug=False, host='0.0.0.0', port=port, threaded=False)


if __name__ == '__main__':
    start_server()

  • 启动APP server
    执行脚本start_http_server.sh
cd ..
python -m online.http.server 8000

在这里插入图片描述

  • 发送POSTman请求
    ①发送hello相关链接请求
curl 0.0.0.0:8000

服务器打印
在这里插入图片描述
客户端回复
在这里插入图片描述
②测试seg分割模型
postman发送请求
在这里插入图片描述
发送seg模块相关
在这里插入图片描述

②RPC接口

1>定义proto接口,然后生成
在这里插入图片描述
2>基础实践
在这里插入图片描述

syntax = "proto3";


service Segment {
  rpc seg (SegRequest) returns (SegResponse) {}
  rpc pos (SegRequest) returns (PosResponse) {}
  rpc add_word (AddWordRequest) returns (Bool) {}
  rpc delete_word (DeleteWordRequest) returns (Bool) {}
}


message SegRequest {
  string content = 1;
  string model = 2;
  bool enable_stop_word = 3;
  bool use_ner = 4;
}

message SegResponse {
  message Term {
    string word = 1;
    int32 start_index = 2;
    int32 end_index = 3;
  }
  repeated Term terms = 1;
}

message PosResponse {
  message Term {
    string word = 1;
    int32 start_index = 2;
    int32 end_index = 3;
    string pos = 4;
  }
  repeated Term terms = 1;
}

message Bool {
  bool status = 1;
}

message AddWordRequest {
  string word = 1;
  string pos = 2;
  int32 freq = 3;
}

message DeleteWordRequest {
  string word = 1;
}

编译脚本命令

cd ..
python -m grpc_tools.protoc -Ionline/rpc/ --python_out=online/rpc/ --grpc_python_out=online/rpc/ segment.proto

客户端代码

import grpc

from online.rpc.segment_pb2 import SegRequest, AddWordRequest, DeleteWordRequest  # 引入Request类
from online.rpc.segment_pb2_grpc import SegmentStub  # 引入stub,和服务端交互


class SegmentClient(object):
    """
    客户端代码,提供给使用方直接import使用

    作用:
        定义各种接口
        将原始函数输入封装为Request对象
        发送Request到server端,获得返回的Response
        解析Response对象为python基本类型,返回给用户
    """

    def __init__(self, host, port):
        """
        声明host, port 创建channel
        通过channel创建stub对象
        """
        channel = grpc.insecure_channel('{}:{}'.format(host, port))
        self.stub = SegmentStub(channel)

    def seg(self, content, model, enable_stop_word=False, use_ner=False):
        """定义seg接口"""
        # 将参数封装成request对象
        request = SegRequest(content=content, model=model, enable_stop_word=enable_stop_word,
                             use_ner=use_ner)

        # 调用stub.seg方法,传入request对象,得到response对象
        response = self.stub.seg(request)

        # 将response对象解析成list of tuple,返回给用户
        words = [(term.word, term.start_index, term.end_index) for term in response.terms]

        return words

    def pos(self, content, model, enable_stop_word=False, use_ner=False):
        """定义pos接口"""
        request = SegRequest(content=content, model=model, enable_stop_word=enable_stop_word,
                             use_ner=use_ner)
        response = self.stub.pos(request)
        words = [(term.word, term.start_index, term.end_index, term.pos) for term in response.terms]
        return words

    def add_word(self, word, pos, freq):
        """定义add_word接口"""
        request = AddWordRequest(word=word, pos=pos, freq=freq)
        response = self.stub.add_word(request)
        status = response.status
        return status

    def delete_word(self, word):
        """定义delete_word接口"""
        request = DeleteWordRequest(word=word)
        response = self.stub.delete_word(request)
        status = response.status
        return status

2)测试编写(unit_test\api_test\load_test)

(1)单元测试

在这里插入图片描述
在这里插入图片描述
单元测试代码

import unittest
from segment.segment import Segment


class MyTestCase(unittest.TestCase):
    def setUp(self) -> None:
        self.segment = Segment()

    def test_seg(self):
        content = '百年来,我们始终筑牢机的密码。' \
                  '建议稿中突出强调了样表述,这在中还是第一次,彰显的理念。' \
                  '都蕴藏着巨大的力量。' \
                  '一百年来,风雨考验中,矢志不渝为了生活,' \
                  '才能勇往直前,击鼓催征稳驭舟。'
        print('content: ', content)

        print('seg(content)')
        words = list(self.segment.seg(content))
        print(words)

        print('seg(content, model=\'HMM\')')
        words = list(self.segment.seg(content, model='HMM'))
        print(words)

        print('seg(content, model=\'CRF\')')
        words = list(self.segment.seg(content, model='CRF'))
        print(words)

        print('seg(content, model=\'DL\')')
        words = list(self.segment.seg(content, model='DL'))
        print(words)


if __name__ == '__main__':
    unittest.main()

测试graph

import unittest

from segment.word_tokenizer.word_graph import WordGraph, Node


class TestGraph(unittest.TestCase):

    def test_graph(self):
        graph = WordGraph()
        graph.insert_start_word(WordGraph.NODE_S)  # 0
        graph.insert_start_word(Node('我', 1, 'core_dict'))  # 1
        graph.insert_start_word(Node('喜', 2, 'core_dict'))  # 2
        graph.insert_start_word(Node('喜欢', 4, 'model_word_dict'))  # 3
        graph.insert_start_word(Node('欢', 1, 'core_dict'))  # 4

        graph.insert_end_words([1])
        graph.insert_end_words([2, 3])
        graph.insert_end_words([4])
        graph.insert_end_words([5])
        graph.insert_end_words([5])

        route = graph.calculate()

        print(graph)
        print(route)
        assert route[0][0] == 5  # 确保最优路径权重为5


if __name__ == '__main__':
    unittest.main()

(2)接口测试

①test_http_api.py

import json
import random
import unittest

import requests

HOST = '127.0.0.1'
PORT = 8000


class MyTestCase(unittest.TestCase):

    def setUp(self) -> None:
        self.samples = []
        with open('tests/data/samples.txt', 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                self.samples.append(line)

    def test_seg(self):
        print('test_seg~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
        for sample in self.samples:
            data = {
                'content': sample,
                'model': _sample(['hmm', 'crf', 'dl']),
                'enable_offset': _sample([True, False])
            }
            print(json.dumps(data, ensure_ascii=False))
            r = requests.post('http://{}:{}/seg'.format(HOST, PORT), json=data)
            print(r.text)
            assert r.status_code == 200 and json.loads(r.text)['status'] == 'OK'
        print('\n')

    def test_pos(self):
        print('test_pos~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
        for sample in self.samples:
            data = {
                'content': sample,
                'model': _sample(['hmm', 'crf']),
            }
            print(json.dumps(data, ensure_ascii=False))
            r = requests.post('http://{}:{}/pos'.format(HOST, PORT), json=data)
            print(r.text)
            assert r.status_code == 200 and json.loads(r.text)['status'] == 'OK'
        print('\n')

    def test_dict(self):
        print('test_dict~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')

        print('add word')
        data = {'word': '深度之眼', 'pos': 'nt', 'freq': 50}
        r = requests.post('http://{}:{}/dict'.format(HOST, PORT), json=data)
        print(r.text)
        assert r.status_code == 200 and json.loads(r.text)['status'] == 'OK'

        print('delete word')
        data = {'word': '深度之眼'}
        r = requests.delete('http://{}:{}/dict'.format(HOST, PORT), json=data)
        print(r.text)
        assert r.status_code == 200 and json.loads(r.text)['status'] == 'OK'


def _sample(values):
    return random.sample(values, 1)[0]


if __name__ == '__main__':
    unittest.main()

②test_rpc_api.py

import random
import unittest

from online.rpc.segment_client import SegmentClient

HOST = '127.0.0.1'
PORT = 8000


class MyTestCase(unittest.TestCase):

    def setUp(self) -> None:
        self.client = SegmentClient(host=HOST, port=PORT)
        self.samples = []
        with open('tests/data/samples.txt', 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                self.samples.append(line)

    def test_seg(self):
        print('test_seg~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
        for sample in self.samples:
            data = {
                'content': sample,
                'model': _sample(['hmm', 'crf', 'dl'])
            }
            r = self.client.seg(**data)
            print(r)
        print('\n')

    def test_pos(self):
        print('test_pos~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
        for sample in self.samples:
            data = {
                'content': sample,
                'model': _sample(['hmm', 'crf']),
            }
            r = self.client.pos(**data)
            print(r)
        print('\n')

    def test_dict(self):
        print('test_dict~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')

        print('add word')
        data = {'word': '深度之眼', 'pos': 'nt', 'freq': 10}
        self.client.add_word(**data)

        print('delete word')
        data = {'word': '深度之眼'}
        self.client.delete_word(**data)


def _sample(values):
    return random.sample(values, 1)[0]


if __name__ == '__main__':
    unittest.main()

(3)压力测试

在这里插入图片描述
①request.json

{
    "content": "【惊险一幕!英国伦敦上空一架飞机被三道闪电击中后继续飞行】6月6日,一架飞机在英国伦敦上空飞行时,遭到来自不同方向的三道闪电击中。视频显示,这架航班正穿过风雨交加的天空时,忽然三道闪电从云层中射出,击中了飞机。闪电击中飞机后,空中爆发出隆隆的雷声,飞机看似未受损坏继续飞行。(北京青年报编辑 许彦明)http://t.cn/A62eV2KY【#老奶奶去探病自己躺病床上睡着了# 老爷爷端着伤手 一脸无",
    "model": "crf"
}

②跑测试脚本

siege -c 1 -t 1M --content-type "application/json" "http://127.0.0.1:8000/seg POST <request.json"

siege压力测试工具
在这里插入图片描述
测试结果
在这里插入图片描述
③观察CPU使用,内存使用,是否存在内存泄漏

3)GPU使用和监控

在这里插入图片描述
在这里插入图片描述

1)改造bilstm_crf_predictor\bilstm_crf_model,启动GPU加速
2)启动http_server,编辑request.json,更改模型为dl,压测
3)nvidia-smi观察gpu显存占用

三、理论-docker、CICD与K8S

四、实践-构建镜像与CICD脚本

1)构建镜像

  • 学习目标
    掌握dockerfile的编写,镜像构建以及构建的实际技巧
  • 流程
    在这里插入图片描述

①dockerfile

# 声明基础镜像版本,选择pytorch官方镜像
FROM pytorch/pytorch:1.3-cuda10.1-cudnn7-runtime

# 首先拷贝requirements.txt进docker
COPY ./requirements.txt /requirements.txt
# 安装python依赖(如果安装较慢,请更换pip源:-i https://pypi.douban.com/simple/)
RUN pip install -r /requirements.txt

# 将所有代码添加进去
COPY . /data/app

# 设置默认的工作目录
WORKDIR /data/app

# 设置系统编码为utf-8,防止中文乱码
ENV LANG C.UTF-8

# 设置暴露8000端口
EXPOSE 8000

# CMD启动http服务,(也可以选择启动rpc服务,或者两个同时启动)
CMD cd /data/app/scripts && sh start_http_server.sh

②build.sh

img_name=api-segment  # 镜像名,即项目名,项目名字叫这个
img_tag=`date '+%Y%m%d_%H%M%S'`  # 声明镜像tag为 日期+时间(实践中需要关联上git的commit-id)

docker build -f docker/Dockerfile -t ${img_name}:${img_tag} .

③构建dev镜像,方便跑测试和开发调试

  • dockerfile.dev(有ssh连接的脚本命令方便远程debug,因为设备有可能是远程的GPU服务器,不是在本地)
FROM pytorch/pytorch:1.3-cuda10.1-cudnn7-runtime
#MAINTAINER old-wang <old-wang@gmail.com>

COPY ./requirements.txt /requirements.txt
RUN pip install -r /requirements.txt

# 安装ssh,用于远程debug(在本地运行服务器上的的python解释器,需借助ssh)
# 实际工作中,本地环境一般不具备各种gpu硬件资源,无法运行gpu方面的程序。此时必须借助远程debug。
RUN apt-get update --fix-missing && apt-get install --fix-missing -y openssh-server
RUN mkdir -p /var/run/sshd
RUN echo 'root:root' | chpasswd
RUN sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config
RUN echo 'PermitRootLogin yes' >> /etc/ssh/sshd_config
RUN sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd
ENV NOTVISIBLE "in users profile"
RUN echo "export VISIBLE=now" >> /etc/profile

COPY . /data/app
WORKDIR /data/app
ENV LANG C.UTF-8
EXPOSE 8000

#跑单元测试
CMD cd /data/app/ && python tests/unit_tests/env/run_unit_tests.py
  • build_dev.sh
img_name=api-segment:dev

docker build -f docker/Dockerfile.dev -t ${img_name} .

pytharm配置本地解释器
在这里插入图片描述
在这里插入图片描述

远程配置
在这里插入图片描述

2)CI/CD脚本

  • 学习目标
    掌握CI/CD流水线,自动化脚本的编写,pipeline的定义

  • 配置CI/CD流水线的配置

stages:
  - unit_test
  - build_image
  - api_test
  - deploy_beta

variables:
  PROJECT_REPO_NAME: api-segment

before_script:
  - export ROOT_PATH=$(pwd)
  - echo 'root path:' $ROOT_PATH
  - docker login -u $DOCKER_USER -p $DOCKER_PWD http://xxxx.dockerhub.com
  - git config --global advice.detachedHead false
  - git config --global user.email "${GITLAB_USER_EMAIL}" && git config --global user.name "${GITLAB_USER_NAME}"
  - git clone --single-branch -b $CI_COMMIT_REF_NAME http://$GITLAB_USER:$GITLAB_PWD@gitlab.xxxxx.com/$PROJECT_REPO_NAME.git
  - cd $PROJECT_REPO_NAME
  - git checkout $CI_COMMIT_SHA
  - echo 'commit id:' $CI_COMMIT_SHA
  - echo 'commit user:' $GITLAB_USER_NAME
  - echo 'commit e-mail:' $GITLAB_USER_EMAIL
  - export COMMIT_MESSAGE=$(git log -p -1 --pretty=format:"%s"|head -1)
  - echo 'commit message:' $COMMIT_MESSAGE
  - export DATE=$(git log --pretty=format:"%cd %H" --date=format:'%Y%m%d' | grep ${CI_COMMIT_SHA} | awk '{print $1}')
  - echo 'date:' $DATE
  - export IMAGE_TAG=$DATE"_"${CI_COMMIT_SHA:0:8}
  - echo 'docker image tag:' $IMAGE_TAG

unit_test_stage:
  stage: unit_test
  script:
    - sh tests/unit_tests/env/run_unit_tests.sh
  artifacts:
    when: always
    name: "${PROJECT_REPO_NAME}_${CI_COMMIT_SHA:0:8}"
    expire_in: 1 week
    paths:
      - TestReport.html
  when: manual
  allow_failure: false

build_image_stage:
  stage: build_image
  script:
    - sh docker/build.sh $IMAGE_TAG
  when: manual
  allow_failure: false

api_test_stage:
  stage: api_test
  script:
    - sh tests/api_tests/env/run_api_tests.sh $IMAGE_TAG
  allow_failure: false

deploy_beta_stage:
  stage: deploy_beta
  script:
    - xxx  # 对接k8s集群
  when: manual
  allow_failure: false
  only:
    - master


网站公告

今日签到

点亮在社区的每一天
去签到