本文禁止转载
1. 用gdb跟踪zabbix server对item data的处理过程
1.1 gdb了解zabbix进程架构
后台执行 : ps aux | grep zabbix 得到zabbix进程信息;
进程的命令行,都被子进程修改了,gdb attach 3747 ,跟进configuration 进程,得到堆栈如下:
configuration 进程最终是由 zbx_thread_start 函数创建的(不过zbx_thread_start 函数里并没有看到创建线程的代码,函数整个逻辑只fork了一个子进程!)。
ZBX_THREAD_HANDLE zbx_thread_start(ZBX_THREAD_ENTRY_POINTER(handler), zbx_thread_args_t *thread_args)
{
……
if (0 == (thread = zbx_child_fork())) /* child process */ // 这里创建完子进程就开始走回调逻辑了,根本就没有创建线程
{
(*handler)(thread_args);
zbx_thread_exit(EXIT_SUCCESS);
}
else if (-1 == thread)
{
zbx_error("failed to fork: %s", zbx_strerror(errno));
thread = (ZBX_THREAD_HANDLE)ZBX_THREAD_ERROR;
}
return thread;
}
fork完子进程,子进程开始执行 dbconfig_thread 函数,dbconfig_thread 定义如下,zabbix就是通过调用 zbx_setproctitle 修改子进程命令行名的。
主逻辑是一个for 死循环,每隔 CONFIG_CONFSYNCER_FREQUENCY = 60S 循环一次,主要是从数据库里读取参数刷新zabbix进程的全局缓存(缓存为所有进程共享)。
ZBX_THREAD_ENTRY(dbconfig_thread, args){
……
zbx_setproctitle("%s [waiting %d sec for processes]", get_process_type_string(process_type),
CONFIG_CONFSYNCER_FREQUENCY);
for (;;)
{
DCsync_configuration(ZBX_DBSYNC_UPDATE);
DCupdate_hosts_availability();
dc_flush_history(); /* misconfigured items generate pseudo-historic values to become notsupported */
zbx_sleep_loop(CONFIG_CONFSYNCER_FREQUENCY);
}
}
#define ZBX_THREAD_ENTRY(entry_name, arg_name) \
unsigned entry_name(void *arg_name)
1.2 调试总结
zabbix主进程fork完所有的子进程后就什么都不做了,静静地等待子进程结束,子进程的功能与进程命令行名相对应,要跟踪哪个模块的,直接ps 查看进程命令行名, gdb attach 即可。
zabbix的调试信息由 zabbix_log 接口打印,zabbix_log 是一个宏,展开如下,最终调用的是 __zbx_zabbix_log 。
#ifdef HAVE___VA_ARGS__
# define zabbix_log(level, fmt, ...) __zbx_zabbix_log(level, ZBX_CONST_STRING(fmt), #__VA_ARGS__)
#else
# define zabbix_log __zbx_zabbix_log
#endif
__zbx_zabbix_log 设置了调试级别,通过 宏 ZBX_CHECK_LOG_LEVEL() 限制,ZBX_CHECK_LOG_LEVEL() 实现如下, 大于 log_level(默认是:LOG_LEVEL_WARNING) 级别的日志全部不予打印,修改 log_level=LOG_LEVEL_TRACE 可看到绝大部分调试信息,如果不够,可以直接 修改 log_level = 6 。
#define ZBX_CHECK_LOG_LEVEL(level) \
((LOG_LEVEL_INFORMATION != level && (level > log_level || LOG_LEVEL_EMPTY == level)) ? FAIL : SUCCEED)
有一类函数定义相当奇怪,就是 zbx_vector_ptr 和 zbx_vector_str 开头的一系列函数,直接搜代码是搜不到定义的地方的,这类函数在 vector.c 中由宏拼接定义。
(gdb) bt
#0 zbx_vector_ptr_create (vector=0x7ffff19f5350) at vector.c:28
#1 0x0000000000462e87 in DCupdate_hosts_availability () at dbcache.c:3659
#2 0x000000000042305b in dbconfig_thread (args=<value optimized out>) at dbconfig.c:94
#3 0x00000000004940f2 in zbx_thread_start (handler=0x422f50 <dbconfig_thread>, thread_args=0x7ffff19f6170) at threads.c:128
#4 0x000000000041d5ab in MAIN_ZABBIX_ENTRY (flags=<value optimized out>) at server.c:1055
#5 0x000000000048a57a in daemon_start (allow_root=<value optimized out>, user=0x4f99a3 "zabbix", flags=0) at daemon.c:392
#6 0x000000000041dd2b in main (argc=<value optimized out>, argv=<value optimized out>) at server.c:832
2. 重点代码 dbconfig / dbsyncer
2.1 dbconfig 代码解读,以 hosts为例说明数据库和缓存同步的过程
整个数据库加载逻辑都在 DCsync_configuration () 函数中进行,提取hosts表操作的全部逻辑,hosts更新一共分为三步:
- 加载数据库表中的配置,这是当前设备最新的原始数据;
- 比较缓存和数据库中的配置差异,并提取出差异部分保存在一个 zbx_dbsync_t 结构体里,1,2步都在 zbx_dbsync_compare_hosts() 函数中完成;
- 更新缓存, 这一步由 DCsync_hosts() 函数完成;
void DCsync_configuration(unsigned char mode)
{
zbx_dbsync_t hosts_sync; // hosts中间缓存
zbx_dbsync_init(&hosts_sync, mode); // 初始化中间缓存
zbx_dbsync_compare_hosts(&hosts_sync); // 比较数据库和zabbix缓存的差异,将差异结果保存在 hosts_sync 中
DCsync_hosts(&hosts_sync); // 同步到zabbix全局缓存,通过直接操作 config 变量完成
zbx_dbsync_clear(&hosts_sync); // 释放空间
}
结构体 zbx_dbsync 是一个中间缓存结构,用来保存数据库和缓存中有差异的数据,包括三个部分:新增、更新、删除;定义如下,下面说明几个重要字段的意义。
typedef struct zbx_dbsync
{
unsigned char mode; /* 当前的操作模式,只有两种状态 初始化/更新 */
int columns_num;
int row_index;
zbx_vector_ptr_t rows; // 数据库和缓存中,有差异的项目
DB_RESULT dbresult;
zbx_dbsync_preproc_row_func_t preproc_row_func;
char **row;
zbx_vector_ptr_t columns;
zbx_uint64_t add_num; /* 新增、更新、删除 的数量 */
zbx_uint64_t update_num;
zbx_uint64_t remove_num;
}
zbx_dbsync_t;
全局变量 config 是一段预分配的共享内存,在 init_configuration_cache 中初始化的,如下:
int init_configuration_cache(char **error)
{
if (SUCCEED != (ret = zbx_mem_create(&config_mem, config_size, "configuration cache", "CacheSize", 0, error)))
goto out;
config = __config_mem_malloc_func(NULL, sizeof(ZBX_DC_CONFIG) +
CONFIG_TIMER_FORKS * sizeof(zbx_vector_ptr_t));
}
zbx_mem_create 函数创建了 名为 " configuration cache " 的共享内存,并将内存映射到本进程虚拟地址空间,__config_mem_*_func 系列接口封装了这段共享内存操作的细节,config的内存就是在这里面创建的。DCsync_hosts() 函数直接对config变量做的操作,实际上就是在操作共享内存。
其他表的操作与hosts表操作类似。
3. 熟悉action / escalator / alerter对事件的处理
直接在第四节分析
4. item data的处理过程
4.1 item data 处理整过程概览
item data 在内存中的处理会经历以下过程:
- pooler 进程收集原始数据,把数据包装好,发送 ZBX_IPC_PREPROCESSOR_REQUEST 请求给 preprocessing manager进程 ;
- preprocessing manager进程做预处理,寻找一个空闲的 preprocessing worker 进程,发送ZBX_IPC_PREPROCESSOR_REQUEST 请求给preprocessing worker进程;
- preprocessing worker进程处理item data的数据类型,做简单的格式转换,将结果发送给 preprocessing manager进程 请求是: ZBX_IPC_PREPROCESSOR_RESULT ;
- preprocessing manager进程先将 item data 结果保存在一个 item_values 临时缓冲区中,item_values 缓冲区满了之后,再将item_values clone到 catch->history_items 中;(以上完成了数据在内存中的处理,后面将把结果保存在数据库中。)
4.2 item data数据的来源
poller_thread 子进程 每隔5s 会收集一次 item data,item 被动模式的数据都是通过 get_value_agent () 函数接收的,核心代码如下:
int get_value_agent(DC_ITEM *item, AGENT_RESULT *result)
{
……
if (SUCCEED == (ret = zbx_tcp_connect(&s, CONFIG_SOURCE_IP, item->interface.addr, item->interface.port, 0,
item->host.tls_connect, tls_arg1, tls_arg2)))
{
buffer = zbx_dsprintf(buffer, "%s\n", item->key);
if (SUCCEED != zbx_tcp_send_raw(&s, buffer))
ret = NETWORK_ERROR;
else if (FAIL != (received_len = zbx_tcp_recv_ext(&s, ZBX_TCP_READ_UNTIL_CLOSE, 0)))
ret = SUCCEED;
}
……
set_result_type(result, ITEM_VALUE_TYPE_TEXT, s.buffer);
……
}
通过 zbx_tcp_send_raw() 函数发送命令到agent端, zbx_tcp_recv 或 zbx_tcp_recv_ext 函数接收agent端返回的结果,最终结果保存在 result 变量中。堆栈如下:
(gdb) bt
#0 get_value_agent (item=0x7ffff18798f0, result=0x7ffff19f28f0) at checks_agent.c:59
#1 0x000000000042b93e in get_value (poller_type=0 '\000', nextcheck=0x7ffff19f53bc) at poller.c:353
#2 get_values (poller_type=0 '\000', nextcheck=0x7ffff19f53bc) at poller.c:593
#3 0x000000000042ba36 in poller_thread (args=<value optimized out>) at poller.c:773
#4 0x00000000004940f2 in zbx_thread_start (handler=0x42b950 <poller_thread>, thread_args=0x7ffff19f6170) at threads.c:128
#5 0x000000000041d21b in MAIN_ZABBIX_ENTRY (flags=<value optimized out>) at server.c:1065
#6 0x000000000048a57a in daemon_start (allow_root=<value optimized out>, user=0x4f99a3 "zabbix", flags=0) at daemon.c:392
#7 0x000000000041dd2b in main (argc=<value optimized out>, argv=<value optimized out>) at server.c:832
4.3 item data数据的处理
- 先由 zbx_preprocess_item_value 做预处理,主要是做一些数据压缩的操作;
void zbx_preprocess_item_value(zbx_uint64_t itemid, unsigned char item_value_type, unsigned char item_flags,
AGENT_RESULT *result, zbx_timespec_t *ts, unsigned char state, char *error)
{
……
value.itemid = itemid;
value.item_value_type = item_value_type;
value.result = result;
value.error = error;
value.item_flags = item_flags;
value.state = state;
value.ts = ts;
preprocessor_pack_value(&cached_message, &value); // 这里面会压缩收到的原始数据
cached_values++;
if (MAX_VALUES_LOCAL < cached_values)
zbx_preprocessor_flush();
}
- 接下来,在 zbx_preprocessor_flush 函数中, 会把数据发送给 preprocessing manager 进程处理,命令是 ZBX_IPC_PREPROCESSOR_REQUEST (这个在preprocessing manager 进程中会用到),get_values 处理数据后,会将 result 的空间释放,即后面数据的处理就全部由 preprocessing manager 进程来完成了;
# 找到preprocessing manager 进程的方法
[root@localhost ~/zabbix-3.4.7]#lsof /tmp/zabbix_server_preprocessing.sock
# 或者
[root@localhost ~/zabbix-3.4.7]#ps auxf | grep "preprocessing manager"
- preprocessing manager 进程即代码里的 preprocessing_manager_thread进程,该进程的主逻辑如下,item data的原始数据是 ZBX_IPC_PREPROCESSOR_REQUEST(请求处理) 命令,数据由 preprocessor_add_request() 函数处理,item data会被添加到一个队列中,后面由 preprocessor_assign_tasks 函数统一做任务分发,数据会再一次转入 work进程;
ZBX_THREAD_ENTRY(preprocessing_manager_thread, args)
{
……
switch (message->code)
{
case ZBX_IPC_PREPROCESSOR_WORKER:
case ZBX_IPC_PREPROCESSOR_REQUEST:
preprocessor_add_request(&manager, message); // poller work 接收的 item数据在这里处理
break;
case ZBX_IPC_PREPROCESSOR_RESULT:
case ZBX_IPC_PREPROCESSOR_QUEUE:
}
}
static void preprocessor_add_request(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
{
preprocessor_sync_configuration(manager);
while (offset < message->size)
{
offset += zbx_preprocessor_unpack_value(&value, message->data + offset);
preprocessor_enqueue(manager, &value, NULL); // 数据入队
}
preprocessor_assign_tasks(manager); // 任务分发
preprocessing_flush_queue(manager);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
}
- worker 进程只是做一些结构体填充工作,后面又会把结果发回给 manager进程,对应的请求是: ZBX_IPC_PREPROCESSOR_RESULT(结果处理),manager开始做真正的数据存储,主要顺序是,先将结果保存在 item_values 结构体中,如果item_values满了,就把结果“剪切”到 catch->history_items 中;
/* 调用堆栈如下
case ZBX_IPC_PREPROCESSOR_RESULT:
preprocessor_add_result
preprocessing_flush_queue
preprocessor_flush_value
dc_add_history
dc_local_add_history_* (如: dc_local_add_history_log)
dc_local_get_history_slot
dc_flush_history
*/
static dc_item_value_t *dc_local_get_history_slot(void)
{
if (ZBX_MAX_VALUES_LOCAL == item_values_num)
dc_flush_history();
if (item_values_alloc == item_values_num)
{
item_values_alloc += ZBX_STRUCT_REALLOC_STEP;
item_values = zbx_realloc(item_values, item_values_alloc * sizeof(dc_item_value_t));
}
return &item_values[item_values_num++];
}
/*dc_flush_history 调用堆栈如下
dc_flush_history
hc_add_item_values
hc_add_item
*/
static zbx_hc_item_t *hc_add_item(zbx_uint64_t itemid, zbx_hc_data_t *data)
{
zbx_hc_item_t item_local = {itemid, ZBX_HC_ITEM_STATUS_NORMAL, data, data};
return (zbx_hc_item_t *)zbx_hashset_insert(&cache->history_items, &item_local, sizeof(item_local));
}
4.4 数据库处理逻辑 dbsyncer_thread 进程
item data 保存在 cache 中之后,dbsyncer_thread会定时处理,dbsyncer_thread进程每隔5s 会检查一次 cache缓存,主要工作由DCsync_history() 函数完成。DCsync_history()函数主要的任务就是:执行与item相关联的trigger,生成event保存到数据库中;同时对 item、trigger、trends(趋势图)做状态更新,保存到数据库。到这一步item data彻底处理完了。
DCsync_history()函数核心逻辑如下:
int DCsync_history(int sync_type, int *total_num)
{
DCmass_update_items(history, history_num, items, errcodes); // 更新item状态到数据库
DCmass_update_triggers(history, history_num, &trigger_diff); // 更新与item相关的trigger状态到数据库,并返回trigger的函数
DCmass_update_trends(history, history_num); // 更新trends 把item data添加到趋势图中去
if (0 != zbx_process_events(&trigger_diff, &triggerids)) // 核心: 执行trigger的函数
{
DCconfig_triggers_apply_changes(&trigger_diff); // 如果trigger 表达式被触发了,要做相应的更新
zbx_save_trigger_changes(&trigger_diff);
}
}
trigger的表达式(如: {Windows host:system.cpu.load.last()}>50 中的 last() )最终由evaluate_function() 函数执行,evaluate_function 函数调用堆栈如下:
(gdb) bt
#0 evaluate_function (value=0x7ffff19e4540 "99.779489", item=0x168dff0, function=0x167d740 "avg", parameter=0x167d760 "10m", now=1521775653, error=0x7ffff19f4588) at evalfunc.c:2399
#1 0x0000000000457f26 in zbx_evaluate_item_functions (funcs=<value optimized out>, unknown_msgs=0x7ffff19f4f70) at expression.c:4526
#2 0x000000000046244d in substitute_functions (triggers=0x7ffff19f5050) at expression.c:4734
#3 evaluate_expressions (triggers=0x7ffff19f5050) at expression.c:4789
#4 0x00000000004659b3 in DCmass_update_triggers (history=<value optimized out>, history_num=<value optimized out>, trigger_diff=0x7ffff19f5270) at dbcache.c:956
#5 0x0000000000466524 in DCsync_history (sync_type=0, total_num=0x7ffff19f53bc) at dbcache.c:2266
#6 0x0000000000422e25 in dbsyncer_thread (args=<value optimized out>) at dbsyncer.c:77
#7 0x00000000004940f2 in zbx_thread_start (handler=0x422d50 <dbsyncer_thread>, thread_args=0x7ffff19f6170) at threads.c:128
#8 0x000000000041d6eb in MAIN_ZABBIX_ENTRY (flags=<value optimized out>) at server.c:1090
#9 0x000000000048a57a in daemon_start (allow_root=<value optimized out>, user=0x4f99a3 "zabbix", flags=0) at daemon.c:392
#10 0x000000000041dd2b in main (argc=<value optimized out>, argv=<value optimized out>) at server.c:832
至此,整个item data处理完毕!
4.5 action/escalation处理
接下来开始执行告警处理,zabbix 的 action的操作是放在escalation中一起处理的,最终 由 add_message_alert()函数生成告警信息,保存在数据库的 alerts 表中,add_message_alert()函数调用栈如下:
(gdb) bt
#0 add_message_alert (event=0x1669660, r_event=0x0, actionid=7, esc_step=1, userid=4, mediatypeid=1, subject=0x16697c0 "zabbix Problem: Zabbix discoverer processes more than 75% busy", message=0x1669c90 "Problem started at 16:34:35 on 2018.03.23\r\nProblem name: Zabbix discoverer processes more than 75% busy\r\nHost: Zabbix server\r\nSeverity: Average\r\n\r\nOriginal problem ID: 17991\r\n", ackid=0) at escalator.c:971
#1 0x000000000043dd92 in flush_user_msg (user_msg=0x7ffff19f4f70, esc_step=1, event=0x1669660, r_event=0x0, actionid=7) at escalator.c:540
#2 0x000000000043ef83 in escalation_execute_operations (escalation=0x167d6e0, action=0x167d680, event=0x1669660) at escalator.c:1318
#3 escalation_execute (escalation=0x167d6e0, action=0x167d680, event=0x1669660) at escalator.c:1887
#4 0x0000000000440559 in process_db_escalations (now=1521794078, nextcheck=0x7ffff19f53bc, escalations=0x7ffff19f5300, eventids=<value optimized out>, actionids=<value optimized out>) at escalator.c:2207
#5 0x000000000044099c in process_escalations (now=1521794078, nextcheck=0x7ffff19f53bc, escalation_source=<value optimized out>) at escalator.c:2474
#6 0x0000000000440c0f in escalator_thread (args=<value optimized out>) at escalator.c:2539
#7 0x00000000004940f2 in zbx_thread_start (handler=0x440ab0 <escalator_thread>, thread_args=0x7ffff19f6170) at threads.c:128
#8 0x000000000041d72b in MAIN_ZABBIX_ENTRY (flags=<value optimized out>) at server.c:1093
#9 0x000000000048a57a in daemon_start (allow_root=<value optimized out>, user=0x4f99a3 "zabbix", flags=0) at daemon.c:392
#10 0x000000000041dd2b in main (argc=<value optimized out>, argv=<value optimized out>) at server.c:832
4.5 alerter处理
action的动作,即告警信息,最终由alerter进程执行,alerter进程和poller进程一样,有两类:alerter manager进程和alerter worker进程(这个work是强行加上去的,为了好和manager进程区分)。这两个进程之间的逻辑关系如下:
- manager进程先启动,启动之后等待worker进程前来注册;
- worker进程启动之后,立刻向manager进程注册一个“接口”,这个“接口”会接收告警请求,并填充告警信息,发送给manager进程;
- manager进程遍历数据库 alerts 表,发现有新的告警项时,将表信息读出,发送给worker进程;
- worker开始填充告警信息,并执行告警信息“扩散”,如:发邮件等,并将执行结果发送给manager进程;
- manager进程收到worker进程发来的执行结果之后,更新数据库中的告警执行状态;
manager进程遍历数据库逻辑在am_db_get_alerts()函数中完成,am_db_get_alerts()逻辑如下:
/* manger进程核心遍历数据库alerts表函数调用堆栈
alert_manager_thread
am_db_queue_alerts
am_db_get_alerts
*/
static int am_db_get_alerts(zbx_am_t *manager, zbx_vector_ptr_t *alerts, int now)
{
zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
"select a.alertid,a.mediatypeid,a.sendto,a.subject,a.message,a.status,a.retries,"
"e.source,e.object,e.objectid"
" from alerts a" // 信息来自 alerts 表
" left join events e"
" on a.eventid=e.eventid"
" where alerttype=%d"
" and",
ALERT_TYPE_MESSAGE);
result = DBselect_once("%s", sql); // 这里执行sql语句
……
manager进程读取完告警信息之后,会把结果发送给worker进程,并发送相应的请求,如:邮件告警时,发送 ZBX_IPC_ALERTER_EMAIL 请求。 worker进程主要逻辑如下,worker进程其实只干一件事,那就是发送告警(zabbix把这个叫做“扩散”,即escalation):
ZBX_THREAD_ENTRY(alerter_thread, args)
{
for (;;)
{
switch (message.code)
{
case ZBX_IPC_ALERTER_EMAIL: // 邮件告警逻辑
alerter_process_email(&alerter_socket, &message);
break;
……
}
}
}