本文禁止转载

1. 用gdb跟踪zabbix server对item data的处理过程

1.1 gdb了解zabbix进程架构

后台执行 : ps aux | grep zabbix 得到zabbix进程信息;

进程的命令行,都被子进程修改了,gdb attach 3747 ,跟进configuration 进程,得到堆栈如下:

configuration 进程最终是由 zbx_thread_start 函数创建的(不过zbx_thread_start 函数里并没有看到创建线程的代码,函数整个逻辑只fork了一个子进程!)。

ZBX_THREAD_HANDLE    zbx_thread_start(ZBX_THREAD_ENTRY_POINTER(handler), zbx_thread_args_t *thread_args)
{
 ……
 if (0 == (thread = zbx_child_fork()))    /* child process */ // 这里创建完子进程就开始走回调逻辑了,根本就没有创建线程
 {
  (*handler)(thread_args);
  zbx_thread_exit(EXIT_SUCCESS);
 }
 else if (-1 == thread)
 {
  zbx_error("failed to fork: %s", zbx_strerror(errno));
  thread = (ZBX_THREAD_HANDLE)ZBX_THREAD_ERROR;
 }
 return thread;
}

fork完子进程,子进程开始执行 dbconfig_thread 函数,dbconfig_thread 定义如下,zabbix就是通过调用 zbx_setproctitle 修改子进程命令行名的。

主逻辑是一个for 死循环,每隔 CONFIG_CONFSYNCER_FREQUENCY = 60S 循环一次,主要是从数据库里读取参数刷新zabbix进程的全局缓存(缓存为所有进程共享)。

ZBX_THREAD_ENTRY(dbconfig_thread, args){
……
 zbx_setproctitle("%s [waiting %d sec for processes]", get_process_type_string(process_type),
   CONFIG_CONFSYNCER_FREQUENCY);
for (;;)
 { 
  DCsync_configuration(ZBX_DBSYNC_UPDATE); 
  DCupdate_hosts_availability();
  dc_flush_history();    /* misconfigured items generate pseudo-historic values to become notsupported */
  zbx_sleep_loop(CONFIG_CONFSYNCER_FREQUENCY);
}
}

 #define ZBX_THREAD_ENTRY(entry_name, arg_name)    \
  unsigned entry_name(void *arg_name)

1.2 调试总结

zabbix主进程fork完所有的子进程后就什么都不做了,静静地等待子进程结束,子进程的功能与进程命令行名相对应,要跟踪哪个模块的,直接ps 查看进程命令行名, gdb attach 即可。

zabbix的调试信息由 zabbix_log 接口打印,zabbix_log 是一个宏,展开如下,最终调用的是 __zbx_zabbix_log 。

#ifdef HAVE___VA_ARGS__
#    define zabbix_log(level, fmt, ...) __zbx_zabbix_log(level, ZBX_CONST_STRING(fmt), #__VA_ARGS__)
#else
#    define zabbix_log __zbx_zabbix_log
#endif

__zbx_zabbix_log 设置了调试级别,通过 宏 ZBX_CHECK_LOG_LEVEL() 限制,ZBX_CHECK_LOG_LEVEL() 实现如下, 大于 log_level(默认是:LOG_LEVEL_WARNING) 级别的日志全部不予打印,修改 log_level=LOG_LEVEL_TRACE 可看到绝大部分调试信息,如果不够,可以直接 修改 log_level = 6 。

#define ZBX_CHECK_LOG_LEVEL(level)    \
  ((LOG_LEVEL_INFORMATION != level && (level > log_level || LOG_LEVEL_EMPTY == level)) ? FAIL : SUCCEED)

有一类函数定义相当奇怪,就是 zbx_vector_ptr 和 zbx_vector_str 开头的一系列函数,直接搜代码是搜不到定义的地方的,这类函数在 vector.c 中由宏拼接定义。

(gdb) bt
#0  zbx_vector_ptr_create (vector=0x7ffff19f5350) at vector.c:28
#1  0x0000000000462e87 in DCupdate_hosts_availability () at dbcache.c:3659
#2  0x000000000042305b in dbconfig_thread (args=<value optimized out>) at dbconfig.c:94
#3  0x00000000004940f2 in zbx_thread_start (handler=0x422f50 <dbconfig_thread>, thread_args=0x7ffff19f6170) at threads.c:128
#4  0x000000000041d5ab in MAIN_ZABBIX_ENTRY (flags=<value optimized out>) at server.c:1055
#5  0x000000000048a57a in daemon_start (allow_root=<value optimized out>, user=0x4f99a3 "zabbix", flags=0) at daemon.c:392
#6  0x000000000041dd2b in main (argc=<value optimized out>, argv=<value optimized out>) at server.c:832

