基于Kubernetes的机器学习微服务系统设计

1 概述

  本篇主要介绍基于Kubernetes、容器(Docker)、微服务技术等在机器学习中的实践应用。详细介绍了机器学习文本分类系统的设计与实现过程,以及云计算分布式系统的部署。

2 系统介绍

2.1 功能全览

  系统需要完成的功能点如下思维导图1所示:

基于Kubernetes的机器学习微服务系统设计
图 1 云化微服务机器学习系统功能全览图

2.2 核心功能

  主要完成功能:

  1. 支持Docker镜像化发布,支持Kuberneetes云化部署;
  2. 微服务化设计支持服务自治,支持服务扩缩容;
  3. 支持负载均衡、系统资源监控、资源编排;
  4. 统一设计轻量级通信RESTful API 接口框架,支持JSON格式请求;
  5. 支持多种机器学习算法,支持JSON格式参数配置;
  6. 支持中文分词:RobinSeg(RS)、IKAnalyzer(IK)、JEAnalysis(JE)、MmSeg4j(MS)、PaoDing(PD)、SmallSeg4j(SS)等;
  7. 支持特征选择算法:Document Frequency(DF)、Information Gain(IG)、(χ2)Chi-Square Test(CHI)、Mutual Information(MI)、Matrix Projection(MP)等;
  8. 支持分类算法:k-Nearest Neighbor(kNN)、Naïve Bayes(NB)、Support Vector Machine(SVM)、Normalized Vector(NLV)等;
  9. 支持Web图形化UI机器学习性能评估、数据可视化;

3 系统架构

3.1 云化架构图

  云化微服务机器学习系统架构架构如图2所示:

基于Kubernetes的机器学习微服务系统设计
图 2 云化微服务机器学习系统架构图

3.2 架构说明

  整个系统采用云计算的架构设计。系统支持部署在传统的虚拟化技术(如KVM)或云计算IaaS层服务上(如Openstack等)。PaaS层采用Kubernetes+Docker的应用方式。
  整个系统的重点是SaaS层的设计开发,即微服务化的机器学习系统。图 2 所示红框蓝底部分为系统的核心部分。
  系统主要功能模块包括:公共库Comm-lib、微服务核(中文分词、预处理、特征选择、分类器)、RESTful微服务框架(微服务核加载 、HTTP API)、应用服务+WEB、管理维护等。
  公共库Comm-lib:包括基础功能,例如日志、配置、数学计算等;
  RESTful微服务框架:主要统一微服务接口,解耦与业务的关系,统一RESTful API。
  微服务核:按照微服务接口定义,关注自身的业务实现。实现中文分词、预处理、特征选择、分类器的独立功能。
  管理维护:主要包括Docker镜像化制作、发布,Kubernetes、Docker、微服务资源监控,资源的编排功能。
  应用WEB:如果把微服务看出深服务端,那么这里包含浅服务端应用和WEB客户端。服务端处理WEB分类任务的请求、调度和生命周期管理。WEB端显示任务运行的状态和机器学习的结果UI显示,还包括资源的监控显示。

4 云化部署

4.1 部署图

  云化微服务机器学习系统架构架构如图3所示:

基于Kubernetes的机器学习微服务系统设计
图 3 云化微服务机器学习系统部署图

4.2 部署说明

  系统部署服务组件主要包括:ETCD、Docker 、Kubernetes Master、Kubernetes Node、Docker Private Registry、Glassfish Server、Flannel。
  ETCD:一个开源的、分布式的键值对数据存储系统,提供共享配置、服务的注册和发现。ETCD为Kubernetes提供默认的存储系统,保存所有集群数据,使用时需要为etcd数据提供备份计划。ETCD为Flannel 存储网络配置、分配的子网以及任何辅助数据(如主机的公网 IP)。
  Docker: 应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。
  Kubernetes Master:集群的管理控制中心。
  Kubernetes Node:提供Kubernetes运行时环境,以及维护Pod。
  Docker Private Registry:Docker 镜像私仓,存放开发的微服务镜像。
  Glassfish Server:Web 应用服务器,提供web应用的部署。也可以采用镜像化的方式部署。
  Flannel:常用的网络配置工具,用于配置第三层(网络层)网络结构。

4.3 部署实例

  如上图3所示,在一台服务器上部署实例。

  服务器配置如下:
    处理器:2颗Intel Xeon E5-2670 8核16线程
     内存:32G = 8X4G PC3-106000R 1333
     硬盘:240GB SSD + 4TB HDD
   操作系统:Ubuntu 14.04.5 LTS
  使用KVM启动6台虚拟机。

  虚拟机配置:
    处理器:4核
     内存:4G
     硬盘:80GB
   操作系统:CentOS 7.5

  虚拟机网络采用linux系统网桥管理工具配置网桥进行链接。Kubernetes Master和ETCD部署在一台虚拟机上。4台虚拟机作为Kubernetes Node节点,其上部署Docker、Flannel服务。一台作为Docker 私仓,部署Docker服务。

  软件版本
   ETCD版本:3.0.0
   Docker版本:1.12.6
   Kubernetes 版本:1.6.7
   Flannel版本:0.9.1
   Docker镜像仓库版本: 2.5

5 设计实现

5.1 RESTful微服务框架

  为了微服务的接口交互统一,本系统采用统一的框架模式。采用Jersey软件框架,Jersey 是开源的RESTful框架, 实现了JAX-RS (JSR 311 & JSR 339) 规范。

5.1.1 微服务框架图

  RESTful框架实现流程如图4所示:

基于Kubernetes的机器学习微服务系统设计
图 4 RESTful框架实现流程图

5.1.2 微服务框架实现

  配置文件config.properties内容如下:

#restful API config
listen.ip=0.0.0.0
listen.port=8084

#thread pool config
thread.core.pool.size=4
thread.max.pool.size=4

#mirco server config
mircoServer.name=business
jar.path=file:business-1.0.jar
jar.actionClass=com.robin.action.BusinessAction

#log config
log.path=log/
log.prefix=business
# Level.ALL Level.FINEST Level.FINER Level.FINE Level.CONFIG 
# Level.INFO Level.WARNING Level.SEVERE Level.OFF
log.level=Level.INFO
log.file.limit=1048576
log.file.count=3

  通用资源类:

/**
 * <DT><B>描述:</B></DT>
 * <DD>通用资源类</DD>
 *
 * @version Version1.0
 * @author  Robin
 * @version <I> V1.0 Date:2018-05-21</I>
 * @author  <I> E-mail:[email protected]</I>
 */
@Path("robin")
public class CommonResource {
    // 日志
    private static final Logger LOGGER = RobinLogger.getLogger();
    // 微服务
    private static MircoServiceAction mircoServer;
    // 配置的微服务名称
    private static final String CFG_MS_NAME;

