博客
关于我
源码分析Dubbo监控中心实现原理
阅读量:93 次
发布时间:2019-02-25

本文共 19506 字,大约阅读时间需要 65 分钟。

   Dubbo监控的实现基本原理就是在服务调用时收集服务调用并发度、服务响应时间,然后以一定频率向监控中心汇报统计数据。

   1、源码分析MonitorFilter过滤器

  • 过滤器作用
       监控过滤器,向监控中心汇报服务调用数据。
  • 使用场景
       搭建监控中心监控Dubbo服务调用。
  • 阻断条件
       非阻断过滤器。

   1.1 MonitorFilter声明

/** * MonitorFilter. (SPI, Singleton, ThreadSafe) */@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})public class MonitorFilter implements Filter {     // 省略具体代码}

   注:MonitorFilter会在生产者、消费者两端生效。

   1.2 getConcurrent方法详解

// concurrent counter    private AtomicInteger getConcurrent(Invoker
invoker, Invocation invocation) { String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); // @1 AtomicInteger concurrent = concurrents.get(key); if (concurrent == null) { concurrents.putIfAbsent(key, new AtomicInteger()); // @2 concurrent = concurrents.get(key); } return concurrent; }

   主要是获取当前调用服务的调用次数计算器。

   代码@1:使用的是ConcurrentMap< String, AtomicInteger >作为缓存容器,其key为:interfaceName + “.” + methodName。
   代码@2:如果是第一次调用,则创建AtomicInteger,否则返回原先的计数器。
   1.3 invoker方法详解

