文章目录
1. 前言
FreeSWITCH 是一个开源的电话软交换平台,使用广泛,功能强大。本文基于 FreeSWITCH 1.10 版本,读者如有兴趣可以自行点击链接进入 github 下载源码。下图为 FreeSWITCH 服务启动及 Event Socket 模块运行工作的源代码时序,下文将对源码流程进行代码分析
2. 源码分析
2.1 服务的启动
2.1.1 FreeSWITCH 核心的启动流程
FreeSWITCH 是用 C 语言写的,服务启动的入口为
switch.c#main()
函数。这个函数非常长,不过主要的处理大致分为以下几步:- 调用
switch_core.c#switch_core_set_globals()
函数设置重要的运行时文件夹路径,比如模块文件的文件夹等 - 调用
switch_core.c#switch_core_init_and_modload()
函数初始化系统并加载模块,这部分是重点 - 调用
switch_core.c#switch_core_runtime_loop()
函数开启主线程循环保持系统运行
int main(int argc, char *argv[]) { ...... switch_core_set_globals(); pid = getpid(); memset(pid_buffer, 0, sizeof(pid_buffer)); switch_snprintf(pid_path, sizeof(pid_path), "%s%s%s", SWITCH_GLOBAL_dirs.run_dir, SWITCH_PATH_SEPARATOR, pfile); switch_snprintf(pid_buffer, sizeof(pid_buffer), "%d", pid); pid_len = strlen(pid_buffer); apr_pool_create(&pool, NULL); ...... if (switch_core_init_and_modload(flags, nc ? SWITCH_FALSE : SWITCH_TRUE, &err) != SWITCH_STATUS_SUCCESS) { fprintf(stderr, "Cannot Initialize [%s]\n", err); return 255; } ...... switch_core_runtime_loop(nc); ...... }
- 调用
switch_core.c#switch_core_init_and_modload()
函数是初始化重要组件和模块的入口,不过本文关注的主要是以下几个函数调用,FreeSWITCH 核心启动的主要逻辑其实就是拉起关键组件及模块switch_core.c#switch_core_init()
函数负责初始化 FreeSWITCH 核心的重要组件switch_loadable_module.c#switch_loadable_module_init()
函数将加载 FreeSWITCH 核心之外的可插拔模块
SWITCH_DECLARE(switch_status_t) switch_core_init_and_modload(switch_core_flag_t flags, switch_bool_t console, const char **err) { ...... if (switch_core_init(flags, console, err) != SWITCH_STATUS_SUCCESS) { return SWITCH_STATUS_GENERR; } if (runtime.runlevel > 1) { /* one per customer */ return SWITCH_STATUS_SUCCESS; } runtime.runlevel++; runtime.events_use_dispatch = 1; switch_core_set_signal_handlers(); switch_load_network_lists(SWITCH_FALSE); switch_msrp_init(); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Bringing up environment.\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Loading Modules.\n"); if (switch_loadable_module_init(SWITCH_TRUE) != SWITCH_STATUS_SUCCESS) { *err = "Cannot load modules"; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Error: %s\n", *err); return SWITCH_STATUS_GENERR; } switch_load_network_lists(SWITCH_FALSE); switch_load_core_config("post_load_switch.conf"); switch_core_set_signal_handlers(); ...... }
此时回到本节步骤1第3步,
switch_core.c#switch_core_runtime_loop()
函数其实就是根据前后台启动方式来开启主线程循环保持系统运行,至此宏观上 FreeSWITCH 核心启动的流程基本结束如果 FreeSWITCH 以前台方式启动,则调用
switch_console.c#switch_console_loop()
函数启动控制台循环,接收控制台输入的命令并处理SWITCH_DECLARE(void) switch_core_runtime_loop(int bg) { #ifdef WIN32 HANDLE shutdown_event; char path[256] = ""; #endif if (bg) { #ifdef WIN32 switch_snprintf(path, sizeof(path), "Global\\Freeswitch.%d", getpid()); shutdown_event = CreateEvent(NULL, FALSE, FALSE, path); if (shutdown_event) { WaitForSingleObject(shutdown_event, INFINITE); } #else while (runtime.running) { switch_yield(1000000); } #endif } else { /* wait for console input */ switch_console_loop(); } }
2.1.2 事件分发组件的初始化
在 2.1.1节步骤2第1步 中,笔者提到
switch_core.c#switch_core_init()
函数会初始化 FreeSWITCH 核心的重要组件和运行时属性,其大致源码如下,本文涉及的主要是以下几部分:- 调用函数
switch_core_session.c#switch_core_session_init()
初始化 Session 管理器并创建对应的事件队列,这部分本文暂不深入 - 调用
switch_event.c#switch_event_init()
函数初始化事件分发组件,包括启动事件分发线程及队列创建等 - 执行
switch_core.c#switch_load_core_config()
函数解析 FreeSWITCH 的核心配置文件switch.conf.xml
,将配置属性加载进内存中 - 执行
switch_scheduler.c#switch_scheduler_task_thread_start()
函数启动内部定时任务线程,后续可以添加心跳通知等定时任务到内部列表,等待触发执行
SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switch_bool_t console, const char **err) { switch_uuid_t uuid; char guess_ip[256]; int mask = 0; struct in_addr in; if (runtime.runlevel > 0) { /* one per customer */ return SWITCH_STATUS_SUCCESS; } memset(&runtime, 0, sizeof(runtime)); gethostname(runtime.hostname, sizeof(runtime.hostname)); runtime.shutdown_cause = SWITCH_CAUSE_SYSTEM_SHUTDOWN; runtime.max_db_handles = 50; runtime.db_handle_timeout = 5000000; runtime.event_heartbeat_interval = 20; ...... if (!runtime.cpu_count) runtime.cpu_count = 1; if (sqlite3_initialize() != SQLITE_OK) { *err = "FATAL ERROR! Could not initialize SQLite\n"; return SWITCH_STATUS_MEMERR; } /* INIT APR and Create the pool context */ if (apr_initialize() != SWITCH_STATUS_SUCCESS) { *err = "FATAL ERROR! Could not initialize APR\n"; return SWITCH_STATUS_MEMERR; } if (!(runtime.memory_pool = switch_core_memory_init())) { *err = "FATAL ERROR! Could not allocate memory pool\n"; return SWITCH_STATUS_MEMERR; } ...... switch_core_session_init(runtime.memory_pool); ...... switch_event_init(runtime.memory_pool); ...... switch_load_core_config("switch.conf"); ...... switch_scheduler_task_thread_start(); ...... switch_scheduler_add_task(switch_epoch_time_now(NULL), heartbeat_callback, "heartbeat", "core", 0, NULL, SSHF_NONE | SSHF_NO_DEL); switch_scheduler_add_task(switch_epoch_time_now(NULL), check_ip_callback, "check_ip", "core", 0, NULL, SSHF_NONE | SSHF_NO_DEL | SSHF_OWN_THREAD); ...... return SWITCH_STATUS_SUCCESS; }
- 调用函数
switch_event.c#switch_event_init()
函数会根据当前机器的核心数计算确定事件分发线程的最大数量MAX_DISPATCH,核心流程是调用switch_event.c#check_dispatch()
函数创建并启动事件分发线程SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) { /* don't need any more dispatch threads than we have CPU's*/ MAX_DISPATCH = (switch_core_cpu_count() / 2) + 1; if (MAX_DISPATCH < 2) { MAX_DISPATCH = 2; } switch_assert(pool != NULL); THRUNTIME_POOL = RUNTIME_POOL = pool; switch_thread_rwlock_create(&RWLOCK, RUNTIME_POOL); switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_mutex_init(&EVENT_QUEUE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_mutex_init(&CUSTOM_HASH_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_core_hash_init(&CUSTOM_HASH); if (switch_core_test_flag(SCF_MINIMAL)) { return SWITCH_STATUS_SUCCESS; } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Activate Eventing Engine.\n"); switch_core_hash_init(&event_channel_manager.lahash); switch_mutex_init(&event_channel_manager.lamutex, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_thread_rwlock_create(&event_channel_manager.rwlock, RUNTIME_POOL); switch_core_hash_init(&event_channel_manager.hash); switch_core_hash_init(&event_channel_manager.perm_hash); event_channel_manager.ID = 1; switch_mutex_lock(EVENT_QUEUE_MUTEX); SYSTEM_RUNNING = -1; switch_mutex_unlock(EVENT_QUEUE_MUTEX); //switch_threadattr_create(&thd_attr, pool); switch_find_local_ip(guess_ip_v4, sizeof(guess_ip_v4), NULL, AF_INET); switch_find_local_ip(guess_ip_v6, sizeof(guess_ip_v6), NULL, AF_INET6); #ifdef SWITCH_EVENT_RECYCLE switch_queue_create(&EVENT_RECYCLE_QUEUE, 250000, THRUNTIME_POOL); switch_queue_create(&EVENT_HEADER_RECYCLE_QUEUE, 250000, THRUNTIME_POOL); #endif check_dispatch(); switch_mutex_lock(EVENT_QUEUE_MUTEX); SYSTEM_RUNNING = 1; switch_mutex_unlock(EVENT_QUEUE_MUTEX); return SWITCH_STATUS_SUCCESS; }
switch_event.c#check_dispatch()
函数会创建事件分发队列EVENT_DISPATCH_QUEUE,并设置队列的容量为DISPATCH_QUEUE_LEN * MAX_DISPATCH,最后通过switch_event.c#switch_event_launch_dispatch_threads()
函数创建 1 个事件分发线程static void check_dispatch(void) { if (!EVENT_DISPATCH_QUEUE) { switch_mutex_lock(BLOCK); if (!EVENT_DISPATCH_QUEUE) { switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, THRUNTIME_POOL); switch_event_launch_dispatch_threads(1); while (!THREAD_COUNT) { switch_cond_next(); } } switch_mutex_unlock(BLOCK); } }
switch_event.c#switch_event_launch_dispatch_threads()
函数会调用库函数创建事件分发线程,并将switch_event.c#switch_event_dispatch_thread()
函数设置为线程任务。需注意此处会维护一个事件分发线程的数组EVENT_DISPATCH_QUEUE_THREADS
,如果数组下标上已经有线程存在了,则不会重复创建,至此事件分发组件的初始化基本结束SWITCH_DECLARE(void) switch_event_launch_dispatch_threads(uint32_t max) { switch_threadattr_t *thd_attr; uint32_t index = 0; int launched = 0; uint32_t sanity = 200; switch_memory_pool_t *pool = RUNTIME_POOL; check_dispatch(); if (max > MAX_DISPATCH) { return; } if (max < SOFT_MAX_DISPATCH) { return; } for (index = SOFT_MAX_DISPATCH; index < max && index < MAX_DISPATCH; index++) { if (EVENT_DISPATCH_QUEUE_THREADS[index]) { continue; } switch_threadattr_create(&thd_attr, pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME); switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE, pool); while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000); if (index == 1) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Create event dispatch thread %d\n", index); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Create additional event dispatch thread %d\n", index); } launched++; } SOFT_MAX_DISPATCH = index; }
2.1.3 模块 mod_event_socket 的加载运行
在 2.1.1节步骤2第2步 中,执行
switch_loadable_module.c#switch_loadable_module_init()
函数将会加载配置文件中的配置的模块,其关键点如下:- 模块配置由三个配置文件 pre_load_modules.conf.xml、modules.conf.xm、post_load_modules.conf.xml 共同组成,三者主要是加载顺序的区别,因为有些模块相互之间可能存在依赖,其他并无不同
- 在加载完前置模块之后,先调用
sswitch_core.c#witch_core_sqldb_init()
函数初始化 sql 数据库 - 加载模块的核心是
switch_loadable_module.c#switch_loadable_module_load_module_ex()
函数,这个函数会将指定文件夹下的模块文件加载进内存 - 模块加载进内存后,会调用
switch_loadable_module.c#switch_loadable_module_runtime()
函数为每一个模块新起线程执行其定义的 runtime 函数
SWITCH_DECLARE(switch_status_t) switch_loadable_module_init(switch_bool_t autoload) { apr_finfo_t finfo = { 0 }; apr_dir_t *module_dir_handle = NULL; apr_int32_t finfo_flags = APR_FINFO_DIRENT | APR_FINFO_TYPE | APR_FINFO_NAME; char *precf = "pre_load_modules.conf"; char *cf = "modules.conf"; char *pcf = "post_load_modules.conf"; switch_xml_t cfg, xml; unsigned char all = 0; unsigned int count = 0; const char *err; switch_hash_t *event_hash; switch_hash_index_t *hi; void *hash_val; switch_event_t *event; #ifdef WIN32 const char *ext = ".dll"; const char *EXT = ".DLL"; #elif defined (MACOSX) || defined (DARWIN) const char *ext = ".dylib"; const char *EXT = ".DYLIB"; #else const char *ext = ".so"; const char *EXT = ".SO"; #endif memset(&loadable_modules, 0, sizeof(loadable_modules)); switch_core_new_memory_pool(&loadable_modules.pool); #ifdef WIN32 switch_loadable_module_path_init(); #endif switch_core_hash_init(&loadable_modules.module_hash); switch_core_hash_init_nocase(&loadable_modules.endpoint_hash); switch_core_hash_init_nocase(&loadable_modules.codec_hash); switch_core_hash_init_nocase(&loadable_modules.timer_hash); switch_core_hash_init_nocase(&loadable_modules.application_hash); switch_core_hash_init_nocase(&loadable_modules.chat_application_hash); switch_core_hash_init_nocase(&loadable_modules.api_hash); switch_core_hash_init_nocase(&loadable_modules.json_api_hash); switch_core_hash_init(&loadable_modules.file_hash); switch_core_hash_init_nocase(&loadable_modules.speech_hash); switch_core_hash_init_nocase(&loadable_modules.asr_hash); switch_core_hash_init_nocase(&loadable_modules.directory_hash); switch_core_hash_init_nocase(&loadable_modules.chat_hash); switch_core_hash_init_nocase(&loadable_modules.say_hash); switch_core_hash_init_nocase(&loadable_modules.management_hash); switch_core_hash_init_nocase(&loadable_modules.limit_hash); switch_core_hash_init_nocase(&loadable_modules.database_hash); switch_core_hash_init_nocase(&loadable_modules.dialplan_hash); switch_core_hash_init(&loadable_modules.secondary_recover_hash); switch_mutex_init(&loadable_modules.mutex, SWITCH_MUTEX_NESTED, loadable_modules.pool); if (!autoload) return SWITCH_STATUS_SUCCESS; /* switch_core_sqldb_init() is not yet ready and is executed after starting modules from pre_load_modules.conf Modules loading procedure generates events used by sqldb. This is why we should hold those events (storing in the event_hash) not firing them until sqldb is ready. */ switch_core_hash_init(&event_hash); /* Pre-load core modules. Do not pre-load modules which may use databases, use appropriate section. */ switch_loadable_module_load_module_ex("", "CORE_SOFTTIMER_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash); switch_loadable_module_load_module_ex("", "CORE_PCM_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash); switch_loadable_module_load_module_ex("", "CORE_SPEEX_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash); /* Loading pre-load modules. Database modules must be loaded here. */ if ((xml = switch_xml_open_cfg(precf, &cfg, NULL))) { switch_xml_t mods, ld; if ((mods = switch_xml_child(cfg, "modules"))) { for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) { switch_bool_t global = SWITCH_FALSE; const char *val = switch_xml_attr_soft(ld, "module"); const char *path = switch_xml_attr_soft(ld, "path"); const char *critical = switch_xml_attr_soft(ld, "critical"); const char *sglobal = switch_xml_attr_soft(ld, "global"); if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val); continue; } global = switch_true(sglobal); if (path && zstr(path)) { path = SWITCH_GLOBAL_dirs.mod_dir; } if (switch_loadable_module_load_module_ex((char *)path, (char *)val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_PRELOAD, event_hash) == SWITCH_STATUS_GENERR) { if (critical && switch_true(critical)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to load critical module '%s', abort()\n", val); switch_core_hash_destroy(&event_hash); abort(); } } count++; } } switch_xml_free(xml); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", precf); } if (switch_core_sqldb_init(&err) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Loading modules interrupted. [Error: %s]\n", err); switch_core_hash_destroy(&event_hash); return SWITCH_STATUS_GENERR; } /* sqldb is ready. Fire holding events! */ for (hi = switch_core_hash_first(event_hash); hi; hi = switch_core_hash_next(&hi)) { switch_core_hash_this(hi, NULL, NULL, &hash_val); event = (switch_event_t *)hash_val; switch_event_fire(&event); } switch_core_hash_destroy(&event_hash); /* To perevent locking. Core modules which may use databases should be pre-loaded here (databases are loaded already). */ #ifdef SWITCH_HAVE_YUV #ifdef SWITCH_HAVE_VPX switch_loadable_module_load_module("", "CORE_VPX_MODULE", SWITCH_FALSE, &err); #endif #endif /* Loading common modules */ if ((xml = switch_xml_open_cfg(cf, &cfg, NULL))) { switch_xml_t mods, ld; if ((mods = switch_xml_child(cfg, "modules"))) { for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) { switch_bool_t global = SWITCH_FALSE; const char *val = switch_xml_attr_soft(ld, "module"); const char *path = switch_xml_attr_soft(ld, "path"); const char *critical = switch_xml_attr_soft(ld, "critical"); const char *sglobal = switch_xml_attr_soft(ld, "global"); if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val); continue; } global = switch_true(sglobal); if (path && zstr(path)) { path = SWITCH_GLOBAL_dirs.mod_dir; } if (switch_loadable_module_load_module_ex(path, val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, NULL) == SWITCH_STATUS_GENERR) { if (critical && switch_true(critical)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to load critical module '%s', abort()\n", val); abort(); } } count++; } } switch_xml_free(xml); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", cf); } if ((xml = switch_xml_open_cfg(pcf, &cfg, NULL))) { switch_xml_t mods, ld; if ((mods = switch_xml_child(cfg, "modules"))) { for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) { switch_bool_t global = SWITCH_FALSE; const char *val = switch_xml_attr_soft(ld, "module"); const char *path = switch_xml_attr_soft(ld, "path"); const char *sglobal = switch_xml_attr_soft(ld, "global"); if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val); continue; } global = switch_true(sglobal); if (path && zstr(path)) { path = SWITCH_GLOBAL_dirs.mod_dir; } switch_loadable_module_load_module_ex(path, val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_POSTLOAD, NULL); count++; } } switch_xml_free(xml); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", pcf); } if (!count) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "No modules loaded, assuming 'load all'\n"); all = 1; } if (all) { if (apr_dir_open(&module_dir_handle, SWITCH_GLOBAL_dirs.mod_dir, loadable_modules.pool) != APR_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Can't open directory: %s\n", SWITCH_GLOBAL_dirs.mod_dir); return SWITCH_STATUS_GENERR; } while (apr_dir_read(&finfo, finfo_flags, module_dir_handle) == APR_SUCCESS) { const char *fname = finfo.fname; if (finfo.filetype != APR_REG) { continue; } if (!fname) { fname = finfo.name; } if (!fname) { continue; } if (zstr(fname) || (!strstr(fname, ext) && !strstr(fname, EXT))) { continue; } switch_loadable_module_load_module(SWITCH_GLOBAL_dirs.mod_dir, fname, SWITCH_FALSE, &err); } apr_dir_close(module_dir_handle); } switch_loadable_module_runtime(); memset(&chat_globals, 0, sizeof(chat_globals)); chat_globals.running = 1; chat_globals.pool = loadable_modules.pool; switch_mutex_init(&chat_globals.mutex, SWITCH_MUTEX_NESTED, chat_globals.pool); chat_thread_start(1); return SWITCH_STATUS_SUCCESS; }
switch_loadable_module.c#switch_loadable_module_load_module_ex()
函数的核心处理如下:- 首先根据模块名称计算对应的模块的动态库文件名称,然后通过
switch_core_hash.c#switch_core_hash_find_locked()
方法查找哈希表 loadable_modules.module_hash 中是否存在该文件名,如果不存在说明模块还没有加载过,需要加载 switch_loadable_module.c#switch_loadable_module_load_file()
函数负责加载动态库文件- 如果加载动态库文件成功,则调用
switch_loadable_module.c#switch_loadable_module_process()
将加载完成的模块插入哈希表 loadable_modules.module_hash,并生成模块加载的事件将其推入到 FreeSWITCH 事件组件中
static switch_status_t switch_loadable_module_load_module_ex(const char *dir, const char *fname, switch_bool_t runtime, switch_bool_t global, const char **err, switch_loadable_module_type_t type, switch_hash_t *event_hash) { switch_size_t len = 0; char *path; char *file, *dot; switch_loadable_module_t *new_module = NULL; switch_status_t status = SWITCH_STATUS_SUCCESS; #ifdef WIN32 const char *ext = ".dll"; #else const char *ext = ".so"; #endif *err = ""; if ((file = switch_core_strdup(loadable_modules.pool, fname)) == 0) { *err = "allocation error"; return SWITCH_STATUS_FALSE; } if (switch_is_file_path(file)) { path = switch_core_strdup(loadable_modules.pool, file); file = (char *) switch_cut_path(file); if ((dot = strchr(file, '.'))) { *dot = '\0'; } } else { if ((dot = strchr(file, '.'))) { *dot = '\0'; } len = strlen(switch_str_nil(dir)); len += strlen(file); len += 8; path = (char *) switch_core_alloc(loadable_modules.pool, len); switch_snprintf(path, len, "%s%s%s%s", switch_str_nil(dir), SWITCH_PATH_SEPARATOR, file, ext); } if (switch_core_hash_find_locked(loadable_modules.module_hash, file, loadable_modules.mutex)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Module %s Already Loaded!\n", file); *err = "Module already loaded"; status = SWITCH_STATUS_FALSE; } else if ((status = switch_loadable_module_load_file(path, file, global, &new_module)) == SWITCH_STATUS_SUCCESS) { new_module->type = type; if ((status = switch_loadable_module_process(file, new_module, event_hash)) == SWITCH_STATUS_SUCCESS && runtime) { if (new_module->switch_module_runtime) { new_module->thread = switch_core_launch_thread(switch_loadable_module_exec, new_module, new_module->pool); } } else if (status != SWITCH_STATUS_SUCCESS) { *err = "module load routine returned an error"; } } else { *err = "module load file routine returned an error"; } return status; }
- 首先根据模块名称计算对应的模块的动态库文件名称,然后通过
switch_loadable_module.c#switch_loadable_module_load_file()
函数的核心处理步骤如下:- 首先调用
switch_dso.c#switch_dso_open()
函数打开库文件,随后调用switch_dso.c#sswitch_dso_data_sym()
函数获取动态库中的符号表,这部分涉及到平台库函数,不做深入讨论 - 如果符号表加载成功,则将其赋给函数表 interface_struct_handle,最终将其赋给函数表结构体 mod_interface_functions
- 如果当前加载的模块定义了 load 函数,则需要通过函数指针
load_func_ptr
回调执行,以 Event Socket 模块为例,这里将回调到mod_event_socket.c#SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
函数 - 以上处理完毕,生成结构体
switch_loadable_module_t
实例 module 保存模块中的函数等信息,最终将其添加到传入函数的指针 new_module 末尾
static switch_status_t switch_loadable_module_load_file(char *path, char *filename, switch_bool_t global, switch_loadable_module_t **new_module) { switch_loadable_module_t *module = NULL; switch_dso_lib_t dso = NULL; apr_status_t status = SWITCH_STATUS_SUCCESS; switch_loadable_module_function_table_t *interface_struct_handle = NULL; switch_loadable_module_function_table_t *mod_interface_functions = NULL; char *struct_name = NULL; switch_module_load_t load_func_ptr = NULL; int loading = 1; switch_loadable_module_interface_t *module_interface = NULL; char *derr = NULL; const char *err = NULL; switch_memory_pool_t *pool = NULL; switch_bool_t load_global = global; switch_assert(path != NULL); switch_core_new_memory_pool(&pool); *new_module = NULL; struct_name = switch_core_sprintf(pool, "%s_module_interface", filename); #ifdef WIN32 dso = switch_dso_open("FreeSwitch.dll", load_global, &derr); #elif defined (MACOSX) || defined(DARWIN) { char *lib_path = switch_mprintf("%s/libfreeswitch.dylib", SWITCH_GLOBAL_dirs.lib_dir); dso = switch_dso_open(lib_path, load_global, &derr); switch_safe_free(lib_path); } #else dso = switch_dso_open(NULL, load_global, &derr); #endif if (!derr && dso) { interface_struct_handle = switch_dso_data_sym(dso, struct_name, &derr); } switch_safe_free(derr); if (!interface_struct_handle) { if (dso) switch_dso_destroy(&dso); dso = switch_dso_open(path, load_global, &derr); } while (loading) { if (derr) { err = derr; break; } if (!interface_struct_handle) { interface_struct_handle = switch_dso_data_sym(dso, struct_name, &derr); } if (derr) { err = derr; break; } if (interface_struct_handle && interface_struct_handle->switch_api_version != SWITCH_API_VERSION) { err = "Trying to load an out of date module, please rebuild the module."; break; } if (!load_global && interface_struct_handle && switch_test_flag(interface_struct_handle, SMODF_GLOBAL_SYMBOLS)) { load_global = SWITCH_TRUE; switch_dso_destroy(&dso); interface_struct_handle = NULL; dso = switch_dso_open(path, load_global, &derr); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loading module with global namespace at request of module\n"); continue; } if (interface_struct_handle) { mod_interface_functions = interface_struct_handle; load_func_ptr = mod_interface_functions->load; } if (load_func_ptr == NULL) { err = "Cannot locate symbol 'switch_module_load' please make sure this is a valid module."; break; } status = load_func_ptr(&module_interface, pool); if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_NOUNLOAD) { err = "Module load routine returned an error"; module_interface = NULL; break; } if (!module_interface) { err = "Module failed to initialize its module_interface. Is this a valid module?"; break; } if ((module = switch_core_alloc(pool, sizeof(switch_loadable_module_t))) == 0) { abort(); } if (status == SWITCH_STATUS_NOUNLOAD) { module->perm++; } loading = 0; } if (err) { if (dso) { switch_dso_destroy(&dso); } if (pool) { switch_core_destroy_memory_pool(&pool); } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error Loading module %s\n**%s**\n", path, err); switch_safe_free(derr); return SWITCH_STATUS_GENERR; } module->pool = pool; module->filename = switch_core_strdup(module->pool, path); module->module_interface = module_interface; module->switch_module_load = load_func_ptr; if (mod_interface_functions) { module->switch_module_shutdown = mod_interface_functions->shutdown; module->switch_module_runtime = mod_interface_functions->runtime; } module->lib = dso; *new_module = module; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Successfully Loaded [%s]\n", module_interface->module_name); switch_core_set_signal_handlers(); return SWITCH_STATUS_SUCCESS; }
- 首先调用
mod_event_socket.c#SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
函数源码中重要的处理如下:- 调用
switch_event.c#switch_event_bind_removable()
注册事件监听,这里是监听SWITCH_EVENT_ALL
所有事件,并将函数mod_event_socket.c#event_hanlder()
作为回调传入 - 通过宏调用
switch_loadable_module.h#SWITCH_ADD_APP()
将switch_event.c#socket_function()
函数作为一个名为 socket 的 Application 注册到 FreeSwitch 核心中供系统调用,也就是外联 Outbound 模式主动连接外部的入口
SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) { switch_application_interface_t *app_interface; switch_api_interface_t *api_interface; memset(&globals, 0, sizeof(globals)); switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); memset(&listen_list, 0, sizeof(listen_list)); switch_mutex_init(&listen_list.sock_mutex, SWITCH_MUTEX_NESTED, pool); if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n"); return SWITCH_STATUS_GENERR; } switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE); /* connect my internal structure to the blank pointer passed to me */ *module_interface = switch_loadable_module_create_module_interface(pool, modname); SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "<ip>[:<port>]", SAF_SUPPORT_NOMEDIA); SWITCH_ADD_API(api_interface, "event_sink", "event_sink", event_sink_function, "<web data>"); /* indicate that the module should continue to be loaded */ return SWITCH_STATUS_SUCCESS; }
- 调用
switch_event.c#switch_event_bind_removable()
函数的处理比较直观,可以看到核心就是将调用方传入的回调等信息封装为一个switch_event_node_t
结构体实例,并将其加入到监听节点数组 EVENT_NODES 中SWITCH_DECLARE(switch_status_t) switch_event_bind_removable(const char *id, switch_event_types_t event, const char *subclass_name, switch_event_callback_t callback, void *user_data, switch_event_node_t **node) { switch_event_node_t *event_node; switch_event_subclass_t *subclass = NULL; switch_assert(BLOCK != NULL); switch_assert(RUNTIME_POOL != NULL); if (node) { *node = NULL; } if (subclass_name) { switch_mutex_lock(CUSTOM_HASH_MUTEX); if (!(subclass = switch_core_hash_find(CUSTOM_HASH, subclass_name))) { switch_event_reserve_subclass_detailed(id, subclass_name); subclass = switch_core_hash_find(CUSTOM_HASH, subclass_name); subclass->bind = 1; } switch_mutex_unlock(CUSTOM_HASH_MUTEX); if (!subclass) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not reserve subclass. '%s'\n", subclass_name); return SWITCH_STATUS_FALSE; } } if (event <= SWITCH_EVENT_ALL) { switch_zmalloc(event_node, sizeof(*event_node)); switch_thread_rwlock_wrlock(RWLOCK); switch_mutex_lock(BLOCK); /* <LOCKED> ----------------------------------------------- */ event_node->id = DUP(id); event_node->event_id = event; if (subclass_name) { event_node->subclass_name = DUP(subclass_name); } event_node->callback = callback; event_node->user_data = user_data; if (EVENT_NODES[event]) { event_node->next = EVENT_NODES[event]; } EVENT_NODES[event] = event_node; switch_mutex_unlock(BLOCK); switch_thread_rwlock_unlock(RWLOCK); /* </LOCKED> ----------------------------------------------- */ if (node) { *node = event_node; } return SWITCH_STATUS_SUCCESS; } return SWITCH_STATUS_MEMERR; }
此时模块的加载流程基本结束,回到本节步骤1第4步
switch_loadable_module.c#switch_loadable_module_runtime()
函数调用,可以看到这个函数的主要逻辑是判断加载的模块是否有定义 runtime 函数,有的话就调用switch_core.c#switch_core_launch_thread()
函数新建线程,将switch_loadable_module.c#switch_loadable_module_exec()
函数作为线程任务传入static void switch_loadable_module_runtime(void) { switch_hash_index_t *hi; void *val; switch_loadable_module_t *module; switch_mutex_lock(loadable_modules.mutex); for (hi = switch_core_hash_first(loadable_modules.module_hash); hi; hi = switch_core_hash_next(&hi)) { switch_core_hash_this(hi, NULL, NULL, &val); module = (switch_loadable_module_t *) val; if (module->switch_module_runtime) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting runtime thread for %s\n", module->module_interface->module_name); module->thread = switch_core_launch_thread(switch_loadable_module_exec, module, loadable_modules.pool); } } switch_mutex_unlock(loadable_modules.mutex); }
switch_loadable_module.c#switch_loadable_module_exec()
函数逻辑清晰,主要逻辑就是执行模块的 runtime 函数,以 Event Socket 模块为例,这里将回调到mod_event_socket.c#SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime)
函数,至此加载进内存的 Event Socket 模块已经运行起来,其运行原理将在下节分析static void *SWITCH_THREAD_FUNC switch_loadable_module_exec(switch_thread_t *thread, void *obj) { switch_status_t status = SWITCH_STATUS_SUCCESS; switch_core_thread_session_t *ts = obj; switch_loadable_module_t *module = ts->objs[0]; int restarts; switch_assert(thread != NULL); switch_assert(module != NULL); for (restarts = 0; status != SWITCH_STATUS_TERM && !module->shutting_down; restarts++) { status = module->switch_module_runtime(); } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Thread ended for %s\n", module->module_interface->module_name); if (ts->pool) { switch_memory_pool_t *pool = ts->pool; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Destroying Pool for %s\n", module->module_interface->module_name); switch_core_destroy_memory_pool(&pool); } switch_thread_exit(thread, 0); return NULL; }
2.2 mod_event_socket 的运行原理
FreeSWITCH 中的 EventSocket 有如下图两种交互模式,从功能上来说二者几乎没有差别,主要区别是 FreeSWITCH 在连接中担任的角色不同
2.2.1 事件订阅命令的处理
mod_event_socket.c#SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime)
函数是 Event Socket 模块运行的关键,其核心处理如下:- 首先调用
mod_event_socket.c#config()
函数读取配置文件的属性,将其加载到内存中 - 根据配置属性,绑定监听本机端口,通过
switch_apr.c#switch_socket_accept()
调用底层接口等待远程 Inbound 连接 - 一旦接收到远程 Inbound 连接,则将其封装为
listener_t
结构体实例 listener 并添加到 socket 监听列表中 - 最后调用
mod_event_socket.c#launch_listener_thread()
函数创建一个线程单独处理这条 Inbound 连接上的数据交互
SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime) { switch_memory_pool_t *pool = NULL, *listener_pool = NULL; switch_status_t rv; switch_sockaddr_t *sa; switch_socket_t *inbound_socket = NULL; listener_t *listener; uint32_t x = 0; uint32_t errs = 0; if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); return SWITCH_STATUS_TERM; } config(); while (!prefs.done) { rv = switch_sockaddr_info_get(&sa, prefs.ip, SWITCH_UNSPEC, prefs.port, 0, pool); if (rv) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot get information about IP address %s\n", prefs.ip); goto fail; } rv = switch_socket_create(&listen_list.sock, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, pool); if (rv) goto sock_fail; rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_REUSEADDR, 1); if (rv) goto sock_fail; #ifdef WIN32 /* Enable dual-stack listening on Windows (if the listening address is IPv6), it's default on Linux */ if (switch_sockaddr_get_family(sa) == AF_INET6) { rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_IPV6_V6ONLY, 0); if (rv) goto sock_fail; } #endif rv = switch_socket_bind(listen_list.sock, sa); if (rv) goto sock_fail; rv = switch_socket_listen(listen_list.sock, 5); if (rv) goto sock_fail; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket up listening on %s:%u\n", prefs.ip, prefs.port); if (prefs.nat_map) { switch_nat_add_mapping(prefs.port, SWITCH_NAT_TCP, NULL, SWITCH_FALSE); } break; sock_fail: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! Could not listen on %s:%u\n", prefs.ip, prefs.port); if (prefs.stop_on_bind_error) { prefs.done = 1; goto fail; } switch_yield(100000); } listen_list.ready = 1; while (!prefs.done) { if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); goto fail; } if ((rv = switch_socket_accept(&inbound_socket, listen_list.sock, listener_pool))) { if (prefs.done) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Shutting Down\n"); goto end; } else { /* I wish we could use strerror_r here but its not defined everywhere =/ */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error [%s]\n", strerror(errno)); if (++errs > 100) { goto end; } } } else { errs = 0; } if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n"); break; } switch_thread_rwlock_create(&listener->rwlock, listener_pool); switch_queue_create(&listener->event_queue, MAX_QUEUE_LEN, listener_pool); switch_queue_create(&listener->log_queue, MAX_QUEUE_LEN, listener_pool); listener->sock = inbound_socket; listener->pool = listener_pool; listener_pool = NULL; listener->format = EVENT_FORMAT_PLAIN; switch_set_flag(listener, LFLAG_FULL); switch_set_flag(listener, LFLAG_ALLOW_LOG); switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool); switch_mutex_init(&listener->filter_mutex, SWITCH_MUTEX_NESTED, listener->pool); switch_core_hash_init(&listener->event_hash); switch_socket_create_pollset(&listener->pollfd, listener->sock, SWITCH_POLLIN | SWITCH_POLLERR, listener->pool); if (switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock) == SWITCH_STATUS_SUCCESS && listener->sa) { switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa); if ((listener->remote_port = switch_sockaddr_get_port(listener->sa))) { if (launch_listener_thread(listener) == SWITCH_STATUS_SUCCESS) continue; } } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error initilizing connection\n"); close_socket(&listener->sock); expire_listener(&listener); } end: close_socket(&listen_list.sock); if (prefs.nat_map && switch_nat_get_type()) { switch_nat_del_mapping(prefs.port, SWITCH_NAT_TCP); } if (pool) { switch_core_destroy_memory_pool(&pool); } if (listener_pool) { switch_core_destroy_memory_pool(&listener_pool); } for (x = 0; x < prefs.acl_count; x++) { switch_safe_free(prefs.acl[x]); } fail: return SWITCH_STATUS_TERM; }
- 首先调用
mod_event_socket.c#config()
函数逻辑比较简洁,就是读取配置文件event_socket.conf.xml
属性,并将其保存到 prefs 结构体中static int config(void) { char *cf = "event_socket.conf"; switch_xml_t cfg, xml, settings, param; memset(&prefs, 0, sizeof(prefs)); if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf); } else { if ((settings = switch_xml_child(cfg, "settings"))) { for (param = switch_xml_child(settings, "param"); param; param = param->next) { char *var = (char *) switch_xml_attr_soft(param, "name"); char *val = (char *) switch_xml_attr_soft(param, "value"); if (!strcmp(var, "listen-ip")) { set_pref_ip(val); } else if (!strcmp(var, "debug")) { globals.debug = atoi(val); } else if (!strcmp(var, "nat-map")) { if (switch_true(val) && switch_nat_get_type()) { prefs.nat_map = 1; } } else if (!strcmp(var, "listen-port")) { prefs.port = (uint16_t) atoi(val); } else if (!strcmp(var, "password")) { set_pref_pass(val); } else if (!strcasecmp(var, "apply-inbound-acl") && ! zstr(val)) { if (prefs.acl_count < MAX_ACL) { prefs.acl[prefs.acl_count++] = strdup(val); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Max acl records of %d reached\n", MAX_ACL); } } else if (!strcasecmp(var, "stop-on-bind-error")) { prefs.stop_on_bind_error = switch_true(val) ? 1 : 0; } } } switch_xml_free(xml); } if (zstr(prefs.ip)) { set_pref_ip("127.0.0.1"); } if (zstr(prefs.password)) { set_pref_pass("ClueCon"); } if (!prefs.nat_map) { prefs.nat_map = 0; } if (!prefs.acl_count) { prefs.acl[prefs.acl_count++] = strdup("loopback.auto"); } if (prefs.nat_map) { prefs.nat_map = 0; } if (!prefs.port) { prefs.port = 8021; } return 0; }
远程 Inbound 连接到来时,
mod_event_socket.c#launch_listener_thread()
函数将被调用,可以看到此处核心处理是新建线程,并将mod_event_socket.c#listener_run()
函数作为线程任务执行static switch_status_t launch_listener_thread(listener_t *listener) { switch_thread_t *thread; switch_threadattr_t *thd_attr = NULL; switch_threadattr_create(&thd_attr, listener->pool); switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); return switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool); }
mod_event_socket.c#listener_run()
函数源码如下,可以看到其关键的处理分为以下几步:- 首先是尝试对连接进行 acl 检查,实际上只会对 Inbound 连接进行 acl 检查,Outbound 连接跳过这步
- 接下来调用
mod_event_socket.c#add_listener()
函数将当前 listener 添加到内部 listen_list 列表 - 接着是用户授权认证,同样是只会对 Inbound 连接进行,Outbound 连接默认已经授权。授权完成后,这个连接可以正式开始交互,则在 while 循环中不断调用
mod_event_socket.c#read_packet()
函数读取对端数据 - 读取到对端数据后,再调用
mod_event_socket.c#parse_command()
函数解析传输过来的命令并执行
static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) { listener_t *listener = (listener_t *) obj; char buf[1024]; switch_size_t len; switch_status_t status; switch_event_t *event; char reply[512] = ""; switch_core_session_t *session = NULL; switch_channel_t *channel = NULL; switch_event_t *revent = NULL; const char *var; int locked = 1; switch_mutex_lock(globals.listener_mutex); prefs.threads++; switch_mutex_unlock(globals.listener_mutex); switch_assert(listener != NULL); if ((session = listener->session)) { if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to lock session!\n"); locked = 0; session = NULL; goto done; } listener->lock_acquired = 1; } if (!listener->sock) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Listener socket is null!\n"); switch_clear_flag_locked(listener, LFLAG_RUNNING); goto done; } switch_socket_opt_set(listener->sock, SWITCH_SO_TCP_NODELAY, TRUE); switch_socket_opt_set(listener->sock, SWITCH_SO_NONBLOCK, TRUE); if (prefs.acl_count && listener->sa && !zstr(listener->remote_ip)) { uint32_t x = 0; for (x = 0; x < prefs.acl_count; x++) { if (!switch_check_network_list_ip(listener->remote_ip, prefs.acl[x])) { const char message[] = "Access Denied, go away.\n"; int mlen = (int)strlen(message); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "IP %s Rejected by acl \"%s\"\n", listener->remote_ip, prefs.acl[x]); switch_snprintf(buf, sizeof(buf), "Content-Type: text/rude-rejection\nContent-Length: %d\n\n", mlen); len = strlen(buf); switch_socket_send(listener->sock, buf, &len); len = mlen; switch_socket_send(listener->sock, message, &len); goto done; } } } if (globals.debug > 0) { if (zstr(listener->remote_ip)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Open\n"); } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Open from %s:%d\n", listener->remote_ip, listener->remote_port); } } switch_socket_opt_set(listener->sock, SWITCH_SO_NONBLOCK, TRUE); switch_set_flag_locked(listener, LFLAG_RUNNING); add_listener(listener); if (session && switch_test_flag(listener, LFLAG_AUTHED)) { switch_event_t *ievent = NULL; switch_set_flag_locked(listener, LFLAG_SESSION); status = read_packet(listener, &ievent, 25); if (status != SWITCH_STATUS_SUCCESS || !ievent) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Socket Error!\n"); switch_clear_flag_locked(listener, LFLAG_RUNNING); goto done; } if (parse_command(listener, &ievent, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) { switch_clear_flag_locked(listener, LFLAG_RUNNING); goto done; } } else { switch_snprintf(buf, sizeof(buf), "Content-Type: auth/request\n\n"); len = strlen(buf); switch_socket_send(listener->sock, buf, &len); while (!switch_test_flag(listener, LFLAG_AUTHED)) { status = read_packet(listener, &event, 25); if (status != SWITCH_STATUS_SUCCESS) { goto done; } if (!event) { continue; } if (parse_command(listener, &event, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) { switch_clear_flag_locked(listener, LFLAG_RUNNING); goto done; } if (*reply != '\0') { if (*reply == '~') { switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\n%s", reply + 1); } else { switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply); } len = strlen(buf); switch_socket_send(listener->sock, buf, &len); } break; } } while (!prefs.done && switch_test_flag(listener, LFLAG_RUNNING) && listen_list.ready) { len = sizeof(buf); memset(buf, 0, len); status = read_packet(listener, &revent, 0); if (status != SWITCH_STATUS_SUCCESS) { break; } if (!revent) { continue; } if (parse_command(listener, &revent, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) { switch_clear_flag_locked(listener, LFLAG_RUNNING); break; } if (revent) { switch_event_destroy(&revent); } if (*reply != '\0') { if (*reply == '~') { switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\n%s", reply + 1); } else { switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply); } len = strlen(buf); switch_socket_send(listener->sock, buf, &len); } } done: if (revent) { switch_event_destroy(&revent); } remove_listener(listener); if (globals.debug > 0) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Session complete, waiting for children\n"); } switch_thread_rwlock_wrlock(listener->rwlock); flush_listener(listener, SWITCH_TRUE, SWITCH_TRUE); switch_mutex_lock(listener->filter_mutex); if (listener->filters) { switch_event_destroy(&listener->filters); } switch_mutex_unlock(listener->filter_mutex); if (listener->session && locked) { channel = switch_core_session_get_channel(listener->session); } if (channel && switch_channel_get_state(channel) != CS_HIBERNATE && !switch_channel_test_flag(channel, CF_REDIRECT) && !switch_channel_test_flag(channel, CF_TRANSFER) && !switch_channel_test_flag(channel, CF_RESET) && (switch_test_flag(listener, LFLAG_RESUME) || ((var = switch_channel_get_variable(channel, "socket_resume")) && switch_true(var)))) { switch_channel_set_state(channel, CS_RESET); } if (listener->sock) { send_disconnect(listener, "Disconnected, goodbye.\nSee you at ClueCon! http://www.cluecon.com/\n"); close_socket(&listener->sock); } switch_thread_rwlock_unlock(listener->rwlock); if (globals.debug > 0) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Closed\n"); } switch_core_hash_destroy(&listener->event_hash); if (listener->allowed_event_hash) { switch_core_hash_destroy(&listener->allowed_event_hash); } if (listener->allowed_api_hash) { switch_core_hash_destroy(&listener->allowed_api_hash); } if (listener->session) { if (locked) { switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED); } switch_clear_flag_locked(listener, LFLAG_SESSION); if (locked) { switch_core_session_rwunlock(listener->session); } } else if (listener->pool) { switch_memory_pool_t *pool = listener->pool; switch_core_destroy_memory_pool(&pool); } switch_mutex_lock(globals.listener_mutex); prefs.threads--; switch_mutex_unlock(globals.listener_mutex); listener->finished = 1; return NULL; }
mod_event_socket.c#read_packet()
函数体比较琐碎,简单来说关键处理是以下几步:- 通过
switch_apr.c#switch_socket_recv()
函数调用底层接口读取 socket 数据,有数据则将其处理后封装到switch_event_t
结构体实例 event 中,跳出循环 - 如果没有读到数据,并且当前 listener 对应的远端已经订阅了事件,则将
listener->event_queue
队列中的事件出队,按照订阅方的订阅格式通过switch_apr.c#switch_socket_send()
函数发送给对端
static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t timeout) { ...... while (listener->sock && !prefs.done) { uint8_t do_sleep = 1; mlen = 1; if (bytes == buf_len - 1) { char *tmp; int pos; pos = (int)(ptr - mbuf); buf_len += block_len; tmp = realloc(mbuf, buf_len); switch_assert(tmp); mbuf = tmp; memset(mbuf + bytes, 0, buf_len - bytes); ptr = (mbuf + pos); } status = switch_socket_recv(listener->sock, ptr, &mlen); if (prefs.done || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) { switch_goto_status(SWITCH_STATUS_FALSE, end); } if (mlen) { bytes += mlen; do_sleep = 0; if (*mbuf == '\r' || *mbuf == '\n') { /* bah */ ptr = mbuf; mbuf[0] = '\0'; bytes = 0; continue; } if (*ptr == '\n') { crcount++; } else if (*ptr != '\r') { crcount = 0; } ptr++; if (bytes >= max_len) { crcount = 2; } if (crcount == 2) { char *next; char *cur = mbuf; while (cur) { if ((next = strchr(cur, '\r')) || (next = strchr(cur, '\n'))) { while (*next == '\r' || *next == '\n') { next++; } } count++; if (count == 1) { switch_event_create(event, SWITCH_EVENT_CLONE); switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, "Command", mbuf); } else if (cur) { char *var, *val; var = cur; strip_cr(var); if (!zstr(var)) { if ((val = strchr(var, ':'))) { *val++ = '\0'; while (*val == ' ') { val++; } } if (val) { switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, var, val); if (!strcasecmp(var, "content-length")) { clen = atoi(val); if (clen > 0) { char *body; char *p; switch_zmalloc(body, clen + 1); p = body; while (clen > 0) { mlen = clen; status = switch_socket_recv(listener->sock, p, &mlen); if (prefs.done || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) { free(body); switch_goto_status(SWITCH_STATUS_FALSE, end); } /* if (channel && !switch_channel_ready(channel)) { status = SWITCH_STATUS_FALSE; break; } */ clen -= (int) mlen; p += mlen; } switch_event_add_body(*event, "%s", body); free(body); } } } } } cur = next; } break; } } if (!*mbuf) { ...... if (listener->session) { switch_channel_t *chan = switch_core_session_get_channel(listener->session); if (switch_channel_get_state(chan) < CS_HANGUP && switch_channel_test_flag(chan, CF_DIVERT_EVENTS)) { switch_event_t *e = NULL; while (switch_core_session_dequeue_event(listener->session, &e, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { if (switch_queue_trypush(listener->event_queue, e) != SWITCH_STATUS_SUCCESS) { switch_core_session_queue_event(listener->session, &e); break; } } } } if (switch_test_flag(listener, LFLAG_EVENTS)) { while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { char hbuf[512]; switch_event_t *pevent = (switch_event_t *) pop; char *etype; do_sleep = 0; if (listener->format == EVENT_FORMAT_PLAIN) { etype = "plain"; switch_event_serialize(pevent, &listener->ebuf, SWITCH_TRUE); } else if (listener->format == EVENT_FORMAT_JSON) { etype = "json"; switch_event_serialize_json(pevent, &listener->ebuf); } else { switch_xml_t xml; etype = "xml"; if ((xml = switch_event_xmlize(pevent, SWITCH_VA_NONE))) { listener->ebuf = switch_xml_toxml(xml, SWITCH_FALSE); switch_xml_free(xml); } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_ERROR, "XML ERROR!\n"); goto endloop; } } switch_assert(listener->ebuf); len = strlen(listener->ebuf); switch_snprintf(hbuf, sizeof(hbuf), "Content-Length: %" SWITCH_SSIZE_T_FMT "\n" "Content-Type: text/event-%s\n" "\n", len, etype); len = strlen(hbuf); switch_socket_send(listener->sock, hbuf, &len); len = strlen(listener->ebuf); switch_socket_send(listener->sock, listener->ebuf, &len); switch_safe_free(listener->ebuf); endloop: switch_event_destroy(&pevent); } } } ...... }
- 通过
此时回到本节步骤4第4步,
mod_event_socket.c#parse_command()
函数非常长,主要逻辑是各个命令的处理,可以看到其关键处理如下:- 首先调用
switch_event.h#switch_event_get_header()
函数从上一步封装好的switch_event_t
结构体中获取到命令名称 - 命令的处理非常繁冗,此处以 event 订阅事件命令为例解析,该命令的主要处理逻辑是解析命令参数,将订阅的事件记录在
listener->event_list
标记数组中并给 listener 添上LFLAG_EVENTS
标记位,至此 Event Socket 事件订阅处理基本结束
static switch_status_t parse_command(listener_t *listener, switch_event_t **event, char *reply, uint32_t reply_len) { switch_status_t status = SWITCH_STATUS_SUCCESS; char *cmd = NULL; char unload_cheat[] = "api bgapi unload mod_event_socket"; char reload_cheat[] = "api bgapi reload mod_event_socket"; *reply = '\0'; if (!event || !*event || !(cmd = switch_event_get_header(*event, "command"))) { switch_clear_flag_locked(listener, LFLAG_RUNNING); switch_snprintf(reply, reply_len, "-ERR command parse error."); goto done; } ...... if (!strncasecmp(cmd, "sendevent", 9)) { ...... } else if (!strncasecmp(cmd, "event", 5)) { char *next, *cur; uint32_t count = 0, key_count = 0; uint8_t custom = 0; strip_cr(cmd); cur = cmd + 5; if ((cur = strchr(cur, ' '))) { for (cur++; cur; count++) { switch_event_types_t type; if ((next = strchr(cur, ' '))) { *next++ = '\0'; } if (!count) { if (!strcasecmp(cur, "xml")) { listener->format = EVENT_FORMAT_XML; goto end; } else if (!strcasecmp(cur, "plain")) { listener->format = EVENT_FORMAT_PLAIN; goto end; } else if (!strcasecmp(cur, "json")) { listener->format = EVENT_FORMAT_JSON; goto end; } } if (custom) { if (!listener->allowed_event_hash || switch_core_hash_find(listener->allowed_event_hash, cur)) { switch_core_hash_insert(listener->event_hash, cur, MARKER); } else { switch_snprintf(reply, reply_len, "-ERR permission denied"); goto done; } } else if (switch_name_event(cur, &type) == SWITCH_STATUS_SUCCESS) { if (switch_test_flag(listener, LFLAG_AUTH_EVENTS) && !listener->allowed_event_list[type] && !switch_test_flag(listener, LFLAG_ALL_EVENTS_AUTHED)) { switch_snprintf(reply, reply_len, "-ERR permission denied"); goto done; } key_count++; if (type == SWITCH_EVENT_ALL) { uint32_t x = 0; for (x = 0; x < SWITCH_EVENT_ALL; x++) { listener->event_list[x] = 1; } if (!listener->allowed_event_hash) { set_all_custom(listener); } else { set_allowed_custom(listener); } } if (type <= SWITCH_EVENT_ALL) { listener->event_list[type] = 1; } if (type == SWITCH_EVENT_CUSTOM) { custom++; } } end: cur = next; } } if (!key_count) { switch_snprintf(reply, reply_len, "-ERR no keywords supplied"); goto done; } if (!switch_test_flag(listener, LFLAG_EVENTS)) { switch_set_flag_locked(listener, LFLAG_EVENTS); } switch_snprintf(reply, reply_len, "+OK event listener enabled %s", format2str(listener->format)); } ...... done: if (zstr(reply)) { switch_snprintf(reply, reply_len, "-ERR command not found"); } done_noreply: if (event) { switch_event_destroy(event); } return status; }
- 首先调用
2.2.2 事件分发的流程
经过上一节步骤5第2步的分析,我们知道远程连接监听的事件其实都来自于 listner 的内部队列,所以要了解事件的分发流程,关键是要知道队列中的数据是从哪里来。此时回顾2.1.3节步骤4第1步,Event Socket 模块在加载时注册了监听到 FreeSWITCH 的事件组件中,而事件组件的事件来源其实是宏定义
switch_event.h#switch_event_fire()
调用switch_event.c#switch_event_fire_detailed()
函数。该函数比较简洁,核心处理如下:- 根据 runtime.events_use_dispatch 属性决定外部调用方投递的事件的最终目的地,该属性默认值为1,也就是默认会调用
switch_event.c#switch_event_queue_dispatch_event()
投递到 EVENT_DISPATCH_QUEUE switch_event.c#switch_event_deliver_thread_pool()
函数执行时会将事件投递到 session_manager.thread_queue 队列,并根据线程池当前空闲状态决定是否创建新线程
SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, const char *func, int line, switch_event_t **event, void *user_data) { switch_assert(BLOCK != NULL); switch_assert(RUNTIME_POOL != NULL); switch_assert(EVENT_QUEUE_MUTEX != NULL); switch_assert(RUNTIME_POOL != NULL); if (SYSTEM_RUNNING <= 0) { /* sorry we're closed */ switch_event_destroy(event); return SWITCH_STATUS_SUCCESS; } if (user_data) { (*event)->event_user_data = user_data; } if (runtime.events_use_dispatch) { check_dispatch(); if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) { switch_event_destroy(event); return SWITCH_STATUS_FALSE; } } else { switch_event_deliver_thread_pool(event); } return SWITCH_STATUS_SUCCESS; }
- 根据 runtime.events_use_dispatch 属性决定外部调用方投递的事件的最终目的地,该属性默认值为1,也就是默认会调用
switch_event.c#switch_event_queue_dispatch_event()
函数的核心处理如下:- 计算 DISPATCH_QUEUE_LEN 队列空闲状态,如果队列中数据已经超过阈值,则检查当前事件分发线程数量是否达到最大值,最终决定是否需要新建事件分发线程
- 如果需要新建线程,则执行
switch_event.c#switch_event_launch_dispatch_threads()
函数启动新线程 - 最后调用函数
switch_apr.c#switch_queue_push()
将事件添加到 DISPATCH_QUEUE_LEN 队列
static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp) { switch_event_t *event = *eventp; if (!SYSTEM_RUNNING) { return SWITCH_STATUS_FALSE; } while (event) { int launch = 0; switch_mutex_lock(EVENT_QUEUE_MUTEX); if (!PENDING && switch_queue_size(EVENT_DISPATCH_QUEUE) > (unsigned int)(DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) { if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) { launch++; PENDING++; } } switch_mutex_unlock(EVENT_QUEUE_MUTEX); if (launch) { if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) { switch_event_launch_dispatch_threads(SOFT_MAX_DISPATCH + 1); } switch_mutex_lock(EVENT_QUEUE_MUTEX); PENDING--; switch_mutex_unlock(EVENT_QUEUE_MUTEX); } *eventp = NULL; switch_queue_push(EVENT_DISPATCH_QUEUE, event); event = NULL; } return SWITCH_STATUS_SUCCESS; }
事件已经入队,实际对 DISPATCH_QUEUE_LEN 队列进行处理的是在 2.1.2节步骤4 提到的线程任务
switch_event.c#switch_event_dispatch_thread()
中。可以看到这个函数的核心操作是在 for 空循环中将 DISPATCH_QUEUE_LEN 队列的数据轮询出来,随后调用switch_event.c#switch_event_deliver()
函数进行处理static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *thread, void *obj) { switch_queue_t *queue = (switch_queue_t *) obj; int my_id = 0; switch_mutex_lock(EVENT_QUEUE_MUTEX); THREAD_COUNT++; DISPATCH_THREAD_COUNT++; for (my_id = 0; my_id < MAX_DISPATCH_VAL; my_id++) { if (EVENT_DISPATCH_QUEUE_THREADS[my_id] == thread) { break; } } if ( my_id >= MAX_DISPATCH_VAL ) { switch_mutex_unlock(EVENT_QUEUE_MUTEX); return NULL; } EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1; switch_mutex_unlock(EVENT_QUEUE_MUTEX); for (;;) { void *pop = NULL; switch_event_t *event = NULL; if (!SYSTEM_RUNNING) { break; } if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) { continue; } if (!pop) { break; } event = (switch_event_t *) pop; switch_event_deliver(&event); switch_os_yield(); } switch_mutex_lock(EVENT_QUEUE_MUTEX); EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 0; THREAD_COUNT--; DISPATCH_THREAD_COUNT--; switch_mutex_unlock(EVENT_QUEUE_MUTEX); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Thread %d Ended.\n", my_id); return NULL; }
switch_event.c#switch_event_deliver()
函数非常简洁,核心处理就是遍历 EVENT_NODES 列表,调用注册方的回调函数将事件分发到订阅方。此处结合2.1.3节步骤4第1步,可以知道会调用到mod_event_socket.c#event_handler()
函数SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event) { switch_event_types_t e; switch_event_node_t *node; if (SYSTEM_RUNNING) { switch_thread_rwlock_rdlock(RWLOCK); for (e = (*event)->event_id;; e = SWITCH_EVENT_ALL) { for (node = EVENT_NODES[e]; node; node = node->next) { if (switch_events_match(*event, node)) { (*event)->bind_user_data = node->user_data; node->callback(*event); } } if (e == SWITCH_EVENT_ALL) { break; } } switch_thread_rwlock_unlock(RWLOCK); } switch_event_destroy(event); }
mod_event_socket.c#event_handler()
函数源码比较繁琐,不过核心处理并不复杂,关键如下:- 遍历全局列表 listen_list.listeners,判断当前事件是否当前 listener 订阅的
- 是的话再检查 listener 上是否有 filter,有则经过 filter 过滤后最终确定需要发送该事件则将其拷贝一份通过
switch_apr.c#switch_queue_trypush()
函数将事件添加到 listener 私属的事件队列。至此事件已经入队,结合2.2.1节步骤5第2步,事件最终发送给了外部订阅方,事件分发流程的分析基本结束
static void event_handler(switch_event_t *event) { switch_event_t *clone = NULL; listener_t *l, *lp, *last = NULL; time_t now = switch_epoch_time_now(NULL); switch_status_t qstatus; switch_assert(event != NULL); if (!listen_list.ready) { return; } switch_mutex_lock(globals.listener_mutex); lp = listen_list.listeners; while (lp) { int send = 0; l = lp; lp = lp->next; if (switch_test_flag(l, LFLAG_STATEFUL) && (l->expire_time || (l->timeout && now - l->last_flush > l->timeout))) { if (expire_listener(&l) == SWITCH_STATUS_SUCCESS) { if (last) { last->next = lp; } else { listen_list.listeners = lp; } continue; } } if (l->expire_time || !switch_test_flag(l, LFLAG_EVENTS)) { last = l; continue; } if (l->event_list[SWITCH_EVENT_ALL]) { send = 1; } else if ((l->event_list[event->event_id])) { if (event->event_id != SWITCH_EVENT_CUSTOM || !event->subclass_name || (switch_core_hash_find(l->event_hash, event->subclass_name))) { send = 1; } } if (send) { switch_mutex_lock(l->filter_mutex); if (l->filters && l->filters->headers) { switch_event_header_t *hp; const char *hval; send = 0; for (hp = l->filters->headers; hp; hp = hp->next) { if ((hval = switch_event_get_header(event, hp->name))) { const char *comp_to = hp->value; int pos = 1, cmp = 0; while (comp_to && *comp_to) { if (*comp_to == '+') { pos = 1; } else if (*comp_to == '-') { pos = 0; } else if (*comp_to != ' ') { break; } comp_to++; } if (send && pos) { continue; } if (!comp_to) { continue; } if (*hp->value == '/') { switch_regex_t *re = NULL; int ovector[30]; cmp = !!switch_regex_perform(hval, comp_to, &re, ovector, sizeof(ovector) / sizeof(ovector[0])); switch_regex_safe_free(re); } else { cmp = !strcasecmp(hval, comp_to); } if (cmp) { if (pos) { send = 1; } else { send = 0; break; } } } } } switch_mutex_unlock(l->filter_mutex); } if (send && switch_test_flag(l, LFLAG_MYEVENTS)) { char *uuid = switch_event_get_header(event, "unique-id"); if (!uuid || (l->session && strcmp(uuid, switch_core_session_get_uuid(l->session)))) { send = 0; } if (!strcmp(switch_core_session_get_uuid(l->session), switch_event_get_header_nil(event, "Job-Owner-UUID"))) { send = 1; } } if (send) { if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { qstatus = switch_queue_trypush(l->event_queue, clone); if (qstatus == SWITCH_STATUS_SUCCESS) { if (l->lost_events) { int le = l->lost_events; l->lost_events = 0; switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost [%d] events! Event Queue size: [%u/%u]\n", le, switch_queue_size(l->event_queue), MAX_QUEUE_LEN); } } else { char errbuf[512] = {0}; unsigned int qsize = switch_queue_size(l->event_queue); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event enqueue ERROR [%d] | [%s] | Queue size: [%u/%u] %s\n", (int)qstatus, switch_strerror(qstatus, errbuf, sizeof(errbuf)), qsize, MAX_QUEUE_LEN, (qsize == MAX_QUEUE_LEN)?"Max queue size reached":""); if (++l->lost_events > MAX_MISSED) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Killing listener because of too many lost events. Lost [%d] Queue size[%u/%u]\n", l->lost_events, qsize, MAX_QUEUE_LEN); kill_listener(l, "killed listener because of lost events\n"); } switch_event_destroy(&clone); } } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_ERROR, "Memory Error!\n"); } } last = l; } switch_mutex_unlock(globals.listener_mutex); }