    static {
        // 微服务名称配置文件检查
        CFG_MS_NAME = ConfigUtil.getConfig("mircoServer.name");
        
        String jarPath = ConfigUtil.getConfig("jar.path");
        URL url = null;
        try {
            url = new URL(jarPath);
        } catch (MalformedURLException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        URLClassLoader classLoader = new URLClassLoader(new URL[]{url}, Thread.currentThread()
                .getContextClassLoader());
        Class<?> actionClass = null;
        try {
            String actionClassName = ConfigUtil.getConfig("jar.actionClass");
            actionClass = (Class<?>) classLoader.loadClass(actionClassName);
        } catch (ClassNotFoundException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
        if (null == actionClass) {
            LOGGER.log(Level.SEVERE, "actionClass is null");
            System.exit(-1);
        }
        try {
            mircoServer = (MircoServiceAction) actionClass.newInstance();
        } catch (InstantiationException | IllegalAccessException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
    }

    /**
     * Method handling HTTP GET requests. The returned object will be sent to
     * the client as "application/json" media type.
     *
     * @return String that will be returned as a application/json response.
     */
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String getIt() {
        String cfgMsName = ConfigUtil.getConfig("mircoServer.name");
        return "Micro server [" + cfgMsName + "] is running...\n";
    }

    @POST
    @Path("{microService}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public JSONObject requestService(
            @PathParam("microService") String serverName,
            JSONObject reqJson) {
        JSONObject rspJson = null;
        if (!serverName.equals(CFG_MS_NAME)) {
            rspJson = new JSONObject();
            try {
                rspJson.put("status", "ERROR");
                rspJson.put("msg", "Mirco server name [" + serverName + "] error.");
            } catch (JSONException ex) {
                LOGGER.log(Level.SEVERE, ex.getMessage());
            }
            return rspJson;
        }

        if (null != mircoServer) {
            rspJson = (JSONObject) mircoServer.action(reqJson);
        }

        return rspJson;
    }
}

  Restful服务类:

/**
 * <DT><B>描述:</B></DT>
 * <DD>Restful服务类</DD>
 *
 * @version Version1.0
 * @author  Robin
 * @version <I> V1.0 Date:2018-05-22</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class RestfulServer {

    private static final Logger LOGGER = RobinLogger.getLogger();
    private static URI uri;
    private static HttpServer server;

    public static HttpServer getServer() {
        return server;
    }

    public static URI getUri() {
        if (null == uri) {
            String listenAddr = ConfigUtil.getConfig("listen.ip");
            String listenPort = ConfigUtil.getConfig("listen.port");
            String baseUri = "http://" + listenAddr + ":" + listenPort + "/";
            uri = URI.create(baseUri);
        }
        return uri;
    }

    /**
     * Starts Grizzly HTTP server exposing JAX-RS resources defined in this application.
     *
     */
    public static void startServer() {
        // create a resource config that scans for JAX-RS resources and providers
        // in com.robin.restful package
        final ResourceConfig rc = new ResourceConfig();
        rc.packages("com.robin.restful");
        rc.register(JettisonFeature.class);

        // create and start a new instance of grizzly http server
        // exposing the Jersey application at URI
        // return GrizzlyHttpServerFactory.createHttpServer(URI.create(BASE_URI), rc);
        server = GrizzlyHttpServerFactory.createHttpServer(getUri(), rc);

        String corePoolSizeStr = ConfigUtil.getConfig("thread.core.pool.size");
        String maxPoolSizeStr = ConfigUtil.getConfig("thread.max.pool.size");
        int corePoolSize = 0;
        int maxPoolSize = 0;
        if ((corePoolSizeStr != null) && (!corePoolSizeStr.equals(""))) {
            corePoolSize = Integer.valueOf(corePoolSizeStr);
        }

        if ((maxPoolSizeStr != null) && (!maxPoolSizeStr.equals(""))) {
            maxPoolSize = Integer.valueOf(maxPoolSizeStr);
        }

        if ((corePoolSize == 0) || (maxPoolSize == 0)) {
            LOGGER.log(Level.INFO, "Use default thread pool configuration.");
            return;
        }

        if ((corePoolSize > maxPoolSize)) {
            LOGGER.log(Level.SEVERE, "Core pool size greater than max pool sixe in configuration.");
            LOGGER.log(Level.INFO, "Use default thread pool configuration.");
            return;
        }

        //参考http://jersey.576304.n2.nabble.com/It-s-very-hard-to-increase-the-number-of-worker-threads-in-Jersey-Grizzly-module-td7579570.html
        NetworkListener nl = server.getListener("grizzly");
        System.out.println(nl.toString());
        TCPNIOTransport transport = nl.getTransport();
        ThreadPoolConfig config = transport.getWorkerThreadPoolConfig();
        config.setCorePoolSize(corePoolSize);
        String info = "Set thread core pool size [" + corePoolSize + "].";
        LOGGER.log(Level.INFO, info);
        config.setMaxPoolSize(maxPoolSize);
        info = "Set thread max pool size [" + maxPoolSize + "].";
        LOGGER.log(Level.INFO, info);
        GrizzlyExecutorService threadPool = (GrizzlyExecutorService) transport.getWorkerThreadPool();
        threadPool.reconfigure(config);
    }

    /**
     * RestfulServer method.
     *
     * @param args
     */
    public static void main(String[] args) {
        startServer();
        if (server.isStarted()) {
            LOGGER.log(Level.INFO, "Start http server sucessfully.");
        } else {
            LOGGER.log(Level.SEVERE, "Start http server failed.");
        }
    }
}

  微服务入口Action接口

package com.robin.loader;
/**
 * <DT><B>描述:</B></DT>
 * <DD>微服务入口Action接口</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> V1.0 Date:2018-05-04</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public interface MircoServiceAction {
    public Object action(Object obj);
}

5.2 中文分词微服务

  中文分词微服务包括分词方法有:RobinSeg(RS)、IKAnalyzer(IK)、JEAnalysis(JE)、MmSeg4j(MS)、PaoDing(PD)、SmallSeg4j(SS)。其中RS分词实现见我的文章:知更鸟中文分词RS设计实现 ,其他分词方法都采用发布的jar包进行封装装。

5.2.1 设计模式

  主要涉及外观模式、适配器模式、工厂模式和单例模式。分词微服务类图如图5所示:

基于Kubernetes的机器学习微服务系统设计
图 5 分词微服务类图

  设计原则:(1)针对接口编程,不要针对实现;(2)只和最紧密的类交互;(3)封装变化;(4)松耦合设计。
  外观模式:提供一个统一的接口,用来访问子系统中的一群接口,外观定义了一个高层接口,让子系统更容易使用。我们采用统一的分词外观类封装各种分词接口,提供一个一致的高层接口。
  适配器模式:将一个类的接口,转换成客户期望的另一个接口。适配器让原本接口不兼容的类可以合作无间。各种分词的的私有实现接口需要一个提供一个统一的接口调用。
  工厂模式:定义一个创建对象的接口,但有子类决定要实例化的类是哪一个。提供统一的分词工厂,创建分类实例对象。
  单例模式:确保一个类只有一个实例,并提供了一个全局访问点。由于各种分词对象的创建、加载词典等需要申请大量的内存,耗费大量的时间,所以所分词器实例都通过适配器进行控制只创建一个实例。

5.2.2 代码实现

中文分词接口抽象类

package com.robin.segment;

import com.robin.log.RobinLogger;
import java.util.logging.Logger;

/**
 * <DT><B>描述:</B></DT>
 * <DD>中文分词接口抽象类</DD>
 *
 * @version Version1.0
 * @author  Robin
 * @version <I> Date:2018-04-18</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public abstract class AbstractSegmenter {

    /** 日志 */
    protected static final Logger LOGGER = RobinLogger.getLogger();

    /**
     * 分词抽象方法
     *
     * @param text 文本
     * @param SEPARATOR 分隔符
     * @return 已分词文本
     */
    public abstract String segment(String text, String SEPARATOR);
}

统一分词器外观类

package com.robin.segment;

import com.robin.log.RobinLogger;
import com.robin.segment.SegmentFactory.SegmentMethod;
import com.robin.segment.robinseg.RobinSeg;
import com.robin.segment.robinseg.SegmentArgs;
import java.util.logging.Logger;

/**
 * <DT><B>描述:</B></DT>
 * <DD>统一分词器外观类</DD>
 * <DD>外观模式</DD>
 *
 * @version 1.0
 * @author Robin
 * @version <I> Date:2018-04-19</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class SegmentFacade {

    // 日志
    private static final Logger LOGGER = RobinLogger.getLogger();

    /**
     * 获取分词器配置参数对象
     *
     * @param methodName 分词方法
     * @return SegmentArgs
     */
    public static SegmentArgs getSegmentArgsObj(SegmentMethod methodName) {
        AbstractSegmenter segment = SegmentFactory.getSegInstance(methodName);
        if (methodName.equals(SegmentMethod.RS)) {
            return ((RobinSeg) segment).getSegmentConfInstance();
        }
        return null;
    }

    /**
     * <DD>根据不同分词算法进行分词,</DD>
     * <DD>传入算法名错误或默认情况下用RobinSeg分词。</DD>
     *
     * @param methodName 分词方法名称,“SegmentMethod.IK”,“.JE”,“.MS”,“.PD”,“.SS”,
     * “.RS”
     * @param text 待分词文本
     * @param separator 分隔符
     * @return 使用分隔符分好词文本
     */
    public static String split(SegmentMethod methodName, String text, String separator) {
        AbstractSegmenter segmenter = SegmentFactory.getSegInstance(methodName);
        return segmenter.segment(text, separator);
    }
}

分词Action实现类

package com.robin.segment.action;

import com.robin.loader.MircoServiceAction;
import com.robin.log.RobinLogger;
import com.robin.segment.SegmentFacade;
import com.robin.segment.SegmentFactory.SegmentMethod;
import com.robin.segment.robinseg.SegmentArgs;
import com.robin.segment.robinseg.SegmentArgs.SegAlgorithm;
import java.util.HashSet;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