2. 重点代码 dbconfig / dbsyncer

2.1 dbconfig 代码解读,以 hosts为例说明数据库和缓存同步的过程

整个数据库加载逻辑都在 DCsync_configuration () 函数中进行,提取hosts表操作的全部逻辑,hosts更新一共分为三步:

  1. 加载数据库表中的配置,这是当前设备最新的原始数据;
  2. 比较缓存和数据库中的配置差异,并提取出差异部分保存在一个 zbx_dbsync_t 结构体里,1,2步都在 zbx_dbsync_compare_hosts() 函数中完成;
  3. 更新缓存, 这一步由 DCsync_hosts() 函数完成;
void    DCsync_configuration(unsigned char mode)
{
    zbx_dbsync_t hosts_sync;       // hosts中间缓存
    zbx_dbsync_init(&hosts_sync, mode);    // 初始化中间缓存
    zbx_dbsync_compare_hosts(&hosts_sync);    // 比较数据库和zabbix缓存的差异,将差异结果保存在 hosts_sync 中
    DCsync_hosts(&hosts_sync);    // 同步到zabbix全局缓存,通过直接操作 config 变量完成
    zbx_dbsync_clear(&hosts_sync);    // 释放空间
}

结构体 zbx_dbsync 是一个中间缓存结构,用来保存数据库和缓存中有差异的数据,包括三个部分:新增、更新、删除;定义如下,下面说明几个重要字段的意义。

typedef struct zbx_dbsync
{
 unsigned char    mode;     /* 当前的操作模式,只有两种状态  初始化/更新 */
 int    columns_num;
 int    row_index;
 zbx_vector_ptr_t    rows;    // 数据库和缓存中,有差异的项目
 DB_RESULT    dbresult;
 zbx_dbsync_preproc_row_func_t    preproc_row_func;
 char    **row;
 zbx_vector_ptr_t    columns;
 zbx_uint64_t    add_num;             /* 新增、更新、删除 的数量 */
 zbx_uint64_t    update_num;
 zbx_uint64_t    remove_num;
}
zbx_dbsync_t;

全局变量 config 是一段预分配的共享内存,在 init_configuration_cache 中初始化的,如下:

int    init_configuration_cache(char **error)
{
     if (SUCCEED != (ret = zbx_mem_create(&config_mem, config_size, "configuration cache", "CacheSize", 0, error))) 
          goto out;
     config = __config_mem_malloc_func(NULL, sizeof(ZBX_DC_CONFIG) + 
   CONFIG_TIMER_FORKS * sizeof(zbx_vector_ptr_t));
}

zbx_mem_create 函数创建了 名为 " configuration cache " 的共享内存,并将内存映射到本进程虚拟地址空间,__config_mem_*_func 系列接口封装了这段共享内存操作的细节,config的内存就是在这里面创建的。DCsync_hosts() 函数直接对config变量做的操作,实际上就是在操作共享内存。

其他表的操作与hosts表操作类似。

3. 熟悉action / escalator / alerter对事件的处理

直接在第四节分析

4. item data的处理过程

4.1 item data 处理整过程概览

item data 在内存中的处理会经历以下过程:

  1. pooler 进程收集原始数据,把数据包装好,发送 ZBX_IPC_PREPROCESSOR_REQUEST 请求给 preprocessing manager进程 ;
  2. preprocessing manager进程做预处理,寻找一个空闲的 preprocessing worker 进程,发送ZBX_IPC_PREPROCESSOR_REQUEST 请求给preprocessing worker进程;
  3. preprocessing worker进程处理item data的数据类型,做简单的格式转换,将结果发送给 preprocessing manager进程 请求是: ZBX_IPC_PREPROCESSOR_RESULT ;
  4. preprocessing manager进程先将 item data 结果保存在一个 item_values 临时缓冲区中,item_values 缓冲区满了之后,再将item_values clone到 catch->history_items 中;(以上完成了数据在内存中的处理,后面将把结果保存在数据库中。)

4.2 item data数据的来源

poller_thread 子进程 每隔5s 会收集一次 item data,item 被动模式的数据都是通过 get_value_agent () 函数接收的,核心代码如下:

int    get_value_agent(DC_ITEM *item, AGENT_RESULT *result)
{
    ……
    if (SUCCEED == (ret = zbx_tcp_connect(&s, CONFIG_SOURCE_IP, item->interface.addr, item->interface.port, 0,
    item->host.tls_connect, tls_arg1, tls_arg2)))
    {
     buffer = zbx_dsprintf(buffer, "%s\n", item->key);
     if (SUCCEED != zbx_tcp_send_raw(&s, buffer))
      ret = NETWORK_ERROR;
     else if (FAIL != (received_len = zbx_tcp_recv_ext(&s, ZBX_TCP_READ_UNTIL_CLOSE, 0)))
      ret = SUCCEED;
    }
    ……
    set_result_type(result, ITEM_VALUE_TYPE_TEXT, s.buffer);
    ……
}

通过 zbx_tcp_send_raw() 函数发送命令到agent端, zbx_tcp_recv 或 zbx_tcp_recv_ext 函数接收agent端返回的结果,最终结果保存在 result 变量中。堆栈如下:

(gdb) bt
#0  get_value_agent (item=0x7ffff18798f0, result=0x7ffff19f28f0) at checks_agent.c:59
#1  0x000000000042b93e in get_value (poller_type=0 '\000', nextcheck=0x7ffff19f53bc) at poller.c:353
#2  get_values (poller_type=0 '\000', nextcheck=0x7ffff19f53bc) at poller.c:593
#3  0x000000000042ba36 in poller_thread (args=<value optimized out>) at poller.c:773
#4  0x00000000004940f2 in zbx_thread_start (handler=0x42b950 <poller_thread>, thread_args=0x7ffff19f6170) at threads.c:128
#5  0x000000000041d21b in MAIN_ZABBIX_ENTRY (flags=<value optimized out>) at server.c:1065
#6  0x000000000048a57a in daemon_start (allow_root=<value optimized out>, user=0x4f99a3 "zabbix", flags=0) at daemon.c:392
#7  0x000000000041dd2b in main (argc=<value optimized out>, argv=<value optimized out>) at server.c:832

4.3 item data数据的处理

  • 先由 zbx_preprocess_item_value 做预处理,主要是做一些数据压缩的操作;
void    zbx_preprocess_item_value(zbx_uint64_t itemid, unsigned char item_value_type, unsigned char item_flags,
  AGENT_RESULT *result, zbx_timespec_t *ts, unsigned char state, char *error)
{
……
 value.itemid = itemid;
 value.item_value_type = item_value_type;
 value.result = result;
 value.error = error;
 value.item_flags = item_flags;
 value.state = state;
 value.ts = ts;
 preprocessor_pack_value(&cached_message, &value);    // 这里面会压缩收到的原始数据
 cached_values++;
 if (MAX_VALUES_LOCAL < cached_values)
  zbx_preprocessor_flush();
}
  • 接下来,在 zbx_preprocessor_flush 函数中, 会把数据发送给 preprocessing manager 进程处理,命令是 ZBX_IPC_PREPROCESSOR_REQUEST (这个在preprocessing manager 进程中会用到),get_values 处理数据后,会将 result 的空间释放,即后面数据的处理就全部由 preprocessing manager 进程来完成了;
# 找到preprocessing manager 进程的方法
[root@localhost ~/zabbix-3.4.7]#lsof /tmp/zabbix_server_preprocessing.sock
# 或者 
[root@localhost ~/zabbix-3.4.7]#ps auxf | grep "preprocessing manager"
  • preprocessing manager 进程即代码里的 preprocessing_manager_thread进程,该进程的主逻辑如下,item data的原始数据是 ZBX_IPC_PREPROCESSOR_REQUEST(请求处理) 命令,数据由 preprocessor_add_request() 函数处理,item data会被添加到一个队列中,后面由 preprocessor_assign_tasks 函数统一做任务分发,数据会再一次转入 work进程;