@Override    public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException { if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { // @1 RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called // @2 String remoteHost = context.getRemoteHost(); long start = System.currentTimeMillis(); // record start timestamp getConcurrent(invoker, invocation).incrementAndGet(); // count up // @3 try { Result result = invoker.invoke(invocation); // proceed invocation chain // @4 collect(invoker, invocation, result, remoteHost, start, false); // @5 return result; } catch (RpcException e) { collect(invoker, invocation, null, remoteHost, start, true); // @6 throw e; } finally { getConcurrent(invoker, invocation).decrementAndGet(); // count down // @7 } } else { return invoker.invoke(invocation); } }

   代码@1:如果url中存在monitor,则设置了监控中心,收集调用信息。

   代码@2:获取本次服务调用的上下文环境。
   代码@3:服务调用并发次数增加1,(非服务调用总次数,而是当前服务的并发调用)。
   代码@4:执行方法之前先记录当前时间,然后调用下一个过滤器,直到真实服务被调用。
   代码@5:调用collect方法收集调用信息。
   代码@6:如果调用发送RPC异常,则收集错误信息。
   代码@7:一次服务调用结束,并发次数减一。
   接下来分析一下collect方法。
   1.4 invoker方法详解

// collect info    private void collect(Invoker
invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { // @1 try { // ---- service statistics ---- // @2 start long elapsed = System.currentTimeMillis() - start; // invocation cost int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY); String service = invoker.getInterface().getName(); // service name String method = RpcUtils.getMethodName(invocation); // method name String group = invoker.getUrl().getParameter(Constants.GROUP_KEY); String version = invoker.getUrl().getParameter(Constants.VERSION_KEY); URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY); // @2 end Monitor monitor = monitorFactory.getMonitor(url); // @3 if (monitor == null) { return; } int localPort; String remoteKey; String remoteValue; if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) { // @4 // ---- for service consumer ---- localPort = 0; remoteKey = MonitorService.PROVIDER; remoteValue = invoker.getUrl().getAddress(); } else { // @5 // ---- for service provider ---- localPort = invoker.getUrl().getPort(); remoteKey = MonitorService.CONSUMER; remoteValue = remoteHost; } String input = "", output = ""; if (invocation.getAttachment(Constants.INPUT_KEY) != null) { // @6 input = invocation.getAttachment(Constants.INPUT_KEY); } if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) { // @7 output = result.getAttachment(Constants.OUTPUT_KEY); } monitor.collect(new URL(Constants.COUNT_PROTOCOL, // @8 NetUtils.getLocalHost(), localPort, service + "/" + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), Constants.INPUT_KEY, input, Constants.OUTPUT_KEY, output, Constants.GROUP_KEY, group, Constants.VERSION_KEY, version)); } catch (Throwable t) { logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t); } }

   代码@1:参数说明。

      Invoker< ? > invoker :服务调用Invoker。
      Invocation invocation :本次服务调用信息
      Result result :执行结果
      String remoteHost :调用者host信息。
      long start :服务开始调用时间。
      boolean error :是否发生错误。
   代码@2:统计基础信息字段说明:
      elapsed :服务调用时长。
      concurrent :当前并发度。(当前服务并发调用次数)。
      application :服务归属应用名。
      service :服务名。
      method :方法名。
      group :服务所属组。
      version :服务版本号
      URL url:监控中心url。
   代码@3:根据监控中心获取监控中心实现类,这是监控中心实现扩展点,默认使用com.alibaba.dubbo.monitor.dubbo.DubboMonitor。
   代码@4:如果是消费端,由于Monitor在消费端与服务端都会生效:
      localPort :本地端口设置为0;
      remoteKey:MonitorService.PROVIDER,表示为服务端。
      remoteValue:为invoker.getUrl().getAddress(),其值为(注册中心地址)或服务提供者地址(客户端直连服务端)。
   代码@5:如果为服务端:
      localPort :为服务端的服务端口号。
      remoteKey:MonitorService.CONSUMER,表示远端为服务消费者。
      remoteValue:消费者host(ip:port)。
   代码@6:获取本次服务调用请求包的字节数,在服务端解码时会在RpcContext中。
   代码@7:获取本次服务调用响应包的字节数,在服务端对响应包编码时会写入,具体代码请参考DubboCountCodec类。
   代码@8:调用monitor#collect收集调用信息,Monitor默认实现为DubboMonitor。使用的协议为count://localhost:localPort/service/method?application=applicationName&remoteKey=remoteValue&success|failure=1&elapsed=调用开销&concurrent=并发调用次数&input=入参字节数&output=响应字节数&group=服务所属组&version=版本。
   2、源码分析DubboMonitor实现原理
   Dubbo中默认的Monitor监控实现类为DubboMonitor:
这里写图片描述
   核心属性介绍:

  • private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, new NamedThreadFactory(“DubboMonitorSendTimer”,

    true)):定时调度线程池,使用3个线程的线程池,线程名称以DubboMonitorSendTimer。

  • private final ScheduledFuture< ? > sendFuture:调度任务future。

          private final Invoker< MonitorService > monitorInvoker:监控调度Invoker,Dubbo中的监控中心会作为服务提供者暴露服务,服务提供者,服务消费者可以通过注册中心订阅服务,通过该Invoker向监控中心汇报调用统计数据,也就是一次上报就是一次Dubbo RPC服务调用,其实现类为DubboInvoker,也就是可以通过该Invoker使用dubbo协议调用远程Monitor服务。

  • private final MonitorService monitorService:对monitorInvoker的proxy代理,主要是对toString、hashcode、equals无需通过RPC向MonitorServer服务提供者发起调

    用。主要是通过AbstractProxyFactory#getProxy创建,默认子类为JavassistProxyFactory,动态代理的InvokerHandler为:
    com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler#invoke。

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {        String methodName = method.getName();        Class
[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); }
  • private final long monitorInterval:向监控中心汇报的频率,也就是调用MonitorService RPC服务的调用频率,默认为1分钟。
  • private final ConcurrentMap< Statistics, AtomicReference< long[]>> statisticsMap:统计信息Map。

   2.1 构造函数分析

public DubboMonitor(Invoker
monitorInvoker, MonitorService monitorService) { this.monitorInvoker = monitorInvoker; this.monitorService = monitorService; this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000); // @1 // collect timer for collecting statistics data sendFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { // @2 @Override public void run() { // collect data try { send(); } catch (Throwable t) { logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t); } } }, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS); }

   代码@1,从url参数中获取interval属性,如果为空,默认为60000,代表60S。

   代码@2:启动定时调度任务,默认60S的间隔执行send()方法,向监控中心汇报服务调用统计数据。
   2.2 collect 收集统计信息方法

public void collect(URL url) {        // data to collect from url        int success = url.getParameter(MonitorService.SUCCESS, 0);        int failure = url.getParameter(MonitorService.FAILURE, 0);        int input = url.getParameter(MonitorService.INPUT, 0);        int output = url.getParameter(MonitorService.OUTPUT, 0);        int elapsed = url.getParameter(MonitorService.ELAPSED, 0);        int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);        // init atomic reference        Statistics statistics = new Statistics(url);        AtomicReference
reference = statisticsMap.get(statistics); if (reference == null) { statisticsMap.putIfAbsent(statistics, new AtomicReference
()); reference = statisticsMap.get(statistics); } // use CompareAndSet to sum long[] current; long[] update = new long[LENGTH]; do { current = reference.get(); if (current == null) { update[0] = success; update[1] = failure; update[2] = input; update[3] = output; update[4] = elapsed; update[5] = concurrent; update[6] = input; update[7] = output; update[8] = elapsed; update[9] = concurrent; } else { update[0] = current[0] + success; update[1] = current[1] + failure; update[2] = current[2] + input; update[3] = current[3] + output; update[4] = current[4] + elapsed; update[5] = (current[5] + concurrent) / 2; update[6] = current[6] > input ? current[6] : input; update[7] = current[7] > output ? current[7] : output; update[8] = current[8] > elapsed ? current[8] : elapsed; update[9] = current[9] > concurrent ? current[9] : concurrent; } } while (!reference.compareAndSet(current, update)); }

   收集的信息主要是10个字段

      update[0] :调用成功的次数
      update[1] :调用失败的次数
      update[2] :总调用流量(请求包的总大小)。
      update[3] :总响应流量(响应包的总大小)。
      update[4] :总响应时长(总服务调用开销)。
      update[5] :一次收集周期的平均TPS。
      update[6] :最大请求包大小。
      update[7] :最大响应包大小。
      update[8] :最大响应时间。
      update[9] :最大TPS。

   2.3 send方法

   通过monitorService,最终通过monitorInvoker去调用RPC服务向监控中心汇报数据。接下来看一下监控中心的具体实现。

   3、Dubbo监控中心实现原理

   Dubbo官方提供了简易版本的监控中心,其项目为dubbo-ops:dubbo-monitor-simple。该项目是个spring-boot项目,启动后可以看到后台管理界面。
该项目服务提供者文件如下:
这里写图片描述
   从中可以看出,监控中心服务提供者实现类为SimpleMonitorService,其实现接口为MonitorService。
   接下来重点分析SimpleMonitorService监控中心的实现,关注如下两个点:
   1、监控数据持久化。
   2、监控报表生成逻辑。
   核心属性说明:

  • ScheduledExecutorService scheduledExecutorService:定时调度线程,将监控数据写入饼图的定时任务,固定1个线程。
  • Thread writeThread:监控数据持久化线程。
  • BlockingQueue< URL > queue:持久化数据任务阻塞队列。
  • String statisticsDirectory = “statistics”:数据持久化目录,SimpleMonitorService将数据持久化到磁盘文件。该值指定目录名称。
  • String chartsDirectory = “charts”:饼图存储目录。
  • private volatile boolean running = true:持久化数据线程是否处于运行状态。

    3.1 SimpleMonitorService构造函数

public SimpleMonitorService() {        queue = new LinkedBlockingQueue
(Integer.parseInt(ConfigUtils.getProperty("dubbo.monitor.queue", "100000"))); // @1 writeThread = new Thread(new Runnable() { // @2 start public void run() { while (running) { try { write(); // write statistics } catch (Throwable t) { logger.error("Unexpected error occur at write stat log, cause: " + t.getMessage(), t); try { Thread.sleep(5000); // retry after 5 secs } catch (Throwable t2) { } } } } }); writeThread.setDaemon(true); writeThread.setName("DubboMonitorAsyncWriteLogThread"); writeThread.start(); // @2 end chartFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { public void run() { try { draw(); // draw chart } catch (Throwable t) { logger.error("Unexpected error occur at draw stat chart, cause: " + t.getMessage(), t); } } }, 1, 300, TimeUnit.SECONDS); // @3 statisticsDirectory = ConfigUtils.getProperty("dubbo.statistics.directory"); chartsDirectory = ConfigUtils.getProperty("dubbo.charts.directory"); // @4 }

   代码@1:创建有界阻塞队列LinkedBlockingQueue,容量默认为100000个,可通过配置参数dubbo.monitor.queue改变默认值,如果队列中已挤压未被处理,后续监控数据将被默认丢弃。

   代码@2:创建持久化监控数据线程,名称为DubboMonitorAsyncWriteLogThread,其使命是从LinkedBlockingQueue中获取监控原始数据,如果队列中没数据则被阻塞,然后写入文件中。
   代码@3:开启定时调度任务,已每个5分钟的频率,根据持久化的监控数据,生成饼图。
   代码@4:获取数据持久化目录与饼图存放目录。
   3.2 SimpleMonitorService#write

private void write() throws Exception {        URL statistics = queue.take();        if (POISON_PROTOCOL.equals(statistics.getProtocol())) {            return;        }        String timestamp = statistics.getParameter(Constants.TIMESTAMP_KEY);        Date now;        if (timestamp == null || timestamp.length() == 0) {            now = new Date();        } else if (timestamp.length() == "yyyyMMddHHmmss".length()) {            now = new SimpleDateFormat("yyyyMMddHHmmss").parse(timestamp);        } else {            now = new Date(Long.parseLong(timestamp));        }        String day = new SimpleDateFormat("yyyyMMdd").format(now);        SimpleDateFormat format = new SimpleDateFormat("HHmm");        for (String key : types) {            try {                String type;                String consumer;                String provider;                if (statistics.hasParameter(PROVIDER)) {                    type = CONSUMER;                    consumer = statistics.getHost();                    provider = statistics.getParameter(PROVIDER);                    int i = provider.indexOf(':');                    if (i > 0) {                        provider = provider.substring(0, i);                    }                } else {                    type = PROVIDER;                    consumer = statistics.getParameter(CONSUMER);                    int i = consumer == null ? -1 : consumer.indexOf(':');                    if (i > 0) {                        consumer = consumer.substring(0, i);                    }                    provider = statistics.getHost();                }                String filename = statisticsDirectory                        + "/" + day                        + "/" + statistics.getServiceInterface()                        + "/" + statistics.getParameter(METHOD)                        + "/" + consumer                        + "/" + provider                        + "/" + type + "." + key;                File file = new File(filename);                File dir = file.getParentFile();                if (dir != null && !dir.exists()) {                    dir.mkdirs();                }                FileWriter writer = new FileWriter(file, true);                try {                    writer.write(format.format(now) + " " + statistics.getParameter(key, 0) + "\n");                    writer.flush();                } finally {                    writer.close();                }            } catch (Throwable t) {                logger.error(t.getMessage(), t);            }        }    }

   数据存储在物理磁盘上,其文件为为:" d u b b o . s t a t i s t i c s . d i r e c t o r y / {dubbo.statistics.directory} / dubbo.statistics.directory/{day}/ i n t e r f a c e n a m e / {interfacename}/ interfacename/{method}/${consumer}/ ${provider}/[consume|provider]/key",

key:{SUCCESS, FAILURE, ELAPSED, CONCURRENT, MAX_ELAPSED, MAX_CONCURRENT},分别调用成功次数、调用失败次数、调用开销(响应时间),TPS、最大响应时间,最大TPS。其文件存储如下:
这里写图片描述
   以provider.concurrent为例,说明一下其内容:
这里写图片描述
   其内容组织方式为:时间(时分:采集的值)。
   3.3 draw
   根据持久化的数据,在特定的目录下创建饼图,创建饼图方法createChart,具体使用JFreeChart相关类图,在这里就不细细讲解了,感兴趣的朋友可以百度查询相关用法。
   3.4 监控中心使用效果一览
   3.4.1 应用一览表
   这个功能可以描述系统与系统的关联关系。
这里写图片描述
   表格字段说明:
   1、Application Name:应用名称
   2、Providers:该应用包含的服务提供者信息,点击进去可以查看具体的服务提供者URL。
   3、Consumers(1):该应用包含的服务消费者信息,点击进去可以查看具体的服务消费者URL。
   4、Depends On:该应用依懒的应用。
这里写图片描述
   5、Used By:该应用被依懒的应用。
这里写图片描述
   3.4.2服务一览表
这里写图片描述
   表格字段说明:
      Service Name:服务名。
      Application:服务所属应用名。
      Providers:服务提供者信息,点击进去,可以看到详细的服务提供者信息。
这里写图片描述
Consumers:该服务的消费者信息。
这里写图片描述
      Statistics:表格统计信息
这里写图片描述
      Charts:饼图统计信息
这里写图片描述
      饼图统计信息,主要从两个维度展示:QPS(接口每秒请求数)、平均响应时间(包含最大,最小响应时间)。

      3.4.3、 Dubbo简易监控中心使用方法

   1、安装Dubbo简易监控中心
      从github dubbo仓库中下载dubbo-simple-monitor即可。
   2、应用程序如何使用Dubbo监控中心
      成功安装完监控中心还只是第一步,为了监控中心能收集服务调用信息,需要在Dubbo服务提、Dubbo消费者供者所在的应用的dubbo配置文件中加上如下内容:
   < dubbo:monitor protocol=“registry” />,表示从注册中心发现监控中心的地址,并将服务调用信息提交到监控中心。
   服务提供者默认以一分钟的频率(可配置)调用监控中心的dubbo服务,向监控中心上报服务调用信息。监控中心宕机,并不影响消费者,服务提供者的正常工作。
   如果要配置其调用频率,可通过如下配置,默认建议保持一分钟的频率,甚至更小,这个频率设置低点,对整个服务器的压力不会增加更大

< dubbo:monitor protocol="registry">	

   注:Dubbo监控中心,服务提供者、服务消费者都可以单独按需配置。


欢迎加笔者微信号(dingwpmz),加群探讨,笔者优质专栏目录:

1、
2、
3、
4、
5、
6、
7、
8、
9、

你可能感兴趣的文章
MySQL 常见的开放性问题
查看>>
Mysql 常见错误
查看>>
mysql 常见问题
查看>>
MYSQL 幻读(Phantom Problem)不可重复读
查看>>
mysql 往字段后面加字符串
查看>>
mysql 快照读 幻读_innodb当前读 与 快照读 and rr级别是否真正避免了幻读
查看>>
MySQL 快速创建千万级测试数据
查看>>
mysql 快速自增假数据, 新增假数据,mysql自增假数据
查看>>
MySql 手动执行主从备份
查看>>
Mysql 批量修改四种方式效率对比(一)
查看>>
mysql 批量插入
查看>>
Mysql 报错 Field 'id' doesn't have a default value
查看>>
MySQL 报错:Duplicate entry 'xxx' for key 'UNIQ_XXXX'
查看>>
Mysql 拼接多个字段作为查询条件查询方法
查看>>
mysql 排序id_mysql如何按特定id排序
查看>>
Mysql 提示:Communication link failure
查看>>
mysql 插入是否成功_PDO mysql:如何知道插入是否成功
查看>>
Mysql 数据库InnoDB存储引擎中主要组件的刷新清理条件:脏页、RedoLog重做日志、Insert Buffer或ChangeBuffer、Undo Log
查看>>
mysql 数据库中 count(*),count(1),count(列名)区别和效率问题
查看>>
mysql 数据库备份及ibdata1的瘦身
查看>>