/**
 * <DT><B>描述:</B></DT>
 * <DD>分词Action实现类</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> V1.0 Date:2018-06-05</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class SegmentAction implements MircoServiceAction {

    private static final Logger LOGGER = RobinLogger.getLogger();

    public enum StatusCode {
        OK,
        JSON_ERR,
        KIND_ERR,
        VERSION_ERR,
        SEGMETHOD_ERR,
        SEPARATOR_ERR,
        SEGMENT_FAILED,
        TEXTS_NULL,
    }

    private class ActionStatus {

        StatusCode statusCode;
        String msg;

    }

    private JSONObject getErrorJson(ActionStatus actionStatus) {
        JSONObject errJson = new JSONObject();
        try {
            errJson.put("status", actionStatus.statusCode.toString());
            errJson.put("msg", actionStatus.msg);
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
        return errJson;
    }

    private ActionStatus checkJSONObjectTerm(JSONObject jsonObj,
            String key,
            HashSet<String> valueSet,
            StatusCode errStatusCode) {
        ActionStatus actionStatus = new ActionStatus();

        try {
            if (!jsonObj.isNull(key)) {
                String value = jsonObj.getString(key);
                if (!valueSet.contains(value)) {
                    actionStatus.msg = "The value [" + value + "] of " + key + " is error.";
                    actionStatus.statusCode = errStatusCode;
                    return actionStatus;
                }
            } else {
                actionStatus.msg = "The input parameter is missing " + key + ".";
                actionStatus.statusCode = errStatusCode;
                return actionStatus;
            }

        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        actionStatus.statusCode = StatusCode.OK;
        return actionStatus;
    }

    private ActionStatus checkInputJSONObject(JSONObject jsonObj) {
        ActionStatus actionStatus = new ActionStatus();
        ActionStatus retActionStatus;

        JSONObject argsJson;
        HashSet<String> valueSet = new HashSet();

        try {
            valueSet.add("segment");
            retActionStatus = checkJSONObjectTerm(jsonObj, "kind", valueSet, StatusCode.KIND_ERR);
            if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
                return retActionStatus;
            }

            valueSet.clear();
            valueSet.add("v1");
            retActionStatus = checkJSONObjectTerm(jsonObj, "version", valueSet, StatusCode.VERSION_ERR);
            if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
                return retActionStatus;
            }

            JSONObject segmentMetadata = jsonObj.getJSONObject("metadata").getJSONObject("segment");

            valueSet.clear();
            valueSet.add("RS");
            valueSet.add("IK");
            valueSet.add("JE");
            valueSet.add("MS");
            valueSet.add("PD");
            valueSet.add("SS");
            retActionStatus = checkJSONObjectTerm(segmentMetadata, "method", valueSet, StatusCode.SEGMETHOD_ERR);
            if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
                return retActionStatus;
            }

            valueSet.clear();
            valueSet.add(" ");
            valueSet.add("|");
            valueSet.add("/");
            retActionStatus = checkJSONObjectTerm(segmentMetadata, "separator", valueSet, StatusCode.SEPARATOR_ERR);
            if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
                return retActionStatus;
            }

            // 设置RobinSeg分词参数
            String method = segmentMetadata.getString("method");
            SegmentMethod segmentMethod = SegmentMethod.valueOf(method);
            if ((segmentMethod.equals(SegmentMethod.RS)) && (!segmentMetadata.isNull("args"))) {
                argsJson = segmentMetadata.getJSONObject("args");
                SegmentArgs segmentArgs = SegmentFacade.getSegmentArgsObj(segmentMethod);
                if (null != segmentArgs) {
                    if (!argsJson.isNull("algorithm")) {
                        String algorithm = argsJson.getString("algorithm");
                        segmentArgs.setSegAlgorithm(SegAlgorithm.valueOf(algorithm.toUpperCase()));
                    }
                    if (!argsJson.isNull("cleanSymbol")) {
                        Boolean flag = argsJson.getBoolean("cleanSymbol");
                        segmentArgs.setCleanSymbolFlag(flag);
                    }
                    if (!argsJson.isNull("markNewWord")) {
                        Boolean flag = argsJson.getBoolean("markNewWord");
                        segmentArgs.setMarkNewWordFlag(flag);
                    }
                    if (!argsJson.isNull("downcasing")) {
                        Boolean flag = argsJson.getBoolean("downcasing");
                        segmentArgs.setDowncasingFlag(flag);
                    }
                    if (!argsJson.isNull("mergePattern")) {
                        Boolean flag = argsJson.getBoolean("mergePattern");
                        segmentArgs.setMergePatternFlag(flag);
                    }
                    if (!argsJson.isNull("retrievalPattern")) {
                        Boolean flag = argsJson.getBoolean("retrievalPattern");
                        segmentArgs.setRetrievalPatternFlag(flag);
                    }
                }
            }
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        actionStatus.statusCode = StatusCode.OK;

        return actionStatus;
    }

    @Override
    public Object action(Object obj) {

        ActionStatus actionStatus = new ActionStatus();
        ActionStatus retActionStatus;

        if (!(obj instanceof JSONObject)) {
            actionStatus.msg = "The action arguments is not JSONObject.";
            LOGGER.log(Level.SEVERE, actionStatus.msg);
            actionStatus.statusCode = StatusCode.JSON_ERR;
            return this.getErrorJson(actionStatus);
        }

        JSONObject jsonObj = (JSONObject) obj;
        retActionStatus = this.checkInputJSONObject(jsonObj);
        if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
            LOGGER.log(Level.SEVERE, retActionStatus.msg);
            return this.getErrorJson(retActionStatus);
        }

        SegmentMethod segmentMethod;
        String separator;
        JSONObject texts;

        try {
            JSONObject segmentMetadata = jsonObj.getJSONObject("metadata").getJSONObject("segment");
            String method = segmentMetadata.getString("method");
            segmentMethod = SegmentMethod.valueOf(method);
            separator = segmentMetadata.getString("separator");
            texts = jsonObj.getJSONObject("texts");
            long beginTime = System.currentTimeMillis();
            if (null == texts) {
                actionStatus.statusCode = StatusCode.TEXTS_NULL;
                actionStatus.msg = "The input texts is null.";
                LOGGER.log(Level.SEVERE, actionStatus.msg);
                return this.getErrorJson(actionStatus);
            }

            Iterator labelsIt = texts.keys();
            while (labelsIt.hasNext()){
                String label = (String) labelsIt.next();
                JSONArray aLabelTexts = texts.getJSONArray(label);
                int len = aLabelTexts.length();
                for (int i = 0; i < len; i++) {
                    JSONObject textJson = aLabelTexts.getJSONObject(i);
                    String text = textJson.getString("text");
                    if (null != text) {
                        String result = SegmentFacade.split(segmentMethod, text, separator);
                        textJson.put("text", result);
                    }
                }
            }

            long endTime = System.currentTimeMillis();
            int spendTime = (int) (endTime - beginTime);
            segmentMetadata.put("spendTime", spendTime);
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        JSONObject rsp = new JSONObject();
        try {
            rsp.put("status", "OK");
            rsp.put("result", jsonObj);
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
        return rsp;
    }
}

分词实例工厂方法类

package com.robin.segment;

import com.robin.segment.adapter.SmallSeg4jAdapter;
import com.robin.segment.adapter.MmSeg4jAdapter;
import com.robin.segment.adapter.IKAnalyzerAdapter;
import com.robin.segment.adapter.JEAnalysisAdapter;
import com.robin.segment.adapter.PaoDingAdapter;
import com.robin.log.RobinLogger;
import com.robin.segment.robinseg.RobinSeg;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * <DT><B>描述:</B></DT>
 * <DD>分词实例工厂方法类</DD>
 *
 * @version Version1.0
 * @author  Robin
 * @version <I> Date:2018-04-19</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class SegmentFactory {

    // 日志
    private static final Logger LOGGER = RobinLogger.getLogger();

    /** 分词算法名称标记 */
    public enum SegmentMethod {

        /** JE  = "JEAnalysis" */
        JE,
        /** IK  = "IKAnalyzer"*/
        IK,
        /** MS  = "MmSeg4j" */
        MS,
        /** PD  = "PaoDing" */
        PD,
        /** SS  = "SmallSeg4j" */
        SS,
        /** RS  = "RobinSeg" */
        RS
    }

    /**
     * 创建具体分词类实例
     *
     * @param methodName 分词方法名称,“SegmentMethod.IK”,“.JE”,“.MS”,“.PD”,“.SS”,“.RS”
     * @return 具体分词方法实例
     */
    public static AbstractSegmenter getSegInstance(SegmentMethod methodName) {
        if (null == methodName) {
            methodName = SegmentMethod.RS;
        }
        switch (methodName) {
            case JE:
                return JEAnalysisAdapter.getInstance();
            case IK:
                return IKAnalyzerAdapter.getInstance();
            case MS:
                return MmSeg4jAdapter.getInstance();
            case PD:
                return PaoDingAdapter.getInstance();
            case SS:
                return SmallSeg4jAdapter.getInstance();
            case RS:
                return RobinSeg.getInstance();
            default:
                LOGGER.log(Level.WARNING, "分词方法名称错误,默认采用RobinSeg分词.");
                return RobinSeg.getInstance();
        }
    }
}

IK适配器类

package com.robin.segment.adapter;

import com.robin.segment.AbstractSegmenter;
import java.io.IOException;
import java.io.StringReader;
import java.util.logging.Level;

import org.wltea.analyzer.IKSegmentation;
import org.wltea.analyzer.Lexeme;

/**
 * <DT><B>描述:</B></DT>
 * <DD>适配IKAnalyzer3.2.0分词器</DD>
 * <DD>适配器模式、单例模式</DD>
 *
 * @version Version1.0
 * @author  Robin
 * @version <I> Date:2018-04-17</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class IKAnalyzerAdapter extends AbstractSegmenter {

    /** 分词实例 */
    protected static AbstractSegmenter instance = null;

    private IKAnalyzerAdapter() {
    }

    /**
     * 使用给定分隔符分词
     *
     * @param text 待分词文本
     * @param separator
     * @return 分好词文本
     */
    @Override
    public String segment(String text, String separator) {

        //防御性编程
        if (null == text || "".equals(text)) {
            return "";
        }

        //使用最大词匹配建立分词器
        IKSegmentation ikSeg = new IKSegmentation(new StringReader(text), true);
        StringBuilder sb = new StringBuilder();
        try {
            Lexeme l = null;
            while ((l = ikSeg.next()) != null) {
                sb.append(l.getLexemeText().concat(separator));
            }
        } catch (IOException e) {
            LOGGER.log(Level.SEVERE, e.getMessage());
        }

        return sb.toString();
    }

    /**
     * 获取 IKAnalyzer 分词类的实例
     *
     * @return 分词类的单实例
     */
    public static AbstractSegmenter getInstance() {
        if (null == instance) {
            instance = new IKAnalyzerAdapter();
        }
        return instance;
    }
}