ZBX_THREAD_ENTRY(preprocessing_manager_thread, args)
{
……
switch (message->code)
   {
    case ZBX_IPC_PREPROCESSOR_WORKER:
    case ZBX_IPC_PREPROCESSOR_REQUEST: 
     preprocessor_add_request(&manager, message);         // poller work 接收的 item数据在这里处理
     break;
    case ZBX_IPC_PREPROCESSOR_RESULT:
    case ZBX_IPC_PREPROCESSOR_QUEUE: 
     }
}
static void    preprocessor_add_request(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
{
 preprocessor_sync_configuration(manager);
 while (offset < message->size)
 {
  offset += zbx_preprocessor_unpack_value(&value, message->data + offset);
  preprocessor_enqueue(manager, &value, NULL);     // 数据入队
 }
 preprocessor_assign_tasks(manager);     // 任务分发
 preprocessing_flush_queue(manager);
 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
}
  • worker 进程只是做一些结构体填充工作,后面又会把结果发回给 manager进程,对应的请求是: ZBX_IPC_PREPROCESSOR_RESULT(结果处理),manager开始做真正的数据存储,主要顺序是,先将结果保存在 item_values 结构体中,如果item_values满了,就把结果“剪切”到 catch->history_items 中;
/* 调用堆栈如下
case ZBX_IPC_PREPROCESSOR_RESULT:
     preprocessor_add_result
           preprocessing_flush_queue
            preprocessor_flush_value
                    dc_add_history
                        dc_local_add_history_* (如: dc_local_add_history_log)
                            dc_local_get_history_slot
                                    dc_flush_history
*/
static dc_item_value_t    *dc_local_get_history_slot(void)
{
 if (ZBX_MAX_VALUES_LOCAL == item_values_num)
  dc_flush_history();
 if (item_values_alloc == item_values_num)
 {
  item_values_alloc += ZBX_STRUCT_REALLOC_STEP;
  item_values = zbx_realloc(item_values, item_values_alloc * sizeof(dc_item_value_t));
 }
 return &item_values[item_values_num++];
}
/*dc_flush_history 调用堆栈如下
dc_flush_history
    hc_add_item_values
        hc_add_item
*/
static zbx_hc_item_t    *hc_add_item(zbx_uint64_t itemid, zbx_hc_data_t *data)
{
 zbx_hc_item_t    item_local = {itemid, ZBX_HC_ITEM_STATUS_NORMAL, data, data};
 return (zbx_hc_item_t *)zbx_hashset_insert(&cache->history_items, &item_local, sizeof(item_local));
}

4.4 数据库处理逻辑 dbsyncer_thread 进程

item data 保存在 cache 中之后,dbsyncer_thread会定时处理,dbsyncer_thread进程每隔5s 会检查一次 cache缓存,主要工作由DCsync_history() 函数完成。DCsync_history()函数主要的任务就是:执行与item相关联的trigger,生成event保存到数据库中;同时对 item、trigger、trends(趋势图)做状态更新,保存到数据库。到这一步item data彻底处理完了。

DCsync_history()函数核心逻辑如下:

int    DCsync_history(int sync_type, int *total_num)
{
   DCmass_update_items(history, history_num, items, errcodes);    // 更新item状态到数据库
   DCmass_update_triggers(history, history_num, &trigger_diff);     // 更新与item相关的trigger状态到数据库,并返回trigger的函数
   DCmass_update_trends(history, history_num);      // 更新trends 把item data添加到趋势图中去
   if (0 != zbx_process_events(&trigger_diff, &triggerids))     // 核心: 执行trigger的函数
   {
    DCconfig_triggers_apply_changes(&trigger_diff);   // 如果trigger 表达式被触发了,要做相应的更新
    zbx_save_trigger_changes(&trigger_diff);
   }
}

trigger的表达式(如: {Windows host:system.cpu.load.last()}>50 中的 last() )最终由evaluate_function() 函数执行,evaluate_function 函数调用堆栈如下:

(gdb) bt
#0 evaluate_function (value=0x7ffff19e4540 "99.779489", item=0x168dff0, function=0x167d740 "avg", parameter=0x167d760 "10m", now=1521775653, error=0x7ffff19f4588) at evalfunc.c:2399
#1 0x0000000000457f26 in zbx_evaluate_item_functions (funcs=<value optimized out>, unknown_msgs=0x7ffff19f4f70) at expression.c:4526
#2 0x000000000046244d in substitute_functions (triggers=0x7ffff19f5050) at expression.c:4734
#3 evaluate_expressions (triggers=0x7ffff19f5050) at expression.c:4789
#4 0x00000000004659b3 in DCmass_update_triggers (history=<value optimized out>, history_num=<value optimized out>, trigger_diff=0x7ffff19f5270) at dbcache.c:956
#5 0x0000000000466524 in DCsync_history (sync_type=0, total_num=0x7ffff19f53bc) at dbcache.c:2266
#6 0x0000000000422e25 in dbsyncer_thread (args=<value optimized out>) at dbsyncer.c:77
#7 0x00000000004940f2 in zbx_thread_start (handler=0x422d50 <dbsyncer_thread>, thread_args=0x7ffff19f6170) at threads.c:128
#8 0x000000000041d6eb in MAIN_ZABBIX_ENTRY (flags=<value optimized out>) at server.c:1090
#9 0x000000000048a57a in daemon_start (allow_root=<value optimized out>, user=0x4f99a3 "zabbix", flags=0) at daemon.c:392
#10 0x000000000041dd2b in main (argc=<value optimized out>, argv=<value optimized out>) at server.c:832

至此,整个item data处理完毕!

4.5 action/escalation处理

接下来开始执行告警处理,zabbix 的 action的操作是放在escalation中一起处理的,最终 由 add_message_alert()函数生成告警信息,保存在数据库的 alerts 表中,add_message_alert()函数调用栈如下:

(gdb) bt
#0  add_message_alert (event=0x1669660, r_event=0x0, actionid=7, esc_step=1, userid=4, mediatypeid=1, subject=0x16697c0 "zabbix Problem: Zabbix discoverer processes more than 75% busy", message=0x1669c90 "Problem started at 16:34:35 on 2018.03.23\r\nProblem name: Zabbix discoverer processes more than 75% busy\r\nHost: Zabbix server\r\nSeverity: Average\r\n\r\nOriginal problem ID: 17991\r\n", ackid=0) at escalator.c:971
#1  0x000000000043dd92 in flush_user_msg (user_msg=0x7ffff19f4f70, esc_step=1, event=0x1669660, r_event=0x0, actionid=7) at escalator.c:540
#2  0x000000000043ef83 in escalation_execute_operations (escalation=0x167d6e0, action=0x167d680, event=0x1669660) at escalator.c:1318
#3  escalation_execute (escalation=0x167d6e0, action=0x167d680, event=0x1669660) at escalator.c:1887
#4  0x0000000000440559 in process_db_escalations (now=1521794078, nextcheck=0x7ffff19f53bc, escalations=0x7ffff19f5300, eventids=<value optimized out>, actionids=<value optimized out>) at escalator.c:2207
#5  0x000000000044099c in process_escalations (now=1521794078, nextcheck=0x7ffff19f53bc, escalation_source=<value optimized out>) at escalator.c:2474
#6  0x0000000000440c0f in escalator_thread (args=<value optimized out>) at escalator.c:2539
#7  0x00000000004940f2 in zbx_thread_start (handler=0x440ab0 <escalator_thread>, thread_args=0x7ffff19f6170) at threads.c:128
#8  0x000000000041d72b in MAIN_ZABBIX_ENTRY (flags=<value optimized out>) at server.c:1093
#9  0x000000000048a57a in daemon_start (allow_root=<value optimized out>, user=0x4f99a3 "zabbix", flags=0) at daemon.c:392
#10 0x000000000041dd2b in main (argc=<value optimized out>, argv=<value optimized out>) at server.c:832

4.5 alerter处理

action的动作,即告警信息,最终由alerter进程执行,alerter进程和poller进程一样,有两类:alerter manager进程和alerter worker进程(这个work是强行加上去的,为了好和manager进程区分)。这两个进程之间的逻辑关系如下:

  1. manager进程先启动,启动之后等待worker进程前来注册;
  2. worker进程启动之后,立刻向manager进程注册一个“接口”,这个“接口”会接收告警请求,并填充告警信息,发送给manager进程;
  3. manager进程遍历数据库 alerts 表,发现有新的告警项时,将表信息读出,发送给worker进程;
  4. worker开始填充告警信息,并执行告警信息“扩散”,如:发邮件等,并将执行结果发送给manager进程;
  5. manager进程收到worker进程发来的执行结果之后,更新数据库中的告警执行状态;

manager进程遍历数据库逻辑在am_db_get_alerts()函数中完成,am_db_get_alerts()逻辑如下:

/*    manger进程核心遍历数据库alerts表函数调用堆栈
alert_manager_thread
    am_db_queue_alerts
        am_db_get_alerts
*/
static int    am_db_get_alerts(zbx_am_t *manager, zbx_vector_ptr_t *alerts, int now)
{
zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
   "select a.alertid,a.mediatypeid,a.sendto,a.subject,a.message,a.status,a.retries,"
    "e.source,e.object,e.objectid"
   " from alerts a"                // 信息来自 alerts 表
   " left join events e"
    " on a.eventid=e.eventid"
   " where alerttype=%d"
   " and",
   ALERT_TYPE_MESSAGE);
 result = DBselect_once("%s", sql);    // 这里执行sql语句
……

manager进程读取完告警信息之后,会把结果发送给worker进程,并发送相应的请求,如:邮件告警时,发送 ZBX_IPC_ALERTER_EMAIL 请求。 worker进程主要逻辑如下,worker进程其实只干一件事,那就是发送告警(zabbix把这个叫做“扩散”,即escalation):

ZBX_THREAD_ENTRY(alerter_thread, args)
{
    for (;;) 
    {
        switch (message.code) 
{
   case ZBX_IPC_ALERTER_EMAIL:        //    邮件告警逻辑
    alerter_process_email(&alerter_socket, &message);
    break;
  ……
}
    }
}