5.2.3 JSON格式

  中文分词请求JSON格式:

基于Kubernetes的机器学习微服务系统设计

  中文分词响应JSON格式:

基于Kubernetes的机器学习微服务系统设计

5.3 预处理微服务

  预处理阶段的主要任务是停用词去除、索引词典的构建、词文档矩阵化。

5.3.1 代码实现

预处理Action实现类

package com.robin.pretreatment.action;

import com.robin.loader.MircoServiceAction;
import com.robin.log.RobinLogger;
import com.robin.pretreatment.DicIndex;
import com.robin.pretreatment.DicIndex.Language;
import com.robin.pretreatment.WordDocMatrix;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

/**
 * <DT><B>描述:</B></DT>
 * <DD>预处理Action实现类</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> V1.0 Date:2018-04-08</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class PretreatAction implements MircoServiceAction {

    private static final Logger LOGGER = RobinLogger.getLogger();

    public enum StatusCode {
        OK,
        JSON_ERR,
        KIND_ERR,
        VERSION_ERR,
        MIN_FREQUENCY_ERR,
        TEXTS_NULL,
    }

    private class ActionStatus {

        StatusCode statusCode;
        String msg;

    }

    private JSONObject getErrorJson(ActionStatus actionStatus) {
        JSONObject errJson = new JSONObject();
        try {
            errJson.put("status", actionStatus.statusCode.toString());
            errJson.put("msg", actionStatus.msg);
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
        return errJson;
    }

    private ActionStatus checkJSONObjectTerm(JSONObject jsonObj,
            String key,
            HashSet<String> valueSet,
            StatusCode errStatusCode) {
        ActionStatus actionStatus = new ActionStatus();

        try {
            if (!jsonObj.isNull(key)) {
                String value = jsonObj.getString(key);
                if (!valueSet.contains(value)) {
                    actionStatus.msg = "The value [" + value + "] of " + key + " is error.";
                    actionStatus.statusCode = errStatusCode;
                    return actionStatus;
                }
            } else {
                actionStatus.msg = "The input parameter is missing " + key + ".";
                actionStatus.statusCode = errStatusCode;
                return actionStatus;
            }

        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        actionStatus.statusCode = StatusCode.OK;
        return actionStatus;
    }

    private ActionStatus checkInputJSONObject(JSONObject jsonObj) {
        ActionStatus actionStatus = new ActionStatus();
        ActionStatus retActionStatus;

        HashSet<String> valueSet = new HashSet();

        valueSet.add("pretreatment");
        retActionStatus = checkJSONObjectTerm(jsonObj, "kind", valueSet, StatusCode.KIND_ERR);
        if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
            return retActionStatus;
        }

        valueSet.clear();
        valueSet.add("v1");
        retActionStatus = checkJSONObjectTerm(jsonObj, "version", valueSet, StatusCode.VERSION_ERR);
        if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
            return retActionStatus;
        }

        actionStatus.statusCode = StatusCode.OK;
        return actionStatus;
    }

    @Override
    public Object action(Object obj) {

        ActionStatus actionStatus = new ActionStatus();
        ActionStatus retActionStatus;

        if (!(obj instanceof JSONObject)) {
            actionStatus.msg = "The action arguments is not JSONObject.";
            LOGGER.log(Level.SEVERE, actionStatus.msg);
            actionStatus.statusCode = StatusCode.JSON_ERR;
            return this.getErrorJson(actionStatus);
        }

        JSONObject preJson = (JSONObject) obj;
        retActionStatus = this.checkInputJSONObject(preJson);
        if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
            LOGGER.log(Level.SEVERE, retActionStatus.msg);
            return this.getErrorJson(retActionStatus);
        }

        try {
            long beginTime = System.currentTimeMillis();

            JSONObject textsObj = preJson.getJSONObject("texts");
            if (null == textsObj) {
                actionStatus.statusCode = StatusCode.TEXTS_NULL;
                actionStatus.msg = "The input texts is null.";
                LOGGER.log(Level.SEVERE, actionStatus.msg);
                return this.getErrorJson(actionStatus);
            }
            DicIndex dicIndex;
            String lang = preJson.getJSONObject("metadata").getJSONObject("corpus").getString("lang");
            if (lang.equals("en")) {
                dicIndex = new DicIndex(Language.EN);
            } else {
                dicIndex = new DicIndex(Language.CN);
            }
            JSONObject preMetadataJson = preJson.getJSONObject("metadata").getJSONObject("pretreatment");
            dicIndex.create(preJson, preMetadataJson.getInt("minFrequency"));
            HashMap<String, Integer> dicMap = dicIndex.getDicMap(preJson);
            if (dicMap.isEmpty()) {
                JSONObject errJson = new JSONObject();
                errJson.put("status", StatusCode.MIN_FREQUENCY_ERR.toString());
                errJson.put("result", "The minFrequency is too big.");
                return errJson;
            }

            // 循环所有文本
            Iterator<String> labelsIt = textsObj.keys();
            while (labelsIt.hasNext()) {
                String label = labelsIt.next();
                JSONArray aLabelTextsArr = textsObj.getJSONArray(label);

                int len = aLabelTextsArr.length();
                for (int i = 0; i < len; i++) {
                    JSONObject textJson = aLabelTextsArr.getJSONObject(i);
                    String text = textJson.getString("text");
                    if (null != text) {
                        String result = WordDocMatrix.create(text, dicMap);
                        String[] wordsDocArr = result.split("-");
                        textJson.remove("text");
                        textJson.put("totalWords", Integer.valueOf(wordsDocArr[0]));
                        textJson.put("text", wordsDocArr[1]);
                    }
                }
            }

            long endTime = System.currentTimeMillis();
            int spendTime = (int) (endTime - beginTime);
            preMetadataJson.put("spendTime", spendTime);
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        JSONObject rsp = new JSONObject();
        try {
            rsp.put("status", "OK");
            rsp.put("result", preJson);
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
        return rsp;
    }
}

停用词类

package com.robin.pretreatment;

import com.robin.config.ConfigUtil;
import java.util.Arrays;

import com.robin.file.FileUtil;
import com.robin.log.RobinLogger;
import java.util.HashSet;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * <DT><B>描述:</B></DT>
 * <DD>停用词类</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> Date:2018-04-21</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class StopWords {

    // 日志
    private static final Logger LOOGER = RobinLogger.getLogger();

    /**
     * 停用词构造方法
     */
    public StopWords() {
    }

    /**
     * 获取中文stop words
     *
     * @return 中文stop words
     */
    public HashSet<String> getChineseSet() {
        String cnStopWordsPath = ConfigUtil.getConfig("stopWords.chinese");
        return this.load(cnStopWordsPath);
    }

    /**
     * 获取英文stop words
     *
     * @return 英文stop words
     */
    public HashSet<String> getEnglishSet() {
        String enStopWordsPath = ConfigUtil.getConfig("stopWords.english");
        return this.load(enStopWordsPath);
    }

    /**
     * 获取特殊符号
     *
     * @return 特殊符号
     */
    public HashSet<String> getSymbolSet() {
        String symbolPath = ConfigUtil.getConfig("stopWords.symbol");
        return this.load(symbolPath);
    }

    /**
     * 加载 stop words 文件
     *
     * @param stopWordsPath stop words 文件路径
     * @return stop words List
     */
    private HashSet<String> load(String stopWordsPath) {
        HashSet<String> set = new HashSet<>();
        String stopWordsText = FileUtil.readText(stopWordsPath);
        if (null == stopWordsText) {
            LOOGER.log(Level.SEVERE, "读取停止词文件失败,检查文件及路径.");
            return null;
        }
        String[] words = stopWordsText.split(" ");
        set.addAll(Arrays.asList(words));
        return set;
    }
}

5.3.2 响应JSON格式

  预处理服务响应的JSON格式如下:

基于Kubernetes的机器学习微服务系统设计

5.4 特征选择微服务

  特征选择微服务主要实现如下特征选择算法:Document Frequency(DF)、Information Gain(IG)、(χ2)Chi-Square Test(CHI)、Mutual Information(MI)、Matrix Projection(MP)

5.4.1 特征选择类图

  特征选择类图如图6所示:

基于Kubernetes的机器学习微服务系统设计
图 6 特征选择微服务类图

5.4.2 代码实现

特征选择Action类

package com.robin.feature.action;

import com.robin.feature.corpus.CorpusManager;
import com.robin.feature.AbstractFeature;
import com.robin.feature.FeatureFactory;
import com.robin.feature.FeatureFactory.FeatureMethod;
import com.robin.loader.MircoServiceAction;
import com.robin.log.RobinLogger;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

/**
 * <DT><B>描述:</B></DT>
 * <DD>特征选择Action类</DD>
 *
 * 适配Jersey服务器资源调用
 *
 * @version Version1.0
 * @author Robin
 * @version <I> Date:2018-04-01</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class FeatureSelectAction implements MircoServiceAction {

    private static final Logger LOGGER = RobinLogger.getLogger();

    /**
     * Action状态码
     */
    public enum StatusCode {
        OK,
        JSON_ERR,
        KIND_ERR,
        VERSION_ERR,
        TRAIN_SCALE_ERR,
        METHOD_ERR,
        TEXTS_NULL,
    }

    /**
     * Action状态内部类
     */
    private class ActionStatus {

        StatusCode statusCode;
        String msg;

    }

    /**
     * 获取返回错误状态JSONObject
     *
     * @param actionStatus
     * @return JSONObject
     */
    private JSONObject getErrorJson(ActionStatus actionStatus) {
        JSONObject errJson = new JSONObject();
        try {
            errJson.put("status", actionStatus.statusCode.toString());
            errJson.put("msg", actionStatus.msg);
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
        return errJson;
    }

    /**
     * 检查JSON输入对象具体项
     *
     * @param jsonObj
     * @param key
     * @param valueSet
     * @param errStatusCode
     * @return ActionStatus
     */
    private ActionStatus checkJSONObjectTerm(JSONObject jsonObj,
            String key,
            HashSet<String> valueSet,
            StatusCode errStatusCode) {
        ActionStatus actionStatus = new ActionStatus();

        try {
            if (!jsonObj.isNull(key)) {
                String value = jsonObj.getString(key);
                if (!valueSet.contains(value)) {
                    actionStatus.msg = "The value [" + value + "] of " + key + " is error.";
                    actionStatus.statusCode = errStatusCode;
                    return actionStatus;
                }
            } else {
                actionStatus.msg = "The input parameter is missing " + key + ".";
                actionStatus.statusCode = errStatusCode;
                return actionStatus;
            }

        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        actionStatus.statusCode = StatusCode.OK;
        return actionStatus;
    }

    /**
     * 检查JSON输入对象
     *
     * @param jsonObj
     * @return ActionStatus
     */
    private ActionStatus checkInputJSONObject(JSONObject jsonObj) {
        ActionStatus actionStatus = new ActionStatus();
        ActionStatus retActionStatus;

        HashSet<String> valueSet = new HashSet();

        valueSet.add("feature");
        retActionStatus = checkJSONObjectTerm(jsonObj, "kind", valueSet, StatusCode.KIND_ERR);
        if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
            return retActionStatus;
        }

        valueSet.clear();
        valueSet.add("v1");
        retActionStatus = checkJSONObjectTerm(jsonObj, "version", valueSet, StatusCode.VERSION_ERR);
        if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
            return retActionStatus;
        }

        try {
            double trainScale = jsonObj.getJSONObject("metadata").getJSONObject("feature").getDouble("trainScale");
            if ((trainScale >= 1.0) || (trainScale <= 0)) {
                actionStatus.statusCode = StatusCode.TRAIN_SCALE_ERR;
                actionStatus.msg = "The input train_scale [" + trainScale + "] is error.";
                return actionStatus;
            }

            valueSet.clear();
            valueSet.add("DF");
            valueSet.add("CHI");
            valueSet.add("MP");
            valueSet.add("IG");
            valueSet.add("MI");

            JSONArray methods = jsonObj.getJSONObject("metadata").getJSONObject("feature").getJSONArray("method");
            for (int i = 0; i < methods.length(); i++) {
                String method = methods.getString(i);
                if (!valueSet.contains(method)) {
                    actionStatus.statusCode = StatusCode.METHOD_ERR;
                    actionStatus.msg = "The input method [" + method + "] is error.";
                    return actionStatus;
                }
            }
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        actionStatus.statusCode = StatusCode.OK;
        return actionStatus;
    }

    /**
     * 覆盖抽象类中的具体action方法<BR>
     * 实现特征选择具体处理事物
     *
     * @param obj
     * @return Object
     */
    @Override
    public Object action(Object obj) {
        ActionStatus actionStatus = new ActionStatus();
        ActionStatus retActionStatus;

        if (!(obj instanceof JSONObject)) {
            actionStatus.msg = "The action arguments is not JSONObject.";
            LOGGER.log(Level.SEVERE, actionStatus.msg);
            actionStatus.statusCode = StatusCode.JSON_ERR;
            return this.getErrorJson(actionStatus);
        }

        JSONObject corpusJson = (JSONObject) obj;
        retActionStatus = this.checkInputJSONObject(corpusJson);
        if (!retActionStatus.statusCode.equals(StatusCode.OK)) {
            LOGGER.log(Level.SEVERE, retActionStatus.msg);
            return this.getErrorJson(retActionStatus);
        }

        try {
            long beginTime = System.currentTimeMillis();
            JSONObject texts = corpusJson.getJSONObject("texts");
            if (null == texts) {
                actionStatus.statusCode = StatusCode.TEXTS_NULL;
                actionStatus.msg = "The input texts is null.";
                LOGGER.log(Level.SEVERE, actionStatus.msg);
                return this.getErrorJson(actionStatus);
            }

            //生成训练集和测试集
            CorpusManager.divide(corpusJson);
            JSONObject testSetJson = (JSONObject) corpusJson.remove("testSet");
            JSONObject trainSetJson = (JSONObject) corpusJson.remove("trainSet");

            JSONObject metadataFeatureJson = corpusJson.getJSONObject("metadata").getJSONObject("feature");
            Boolean globalFeature = metadataFeatureJson.getBoolean("globalFeature");
            int globalDimension = metadataFeatureJson.getInt("globalDimension");
            Boolean localFeature = metadataFeatureJson.getBoolean("localFeature");
            int localDimension = metadataFeatureJson.getInt("localDimension");

            JSONObject featureSelectJson = new JSONObject();
            JSONObject globalFeatureJson = new JSONObject();
            JSONObject localFeatureJson = new JSONObject();

            //特征选择
            JSONArray methodArr = metadataFeatureJson.getJSONArray("method");
            for (int i = 0; i < methodArr.length(); i++) {
                String selectMethod = methodArr.getString(i);
                AbstractFeature selecter = FeatureFactory.creatInstance(trainSetJson, FeatureMethod.valueOf(selectMethod));
                if (true == globalFeature) {
                    List<Map.Entry<Integer, Double>> featureList = selecter.selectGlobalFeature(globalDimension);
                    JSONArray featureArr = new JSONArray();
                    featureList.forEach((entry) -> {
                        featureArr.put(entry.getKey());
                    });
                    globalFeatureJson.put(selectMethod, featureArr);
                }
                if (true == localFeature) {
                    Map<String, List<Map.Entry<Integer, Double>>> labelsMap = selecter.selectLocalFeature(localDimension);
                    JSONObject labelFeatureJson = new JSONObject();
                    Iterator<String> labelsIt = labelsMap.keySet().iterator();
                    while (labelsIt.hasNext()) {
                        String label = labelsIt.next();
                        JSONArray labelFeatureArr = new JSONArray();
                        List<Map.Entry<Integer, Double>> localFeatureList = labelsMap.get(label);
                        localFeatureList.forEach((entry) -> {
                            labelFeatureArr.put(entry.getKey());
                        });
                        labelFeatureJson.put(label, labelFeatureArr);
                    }
                    localFeatureJson.put(selectMethod, labelFeatureJson);
                }
            }
            featureSelectJson.put("globalFeature", globalFeatureJson);
            featureSelectJson.put("localFeature", localFeatureJson);
            corpusJson.put("featureSelect", featureSelectJson);
            corpusJson.put("trainSet", trainSetJson);
            corpusJson.put("testSet", testSetJson);

            JSONObject preMetadataJson = corpusJson.getJSONObject("metadata").getJSONObject("feature");
            long endTime = System.currentTimeMillis();
            int spendTime = (int) (endTime - beginTime);
            preMetadataJson.put("spendTime", spendTime);
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }

        JSONObject rsp = new JSONObject();
        try {
            rsp.put("status", "OK");
            rsp.put("result", corpusJson);
        } catch (JSONException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage());
        }
        return rsp;
    }
}

特征选择抽象类

package com.robin.feature;

import com.robin.container.MapSort;
import com.robin.feature.corpus.CorpusManager;
import com.robin.log.RobinLogger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import java.util.logging.Level;
import java.util.logging.Logger;
import org.codehaus.jettison.json.JSONObject;

/**
 * <DT><B>描述:</B></DT>
 * <DD>特征选择抽象类</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> Date:2018-04-05</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public abstract class AbstractFeature {

    /**
     * 日志记录器
     */
    protected static final Logger LOGGER = RobinLogger.getLogger();
    /**
     * 训练集所有词的集合
     */
    protected Set<Integer> globalTermsSet;
    /**
     * 保存每个训练类别的词-文档频数 <类标签,<词编码,频数>>
     */
    protected HashMap<String, HashMap<Integer, Integer>> everyClassDFMap;

    //训练集JSON对象
    protected JSONObject trainSetJson;

    //全局特征-特征值集合
    protected HashMap<Integer, Double> globalFeatureValueMap;

    //局部特征-特征值集合
    protected HashMap<String, HashMap<Integer, Double>> allLocalFeatureValueMap;

    /**
     * 特征选择抽象类构造方法
     *
     * @param trainSetJson
     */
    public AbstractFeature(JSONObject trainSetJson) {
        this.trainSetJson = trainSetJson;
        this.allLocalFeatureValueMap = new HashMap<>();
        initEveryClassDFMap();
    }

    /**
     * 获取非重复总词数
     *
     * @return 非重复总词数
     */
    public int getAllTermTotal() {
        if (globalTermsSet != null) {
            return globalTermsSet.size();
        }
        return 0;
    }

    /**
     * 获取全局特征总数
     *
     * @return 全局特征总数
     */
    public int getGlobalFeatureSize() {
        if (null == globalFeatureValueMap) {
            return globalFeatureValueMap.size();
        }
        return 0;
    }

    /**
     * 计算全局特征值
     *
     * @return HashMap
     */
    protected abstract HashMap<Integer, Double> computeGlobalFeatureValue();

    /**
     * 计算局部特征值
     *
     * @param label 类标签
     * @return HashMap
     */
    protected abstract HashMap<Integer, Double> computeLocalFeatureValue(String label);

    /**
     * 全局选取 dimension 维特征
     *
     * @param dimension
     * @return List
     */
    public List<Map.Entry<Integer, Double>> selectGlobalFeature(int dimension) {
        if (null == globalFeatureValueMap) {
            // 计算全局特征的量化值
            globalFeatureValueMap = this.computeGlobalFeatureValue();
        }
        List<Map.Entry<Integer, Double>> featureList = new MapSort<Integer, Double>().descendSortByValue(globalFeatureValueMap);
        for (int i = featureList.size() - 1; dimension <= i; i--) {
            featureList.remove(i);
        }
        return featureList;
    }

    /**
     * 局部选取 dimension 维特征
     *
     * @param dimension
     * @return Map
     */
    public Map<String, List<Map.Entry<Integer, Double>>> selectLocalFeature(int dimension) {
        Map<String, List<Map.Entry<Integer, Double>>> localFeatuerListMap = new HashMap<>();
        // 计算每一个类别的所有词的特征量化值
        Iterator<String> labelsIt = this.trainSetJson.keys();
        while (labelsIt.hasNext()) {
            String label = labelsIt.next();

            HashMap<Integer, Double> localMPMap = allLocalFeatureValueMap.get(label);
            if (null == localMPMap) {
                localMPMap = this.computeLocalFeatureValue(label);
                allLocalFeatureValueMap.put(label, localMPMap);
            }
            List<Map.Entry<Integer, Double>> localFeatuerList = new MapSort<Integer, Double>().descendSortByValue(localMPMap);
            for (int i = localFeatuerList.size() - 1; dimension <= i; i--) {
                localFeatuerList.remove(i);
            }
            localFeatuerListMap.put(label, localFeatuerList);
        }
        return localFeatuerListMap;
    }

    /**
     * 初始化每个训练类别的词-文档频数 Map
     */
    protected final void initEveryClassDFMap() {
        this.everyClassDFMap = new HashMap<>();
        this.globalTermsSet = new HashSet<>();

        Iterator<String> labelsIt = this.trainSetJson.keys();
        while (labelsIt.hasNext()) {
            String label = labelsIt.next();
            HashMap<Integer, Integer> termDFMap = this.getTermDFMap(label);
            this.everyClassDFMap.put(label, termDFMap);
        }
    }

    /**
     * 获取一个训练集类别的所有词及出现的文档数,<BR>
     * 使用-1号词代码保存类别下的文档数。<BR>
     * 由于词文档Map中使用-1号词代码记录一个文本总词数,<BR>
     * 所以这里直接自动统计含-1的文本数,即文本总数。
     *
     * @param label 类别标签
     * @return HashMap 训练集类别的所有词及出现的文档数
     */
    protected HashMap<Integer, Integer> getTermDFMap(String label) {
        // 一个类下词以及这个词出现的次数HashMap<词编号,文档数>
        HashMap<Integer, Integer> thisDFMap = new HashMap<>();
        // HashMap<文件ID,HashMap<词编号,词个数>>
        HashMap<String, HashMap<Integer, Integer>> tdmMap = CorpusManager.getTdmMap(this.trainSetJson, label);
        if (null == tdmMap) {
            LOGGER.severe("词文档矩阵Map为空或NULL!");
            return thisDFMap;
        }
        Set<String> textsIdSet = tdmMap.keySet();
        Iterator<String> textIdit = textsIdSet.iterator();
        while (textIdit.hasNext()) {
            String textId = textIdit.next();
            HashMap<Integer, Integer> textMap = tdmMap.get(textId);
            Set<Integer> termCodeSet = textMap.keySet();
            Iterator<Integer> it = termCodeSet.iterator();
            while (it.hasNext()) {
                Integer termCode = it.next();
                Integer num = 1;
                Integer thisNum = thisDFMap.get(termCode);
                if (null != thisNum) {
                    num += thisNum;
                }
                thisDFMap.put(termCode, num);
                globalTermsSet.add(termCode);// 为了节约时间,此行用于初始化所有词集合
            }
        }
        return thisDFMap;
    }

    /**
     * 获取特征词频集合
     *
     * @param label 类别标签
     * @return HashMap<Integer, Integer> 特征词频集合
     */
    protected HashMap<Integer, Integer> getTermTFMap(String label) {
        // HashMap<词编号,词频>
        HashMap<Integer, Integer> thisTFMap = new HashMap<>();
        // HashMap<文件ID,HashMap<词编号,词频>>
        HashMap<String, HashMap<Integer, Integer>> tdmMap = CorpusManager.getTdmMap(this.trainSetJson, label);
        if (null == tdmMap) {
            LOGGER.log(Level.SEVERE, "词文档矩阵Map为空或NULL!");
            return thisTFMap;
        }
        Set<String> textIdSet = tdmMap.keySet();
        Iterator<String> textsIt = textIdSet.iterator();
        while (textsIt.hasNext()) {
            String textId = textsIt.next();
            HashMap<Integer, Integer> textMap = tdmMap.get(textId);
            Set<Integer> termCodeSet = textMap.keySet();
            Iterator<Integer> it = termCodeSet.iterator();
            while (it.hasNext()) {
                Integer termCode = it.next();
                if (termCode == -1) {
                    continue;
                }
                Integer num = textMap.get(termCode);//词频较好
                Integer thisNum = thisTFMap.get(termCode);
                if (null != thisNum) {
                    num += thisNum;
                }
                // 本函数当前未使用,为啥没除以文本次数呢?
                thisTFMap.put(termCode, num);
                globalTermsSet.add(termCode);// 为了节约时间,此行用于初始化所有词集合
            }
        }
        return thisTFMap;
    }

    /**
     * 获得除当期处理类别的其他类综合词-文档频数Map<类名,<词编码,频数>>
     *
     * @param currLabel 当期处理类别
     * @return HashMap<Integer, Integer>其他文本类别综合词-文档频数Map<词编码,频数>
     */
    protected HashMap<Integer, Integer> getOtherClassDFMap(String currLabel) {
        // 其他文档类别的词-文档频数Map
        HashMap<Integer, Integer> otherClassDFMap = new HashMap<>();

        Iterator<String> labelsIt = this.trainSetJson.keys();
        while (labelsIt.hasNext()) {
            String label = labelsIt.next();
            if (!label.equals(currLabel)) {
                HashMap<Integer, Integer> otherDFMap = everyClassDFMap.get(label);
                Set<Integer> otherTermSet = otherDFMap.keySet();
                Iterator<Integer> it = otherTermSet.iterator();
                while (it.hasNext()) {
                    Integer termCode = it.next();
                    Integer docNum = otherDFMap.get(termCode);
                    Integer otherDocNum = otherClassDFMap.get(termCode);
                    if (null != otherDocNum) {
                        docNum += otherDocNum;
                    }
                    otherClassDFMap.put(termCode, docNum);
                }
            }
        }
        return otherClassDFMap;
    }
}

5.4.3 响应JSON格式

  特征选择服务响应的JSON格式如下:

基于Kubernetes的机器学习微服务系统设计

5.5 分类器微服务

  分类器微服务主要实现如下分类选择算法:k-Nearest Neighbor(kNN)、Naïve Bayes(NB)、Support Vector Machine(SVM)、Normalized Vector(NLV)。

5.5.1 分类器主要类图

  分类器服务主要类图如下图7所示:

基于Kubernetes的机器学习微服务系统设计
图 7 分类器服务主要类图

5.5.2 代码实现

文本分类-训练器抽象类

package com.robin.classifier;

import com.robin.log.RobinLogger;
import java.util.logging.Logger;
import org.codehaus.jettison.json.JSONObject;

/**
 * <DT><B>描述:</B></DT>
 * <DD>文本分类-训练器</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> Date:2018-04-06</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public abstract class AbstractTrainer {

    // 日志
    protected static final Logger LOGGER = RobinLogger.getLogger();

    // 训练集JSON
    protected JSONObject trainSetJson;

    /**
     * 训练器抽象类构造方法
     *
     * @param trainSetJson
     */
    public AbstractTrainer(JSONObject trainSetJson) {
        this.trainSetJson = trainSetJson;
    }

    /**
     * 训练分类器抽象方法
     */
    public abstract void trains();
}

文本分类-分类器抽象类

package com.robin.classifier;

import com.robin.log.RobinLogger;
import java.util.HashMap;
import java.util.Map;

import java.util.logging.Logger;
import org.codehaus.jettison.json.JSONObject;

/**
 * <DT><B>描述:</B></DT>
 * <DD>文本分类-分类器抽象类</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> Date:2018-04-06</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public abstract class AbstractClassifier {

    // 日志记录器
    protected static final Logger LOGGER = RobinLogger.getLogger();
    // 保存分类结果的矩阵
    protected Map<String, HashMap<String, Integer>> matrixMap;
    // 测试集JSON
    protected JSONObject testSetJson;

    /**
     * 分类器构造方法
     *
     * @param testSetJson
     */
    public AbstractClassifier(JSONObject testSetJson) {
        this.testSetJson = testSetJson;
    }

    /**
     * 分类器分类抽象方法
     */
    public abstract void classifies();

    /**
     * 获取分类简单结果矩阵
     *
     * @return 简单结果矩阵
     */
    public Map<String, HashMap<String, Integer>> getMatrixMap() {
        return matrixMap;
    }
    
}

5.5.3 JSON格式

分类请求JSON:

基于Kubernetes的机器学习微服务系统设计

分类响应JSON:

基于Kubernetes的机器学习微服务系统设计

5.5.4 DEBUG信息示例

信息:   NLV-500-MP-GLOBAL-OPEN-ROOT-2-INNER_PRODUCT
信息:   ----Config----------------------------
信息:   FeatureRange       : GLOBAL
信息:   Dimension          : 500
信息:   TestType           : OPEN
信息:   AlgorithmName      : NLV
信息:   FeatureName        : MP
信息:   ----Args------------------------------
信息:   RootIndex	       : 2
信息:   SimilarityType	   : INNER_PRODUCT
信息:   NormalizedType	   : ROOT
信息:      
   -------------------- 分 类 结 果 混 淆 矩 阵 --------------------
   ---------------------------------------------------------------
   类别        人才    体育    信息     娱乐    房产    汽车    总计 
   ---------------------------------------------------------------
   人才         64       0       0       1       4       1      70
   体育          0      97       0       1       0       2     100
   信息          1       0     113       0       1       1     116
   娱乐          0       4       1      90       2       1      98
   房产          0       0       1       0      94       0      95
   汽车          1       0       0       0       1      84      86
   ---------------------------------------------------------------
   文本数       66     101     115      92     102      89     565
   ---------------------------------------------------------------
   召回率    96.97   96.04   98.26   97.83   92.16   94.38   95.94
   ---------------------------------------------------------------
   精准率    91.43   97.00   97.41   91.84   98.95   97.67   95.72
   ---------------------------------------------------------------
   F1评测    94.12   96.52   97.84   94.74   95.43   96.00   95.83
   ---------------------------------------------------------------

信息:    Train time(ms)    : 9
信息:    Classify time(ms) : 9
信息:    MacroRecall(%)    : 95.94
信息:    MacroPrecision(%) : 95.72
信息:    Macro F1(%)       : 95.83
信息:    Micro F1(%)       : 95.93
信息:    Current time(ms)  : 1541671120162
信息:   ==========================================================

6 部署配置

6.1 Docker镜像制作

  下面以segment微服务为例描述Docker镜像制作过程。
微服务配置文件config.properties内容:

#restful API config
listen.ip=0.0.0.0
listen.port=8081

#mirco server config
mircoServer.name=segment
jar.path=file:segment-1.0.jar
jar.actionClass=com.robin.segment.action.SegmentAction

#log config
log.path=log/
log.prefix=segment
# Level.ALL Level.FINEST Level.FINER Level.FINE Level.CONFIG 
# Level.INFO Level.WARNING Level.SEVERE Level.OFF
log.level=Level.INFO
log.file.limit=1048576
log.file.count=3

#robin segment dictinary config
dic.base=dic/RS/base.dic
dic.numeral=dic/RS/numeral.dic
dic.quantifier=dic/RS/quantifier.dic
dic.encoding=UTF-8

#paoding config
paoding.analysis.properties=lib/paoding/paoding-analysis.properties

Dockerfile文件:

From hub.c.163.com/public/ubuntu:14.04-common
MAINTAINER robin [email protected]

ADD jdk-8u162-linux-x64.tar.gz /usr/bin/
ENV JAVA_HOME /usr/bin/jdk1.8.0_162
ENV CLASSPATH ${JAVA_HOME}/lib:${JAVA_HOME}/jre/lib
ENV PATH ${JAVA_HOME}/bin:$PATH
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
RUN echo "Asia/Shanghai" > /etc/timezone && dpkg-reconfigure -f noninteractive tzdata

COPY JerseyServer /opt/JerseyServer/

镜像制作shell脚本:

#!/bin/bash

RUN_PATH=$(readlink -f .)

INSTALL_DIR=JerseyServer
CONFIG_FILE=$RUN_PATH/../config.properties
LIB_DIR=$RUN_PATH/../lib
START_DATA=$RUN_PATH/../dic
START_SCRIPT=startup.sh

JERSEY_SERVER_JAR_PATH=../../JerseyServer/target/JerseyServer-1.0.jar
MICRO_SERVICE_JAR_PATH=$RUN_PATH/../dist/segment-1.0.jar
IMAGE_TAR_NAME=ubuntu-segment-0.1.tar
IMAGE_TAG=docker.robin.com/robin/ubuntu-segment:0.1

echo "----------------Prepare project--------------------------------------"
rm -fr $INSTALL_DIR
mkdir -p $INSTALL_DIR
cp $JERSEY_SERVER_JAR_PATH $INSTALL_DIR
cp $CONFIG_FILE $MICRO_SERVICE_JAR_PATH $INSTALL_DIR
cp -r $LIB_DIR $INSTALL_DIR
cp -r $START_DATA $INSTALL_DIR
cp $START_SCRIPT $INSTALL_DIR

echo "----------------Clean images ----------------------------------------"
rm -fr $IMAGE_TAR_NAME
docker rmi $IMAGE_TAG

echo "----------------Build images ----------------------------------------"
docker build -t $IMAGE_TAG .

echo "----------------Save images: ${IMAGE_TAR_NAME}----------------------------"
docker save $IMAGE_TAG > $IMAGE_TAR_NAME

echo "----------------Copy images: ${IMAGE_TAR_NAME}----------------------------"
NODE2_ADDRESS=192.168.1.12
NODE3_ADDRESS=192.168.1.13
NODE4_ADDRESS=192.168.1.14

scp $IMAGE_TAR_NAME $NODE2_ADDRESS:/home/dockerImg
ssh [email protected]$NODE2_ADDRESS "docker rmi $IMAGE_TAG"
ssh [email protected]$NODE2_ADDRESS "cd /home/dockerImg; docker load --input $IMAGE_TAR_NAME"

scp $IMAGE_TAR_NAME $NODE3_ADDRESS:/home/dockerImg
ssh [email protected]$NODE3_ADDRESS "docker rmi $IMAGE_TAG"
ssh [email protected]$NODE3_ADDRESS "cd /home/dockerImg; docker load --input $IMAGE_TAR_NAME"

scp $IMAGE_TAR_NAME $NODE4_ADDRESS:/home/dockerImg
ssh [email protected]$NODE4_ADDRESS "docker rmi $IMAGE_TAG"
ssh [email protected]$NODE4_ADDRESS "cd /home/dockerImg; docker load --input $IMAGE_TAR_NAME"

本地镜像查看:

[[email protected] home]# docker images
REPOSITORY                                     TAG                 IMAGE ID            CREATED             SIZE
docker.robin.com/robin/ubuntu-pretreatment     0.1                 7da0704a794b        22 hours ago        761.3 MB
docker.robin.com/robin/ubuntu-classifier       0.1                 4b39c146e6c0        4 days ago          761.6 MB
docker.robin.com/robin/ubuntu-feature-select   0.1                 4d6ca3e5e6db        4 days ago          761.4 MB
docker.robin.com/robin/ubuntu-segment          0.1                 49285ef474f3        4 days ago          774.7 MB
gcr.io/google_containers/pause-amd64           3.0                 99e59f495ffa        2 years ago         746.9 kB
hub.c.163.com/public/ubuntu                    14.04-common        493d50b6de79        2 years ago         369.3 MB

Docker镜像私仓镜像查询:

[[email protected] home]# curl http://docker.robin.com/v2/_catalog
{"repositories":["robin/ubuntu-classifier","robin/ubuntu-feature-select","robin/ubuntu-pretreatment","robin/ubuntu-segment"]}

6.2 K8S RC创建

  下面以classifier微服务为例描述RC的创建。
classifier-rc.yaml配置:

apiVersion: v1
kind: ReplicationController
metadata:
  name: classifier-rc
spec:
  replicas: 4
  selector:
    app: ubuntu-classifier
  template:
    metadata:
      labels:
        app: ubuntu-classifier
    spec:
      containers:
      - name: ubuntu-classifier
        image: docker.robin.com/robin/ubuntu-classifier:0.1
        command: ["/bin/sh", "-c"]
        args: ["cd /opt/JerseyServer;./startup.sh"]
        resources:
          limits:
            cpu: 1500m
            memory: 1280Mi
          requests:
            cpu: 1000m
            memory: 1024Mi
      imagePullSecrets:
        - name: robin-registrykey

创建RC:

kubectl create -f classifier-rc.yaml

查询RC:

[[email protected] yaml]# kubectl get rc
NAME                DESIRED   CURRENT   READY     AGE
classifier-rc       4         4         4         4d
feature-select-rc   1         1         1         4d
pretreatment-rc     1         1         1         22h
segment-rc          1         1         1         4d

查询endpoints

[[email protected] yaml]# kubectl get endpoints
NAME                 ENDPOINTS                                               AGE
classifier-svc       10.0.1.2:8084,10.0.7.2:8084,10.0.8.2:8084 + 1 more...   4d
feature-select-svc   10.0.1.3:8083                                           4d
kubernetes           192.168.1.10:6443                                       172d
pretreatment-svc     10.0.7.4:8082                                           22h
segment-svc          10.0.7.3:8081                                           4d

6.3 K8S Service创建

Classifier Service配置:

apiVersion: v1
kind: Service
metadata:
  name: classifier-svc
spec:
  type: NodePort
  ports:
  - port: 8004
    targetPort: 8084
    nodePort: 30084
  selector:
    app: ubuntu-classifier

Service创建:

kubectl create -f classifier-svc.yam

Service查询:

[[email protected] yaml]# 
[[email protected] yaml]# kubectl get services
NAME                 CLUSTER-IP      EXTERNAL-IP   PORT(S)          AGE
classifier-svc       192.168.8.56    <nodes>       8004:30084/TCP   4d
feature-select-svc   192.168.8.169   <nodes>       8003:30083/TCP   4d
kubernetes           192.168.8.1     <none>        443/TCP          172d
pretreatment-svc     192.168.8.11    <nodes>       8002:30082/TCP   22h
segment-svc          192.168.8.29    <nodes>       8001:30081/TCP   4d

7 应用服务

  这里的应用是指机器学习任务的应用,主要涉及任务的调度、状态机、和微服务发现,微服务访问以及微服务资源的监控。

7.1 资源监控

7.1.1 资源监控类图

  资源监控类图如图8所示:

基于Kubernetes的机器学习微服务系统设计
图 8 资源监控类图

7.1.2 代码实现

服务控制中心类:

package com.robin.monitor;

import com.robin.config.ConfigUtil;
import com.robin.log.RobinLogger;
import com.robin.task.ClassifyTaskPool;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * <DT><B>描述:</B></DT>
 * <DD>服务控制中心</DD>
 *
 * @version Version1.0
 * @author Robin
 * @version <I> V1.0 Date:2018-08-08</I>
 * @author  <I> E-mail:[email protected]</I>
 */
public class MonitorCenter {

    private static final Logger LOGGER = RobinLogger.getLogger();

    private static final Map<String, ReplicationController> RC_MAP;
    private static final KubeApiServer KUBE_APISERVER;
    private static MonitorThread monitor = null;
    private static final ExecutorService TASK_THREAD_POOL;

    static {
        RC_MAP = new HashMap<>();
        String kubeApiAddress = ConfigUtil.getConfig("kubernetes.api.address");
        int kubeApiPort = Integer.valueOf(ConfigUtil.getConfig("kubernetes.api.port"));
        KUBE_APISERVER = new KubeApiServer(kubeApiAddress, kubeApiPort);
        int threadPoolSize = Integer.valueOf(ConfigUtil.getConfig("macro.request.thread.pool.size"));
        TASK_THREAD_POOL = Executors.newFixedThreadPool(threadPoolSize);
    }

    public static KubeApiServer getKUBE_APISERVER() {
        return KUBE_APISERVER;
    }

    public static ExecutorService getTASK_THREAD_POOL() {
        return TASK_THREAD_POOL;
    }

    public static void addRC(String rcName) {
        if (RC_MAP.containsKey(rcName)) {
            return;
        }

        ReplicationController rc = KUBE_APISERVER.getReplicationController(rcName);
        if (rc == null) {
            return;
        }
        rc.startMonitor();
        RC_MAP.put(rcName, rc);
    }

    public static void monitor() {
        if (monitor == null) {
            monitor = new MonitorThread();
            Thread t = new Thread(monitor);
            t.start();
        }
    }

    public static ReplicationController getRC(String rcName) {
        return RC_MAP.get(rcName);
    }

    public static void removeRC(String rcName) {
        RC_MAP.remove(rcName);
    }

    static class MonitorThread implements Runnable {

        private static final int INTERVAL_TIME = 1000 * Integer.valueOf(ConfigUtil.getConfig("kubernetes.rc.refresh.time"));

        @Override
        public void run() {
            LOGGER.log(Level.INFO, "++++ RCMonitor thread start. ++++");
            while (true) {

                //清理残留任务
                ClassifyTaskPool.clearTimeoutTask();
                
                if (!RC_MAP.isEmpty()) {
                    Set<String> rmSet = new HashSet<>();
                    Iterator<String> it = RC_MAP.keySet().iterator();
                    while (it.hasNext()) {
                        String rcName = it.next();
                        ReplicationController oldRc = RC_MAP.get(rcName);
                        ReplicationController newRc = KUBE_APISERVER.getReplicationController(rcName);
                        if (newRc == null) {
                            rmSet.add(rcName);
                            continue;
                        }
                        if (!oldRc.equals(newRc)) {
                            rmSet.add(rcName);
                        }
                    }

                    if (!rmSet.isEmpty()) {
                        it = rmSet.iterator();
                        while (it.hasNext()) {
                            String rcName = it.next();
                            ReplicationController oldRc = RC_MAP.remove(rcName);
                            oldRc.clean();
                        }
                    }
                }

                addRC("segment-rc");
                addRC("pretreatment-rc");
                addRC("feature-select-rc");
                addRC("classifier-rc");

                try {
                    Thread.sleep(INTERVAL_TIME);
                } catch (InterruptedException ex) {
                    LOGGER.log(Level.SEVERE, ex.getMessage());
                }
            }
        }
    }
}

7.2 分类任务应用

7.2.1 分类任务类图

  分类任务类图如图9所示:

基于Kubernetes的机器学习微服务系统设计
图 9 分类任务类图

7.2.2 数据结构

  分类任务类图如图10所示:

基于Kubernetes的机器学习微服务系统设计
图 10 分类任务数据结构

  顶层采用任务池管理分类任务,每个任务下面挂多个分类子任务,每个子任务下又有多个任务实例。

7.2.3 调度状态转移

  分类任务调度状态转移图如图11所示:

基于Kubernetes的机器学习微服务系统设计
图 11 分类任务调度状态转移图

  任务有创建、执行、等待、完成、超时、消亡六种状态,不同的条件进入不同的状态如图11所示。

7.3 应用WEB部署

  本部分部署在虚拟机的Glassfish server容器内,也可以部署在Docker Container的server的容器内。
WEB部署配置文件:

#web config
web.axis.title.font.name=Arial\u0020Italic\u0020Bold
web.axis.title.font.size=9
web.axis.title.font.color=000000

#docker config
docker.monitor.port=5257

#kubernetes config
kubernetes.rc.refresh.time=15
kubernetes.api.address=192.168.1.10
kubernetes.api.port=8080

#macro server ip type  Cluster/Host
macro.server.ip.type=Cluster
macro.request.thread.pool.size=10

#classify task config
classify.task.timeout=60

#log config
log.path=log/
log.prefix=robin-ml
# Level.ALL Level.FINEST Level.FINER Level.FINE Level.CONFIG 
# Level.INFO Level.WARNING Level.SEVERE Level.OFF
log.level=Level.INFO
log.file.limit=1048576
log.file.count=3

7.4 数据可视化

  应用访问界面如图12所示:

基于Kubernetes的机器学习微服务系统设计
图 12 应用UI图示

包括:

  • 微服务配置、分类任务配置;
  • 微服务资源监控,动态显示;
  • 数据集分析图、分类对比图;
  • 综合对比、参数调优等。

8 可视化演示

8.1 运行效果

  分类任务运行效果如图13所示,如未显示请点击图片显示

基于Kubernetes的机器学习微服务系统设计
图 13 分类任务运行动图(JE-MP)

  图13 选择JE分词和MP特征选择。图14 选择RS分词和CHI特征选择,效果如下,如未显示请点击图片显示

基于Kubernetes的机器学习微服务系统设计
图 14 分类任务运行动图(RS-MP)

8.2 可视化说明

  微服务可以支持的机器学习算法对比实验很丰富,我开发的可视化UI大概紧展示了20%微服务可以提供的功能。

9 总结

  本文详细地描述了基于Kubernetes的机器学习微服务系统的设计和实现过程。欢迎感兴趣的朋友一起探讨学习。