Published Aug 18, 2021
git clone --branch=v2.4.20 --depth=1 git@github.com:TarsCloud/Tars.git --recursive
find . -name .git | xargs rm -rf
git init
git add -f .
git commit --allow-empty-message -m ""
clang-format -style=google -dump-config > .clang-format
find . -regex '.*\.\(h\|hh\|hpp\|hxx\|H\|c\|cc\|cpp\|cxx\|C\)' -exec clang-format -style=file -i {} \;
git rm -rf cpp docs_en go java nodejs php
find . -name .gitmodules | xargs git rm -rf
修改编译选项,方便调试:
-O2
CMAKE_BUILD_TYPE
framework/build/build.sh
需要重新构建数据库时:
DROP DATABASE ...
db_base, db_cache_web, db_tars, db_tars_web, db_user_system, tars_property, tars_stat
cd /usr/local/tars/cpp/deploy/
./linux-install.sh 172.29.233.88 123456 eth0 true false root 3306
https://tarscloud.github.io/TarsDocs/demo/tarscpp/tars_cpp_quickstart.html
https://tarscloud.github.io/TarsDocs/hello-world/tarscpp.html
服务开发
mkdir Hello
cd Hello
/usr/local/tars/cpp/script/cmake_tars_server.sh TestApp HelloServer Hello
/usr/local/tars/cpp/tools/tars2cpp Hello.tars
修改代码后
cd HelloServer/build
cmake .. -DCMAKE_BUILD_TYPE=Debug
make -j4
make HelloServer-tar // make tar
make release
/usr/local/app/tars/tars-stop.sh
frame/build/build.sh all
frame/build/build.sh install
/usr/local/app/tars/tars-start.sh
/usr/local/app/web/log/
/usr/local/app/tars/app_log/
/usr/local/app/patchs/tars.upload/
/usr/local/app/patchs/tars/TARSBatchPatching/
/usr/local/app/tars/tarsnode/data/tmp/download/BatchPatchingLoad/
应用:TestApp
服务名称:HelloServer
服务类型:tars_cpp
模板:tars.default
节点:172.29.233.88
OBJ:HelloObj
端口:20001
GET http://39.105.159.68:3000/pages/server/api/server_exist?application=TestApp&server_name=HelloServer&node_name=172.29.233.88
{
"data": false,
"ret_code": 200,
"err_msg": ""
}
web:查询 `db_tars`.`t_server_conf`
POST http://39.105.159.68:3000/pages/server/api/deploy_server
{
"application": "TestApp",
"server_name": "HelloServer",
"server_type": "tars_cpp",
"template_name": "tars.default",
"node_name": "172.29.233.88",
"enable_set": false,
"set_name": "",
"set_area": "",
"set_group": "",
"operator": "",
"developer": "",
"adapters": [
{
"obj_name": "HelloObj",
"bind_ip": "172.29.233.88",
"port": "20001",
"port_type": "tcp",
"protocol": "tars",
"thread_num": 5,
"max_connections": 100000,
"queuecap": 50000,
"queuetimeout": 20000
}
]
}
{
"data": {
"server_conf": {
"id": 11,
"application": "TestApp",
"server_name": "HelloServer",
"node_name": "172.29.233.88",
"server_type": "tars_cpp",
"enable_set": false,
"set_name": null,
"set_area": null,
"set_group": null,
"setting_state": "inactive",
"present_state": "inactive",
"bak_flag": false,
"template_name": "tars.default",
"profile": "",
"async_thread_num": 3,
"base_path": "",
"exe_path": "",
"start_script_path": "",
"stop_script_path": "",
"monitor_script_path": "",
"patch_time": "1969-12-31T16:00:00.000Z",
"patch_version": "",
"process_id": 0,
"posttime": "2021-08-27 15:42:08"
},
"tars_node_rst": []
},
"ret_code": 200,
"err_msg": ""
}
web:插入 `db_tars`.`t_server_conf`,`db_tars`.`t_adapter_conf`
web:查询 `db_tars`.`t_server_conf`
GET http://39.105.159.68:3000/pages/server/api/tree?searchKey=&type=
{
"data": [
{
"id": "1tars",
"name": "tars",
"pid": "root",
"is_parent": true,
"open": false,
"children": [
{
"id": "1tars.5tarsAdminRegistry",
"name": "tarsAdminRegistry",
"pid": "1tars",
"is_parent": false,
"open": false,
"children": []
},
{
"id": "1tars.5tarsconfig",
"name": "tarsconfig",
"pid": "1tars",
"is_parent": false,
"open": false,
"children": []
},
{
"id": "1tars.5tarslog",
"name": "tarslog",
"pid": "1tars",
"is_parent": false,
"open": false,
"children": []
},
{
"id": "1tars.5tarsnotify",
"name": "tarsnotify",
"pid": "1tars",
"is_parent": false,
"open": false,
"children": []
},
{
"id": "1tars.5tarspatch",
"name": "tarspatch",
"pid": "1tars",
"is_parent": false,
"open": false,
"children": []
},
{
"id": "1tars.5tarsproperty",
"name": "tarsproperty",
"pid": "1tars",
"is_parent": false,
"open": false,
"children": []
},
{
"id": "1tars.5tarsqueryproperty",
"name": "tarsqueryproperty",
"pid": "1tars",
"is_parent": false,
"open": false,
"children": []
},
{
"id": "1tars.5tarsquerystat",
"name": "tarsquerystat",
"pid": "1tars",
"is_parent": false,
"open": false,
"children": []
},
{
"id": "1tars.5tarsregistry",
"name": "tarsregistry",
"pid": "1tars",
"is_parent": false,
"open": false,
"children": []
},
{
"id": "1tars.5tarsstat",
"name": "tarsstat",
"pid": "1tars",
"is_parent": false,
"open": false,
"children": []
}
],
"order": 0
},
{
"id": "1TestApp",
"name": "TestApp",
"pid": "root",
"is_parent": true,
"open": false,
"children": [
{
"id": "1TestApp.5HelloServer",
"name": "HelloServer",
"pid": "1TestApp",
"is_parent": false,
"open": false,
"children": []
}
],
"order": 0
}
],
"ret_code": 200,
"err_msg": ""
}
GET http://39.105.159.68:3000/pages/server/api/server_notify_list?tree_node_id=1TestApp.5HelloServer&page_size=20&curr_page=1
{
"data": {
"count": 0,
"rows": []
},
"ret_code": 200,
"err_msg": ""
}
web:查询 `db_tars`.`t_server_notifys`
GET http://39.105.159.68:3000/pages/server/api/server_list?tree_node_id=1TestApp
{
"data": [
{
"id": 11,
"application": "TestApp",
"server_name": "HelloServer",
"node_name": "172.29.233.88",
"server_type": "tars_cpp",
"enable_set": false,
"set_name": null,
"set_area": null,
"set_group": null,
"setting_state": "inactive",
"present_state": "inactive",
"bak_flag": false,
"template_name": "tars.default",
"profile": "",
"async_thread_num": 3,
"base_path": "",
"exe_path": "",
"start_script_path": "",
"stop_script_path": "",
"monitor_script_path": "",
"patch_time": "1970-01-01 00:00:00",
"patch_version": "",
"patch_user": "",
"process_id": 0,
"posttime": "2021-08-27 15:42:08",
"enable_group": false,
"ip_group_name": "",
"flow_state": "active"
}
],
"ret_code": 200,
"err_msg": ""
}
web:查询 `db_tars`.`t_server_conf`
GET http://39.105.159.68:3000/pages/server/api/server_now_version?application=TestApp&serverName=HelloServer&enableSet=false&setName=&setArea=&setGroup=&nodeName=172.29.233.88
{
"data": [],
"ret_code": 200,
"err_msg": ""
}
web:查询 `db_tars`.`t_server_conf`
GET http://39.105.159.68:3000/pages/server/api/server_patch_list?application=TestApp&module_name=HelloServer&curr_page=1&page_size=50
{
"data": {
"count": 0,
"rows": []
},
"ret_code": 200,
"err_msg": ""
}
web:查询 `db_tars`.`t_server_patchs`
POST http://39.105.159.68:3000/pages/server/api/upload_patch_package
{
"data": {
"id": 12,
"server": "TestApp.HelloServer",
"tgz": "TestApp.HelloServer_suse_1630068875734.tgz",
"comment": "",
"posttime": "2021-08-27 20:54:35"
},
"ret_code": 200,
"err_msg": ""
}
web:插入 `db_tars`.`t_server_patchs`
web:插入 `db_tars_web`.`t_patch_task`
GET http://39.105.159.68:3000/pages/server/api/server_patch_list?application=TestApp&module_name=HelloServer&curr_page=1&page_size=50
{
"data": {
"count": 1,
"rows": [
{
"package_type": 0,
"default_version": 1,
"id": 12,
"server": "TestApp.HelloServer",
"tgz": "TestApp.HelloServer_suse_1630068875734.tgz",
"publish_time": "",
"comment": "",
"posttime": "2021-08-27 20:54:35",
"upload_time": "2021-08-27 20:54:35",
"upload_user": "admin"
}
]
},
"ret_code": 200,
"err_msg": ""
}
web:查询 `db_tars`.`t_server_patchs`
POST http://39.105.159.68:3000/pages/server/api/add_task
{
"items": [
{
"command": "patch_tars",
"parameters": {
"bak_flag": false,
"group_name": "",
"patch_id": "12",
"update_text": "first"
},
"server_id": "11"
}
],
"serial": true
}
{
"data": "527c2b4facc14fd382c2dcb07797b40b",
"ret_code": 200,
"err_msg": ""
}
JS
let ret = await adminRegPrx.addTaskReq(taskReq,{
hashCode: getHashNumber(taskReq.taskNo)
});
C++
int AdminRegistryImp::addTaskReq(const TaskReq& taskReq,
tars::CurrentPtr current) {
db_tars
.t_task_item
db_tars
.t_task
void TaskListSerial::doTask() {
EMTaskItemStatus TaskList::executeSingleTask(size_t index,
const TaskItemReq &req) {
......
} else if (req.command == "patch_tars") {
ret = patch(index, req, log);
if (ret == EM_I_SUCCESS && get("bak_flag", req.parameters) != "1") {
//不是备机, 需要重启
ret = restart(req, log);
}
}
EMTaskItemStatus TaskList::patch(size_t index, const TaskItemReq &req,
string &log) {
......
ret = _adminPrx->batchPatch_inner(patchReq, log);
iRet = _patchPrx->tars_set_timeout(timeout)->preparePatchFile(
req.appname, req.servername, patchFile);
......
ret = _adminPrx->batchPatch_inner(patchReq, log);
iRet = proxy->tars_set_timeout(timeout)->patchPro(reqPro, result);
node
g_BatchPatchThread->push_back(req, server);
void BatchPatchThread::doPatchRequest(const tars::PatchRequest& request,
ServerObjectPtr server) {
......
CommandPatch command(server, _downloadPath, request);
......
if (command.execute(sError) == 0) {
[root@iZ2ze7qslbwa07f03lfmehZ HelloServer]# find / -name TestApp.HelloServer
/usr/local/app/tars/tarsnode/data/TestApp.HelloServer
/usr/local/app/tars/tarsnode/data/tmp/TestApp.HelloServer
/usr/local/app/tars/tarsnode/data/tmp/download/BatchPatchingLoad/TestApp.HelloServer
/usr/local/app/tars/tarsnode/data/tmp/download/BatchPatching/TestApp.HelloServer
EMTaskItemStatus TaskList::restart(const TaskItemReq &req, string &log) {
int ret = -1;
try {
ret = _adminPrx->restartServer_inner(req.application, req.serverName,
req.nodeName, log);
iRet = (tarsErrCode)nodePrx->stopServer(application, serverName, result);
return nodePrx->startServer(application, serverName, result);
GET http://39.105.159.68:3000/pages/server/api/task?task_no=527c2b4facc14fd382c2dcb07797b40b
{
"data": {
"task_no": "527c2b4facc14fd382c2dcb07797b40b",
"serial": true,
"status": 1,
"create_time": "2021-08-27 20:58:30",
"userName": "",
"items": [
{
"task_no": "527c2b4facc14fd382c2dcb07797b40b",
"item_no": "2e5afd76dc2e40a991ae32b355d02ef8",
"application": "TestApp",
"server_name": "HelloServer",
"node_name": "172.29.233.88",
"command": "patch_tars",
"start_time": "2021-08-27 20:58:30",
"end_time": "",
"status": 1,
"status_info": "EM_I_RUNNING",
"execute_info": "",
"percent": 0
}
]
},
"ret_code": 200,
"err_msg": ""
}
GET http://39.105.159.68:3000/pages/server/api/task?task_no=527c2b4facc14fd382c2dcb07797b40b
{
"data": {
"task_no": "527c2b4facc14fd382c2dcb07797b40b",
"serial": true,
"status": 2,
"create_time": "2021-08-27 20:58:30",
"userName": "",
"items": [
{
"task_no": "527c2b4facc14fd382c2dcb07797b40b",
"item_no": "2e5afd76dc2e40a991ae32b355d02ef8",
"application": "TestApp",
"server_name": "HelloServer",
"node_name": "172.29.233.88",
"command": "patch_tars",
"start_time": "2021-08-27 20:58:30",
"end_time": "2021-08-27 20:58:32",
"status": 2,
"status_info": "EM_I_SUCCESS",
"execute_info": "startServer [TestApp.HelloServer] from 172.29.233.88 :server is activating, please check: ",
"percent": 100
}
]
},
"ret_code": 200,
"err_msg": ""
}
POST http://39.105.159.68:3000/pages/server/api/add_task
{
"items": [
{
"command": "stop",
"server_id": 11
}
],
"serial": true
}
{
"data": "be72686aa9274ed89656bced8cabfbd7",
"ret_code": 200,
"err_msg": ""
}
GET http://39.105.159.68:3000/pages/server/api/task?task_no=be72686aa9274ed89656bced8cabfbd7
{
"data": {
"task_no": "be72686aa9274ed89656bced8cabfbd7",
"serial": true,
"status": 1,
"create_time": "2021-08-27 21:25:43",
"userName": "",
"items": [
{
"task_no": "be72686aa9274ed89656bced8cabfbd7",
"item_no": "0b78cca7ee3a472fa6c62605c5a0c89f",
"application": "TestApp",
"server_name": "HelloServer",
"node_name": "172.29.233.88",
"command": "stop",
"start_time": "2021-08-27 21:25:43",
"end_time": "",
"status": 1,
"status_info": "EM_I_RUNNING",
"execute_info": "",
"percent": 0
}
]
},
"ret_code": 200,
"err_msg": ""
}
GET http://39.105.159.68:3000/pages/server/api/task?task_no=be72686aa9274ed89656bced8cabfbd7
{
"data": {
"task_no": "be72686aa9274ed89656bced8cabfbd7",
"serial": true,
"status": 2,
"create_time": "2021-08-27 21:25:43",
"userName": "",
"items": [
{
"task_no": "be72686aa9274ed89656bced8cabfbd7",
"item_no": "0b78cca7ee3a472fa6c62605c5a0c89f",
"application": "TestApp",
"server_name": "HelloServer",
"node_name": "172.29.233.88",
"command": "stop",
"start_time": "2021-08-27 21:25:43",
"end_time": "2021-08-27 21:25:43",
"status": 2,
"status_info": "EM_I_SUCCESS",
"execute_info": "stopServer [TestApp.HelloServer] from 172.29.233.88 succ:",
"percent": 0
}
]
},
"ret_code": 200,
"err_msg": ""
}
POST http://39.105.159.68:3000/pages/server/api/add_task
{
"items": [
{
"command": "restart",
"server_id": 11
}
],
"serial": true
}
{
"data": "98b61e0a9fe744df833a0cc7c8bc718d",
"ret_code": 200,
"err_msg": ""
}
GET http://39.105.159.68:3000/pages/server/api/task?task_no=98b61e0a9fe744df833a0cc7c8bc718d
{
"data": {
"task_no": "98b61e0a9fe744df833a0cc7c8bc718d",
"serial": true,
"status": 2,
"create_time": "2021-08-27 23:29:05",
"userName": "",
"items": [
{
"task_no": "98b61e0a9fe744df833a0cc7c8bc718d",
"item_no": "5fad4f068f8f451c8b3103647a897140",
"application": "TestApp",
"server_name": "HelloServer",
"node_name": "172.29.233.88",
"command": "restart",
"start_time": "2021-08-27 23:29:05",
"end_time": "2021-08-27 23:29:05",
"status": 2,
"status_info": "EM_I_SUCCESS",
"execute_info": "startServer [TestApp.HelloServer] from 172.29.233.88 :server is activating, please check: ",
"percent": 0
}
]
},
"ret_code": 200,
"err_msg": ""
}
GET http://39.105.159.68:3000/pages/server/api/server_list?tree_node_id=1TestApp.5HelloServer
{
"data": [
{
"id": 11,
"application": "TestApp",
"server_name": "HelloServer",
"node_name": "172.29.233.88",
"server_type": "tars_cpp",
"enable_set": false,
"set_name": null,
"set_area": null,
"set_group": null,
"setting_state": "active",
"present_state": "activating",
"bak_flag": false,
"template_name": "tars.default",
"profile": "",
"async_thread_num": 3,
"base_path": "",
"exe_path": "",
"start_script_path": "",
"stop_script_path": "",
"monitor_script_path": "",
"patch_time": "2021-08-27 20:58:41",
"patch_version": "12",
"patch_user": "admin",
"process_id": 19925,
"posttime": "2021-08-27 15:42:08",
"enable_group": false,
"ip_group_name": "",
"flow_state": "active"
}
],
"ret_code": 200,
"err_msg": ""
}
Tars的C++实现,客户端和服务端对IO组件分别采取了两种封装方式,有些繁琐。
muduo中的EventLoopThread,即IO线程,队列和线程是1:1的“IO单线程池”;EventLoopThreadPool是“IO单线程池”的集合;ThreadPool中队列和线程是1:N。
tars中的线程和队列总是保持1:1。
tars client:
N1个caller线程(N1数目不确定),N2个IO线程,N3个异步线程(回调异步rpc所设置的回调函数)
tars这样解决每个caller线程选取IO线程的并发问题:每个caller线程都有线程局部存储,存储N2个IO线程所对应的N2个队列。每个caller线程发起调用时,在caller线程局部存储内选择一个队列,push msg,唤醒相应IO线程。IO线程通过queue指针,pop msg,之后进行IO操作。
tars server:
1+N1个IO线程(1个处理所有servant listen socket的监听,N1个处理所有conn socket的读写事件),N2个servant(XXXObjAdapter),每个servant配置N3个业务线程。
客户端和服务端的网络线程数,无法通过web修改,且数据库不存储,默认都是单线程。
1.main listen socket线程: 1
2.log: 1
int main(int argc, char *argv[]) {
g_app.main(argc, argv);
main(op);
main(config);
//初始化Server部分
initializeServer();
LocalRollLogger::getInstance()->setLogInfo(
ServerConfig::Application, ServerConfig::ServerName,
ServerConfig::LogPath, ServerConfig::LogSize, ServerConfig::LogNum,
_communicator, ServerConfig::Log);
_local.start(1);
_thread = new std::thread(&TC_LoggerThreadGroup::run, this);
3.客户端业务回调线程: asyncthread=3
LocalRollLogger::getInstance()->setLogInfo(
ServerConfig::Application, ServerConfig::ServerName,
ServerConfig::LogPath, ServerConfig::LogSize, ServerConfig::LogNum,
_communicator, ServerConfig::Log);
//设置染色日志信息
_logger.getWriteT().setDyeingLogInfo(sApp, sServer, sLogpath, iMaxSize,
iMaxNum, comm, sLogObj);
_logPrx = comm->stringToProxy<LogPrx>(sLogObj);
stringToProxy<T>(objectName, prx, setName);
ServantProxy* pServantProxy = getServantProxy(objectName, setName);
Communicator::initialize();
_asyncThread.push_back(new AsyncProcThread(iAsyncQueueCap, merge));
start();
_th = new std::thread(&TC_Thread::threadEntry, this);
4.客户端网络线程: netthread=1
Communicator::initialize();
_communicatorEpoll[i]->start();
_th = new std::thread(&TC_Thread::threadEntry, this);
5.时间TC_TimeProvider: 1
void CommunicatorEpoll::run() {
......
//处理超时请求
doTimeout();
int64_t iNow = TNOWMS;
#define TNOWMS TC_TimeProvider::getInstance()->getNowMs()
std::thread t(&TC_TimeProvider::run, g_tp);
6.状态上报StatReport: 1
Communicator::initialize();
_statReport->setReportInfo(
statPrx, propertyPrx, ClientConfig::ModuleName, ClientConfig::LocalIp,
sSetDivision, iReportInterval, 0, 0, iMaxReportSize, iReportTimeout);
start();
_th = new std::thread(&TC_Thread::threadEntry, this);
7.业务AdminReapThread: 1
void Application::main(const string& config) {
......
//业务应用的初始化
initialize();
_reapThread.start();
_th = new std::thread(&TC_Thread::threadEntry, this);
8.业务检查超时ExecuteTask:1
ExecuteTask::getInstance()->init();
ExecuteTask::ExecuteTask() {
_adminImp = NULL;
_terminate = false;
start();
}
_th = new std::thread(&TC_Thread::threadEntry, this);
9.服务端业务线程
AdminAdapter: 1
tars.tarsAdminRegistry.AdminRegObjAdapter: threads=5
g_app.waitForShutdown();
_epollServer->waitForShutdown();
if (!isMergeHandleNetThread()) startHandle();
hds[i]->start();
_th = new std::thread(&TC_Thread::threadEntry, this);
10.服务端网络线程: netthread=1
_netThreads[i]->start();
_th = new std::thread(&TC_Thread::threadEntry, this);
ServantPrx
_patchPrx = CommunicatorFactory::getInstance()
->getCommunicator()
->stringToProxy<PatchPrx>("tars.tarspatch.PatchObj");
ServantPrx prx =
Application::getCommunicator()->stringToProxy<ServantPrx>(server.objName);
string obj = "AdminObj@" + conf["/tars/application/server<local>"];
ServantPrx prx =
Application::getCommunicator()->stringToProxy<ServantPrx>(obj);
文章出处:
https://newdoc.tarsyun.com/#/markdown/TarsCloud/TarsDocs/articles/technical_articles.md
https://cloud.tencent.com/developer/column/88122
参照TarsFramework-v2.4.13,在原文基础上进行了修改。
作者:Cony 导语:微服务开源框架TARS的RPC调用包含客户端与服务端,《微服务开源框架TARS的RPC源码解析》系列文章将从初识客户端、客户端的同步及异步调用、初识服务端、服务端的工作流程四部分,以C++语言为载体,深入浅出地带你了解TARS RPC调用的原理。
TARS是腾讯使用十年的微服务开发框架,目前支持C++、Java、PHP、Node.js、Go语言。该开源项目为用户提供了涉及到开发、运维、以及测试的一整套微服务平台PaaS解决方案,帮助一个产品或者服务快速开发、部署、测试、上线。目前该框架应用在腾讯各大核心业务,基于该框架部署运行的服务节点规模达到数十万。 TARS的通信模型中包含客户端和服务端。客户端服务端之间主要是利用RPC进行通信。本系列文章分上下两篇,对RPC调用部分进行源码解析。本文是上篇,我们将以C++语言为载体,带大家了解一下TARS的客户端。
TARS的客户端最重要的类是Communicator,一个客户端只能声明出一个Communicator类实例(建议),用户可以通过CommunicatorPtr& Application::getCommunicator()获取线程安全的Communicator类单例。Communicator类聚合了两个比较重要的类,一个是CommunicatorEpoll,负责网络线程的建立与通过ObjectProxyFactory生成ObjectProxy;另一个是ServantProxyFactory,生成不同的RPC服务句柄,即ServantProxy,用户通过ServantProxy调用RPC服务。Communicator还聚合了多个异步调用处理线程AsyncProcThread,CommunicatorEpoll接收到异步的响应包之后,将响应包交给该线程处理。下面简单介绍几个类的作用。
一个Communicator实例就是一个客户端(连接池),负责与服务端建立连接,生成RPC服务句柄,可以通过CommunicatorPtr& Application::getCommunicator()获取Communicator实例,用户最好不要自己声明定义新的Communicator实例。
ServantProxy就是一个服务代理,ServantProxy可以通过ServantProxyFactory工厂类生成,用户往往通过Communicator的template void stringToProxy()接口间接调用ServantProxyFactory的ServantPrx::element_type* getServantProxy()接口以获取服务代理,通过服务代理ServantProxy,用户就可以进行RPC调用了。ServantProxy内含多个服务实体ObjectProxy(详见下文第4小点),能够帮助用户在同一个服务代理内进行负载均衡。
CommunicatorEpoll类代表客户端的网络模块,内含TC_Epoller作为IO复用,能够同时处理不同主调线程(caller线程)的多个请求。CommunicatorEpoll内含服务实体工厂类ObjectProxyFactory(详见下文第4小点),意味着在同一网络线程中,能够产生不同服务的实体,能够完成不同的RPC服务调用。
ObjectProxy类是一个服务实体,注意与ServantProxy类是一个服务代理相区别,前者表示一个网络线程上的某个服务实体A,后者表示对所有网络线程上的某服务实体A的总代理,更详细的介绍可见下文。ObjectProxy通过ObjectProxyFactory生成,而ObjectProxyFactory类的实例是CommunicatorEpoll的成员变量,意味着一个网络线程CommunicatorEpoll能够产生各种各样的服务实体ObjectProxy,发起不同的RPC服务。ObjectProxy通过AdapterProxy来管理对服务端的连接。 好了,介绍完所有的类之后,先通过类图理一理他们之间的关系,这个类图在之后的文章中将会再次出现。
图(1-1)客户端类图
根据用户配置,Communicator拥有n个网络线程,即n个CommunicatorEpoll。每个CommunicatorEpoll拥有一个ObjectProxyFactory类,每个ObjectProxyFactory可以生成一系列的不同服务的实体对象ObjectProxy,因此,假如Communicator拥有两个CommunicatorEpoll,并有foo与bar这两类不同的服务实体对象,那么如下图(1-2)所示,每个CommunicatorEpoll可以通过 ObjectProxyFactory创建两类ObjectProxy,这是TARS客户端的第一层负载均衡,每个线程都可以分担所有服务的RPC请求,因此,一个服务的阻塞可能会影响其他服务,因为网络线程是多个服务实体ObjectProxy所共享的。
图(1-2)Communicator中的CommunicatorEpoll与ObjectProxy
Communicator类下另一个比较重要的类ServantProxyFactory的作用是依据实际服务端的信息(如服务器的socket标志)与Communicator中客户端的信息(如网络线程数)而生成ServantProxy句柄,通过句柄调用RPC服务。举个例子,如下图(1-3)所示,Communicator实例通过ServantProxyFactory成员变量的getServantProxy()接口在构造fooServantProxy句柄的时候,会获取Communicator实例下的所有CommunicatorEpoll(即CommunicatorEpoll-1与CommunicatorEpoll-2)中的fooObjectProxy(即fooObjectProxy-1与fooObjectProxy-2),并作为构造fooServantProxy的参数。Communicator通过ServantProxyFactory能够获取foo与bar这两类ServantProxy,ServantProxy与相应的ObjectProxy存在相应的聚合关系:
图(1-3)Communicator中的ServantProxyFactory与ObjectProxy
另外,每个ObjectProxy都拥有一个EndpointManager,例如,fooObjectProxy 的EndpointManager管理fooObjectProxy 下面的所有fooAdapterProxy,每个AdapterProxy连接到一个提供相应foo服务的服务端物理机socket上。通过EndpointManager还可以以不同的负载均衡方式获取连接AdapterProxy。假如foo服务有两台物理机,bar服务有一台物理机,那么ObjectProxy,EndpointManager与AdapterProxy关系如下图(1-4)所示。上面提到,不同的网络线程CommunicatorEpoll均可以发起同一RPC请求,对于同一RPC服务,选取不同的ObjectProxy(或可认为选取不同的网络线程CommunicatorEpoll))是第一层的负载均衡,而对于同一个被选中的ObjectProxy,通过EndpointManager选择不同的AdapterProxy socket连接(假如ObjectProxy有大于1个的AdapterProxy,如图(1-4)的fooObjectProxy)是第二层的负载均衡。
图(1-4)ObjectProxy与AdapterProxy的关系
小结:
ServantProxy确定服务 –> ObjectProxy确定IO线程 –> AdapterProxy确定服务端物理地址
第一层负载均衡 第二层负载均衡
在客户端进行初始化时,必须建立上面介绍的关系,因此相应的类图如图(1-5)所示,通过类图可以看出各类的关系,以及初始化需要用到的函数。
图(1-5)客户端初始化后建立的类图
现在,通过代码跟踪来看看,在客户端初始化过程中,各个类是如何被初始化出来并建立上述的架构关系的。在简述之前,可以先看看函数的调用流程图,若看不清晰,可以将图片保存下来,用看图软件放大查看,强烈建议结合文章的代码解析以TARS源码一起查看,文章后面的所有代码流程图均如此。 接下来,将会按照函数调用流程图来一步一步分析客户端代理是如何被初始化出来的:
图(1-6) 初始化函数调用过程图
在客户端程序中,一开始会执行下面的代码进行整个客户端代理的初始化:
Communicator comm;
try {
HelloPrx prx;
comm.stringToProxy(
"TestApp.HelloServer.HelloObj@tcp -h 172.29.233.88 -p 20001", prx);
先声明一个Communicator变量comm(其实不建议这么做)以及一个ServantProxy类的指针变量prx,在此处,服务为Hello,因此声明一个HelloPrx prx。注意,一个客户端只能拥有一个Communicator(建议)。为了能够获得RPC的服务句柄,我们调用Communicator::stringToProxy(),并传入服务端的信息与prx变量,函数返回后,prx就是RPC服务的句柄。 进入Communicator::stringToProxy()函数中,我们通过Communicator::getServantProxy()来依据objectName与setName获取服务代理ServantProxy:
/**
* 生成代理
* @param T
* @param objectName
* @param setName 指定set调用的setid
* @param proxy
*/
template <class T>
void stringToProxy(const string& objectName, T& proxy,
const string& setName = "") {
ServantProxy* pServantProxy = getServantProxy(objectName, setName);
proxy = (typename T::element_type*)(pServantProxy);
}
进入Communicator::getServantProxy(),首先会执行Communicator::initialize()来初始化Communicator,
ServantProxy *Communicator::getServantProxy(const string &objectName,
const string &setName) {
Communicator::initialize();
需要注意一点,Communicator:: initialize()只会被执行一次,下一次执行Communicator::getServantProxy()将不会再次执行Communicator:: initialize()函数:
void Communicator::initialize() {
TC_LockT<TC_ThreadRecMutex> lock(*this);
if (_initialized) return;
_initialized = true;
进入Communicator::initialize()函数中,在这里,将会new出上文介绍的与Communicator密切相关的类ServantProxyFactory与n个CommunicatorEpoll,n为客户端的网络线程数,最小为1,最大为MAX_CLIENT_THREAD_NUM:
void Communicator::initialize() {
......
_servantProxyFactory = new ServantProxyFactory(this);
//网络线程
_clientThreadNum = TC_Common::strto<size_t>(getProperty("netthread", "1"));
if (0 == _clientThreadNum) {
_clientThreadNum = 1;
} else if (MAX_CLIENT_THREAD_NUM < _clientThreadNum) {
_clientThreadNum = MAX_CLIENT_THREAD_NUM;
}
......
for (size_t i = 0; i < _clientThreadNum; ++i) {
_communicatorEpoll[i] = new CommunicatorEpoll(this, i);
_communicatorEpoll[i]->start();
}
在CommunicatorEpoll的构造函数中,ObjectProxyFactory被创建出来,这是构造图(1-2)关系的前提。
CommunicatorEpoll::CommunicatorEpoll(Communicator *pCommunicator,
size_t netThreadSeq)
......
// ObjectProxyFactory 对象
_objectProxyFactory = new ObjectProxyFactory(this);
除此之外,还可以看到获取相应配置,创建并启动若干个异步回调后的处理线程。创建完成后,调用CommunicatorEpoll::start()启动网络线程。
void Communicator::initialize() {
......
//异步线程数
_asyncThreadNum = TC_Common::strto<size_t>(getProperty("asyncthread", "3"));
if (_asyncThreadNum == 0) {
_asyncThreadNum = 3;
}
if (_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM) {
_asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;
}
......
for (size_t i = 0; i < _asyncThreadNum; ++i) {
_asyncThread.push_back(new AsyncProcThread(iAsyncQueueCap, merge));
}
const size_t MAX_CLIENT_THREAD_NUM = 64; //客户端最大网络线程数
const size_t MAX_CLIENT_ASYNCTHREAD_NUM = 1024; //客户端最大异步线程数
const size_t MAX_CLIENT_NOTIFYEVENT_NUM =
2048; //客户端每个网络线程拥有的最大通知事件的数目,客户端最大caller线程数
至此,Communicator::initialize()顺利执行。通过下图回顾上面的过程:
图(1-7)执行Communicator的初始化函数流程
代码回到Communicator::getServantProxy()中,Communicator::getServantProxy()会执行ServantProxyFactory::getServantProxy(),并返回相应的服务代理:
ServantProxy *Communicator::getServantProxy(const string &objectName,
const string &setName) {
......
return _servantProxyFactory->getServantProxy(objectName, setName);
}
进入ServantProxyFactory::getServantProxy(),首先会加锁,从map<string, ServantPrx> _servantProxy中查找目标,若查找成功直接返回。若查找失败,TARS需要构造出相应的ServantProxy,ServantProxy的构造需要如图(1-3)所示的相对应的ObjectProxy作为构造函数的参数,在ServantProxyFactory::getServantProxy()中有如下获取ObjectProxy指针数组的代码:
ObjectProxy** ppObjectProxy = new ObjectProxy*[_comm->getClientThreadNum()];
assert(ppObjectProxy != NULL);
for (size_t i = 0; i < _comm->getClientThreadNum(); ++i) {
ppObjectProxy[i] =
_comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName);
}
ObjectProxy *CommunicatorEpoll::getObjectProxy(const string &sObjectProxyName,
const string &setName) {
return _objectProxyFactory->getObjectProxy(sObjectProxyName, setName);
}
代码来到ObjectProxyFactory::getObjectProxy(),同样,会首先加锁,再从map<string, ObjectProxy*> _objectProxys中查找是否已经拥有目标ObjectProxy,若查找成功直接返回。若查找失败,需要新建一个新的ObjectProxy,通过类图可知,ObjectProxy需要一个CommunicatorEpoll对象进行初始化,由此关联管理自己的CommunicatorEpoll,CommunicatorEpoll之后便可以通过getObjectProxy()接口获取属于自己的ObjectProxy。
ObjectProxy* ObjectProxyFactory::getObjectProxy(const string& sObjectProxyName,
const string& setName) {
TC_LockT<TC_ThreadRecMutex> lock(*this);
string tmpObjName = sObjectProxyName + ":" + setName;
map<string, ObjectProxy*>::iterator it = _objectProxys.find(tmpObjName);
if (it != _objectProxys.end()) {
return it->second;
}
ObjectProxy* pObjectProxy =
new ObjectProxy(_communicatorEpoll, sObjectProxyName, setName);
_objectProxys[tmpObjName] = pObjectProxy;
_vObjectProxys.push_back(pObjectProxy);
_objNum++;
return pObjectProxy;
}
详细过程可见下图:
图(1-8)获取ObjectProxy流程
新建ObjectProxy的过程同样非常值得关注,在ObjectProxy::ObjectProxy()中,关键代码是:
ObjectProxy::ObjectProxy(CommunicatorEpoll* pCommunicatorEpoll,
const string& sObjectProxyName, const string& setName)
......
_endpointManger.reset(new EndpointManager(
this, _communicatorEpoll->getCommunicator(), _sObjectProxyName,
_communicatorEpoll->isFirstNetThread(), _invokeSetId));
每个ObjectProxy都有属于自己的EndpointManager,负责管理到服务端的所有socket连接AdapterProxy,每个AdapterProxy连接到一个提供相应服务的服务端物理机socket上。通过EndpointManager还可以以不同的负载均衡方式获取与服务器的socket连接AdapterProxy。 ObjectProxy:: ObjectProxy()是图(1-6)或者图(1-8)中的略1,具体的代码流程如图(1-9)所示。ObjectProxy创建一个EndpointManager对象,在EndpointManager的初始化过程中,依据客户端提供的信息,直接创建连接到服务端物理机的TCP/UDP连接AdapterProxy或者从代理中获取服务端物理机socket列表后再创建TCP/UDP连接AdapterProxy。
图(1-9)ObjectProxy::ObjectProxy()函数流程图
按照图(1-9)的程序流程执行完成后,便会建立如图(1-4)所示的一个ObjectProxy对多个AdapterProxy的关系。 新建ObjectProxy之后,当然,需要将ObjectProxy对象插入ObjectProxyFactory的成员变量objectProxys与vObjectProxys中,方便下次直接返回ObjectProxy对象。
退出层层的函数调用栈,代码再次回 ServantProxyFactory::getServantProxy(),此时,ServantProxyFactory已经获得相应的ObjectProxy数组ObjectProxy** ppObjectProxy,接着便可以调用:
ServantPrx sp =
new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum());
进行ServantProxy的构造。构造完成便可以呈现出如图(1-10)的关系。在ServantProxy的构造函数中可以看到,ServantProxy在新建一个EndpointManagerThread变量,这是对外获取路由请求的类,是TARS为调用逻辑而提供的多种解决跨地区调用等问题的方案。
ServantProxy::ServantProxy(Communicator *pCommunicator,
ObjectProxy **ppObjectProxy, size_t iClientThreadNum)
......
_endpointInfo.reset(
new EndpointManagerThread(pCommunicator, (*_objectProxy)->name()));
同时可以看到:
for (size_t i = 0; i < _objectProxyNum; ++i) {
(*(_objectProxy + i))->setServantProxy(this);
}
建立了ServantProxy与ObjectProxy的相互关联关系。剩下的是读取配置文件,获取相应的信息。 构造ServantProxy变量完成后,ServantProxyFactory::getServantProxy()获取一些超时参数,赋值给ServantProxy变量,同时将其放进map<string, ServantPrx> _servantProxy中,方便下次直接查找获取。 ServantProxyFactory::getServantProxy()将刚刚构造的ServantProxy指针变量返回给调用他的Communicator::getServantProxy(),
ServantProxy *Communicator::getServantProxy(const string &objectName,
const string &setName) {
.......
return _servantProxyFactory->getServantProxy(objectName, setName);
}
Communicator::getServantProxy()直接将返回值返回给调用起Communicator::getServantProxy()的Communicator::stringToProxy()。可以看到:
/**
* 生成代理
* @param T
* @param objectName
* @param setName 指定set调用的setid
* @param proxy
*/
template <class T>
void stringToProxy(const string& objectName, T& proxy,
const string& setName = "") {
ServantProxy* pServantProxy = getServantProxy(objectName, setName);
proxy = (typename T::element_type*)(pServantProxy);
}
Communicator::stringToProxy()将返回值强制转换为客户端代码中与HelloPrx prx同样的类型HelloPrx。由于函数参数proxy就是prx的引用。那么实际就是将句柄prx成功初始化了,用户可以利用句柄prx进行RPC调用了。
当我们获得一个ServantProxy句柄后,便可以进行RPC调用了。Tars提供四种RPC调用方式,分别是同步调用,异步调用,promise调用与协程调用。其中最简单最常见的RPC调用方式是同步调用,接下来,将简单分析Tars的同步调用。
现假设有一个TestApp.HelloServer.HelloObj的服务,提供一个RPC接口是testHello,传入一个string类型的变量,返回一个string类型变量结果。而且假设有两台服务器,socket标识分别是172.29.233.88:20001与172.29.233.89:20001,设置客户端的网络线程数为3,那么执行如下代码:
Communicator comm;
try {
HelloPrx prx;
comm.stringToProxy(
"TestApp.HelloServer.HelloObj@tcp -h 172.29.233.88 -p 20001", prx);
comm.stringToProxy(
"TestApp.HelloServer.HelloObj@tcp -h 172.29.233.89 -p 20001", prx);
经过上文关于客户端初始化的分析介绍可知,可以得出如下图所示的关系图:
图(1-10)StringServant中ServantProxy,ObjectProxy与AdapterProxy的关系
获取StringServantPrx _proxy后,直接调用:
string sReq("hello world");
string sRsp("");
int iRet = prx->testHello(sReq, sRsp);
成功进行RPC同步调用后,返回的结果是sRsp = “hello world”。
同样,我们先看看与同步调用相关的类图,如下图所示:
图(1-11) 客户端同步调用涉及的类图
HelloProxy是继承自ServantProxy的,StringServantProxy提供了RPC同步调用的接口Int32 testHello(),当用户发起同步调用prx->testHello(sReq, sRsp)时,所进行的函数调用过程如下图所示。
图(1-12)同步调用过程
在函数HelloProxy::testHello()中,程序会先构造ServantProxy::tars_invoke()所需要的参数,如请求包类型,RPC方法名,方法参数等,需要值得注意的是,在同步调用中,最终的返回结果会放置在ResponsePacket类型的变量上。接下来便直接调用了ServantProxy::tars_invoke()方法:
shared_ptr<tars::ResponsePacket> rep = tars_invoke(tars::TARSNORMAL,"testHello", _os, context, _mStatus);
在ServantProxy::tars_invoke()方法中,先创建一个ReqMessage变量msg,初始化msg变量,给变量赋值,如Tars版本号,数据包类型,服务名,RPC方法名,Tars的上下文容器,同步调用的超时时间(单位为毫秒)等。最后,调用ServantProxy::invoke()进行远程方法调用。
无论同步调用还是各种异步调用,ServantProxy::invoke()都是RPC调用的必经之地。在ServantProxy::invoke()中,继续填充传递进来的变量ReqMessage msg。此外,还需要获取调用者caller线程的线程私有数据ServantProxyThreadData,用来指导RPC调用。客户端的每个caller线程都有属于自己的维护调用上下文的线程私有数据,如hash属性,消息染色信息。最关键的还是每条caller线程与每条客户端网络线程CommunicatorEpoll进行信息交互的桥梁——通信队列ReqInfoQueue数组,数组中的每个ReqInfoQueue元素负责与一条网络线程进行交互,如图(1-13)所示,图中橙色阴影代表数组ReqInfoQueue[],阴影内的圆柱体代表数组元素ReqInfoQueue。假如客户端create两条线程(下称caller线程)发起HelloServant服务的RPC请求,且客户端网络线程数设置为2,那么两条caller线程各自有属于自己的线程私有数据请求队列数组ReqInfoQueue[],数组里面的ReqInfoQueue元素便是该数组对应的caller线程与两条网络线程的通信桥梁,一条网络线程对应着数组里面的一个元素,通过网络线程ID进行数组索引。整个关系有点像生产者消费者模型,生产者caller线程向自己的线程私有数据ReqInfoQueue[]中的第N个元素ReqInfoQueue[N] push请求包,消费者客户端第N个网络线程就会从这个队列中pop请求包。
图(1-13)caller线程与网络线程的通信
阅读代码可能会发现几个常量值,如MAX_CLIENT_THREAD_NUM=64,这是最大网络线程数,在图(1-13)中为单个请求队列数组ReqInfoQueue[]的最大size;MAX_CLIENT_NOTIFYEVENT_NUM=2048,在图(1-13)中,可以看作caller线程的最大数量,或者请求队列数组ReqInfoQueue[]的最大数量(反正两者一一对应,每个caller线程都有自己的线程私有数据ReqInfoQueue[])。
接着依据caller线程的线程私有数据进行第一次的负载均衡——选取ObjectProxy(即选择网络线程CommunicatorEpoll)和与之相对应的ReqInfoQueue:
ObjectProxy *pObjProxy = NULL;
ReqInfoQueue *pReqQ = NULL;
//选择网络线程
selectNetThreadInfo(pSptd, pObjProxy, pReqQ);
在ServantProxy::selectNetThreadInfo()中,通过轮询的形式来选取ObjectProxy与ReqInfoQueue。
退出ServantProxy::selectNetThreadInfo()后,便得到ObjectProxy类型的pObjProxy及其对应的ReqInfoQueue类型的pReqQ,稍后通过pObjectProxy来发送RPC请求,请求信息会暂存在pReqQ中。
由于是同步调用,需要新建一个条件变量去监听RPC的完成,可见:
//同步调用 new 一个ReqMonitor
assert(msg->pMonitor == NULL);
if (msg->eType == ReqMessage::SYNC_CALL) {
msg->bMonitorFin = false;
if (pSptd->_sched) {
msg->bCoroFlag = true;
msg->sched = pSptd->_sched;
msg->iCoroId = pSptd->_sched->getCoroutineId();
} else {
msg->pMonitor = new ReqMonitor;
}
}
创建完条件变量,接下来往ReqInfoQueue中push_back()请求信息包msg,并通知pObjProxy所属的CommunicatorEpoll进行数据发送:
if (!pReqQ->push_back(msg, bEmpty)) {
TLOGERROR("[ServantProxy::invoke msgQueue push_back error num:"
<< pSptd->_netSeq << "]" << endl);
delete msg;
msg = NULL;
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
throw TarsClientQueueException("client queue full");
}
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
来到CommunicatorEpoll::notify()中,往请求事件通知数组NotifyInfo notify[]中添加请求事件,通知CommunicatorEpoll进行请求包的发送。注意了,这个函数的作用仅仅是通知网络线程准备发送数据,通过TC_Epoller::mod()或者TC_Epoller::add()触发一个EPOLLIN | EPOLLOUT事件,从而促使阻塞在TC_Epoller::wait()(在CommunicatorEpoll::run()中阻塞)的网络线程CommunicatorEpoll被唤醒,并设置唤醒后的epoll_event中的联合体epoll_data_t变量为_notify[iSeq]: |
//业务线程调用, 通知对应的网络线程醒过来
// iSeq业务线程号
void CommunicatorEpoll::notify(size_t iSeq, ReqInfoQueue *msgQueue) {
assert(iSeq < MAX_CLIENT_NOTIFYEVENT_NUM);
if (_notify[iSeq] == NULL) {
_notify[iSeq] = new FDInfo();
_notify[iSeq]->iType = FDInfo::ET_C_NOTIFY;
_notify[iSeq]->p = (void *)msgQueue;
_notify[iSeq]->iSeq = iSeq;
_notify[iSeq]->notify.init(&_ep);
_notify[iSeq]->notify.add((uint64_t)_notify[iSeq]);
} else {
_notify[iSeq]->notify.notify();
}
}
就是经过这么一个操作,网络线程就可以被唤醒,唤醒后通过epoll_event变量可获得_notify[iSeq]。接下来的请求发送与响应的接收会在后面会详细介绍。
随后,代码再次回到ServantProxy::invoke(),阻塞于:
if (!msg->bMonitorFin) {
TC_ThreadLock::Lock lock(*(msg->pMonitor));
//等待直到网络线程通知过来
if (!msg->bMonitorFin) {
msg->pMonitor->wait();
}
}
等待网络线程接收到数据后,对其进行唤醒。接收到响应后,检查是否成功获取响应,是则直接退出函数即可,响应信息在传入的参数msg中:
//判断eStatus来判断状态
assert(msg->eStatus != ReqMessage::REQ_REQ);
// TLOGTARS("[ServantProxy::invoke line: " << __LINE__ << " status: "
// << msg->eStatus << ", ret: " <<msg->response->iRet << endl);
if (msg->adapter) {
pSptd->_szHost = msg->adapter->endpoint().desc();
}
if (msg->eStatus == ReqMessage::REQ_RSP &&
msg->response->iRet == TARSSERVERSUCCESS) {
//成功
return;
}
若接收失败,会抛出异常,并删除msg:
if (msg->eStatus == ReqMessage::REQ_TIME) {
//超时
delete msg;
msg = NULL;
throw TarsSyncCallTimeoutException(os.str());
}
//异常调用
int ret = msg->response->iRet;
delete msg;
msg = NULL;
TarsException::throwException(ret, os.str());
若接收成功,退出ServantProxy::invoke()后,回到ServantProxy::tars_invoke(),获取ResponsePacket类型的响应信息,并删除msg包:
shared_ptr<ResponsePacket> rsp = msg->response;
// rsp = msg->response;
delete msg;
msg = NULL;
代码回到HelloServantProxy::testHello(),此时经过同步调用,可以直接获取RPC返回值并回到客户端中。
上面提到,当在ServantProxy::invoke()中,调用CommunicatorEpoll::notify()通知网络线程进行请求发送后,接下来,网络线程的具体执行流程如下图所示:
图(1-14)网络线程发送请求包
由于CommunicatorEpoll继承自TC_Thread,在上文1.2.2节中,初始化CommunicatorEpoll之后便调用其CommunicatorEpoll::start()函数启动网络线程,网络线程在CommunicatorEpoll::run()中一直等待_ep.wait(100)。由于在上一节的描述中,在CommunicatorEpoll::notify(),caller线程发起了通知notify,网络线程在CommunicatorEpoll::run()就会调用CommunicatorEpoll::handle()处理通知:
void CommunicatorEpoll::run() {
ServantProxyThreadData *pSptd = ServantProxyThreadData::getData();
assert(pSptd != NULL);
pSptd->_netThreadSeq = (int)_netThreadSeq;
while (!_terminate) {
try {
//考虑到检测超时等的情况 这里就wait100ms吧
int num = _ep.wait(100);
if (_terminate) break;
//先处理epoll的网络事件
for (int i = 0; i < num; ++i) {
const epoll_event &ev = _ep.get(i);
uint64_t data = TC_Epoller::getU64(ev);
if (data == 0) continue; // data非指针, 退出循环
// int64_t ms = TNOWMS;
handle((FDInfo *)data, ev);
}
//处理超时请求
doTimeout();
//数据上报
doStat();
reConnect();
} catch (exception &e) {
TLOGERROR("[CommunicatorEpoll:run exception:" << e.what() << "]" << endl);
} catch (...) {
TLOGERROR("[CommunicatorEpoll:run exception.]" << endl);
}
}
}
在CommunicatorEpoll::handle()中,通过传递进来的epoll_event中的data成员变量获取前面被选中的ObjectProxy并调用其ObjectProxy::invoke()函数:
void CommunicatorEpoll::handle(FDInfo *pFDInfo, const epoll_event &ev) {
......
} else if (FDInfo::ET_C_NOTIFY == pFDInfo->iType) {
//队列有消息通知过来
ReqInfoQueue *pInfoQueue = (ReqInfoQueue *)pFDInfo->p;
ReqMessage *msg = NULL;
size_t maxProcessCount = 0;
try {
while (pInfoQueue->pop_front(msg)) {
......
try {
msg->pObjectProxy->invoke(msg);
} catch (exception &e) {
在ObjectProxy::invoke()中将进行第二次的负载均衡,像图(1-4)所示,每个ObjectProxy通过EndpointManager可以以不同的负载均衡方式对AdapterProxy进行选取:
void ObjectProxy::invoke(ReqMessage* msg) {
......
//选择一个远程服务的Adapter来调用
AdapterProxy* pAdapterProxy = NULL;
bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy);
在EndpointManager:: selectAdapterProxy()中,有多种负载均衡的方式来选取AdapterProxy,如getHashProxy(),getWeightedProxy(),getNextValidProxy()等。
获取AdapterProxy之后,便将选择到的AdapterProxy赋值给EndpointManager:: selectAdapterProxy()函数中的引用参数pAdapterProxy,随后执行:
void ObjectProxy::invoke(ReqMessage* msg) {
......
msg->adapter = pAdapterProxy;
......
pAdapterProxy->invoke(msg);
调用pAdapterProxy将请求信息发送出去。而在AdapterProxy::invoke()中,AdapterProxy将调用Transceiver::sendRequset()进行请求的发送。
int AdapterProxy::invoke(ReqMessage* msg) {
......
return invoke_connection_parallel(msg);
int AdapterProxy::invoke_connection_parallel(ReqMessage* msg) {
......
if (_timeoutQueue->sendListEmpty()) {
int ret = _trans->sendRequest(msg->sReqData);
至此,对应同步调用的网络线程发送请求的工作就结束了,网络线程会回到CommunicatorEpoll::run()中,继续等待数据的收发。
当网络线程CommunicatorEpoll接收到响应数据之后,如同之前发送请求那样, 在CommunicatorEpoll::run()中,程序获取活跃的epoll_event的变量,并将其中的epoll_data_t data传递给CommunicatorEpoll::handle():
//先处理epoll的网络事件
for (int i = 0; i < num; ++i) {
const epoll_event &ev = _ep.get(i);
uint64_t data = TC_Epoller::getU64(ev);
if (data == 0) continue; // data非指针, 退出循环
// int64_t ms = TNOWMS;
handle((FDInfo *)data, ev);
}
接下来的程序流程如下图所示:
图(1-15)网络线程接收响应包
在CommunicatorEpoll::handle()中,从epoll_data_t data中获取Transceiver指针,并调用CommunicatorEpoll::handleInputImp():
Transceiver *pTransceiver = (Transceiver *)pFDInfo->p;
//连接出错 直接关闭连接
if (TC_Epoller::errorEvent(ev)) {
try {
pTransceiver->close();
} catch (exception &e) {
TLOGERROR("CommunicatorEpoll::handle exp:" << e.what() << " ,line:"
<< __LINE__ << endl);
} catch (...) {
TLOGERROR("CommunicatorEpoll::handle|" << __LINE__ << endl);
}
return;
}
//先收包
if (TC_Epoller::readEvent(ev)) {
try {
handleInputImp(pTransceiver);
} catch (exception &e) {
TLOGERROR("CommunicatorEpoll::handle exp:" << e.what() << " ,line:"
<< __LINE__ << endl);
} catch (...) {
TLOGERROR("CommunicatorEpoll::handle|" << __LINE__ << endl);
}
}
在CommunicatorEpoll::handleInputImp()中,除了对连接的判断外,主要做两件事,调用TcpTransceiver::doResponse()以及AdapterProxy::finishInvoke(),前者的工作是从socket连接中获取响应数据并判断接收的数据是否为一个完整的RPC响应包。后者的作用是将响应结果返回给客户端,同步调用的会唤醒阻塞等待在条件变量中的caller线程,异步调用的会在异步回调处理线程中执行回调函数。
void Transceiver::finishInvoke(shared_ptr<ResponsePacket> &rsp) {
if (_adapterProxy->endpoint().authType() == AUTH_TYPELOCAL &&
_authState != AUTH_SUCC) {
std::string ret(rsp->sBuffer.begin(), rsp->sBuffer.end());
tars::AUTH_STATE tmp = AUTH_SUCC;
tars::stoe(ret, tmp);
tars::AUTH_STATE newstate = tmp;
TLOGTARS("[Transceiver::finishInvoke state: " << etos(_authState) << " -> "
<< etos(newstate) << endl);
setAuthState(newstate);
if (newstate == AUTH_SUCC) {
// flush old buffered msg when auth is not complete
_adapterProxy->doInvoke(true);
} else {
TLOGERROR("[Transceiver::finishInvoke newstate: "
<< etos(newstate) << ", error close!" << endl);
close();
}
return;
}
_adapterProxy->finishInvoke(rsp);
}
void AdapterProxy::finishInvoke(shared_ptr<ResponsePacket>& rsp) {
if (_objectProxy->getRootServantProxy()->tars_connection_serial() > 0) {
finishInvoke_serial(rsp);
} else {
finishInvoke_parallel(rsp);
}
}
在AdapterProxy::finishInvoke(shared_ptr
void AdapterProxy::finishInvoke_parallel(shared_ptr<ResponsePacket>& rsp) {
......
ReqMessage* msg = NULL;
......
//这里的队列中的发送链表中的数据可能已经在timeout的时候删除了,因此可能会core,在erase中要加判断
//获取请求信息
bool retErase = _timeoutQueue->erase(rsp->iRequestId, msg);
在找回响应包对应的请求信息msg的同时,将其在超时队列中剔除出来。接着执行:
assert(msg->eStatus == ReqMessage::REQ_REQ);
msg->eStatus = ReqMessage::REQ_RSP;
}
msg->response = rsp;
finishInvoke(msg);
程序调用另一个重载函数AdapterProxy::finishInvoke(ReqMessage*),不同的RPC调用方式会执行不同的动作,例如同步调用会唤醒对应的caller线程:
//同步调用,唤醒ServantProxy线程
if (msg->eType == ReqMessage::SYNC_CALL) {
if (!msg->bCoroFlag) {
assert(msg->pMonitor);
TC_ThreadLock::Lock sync(*(msg->pMonitor));
msg->bMonitorFin = true;
msg->pMonitor->notify();
} else {
msg->sched->put(msg->iCoroId);
}
return;
}
至此,对应同步调用的网络线程接收响应的工作就结束了,网络线程会回到CommunicatorEpoll::run()中,继续等待数据的收发。 综上,客户端同步调用的过程如下图所示。
图(1-16)同步调用图示
在Tars中,除了最常见的同步调用之外,还可以进行异步调用,异步调用可分三种:普通的异步调用,promise异步调用与协程异步调用,这里简单介绍普通的异步调用,看看其与上文介绍的同步调用有何异同。
异步调用不会阻塞整个客户端程序,调用完成(请求发送)之后,用户可以继续处理其他事情,等接收到响应之后,Tars会在异步处理线程当中执行用户实现好的回调函数。在这里,会用到《Effective C++》中条款35所介绍的“藉由Non-Virtual Interface手法实现Template Method模式”,用户需要继承一个XXXPrxCallback基类,并实现里面的虚函数,异步回调线程会在收到响应包之后回调这些虚函数,具体的异步调用客户端示例这里不作详细介绍,在Tars的Example中会找到相应的示例代码。
本文第一章已经详细介绍了客户端的初始化,这里再简单提一下,在第一章的“1.2.2初始化代码- 2.执行Communicator的初始化函数”中,已经提到说,在Communicator::initialize()中,会创建_asyncThreadNum条异步线程,等待异步调用的时候处理响应数据:
//异步线程数
_asyncThreadNum = TC_Common::strto<size_t>(getProperty("asyncthread", "3"));
if (_asyncThreadNum == 0) {
_asyncThreadNum = 3;
}
if (_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM) {
_asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;
}
//异步队列的大小
size_t iAsyncQueueCap =
TC_Common::strto<size_t>(getProperty("asyncqueuecap", "100000"));
if (iAsyncQueueCap < 10000) {
iAsyncQueueCap = 10000;
}
for (size_t i = 0; i < _asyncThreadNum; ++i) {
_asyncThread.push_back(new AsyncProcThread(iAsyncQueueCap, merge));
}
在开始讲述异步调用与接收响应之前,先看看大致的调用过程,与图(1-16)的同步调用来个对比。
图(1-17)客户端同步异步调用图示
跟同步调用的示例一样,现在有一TestApp.HelloServer.HelloObj的服务,提供一个RPC接口是testHello,传入一个string类型的变量,返回一个string类型变量的结果。在执行tars2cpp而生成的文件中,定义了回调函数基类HelloPrxCallback,用户需要public继承这个基类并实现自己的方法,例如:
class HelloCallBack : public HelloPrxCallback {
public:
HelloCallBack() {}
virtual ~HelloCallBack() {}
virtual void callback_testHello(tars::Int32 ret, const std::string& sRsp) {
cout << "callback_testHello ret:" << ret << "|sRsp:" << sRsp << endl;
}
virtual void callback_testHello_exception(tars::Int32 ret) {
cout << "callback_testHello_exception ret:" << ret << endl;
}
};
然后,用户就可以通过prx->async_testHello(cb, sReq)进行异步调用了,调用过程与上文的同步调用差不多,函数调用流程如下图所示,可以与图(1-12)进行比较,看看同步调用与异步调用的异同。
图(1-18)异步调用过程
在异步调用中,客户端发起异步调用prx->async_testHello(cb, sReq)后,在函数HelloProxy::async_testHello()中,程序同样会先构造ServantProxy::tars_invoke_async()所需要的参数,如请求包类型,RPC方法名,方法参数等,与同步调用的一个区别是,还传递了承载回调函数的派生类实例。接下来便直接调用了HelloProxy::tars_invoke_async()方法:
tars_invoke_async(tars::TARSNORMAL,"testHello", _os, context, _mStatus, callback);
在ServantProxy::tars_invoke_async()方法中,先创建一个ReqMessage变量msg,初始化msg变量,给变量赋值,如Tars版本号,数据包类型,服务名,RPC方法名,Tars的上下文容器,异步调用的超时时间(单位为毫秒)以及异步调用后的回调函数ServantProxyCallbackPtr callback(等待异步调用返回响应后回调里面的函数)等。最后,与同步调用一样,调用ServantProxy::invoke()进行远程方法调用。
在ServantProxy::invoke()中,继续填充传递进来的变量ReqMessage msg。此外,还需要获取调用者caller线程的线程私有数据ServantProxyThreadData,用来指导RPC调用。与同步调用一样,利用ServantProxy::selectNetThreadInfo()来轮询选取ObjectProxy(网络线程CommunicatorEpoll)与对应的ReqInfoQueue,详细可看同步调用中的介绍,注意区分客户端中的调用者caller线程与网络线程,以及之间的通信桥梁。
退出ServantProxy::selectNetThreadInfo()后,便得到ObjectProxy类型的pObjProxy及其对应的ReqInfoQueue类型的pReqQ,在异步调用中,不需要建立条件变量来阻塞进程,直接通过pObjectProxy来发送RPC请求,请求信息会暂存在ReqInfoQueue中:
if (!pReqQ->push_back(msg, bEmpty)) {
TLOGERROR("[ServantProxy::invoke msgQueue push_back error num:"
<< pSptd->_netSeq << "]" << endl);
delete msg;
msg = NULL;
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
throw TarsClientQueueException("client queue full");
}
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
在之后,就不需要做任何的工作,退出层层函数调用,回到客户端中,程序可以继续执行其他动作。
异步调用的请求发送过程与同步调用的一致,都是在网络线程中通过ObjectProxy去调用AdapterProxy来发送数据。但是在接收到响应之后,通过图(1-15)可以看到,在函数AdapterProxy::finishInvoke(ReqMessage*)中,同步调用会通过msg->pMonitor->notify()唤醒客户端的caller线程来接收响应包,而在异步调用中,则是如图(1-19)所示,Communicator与AsyncProcThread的关系如图(1-20)所示。
图(1-19)异步回调的收包处理
图(1-20)Communicator与AsyncProcThread
在函数AdapterProxy::finishInvoke(ReqMessage*)中,程序通过:
//异步回调,放入回调处理线程中
_objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);
将信息包msg(带响应信息)放到异步回调处理线程中,在Communicator::pushAsyncThreadQueue()中,通过轮询的方式选择异步回调处理线程处理接收到的响应包,异步处理线程数默认是3,最大是1024。
void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage *msg) {
_communicator->pushAsyncThreadQueue(msg);
}
void Communicator::pushAsyncThreadQueue(ReqMessage *msg) {
if (msg->pObjectProxy->getRootServantProxy()->_callback) {
ReqMessagePtr msgPtr = msg;
msg->pObjectProxy->getRootServantProxy()->_callback(msgPtr);
} else if (msg->pObjectProxy->getRootServantProxy()->_callbackHash) {
//先不考虑每个线程队列数目不一致的情况
_asyncThread[((uint32_t)msg->adapter->trans()->fd()) % _asyncThreadNum]
->push_back(msg);
} else {
//先不考虑每个线程队列数目不一致的情况
_asyncThread[(_asyncSeq++) % _asyncThreadNum]->push_back(msg);
}
}
选取之后,通过AsyncProcThread::push_back(),将msg包放在响应包队列AsyncProcThread::_msgQueue中,然后通过AsyncProcThread:: notify()函数通知本异步回调处理线程进行处理,AsyncProcThread:: notify()函数可以令阻塞在AsyncProcThread:: run()中的AsyncProcThread::timedWait()的异步处理线程被唤醒。
void AsyncProcThread::push_back(ReqMessage* msg) {
if (_merge) {
//合并了, 直接回调
callback(msg);
} else {
if (_msgQueue->size() >= _iQueueCap) {
TLOGERROR("[AsyncProcThread::push_back] async_queue full:"
<< _msgQueue->size() << ">=" << _iQueueCap << endl);
delete msg;
} else {
_msgQueue->push_back(msg);
TC_ThreadLock::Lock lock(*this);
notify();
}
}
}
void AsyncProcThread::run() {
while (!_terminate) {
ReqMessage* msg;
if (_msgQueue->pop_front(msg)) {
callback(msg);
} else {
TC_ThreadLock::Lock lock(*this);
timedWait(1000);
}
}
}
在AsyncProcThread::run()中,主要执行下面的程序进行函数回调:
void AsyncProcThread::callback(ReqMessage* msg) {
......
try {
ReqMessagePtr msgPtr = msg;
msg->callback->dispatch(msgPtr);
} catch (exception& e) {
TLOGERROR("[AsyncProcThread exception]:" << e.what() << endl);
} catch (...) {
TLOGERROR("[AsyncProcThread exception.]" << endl);
}
}
通过msg->callback,程序可以调用回调函数基类HelloPrxCallback里面的onDispatch()函数。在HelloPrxCallback:: onDispatch()中,分析此次响应所对应的RPC方法名,获取响应结果,并通过动态多态,执行用户所定义好的派生类的虚函数。通过ReqMessagePtr的引用计数,还可以将ReqMessage* msg删除掉,与同步调用不同,同步调用的msg的新建与删除都在caller线程中,而异步调用的msg在caller线程上构造,在异步回调处理线程中析构。
TARS可以在考虑到易用性和高性能的同时快速构建系统并自动生成代码,帮助开发人员和企业以微服务的方式快速构建自己稳定可靠的分布式应用,从而令开发人员只关注业务逻辑,提高运营效率。多语言、敏捷研发、高可用和高效运营的特性使 TARS 成为企业级产品。 《微服务开源框架TARS的RPC源码解析》系列文章分上下两篇,对RPC调用部分进行源码解析。本文是上篇,我们带大家了解了一下TARS的客户端。敬请期待下篇《初识TARS C++服务端》
TARS是腾讯使用十年的微服务开发框架,目前支持C++、Java、PHP、Node.js、Go语言。该开源项目为用户提供了涉及到开发、运维、以及测试的一整套微服务平台PaaS解决方案,帮助一个产品或者服务快速开发、部署、测试、上线。目前该框架应用在腾讯各大核心业务,基于该框架部署运行的服务节点规模达到数十万。
TARS的通信模型中包含客户端和服务端。客户端服务端之间主要是利用RPC进行通信。本系列文章分上下两篇,对RPC调用部分进行源码解析。本文是下篇,我们将以C++语言为载体,带大家了解一下TARS的服务端。
在使用TARS构建RPC服务端的时候,TARS会帮你生成一个XXXServer类,这个类是继承自Application类的,声明变量XXXServer g_app,以及调用函数:
g_app.main(argc, argv);
g_app.waitForShutdown();
便可以开启TARS的RPC服务了。在开始剖析TARS的服务端代码之前,先介绍几个重要的类,让大家有一个大致的认识。
正如前面所言,一个服务端就是一个Application,Application帮助用户读取配置文件,根据配置文件初始化代理(假如这个服务端需要调用其他服务,那么就需要初始化代理了)与服务,新建以及启动网络线程与业务线程。
TC_EpollServer才是真正的服务端,如果把Application比作风扇,那么TC_EpollServer就是那个马达。TC_EpollServer掌管两大模块——网络模块与业务模块,就是下面即将介绍的两个类。
/**
* 网络线程
* Network Thread
*/
std::vector<NetThread *> _netThreads;
/**
*
*/
vector<BindAdapterPtr> _bindAdapters;
/**
* 监听socket
* Listening socket
*/
unordered_map<int, BindAdapterPtr> _listeners;
代表着网络模块,内含TC_Epoller作为IO复用,TC_Socket建立socket连接(Connection),ConnectionList记录众多对客户端的socket连接(Connection)。任何与网络相关的数据收发都与NetThread有关。在配置文件中,利用/tars/application/server 下的netthread配置NetThread的个数。
代表一个RPC服务实体,在配置文件中的/tars/application/server下面的xxxAdapter就是对BindAdapter的配置,一个BindAdapter代表一个服务实体,看其配置就知道BindAdapter的作用是什么了,其代表一个RPC服务对外的监听套接字,还声明了连接的最大数量,接收队列的大小,业务线程数,RPC服务名,所使用的协议等。
BindAdapter本身可以认为是一个服务的实例,能建立真实存在的监听socket并对外服务,与网络模块NetThread以及业务模块HandleGroup都有关联,例如,多个NetThread的第一个线程负责对BindAdapter的listen socket进行监听,有客户连接到BindAdapter的listen socket就随机在多个NetThread中选取一个,将连接放进被选中的NetThread的ConnectionList中。BindAdapter则通常会与一组HandleGroup进行关联,该HandleGroup里面的业务线程就执行BindAdapter对应的服务。可见,BindAdapter与网络模块以及业务模块都有所关联。
/**
* Adapter所用的HandleGroup
* the HandleGroup used by Adapter
*/
vector<HandlePtr> _handles;
代表着业务模块,Handle是执行PRC服务的一个线程,而众多Handle组成的Handle数组就是同一个RPC服务的一组业务线程了。业务线程负责调用用户定义好的服务代码,并将处理结果放到发送缓存中等待网络模块发送,下文将会详细讲解业务线程是如何调用用户定义的代码的,这里用到了简单的C++反射,这点在很多资料中都没有被提及。在配置文件中,利用/tars/application/server/xxxAdapter 下的threads配置一个Handle数组中的Handle(业务线程)的个数。
好了,介绍完这几个类之后,通过类图看看他们之间的关系:
图(2-1)服务端相关类图
服务端TC_EpollServer管理类图中左侧的网络模块与右侧的业务模块,前者负责建立与管理服务端的网络关系,后者负责执行服务端的业务代码,两者通过BindAdapter构成一个整体,对外进行RPC服务。
与客户端一样,服务端也需要进行初始化,来构建上面所说的整体,按照上面的介绍,可以将初始化分为两模块——网络模块的初始化与业务模块的初始化。初始化的所有代码在Application的void main()以及void waitForQuit()中,初始化包括屏蔽pipe信号,读取配置文件等,这些将忽略不讲,主要看看其如何通过epoll与建立listen socket来构建网络部分,以及如何设置业务线程组构建业务部分。
在初始化网络模块与业务模块之前,TC_EpollServer需要先初始化,主要代码在:
void Application::main(const string& config) {
......
//初始化Server部分
initializeServer();
在initializeServer()中会填充ServerConfig里面的各个静态成员变量,留待需要的时候取用。可以看到有_epollServer = new TC_EpollServer(ServerConfig::NetThread),服务端TC_EpollServer被创建出来,而且网络线程NetThread也被建立出来了:
TC_EpollServer::TC_EpollServer(unsigned int iNetThreadNum)
......
if (_netThreadNum < 1) {
_netThreadNum = 1;
}
//网络线程的配置数目不能15个
if (_netThreadNum > 15) {
_netThreadNum = 15;
}
......
for (size_t i = 0; i < _netThreadNum; ++i) {
TC_EpollServer::NetThread *netThreads =
new TC_EpollServer::NetThread(this, i);
_netThreads.push_back(netThreads);
}
此后,其实有一个AdminAdapter被建立,但其与一般的RPC服务BindAdapter不同,这里不展开介绍。
好了,TC_EpollServer被构建之后,如何给他安排左(网络模块)右(业务模块)护法呢?
在讲解网络模块之前,再认真地看看网络模块的相关类图:
图(2-2)网络模块类图
先看看Application中哪些代码与网络模块的初始化有关吧:
void Application::main(const string& config) {
......
vector<TC_EpollServer::BindAdapterPtr> adapters;
//绑定对象和端口
bindAdapter(adapters);
void Application::waitForShutdown() {
......
_epollServer->waitForShutdown();
void TC_EpollServer::waitForShutdown() {
......
createEpoll();
网络部分的初始化,离不开建立各RPC服务的监听端口(socket,bind,listen),接收客户端的连接(accept),建立epoll等。那么何时何地调用这些函数呢?大致过程如下图所示:
图(2-3)网络模块的初始化
首先在Application::main(const string& config)中,调用:
vector<TC_EpollServer::BindAdapterPtr> adapters;
//绑定对象和端口
bindAdapter(adapters);
在Application::bindAdapter()建立一个个服务实体BindAdapter,通过读取配置文件中的/tars/application/server下面的xxxAdapter来确定服务实体BindAdapter的个数及不同服务实体的配置,然后再调用:
TC_EpollServer::BindAdapterPtr bindAdapter =
new TC_EpollServer::BindAdapter(_epollServer.get());
_epollServer->bind(bindAdapter);
来确定服务实体的listen socket。
int TC_EpollServer::bind(BindAdapterPtr &lsPtr) {
auto it = _listeners.begin();
while (it != _listeners.end()) {
if (it->second->getName() == lsPtr->getName()) {
throw TC_Exception("bind name '" + lsPtr->getName() + "' conflicts.");
}
++it;
}
const TC_Endpoint &ep = lsPtr->getEndpoint();
TC_Socket &s = lsPtr->getSocket();
bind(ep, s, lsPtr->isManualListen());
_listeners[s.getfd()] = lsPtr;
_bindAdapters.push_back(lsPtr);
return s.getfd();
}
可以看到,接下来继续调用TC_EpollServer::bind(BindAdapterPtr &lsPtr),其负责做一些准备工作,实际创建socket的是在TC_EpollServer::bind(BindAdapterPtr &lsPtr)中执行的TC_EpollServer::bind(const TC_Endpoint &ep, TC_Socket &s, bool manualListen):
void TC_EpollServer::bind(const TC_Endpoint &ep, TC_Socket &s,
bool manualListen) {
#if TARGET_PLATFORM_WINDOWS
int type = ep.isIPv6() ? AF_INET6 : AF_INET;
#else
int type = ep.isUnixLocal() ? AF_LOCAL : ep.isIPv6() ? AF_INET6 : AF_INET;
#endif
#if TARGET_PLATFORM_LINUX
if (ep.isTcp()) {
s.createSocket(SOCK_STREAM | SOCK_CLOEXEC, type);
} else {
s.createSocket(SOCK_DGRAM | SOCK_CLOEXEC, type);
}
#else
if (ep.isTcp()) {
s.createSocket(SOCK_STREAM, type);
} else {
s.createSocket(SOCK_DGRAM, type);
}
#endif
#if TARGET_PLATFORM_WINDOWS
s.bind(ep.getHost(), ep.getPort());
if (ep.isTcp())
#else
if (ep.isUnixLocal()) {
s.bind(ep.getHost().c_str());
} else {
s.bind(ep.getHost(), ep.getPort());
}
if (ep.isTcp() && !ep.isUnixLocal())
#endif
{
if (!manualListen) {
//手工监听
s.listen(10240);
}
s.setKeepAlive();
s.setTcpNoDelay();
//不要设置close wait否则http服务回包主动关闭连接会有问题
s.setNoCloseWait();
}
s.setblock(false);
}
执行到这里,已经创建了服务实体BindAdapter的listen socket了,代码退回到TC_EpollServer::bind(BindAdapterPtr &lsPtr)后,还可以看到TC_EpollServer记录fd其所负责监听的BindAdapter:
_listeners[s.getfd()] = lsPtr;
下图是对创建服务实体的listen socket的流程总结
图(2-4)创建服务实体的listen socket
代码回到Application::waitForShutdown()中,通过执行:
_epollServer->waitForShutdown();
void TC_EpollServer::waitForShutdown() {
if (!isMergeHandleNetThread()) startHandle();
createEpoll();
来让TC_EpollServer在main线程中将上面创建的listen sockets加入epoll中:
void TC_EpollServer::createEpoll() {
uint32_t maxAllConn = 0;
//监听socket
auto it = _listeners.begin();
while (it != _listeners.end()) {
if (it->second->getEndpoint().isTcp()) {
//获取最大连接数
maxAllConn += it->second->getMaxConns();
_epoller.add(it->first, it->first, EPOLLIN);
} else {
maxAllConn++;
}
++it;
}
if (maxAllConn >= (1 << 22)) {
error("createEpoll connection num: " + TC_Common::tostr(maxAllConn) +
" >= " + TC_Common::tostr(1 << 22));
maxAllConn = (1 << 22) - 1;
}
for (size_t i = 0; i < _netThreads.size(); ++i) {
_netThreads[i]->createEpoll(maxAllConn);
}
//必须先等所有网络线程调用createEpoll(),初始化list后,才能调用initUdp()
for (size_t i = 0; i < _netThreads.size(); ++i) {
_netThreads[i]->initUdp(_listeners);
}
}
代码来到NetThread::createEpoll(uint32_t maxAllConn),
void TC_EpollServer::NetThread::createEpoll(uint32_t maxAllConn) {
_list.init((uint32_t)maxAllConn, _threadIndex + 1);
}
这个函数可以作为网络线程NetThread的初始化函数,在函数里面初始化了连接管理链表ConnectionList _list。看下图对本流程的总结:
图(2-5)创建epoll
由于NetThread是线程,需要执行其start()函数才能启动线程。而这个工作在Application::waitForShutdown()中的_epollServer->waitForShutdown()完成,
void TC_EpollServer::waitForShutdown() {
......
for (size_t i = 0; i < _netThreadNum; ++i) {
_netThreads[i]->start();
}
跟着下面的流程图看代码,就清楚明白了:
图(2-6)启动网络线程
同样,与网络模块一样,在讲解业务模块之前,先认真地看看业务模块的相关类图:
图(2-7)业务模块相关类图
在业务模块初始化中,我们需要理清楚两个问题:业务模块如何与用户填充实现的XXXServantImp建立联系,从而使请求到来的时候,Handle能够调用用户定义好的RPC方法?业务线程在何时何地被启动,如何等待着请求的到达?
看看Application中哪些代码与业务模块的初始化有关吧:
void Application::main(const string& config) {
......
vector<TC_EpollServer::BindAdapterPtr> adapters;
//绑定对象和端口
bindAdapter(adapters);
......
//捕捉初始化可能的异常
try {
TC_Common::msleep(100);
//业务应用的初始化
initialize();
在bindAdapter(adapters)与initialize()中解决了前面提到的第一个问题,bindAdapter(adapters)中剩下的代码实现了handle业务线程组的创建与启动。
如何进行关联?先看看下面的代码流程图:
图(2-8)通过ServantHelperManager关联BindAdapter与服务Servant
如何让业务线程能够调用用户自定义的代码?这里引入了ServantHelperManager,先简单剧透一下,通过ServantHelperManager作为桥梁,业务线程可以通过BindAdapter的ID索引到服务ID,然后通过服务ID索引到用户自定义的XXXServantImp类的生成器,有了生成器,业务线程就可以生成XXXServantImp类并调用里面的方法了。下面一步一步分析。
在Application::main()调用的Application::bindAdapter()中看到有下面的代码:
void Application::bindAdapter(
vector<TC_EpollServer::BindAdapterPtr>& adapters) {
......
for (size_t i = 0; i < adapterName.size(); i++) {
string servant =
_conf.get("/tars/application/server/" + adapterName[i] + "<servant>");
checkServantNameValid(servant, sPrefix);
_servantHelper->setAdapterServant(adapterName[i], servant);
举个例子,adapterName[i]为TestApp.HelloServer.HelloObjAdapter,而servant为TestApp.HelloServer.HelloObj,这些都是在配置文件中读取的,前者是BindAdapter的ID,而后者是服务ID。在ServantHelperManager:: setAdapterServant()中,仅仅是执行:
void ServantHelperManager::setAdapterServant(const string &sAdapter,
const string &sServant) {
_adapter_servant[sAdapter] = sServant;
_servant_adapter[sServant] = sAdapter;
}
而这两个成员变量仅仅是:
/**
* Adapter包含的Servant(Adapter名称:servant名称)
*/
map<string, string> _adapter_servant;
/**
* Adapter包含的Servant(Servant名称:Adapter名称)
*/
map<string, string> _servant_adapter;
在这里仅仅是作一个映射记录,后续可以通过BindAdapter的ID可以索引到服务的ID,通过服务的ID可以利用简单的C++反射得出用户实现的XXXServantImp类,从而得到用户实现的方法。
如何实现从服务ID到类的反射?同样需要通过ServantHelperManager的帮助。在Application::main()中,执行完Application::bindAdapter()会执行initialize(),这是一个纯虚函数,实际会执行派生类XXXServer的函数,类似:
void HelloServer::initialize() {
// initialize application here:
//...
addServant<HelloImp>(ServerConfig::Application + "." +
ServerConfig::ServerName + ".HelloObj");
}
代码最终会执行ServantHelperManager:: addServant():
/**
* 添加Servant
* @param T
* @param id
*/
template <typename T>
void addServant(const string &id, Application *application,
bool check = false) {
if (check && _servant_adapter.end() == _servant_adapter.find(id)) {
cerr << "[TARS]ServantHelperManager::addServant " << id
<< " not find adapter.(maybe not conf in the web)" << endl;
throw runtime_error("[TARS]ServantHelperManager::addServant " + id +
" not find adapter.(maybe not conf in the web)");
}
_servant_creator[id] = new ServantCreation<T>(application);
}
其中参数const string& id是服务ID,例如上文的TestApp.HelloServer.HelloObj,T是用户填充实现的HelloImp类。
上面代码的_servant_creator[id] = new ServantCreation
/**
* Servant
*/
template <class T>
struct ServantCreation : public ServantHelperCreation {
ServantCreation(Application *application) : _application(application) {}
ServantPtr create(const string &s) {
T *p = new T;
p->setName(s);
p->setApplication(_application);
return p;
}
Application *_application;
};
以上就是通过服务ID生成相应XXXServantImp类的简单反射技术,业务线程组里面的业务线程只需要获取到所需执行的业务的BindAdapter的ID,就可以通过ServantHelperManager获得服务ID,有了服务ID就可以获取XXXServantImp类的生成器,从而生成XXXServantImp类,执行里面由用户定义好的RPC方法。现在重新看图(2-8)就大致清楚整个流程了。
剩下的部分就是Handle数组的创建,并将其与BindAdapter进行绑定关联,同时也需要绑定TC_EpollServer,随后创建/启动Handle业务线程,启动Handle的过程涉及上文“将BindAdapter与用户定义的方法关联起来”提到的获取服务类生成器。先看看大致的代码流程图:
图(2-9) 业务线程组的建立流程
在这里分两部分,第一部分是在Application::bindAdapter()中执行下列代码:
if (_conf.getDomainVector("/tars/application/server", adapterName)) {
for (size_t i = 0; i < adapterName.size(); i++) {
......
bindAdapter->setHandle<ServantHandle>(
TC_Common::strto<int>(_conf.get(sLastPath + "<threads>", "1")), this);
遍历在配置文件中定义好的每一个BindAdapter(例如TestApp.HelloServer.HelloObjAdapter),并为其设置业务线程Handle数组,让线程组的所有线程都可以执行该BindAdapter所对应的RPC方法。注意,ServantHandle是Handle的派生类,就是业务处理线程类,随后来到:
/**
* 初始化处理线程,线程将会启动
* Initialize the processing thread, which will start
*/
template <typename T, typename... Args>
void setHandle(size_t n, Args &&... args) {
if (!_handles.empty()) {
getEpollServer()->error(
"[BindAdapter::setHandle] handle is not empty!");
return;
}
_iHandleNum = n;
_threadDataQueue.resize(_iHandleNum + 1);
_threadDataQueue[0] = std::make_shared<BindAdapter::DataQueue>();
if (_pEpollServer->isMergeHandleNetThread()) {
_iHandleNum = _pEpollServer->_netThreadNum;
}
for (int32_t i = 0; i < _iHandleNum; ++i) {
HandlePtr handle = new T(args...);
handle->setHandleIndex(i);
handle->setEpollServer(this->getEpollServer());
handle->setBindAdapter(this);
_handles.push_back(handle);
}
}
真正创建业务线程Handle数组,并将线程组与BindAdapter,TC_EpollServer关联起来。
执行完上面的代码,就可以得到下面的类图了:
图(2-10)再看业务模块相关类图
这里再通过函数流程图简单复习一下上述代码的流程,主要内容均在BindAdapter::setHandle()中:
图(2-11)建立业务模块
随着函数的层层退出,代码来到Application::waitForShutdown()中,随后执行:
void Application::waitForShutdown() {
......
_epollServer->waitForShutdown();
}
void TC_EpollServer::waitForShutdown() {
if (!isMergeHandleNetThread()) startHandle();
在TC_EpollServer::startHandle()中,遍历TC_EpollServer控制的_bindAdapters中的所有bindAdapter,并遍历bindAdapter内的各个Handle,执行其start()方法进行线程的启动:
void TC_EpollServer::startHandle() {
if (!this->isMergeHandleNetThread()) {
if (!_handleStarted) {
_handleStarted = true;
for (auto &bindAdapter : _bindAdapters) {
const vector<TC_EpollServer::HandlePtr> &hds =
bindAdapter->getHandles();
for (uint32_t i = 0; i < hds.size(); ++i) {
if (!hds[i]->isAlive()) {
hds[i]->start();
}
}
}
}
}
}
由于Handle是继承自TC_Thread的,在执行Handle::start()中,会执行虚函数Handle::run(),在Handle::run()中主要是执行两个函数,一个是ServantHandle::initialize(),另一个是Handle::handleImp():
void TC_EpollServer::Handle::run() {
initialize();
handleImp();
}
ServantHandle::initialize()的主要作用是取得用户实现的RPC方法,其实现原理与上文(“2.2.3业务模块的初始化”中的第1小点“将BindAdapter与用户定义的方法关联起来”)提及的一样,借助与其关联的BindAdapter的ID号,以及ServantHelpManager,来查找到用户填充实现的XXXServantImp类的生成器并生成XXXServantImp类的实例,将这个实例与服务名构成pair <string, ServantPtr>变量,放进unordered_map<string, ServantPtr> ServantHandle:: _servants中,等待业务线程Handle需要执行用户自定义方法的时候,从map<string, ServantPtr> ServantHandle:: _servants中查找:
void ServantHandle::initialize() {
ServantPtr servant =
_application->getServantHelper()->create(_bindAdapter->getName());
if (servant) {
_servants[servant->getName()] = servant;
} else {
TLOGERROR(
"[ServantHandle initialize createServant ret null, for adapter `" +
_bindAdapter->getName() + "`]"
<< endl);
cerr << "ServantHandle initialize createServant ret null, for adapter `" +
_bindAdapter->getName() + "`]"
<< endl;
RemoteNotify::getInstance()->report(
"initialize createServant error: no adapter:" +
_bindAdapter->getName());
TC_Common::msleep(100);
exit(-1);
}
而Handle::handleImp()的主要作用是使业务线程阻塞在等待条件变量上,在这里,可以看到l.timedWait(waitTime)函数,阻塞等待在条件变量上:
void TC_EpollServer::Handle::handleImp() {
// by goodenpei, 判断是否启用顺序模式
_bindAdapter->initThreadRecvQueue(getHandleIndex());
startHandle();
while (!getEpollServer()->isTerminate()) {
//等待
wait();
void TC_EpollServer::Handle::wait() {
if (allAdapterIsEmpty() && allFilterIsEmpty()) {
_bindAdapter->waitAtQueue(_handleIndex, _iWaitTime);
}
}
void TC_EpollServer::BindAdapter::waitAtQueue(uint32_t handleIndex,
uint32_t waitTime) {
TC_ThreadLock &l = getLock(handleIndex);
TC_ThreadLock::Lock lock(l);
l.timedWait(waitTime);
}
Handle线程通过条件变量来让所有业务线程阻塞等待被唤醒 ,因为本章是介绍初始化,因此代码解读到这里先告一段落,稍后再详解服务端中的业务线程Handle被唤醒后,如何通过map<string, ServantPtr> ServantHandle:: _servants查找并执行业务。现在通过函数流程图复习一下上述的代码流程:
图(2-12)启动Handle业务线程
经过了初始化工作后,服务端就进入工作状态了,服务端的工作线程分为两类,正如前面所介绍的网络线程与业务线程,网络线程负责接受客户端的连接与收发数据,而业务线程则只关注执行用户所定义的PRC方法,两种线程在初始化的时候都已经执行start()启动了。
大部分服务器都是按照accept()->read()->write()->close()的流程执行的,大致工作流程图如下图所示:
图(2-13)普通服务器工作流程
TARS的服务端也不例外。
判定逻辑采用Epoll IO复用模型实现,每一条网络线程NetThread都有一个TC_Epoller来做事件的收集、侦听、分发。
正如前面所介绍,只有main线程会执行连接的监听工作,接受新的连接之后,就会构造一个Connection实例,并选择处理这个连接的网络线程。
请求被读入后,将暂存在接收队列中,并通知业务线程进行处理,在这里,业务线程终于登场了,处理完请求后,将结果放到发送队列。
发送队列有数据,自然需要通知网络线程进行发送,接收到发送通知的网络线程会将响应发往客户端。
TARS服务器的工作流程大致就是如此,与如上图所示的普通服务器工作流程没有多大的区别,下面将按照接受客户端连接,读入RPC请求,处理RPC请求,发送RPC响应四部分逐一介绍介绍服务端的工作。
讨论服务器接受请求,从main线程的TC_EpollServer::waitForShutdown()开始分析,在上面说到的创建TC_Epoller并将监听fd放进TC_Epoller的时候,执行的是:
void TC_EpollServer::waitForShutdown() {
......
// 初始化 listen sockets,加入epoll管理
// 初始化每个网络线程内的连接列表
createEpoll();
void TC_EpollServer::createEpoll() {
......
_epoller.add(it->first, it->first, EPOLLIN);
int TC_Epoller::add(SOCKET_TYPE fd, uint64_t data, int32_t event) {
#if TARGET_PLATFORM_IOS
return ctrl(fd, data, event, EV_ADD | EV_ENABLE);
#else
return ctrl(fd, data, event, EPOLL_CTL_ADD);
#endif
}
那么从epoll_wait()返回的时候,epoll_event中的events将会是EPOLLIN,然后执行下面:
//监听端口有请求
if (TC_Epoller::readEvent(ev)) {
#if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS
bool ret;
do {
ret = accept(fd, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET);
} while (ret);
#else
accept(fd, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET);
#endif
}
而ret = accept(fd, it->second->_ep.isIPv6() ? AF_INET6 : AF_INET)的整个函数流程如下图所示:
图(2-14)服务端accept一位客户端
在讲解之前,先复习一下网络线程相关类图,以及通过图解对accept有个大致的印象:
图(2-15)网络模块类图
图(2-16)服务端接受一个客户端连接
好了,跟着图(2-14),现在从TC_EpollServer::waitForShutdown()的TC_EpollServer::accept(int fd, int domain)讲起。
进入TC_EpollServer::accept(int fd, int domain),可以看到代码执行了:
TC_Socket cs;
cs.setOwner(false);
//接收连接
TC_Socket s;
s.init(fd, false, domain);
int iRetCode = s.accept(cs, (struct sockaddr *)stSockAddr, iSockAddrSize);
通过TC_Socket::accept(),调用系统函数accept()接受了客户端的辛辛苦苦三次握手来的socket连接,然后对客户端的IP与端口进行打印以及检查,并分析对应的BindAdapter是否过载,过载则关闭连接。随后对客户端socket进行设置:
cs.setblock(false);
cs.setKeepAlive();
cs.setTcpNoDelay();
cs.setCloseWaitDefault();
到此,对应图(2-16)的第一步——接受客户端连接(流程如下图所示),已经完成。
图(2-17)accept客户端
接下来是为新来的客户端socket创建一个Connection,在TC_EpollServer::accept(int fd, int domain)中,创建Connection的代码如下:
int timeout = _listeners[fd]->getEndpoint().getTimeout() / 1000;
Connection *cPtr =
new Connection(_listeners[fd].get(), fd, (timeout < 2 ? 2 : timeout),
cs.getfd(), ip, port);
构造函数中的参数依次是,这次新客户端所对应的BindAdapter指针,BindAdapter对应的listen socket的fd,超时时间,客户端socket的fd,客户端的ip以及端口。在Connection的构造函数中,通过fd也关联其TC_Socket:
TC_EpollServer::Connection::Connection(
TC_EpollServer::BindAdapter *pBindAdapter, int lfd, int timeout, int fd,
const string &ip, uint16_t port)
......
_sock.init(fd, true, pBindAdapter->_ep.isIPv6() ? AF_INET6 : AF_INET);
那么关联TC_Socket之后,通过Connection实例就可以操作的客户端socket了。至此,对应图(2-16)的第二步——为客户端socket创建Connection就完成了(流程如下图所示)。
图(2-18)创建Connection
最后,就是为这个Connection选择一个网络线程,将其加入网络线程对应的ConnectionList,在TC_EpollServer::accept(int fd, int domain)中,执行:
addConnection(cPtr, cs.getfd(), TCP_CONNECTION);
TC_EpollServer::addConnection()的代码如下所示:
void TC_EpollServer::addConnection(TC_EpollServer::Connection *cPtr, int fd,
TC_EpollServer::CONN_TYPE iType) {
TC_EpollServer::NetThread *netThread = getNetThreadOfFd(fd);
if (iType == TCP_CONNECTION) {
netThread->addTcpConnection(cPtr);
} else {
netThread->addUdpConnection(cPtr);
}
// 回调
if (_acceptFunc != NULL) {
_acceptFunc(cPtr);
}
}
看到,先为Connection* cPtr选择网络线程,在流程图中,被选中的网络线程称为Chosen_NetThread。选网络线程的函数是TC_EpollServer::getNetThreadOfFd(int fd),根据客户端socket的fd求余数得到,具体代码如下:
/**
* 选择网络线程
* Select network threads
* @param fd
*/
inline NetThread *getNetThreadOfFd(int fd) {
return _netThreads[fd % _netThreads.size()];
}
接着调用被选中线程的NetThread::addTcpConnection()方法(或者NetThread::addUdpConnection(),这里只介绍TCP的方法),将Connection加入被选中网络线程的ConnectionList中,最后会执行_epoller.add(cPtr->getfd(), cPtr->getId(), EPOLLIN | EPOLLOUT)将客户端socket的fd加入本网络线程的TC_Epoller中,让本网络线程负责对本客户端的数据收发。至此对应图(2-16)的第三步就执行完毕了(具体流程如下图所示)。 |
图(2-19)为Connection选择一个网络线程
讨论服务器接收RPC请求,从网络线程的NetThread::run()开始分析,上面提到,将客户端socket的fd加入TC_Epoller来监听其读写,采用的是_epoller.add(cPtr->getfd(), cPtr->getId(), EPOLLIN | EPOLLOUT),传递给函数的第二个参数是32位的整形cPtr->getId(),而函数的第二个参数要求必须是64位的整型,因此,这个参数将会是高32位是0,低32位是cPtr->getId()的64位整形。而第二个参数的作用是当该注册的事件引起epoll_wait()退出的时候,会作为激活事件epoll_event 结构体中的64位联合体epoll_data_t data返回给用户。因此,看下面NetThread::run()代码: |
void TC_EpollServer::NetThread::run() {
_threadId = std::this_thread::get_id();
if (_epollServer->isMergeHandleNetThread()) {
vector<TC_EpollServer::BindAdapterPtr> adapters =
_epollServer->getBindAdapters();
for (auto adapter : adapters) {
adapter->getHandle(_threadIndex)->setNetThread(this);
adapter->getHandle(_threadIndex)->initialize();
}
}
//循环监听网路连接请求
while (!_bTerminate) {
_list.checkTimeout(TNOW);
int iEvNum = _epoller.wait(1000);
//没有网络事件
if (iEvNum == 0) {
//在这里加上心跳逻辑,获取所有的bindAdpator,然后发心跳
if (_epollServer->isMergeHandleNetThread()) {
vector<TC_EpollServer::BindAdapterPtr> adapters =
_epollServer->getBindAdapters();
for (auto adapter : adapters) {
adapter->getHandle(_threadIndex)->heartbeat();
}
}
}
if (_bTerminate) break;
for (int i = 0; i < iEvNum; ++i) {
try {
const epoll_event &ev = _epoller.get(i);
uint32_t fd = TC_Epoller::getU32(ev, false);
if (fd == (uint32_t)_notify.notifyFd()) {
//检查是否是通知消息
processPipe();
} else {
processNet(ev);
}
} catch (exception &ex) {
error("run exception:" + string(ex.what()));
}
}
}
}
Linux下,代码中的fd是64位联合体epoll_data_t data的低32位,经过上面分析,客户端socket若因为接收到数据而引起epoll_wait()退出的话,epoll_data_t data的高32位是0,低32位是cPtr->getId(),因此fd将会是cPtr->getId()。因此客户端socket有数据来到的话,会执行else分支。下面看看执行else分支的函数流程图。
图(2-20)服务端接收RPC请求流程图
收到RPC请求,进入到NetThread::processNet(),服务器需要知道是哪一个客户端socket被激活了,因此在NetThread::processNet()中执行:
void TC_EpollServer::NetThread::processNet(const epoll_event &ev) {
uint32_t uid = TC_Epoller::getU32(ev, false);
Connection *cPtr = getConnectionPtr(uid);
正如上面说的,epoll_data_t data的高32位是0,低32位是cPtr->getId(),那么获取到uid之后,通过NetThread::getConnectionPtr()就可以从ConnectionList中返回此时此刻所需要读取RPC请求的Connection了。之后对获取的Connection进行简单的检查工作,并看看epoll_event::events是否是EPOLLERR或者EPOLLHUP(具体流程如下图所示)。
图(2-21)获取收到数据的Connection
接着,就需要接收客户端的请求数据了,有数据接收意味着epoll_event::events是EPOLLIN,看下面代码,主要是NetThread::recvBuffer()读取RPC请求数据,以及以及Connection:: insertRecvQueue()唤醒业务线程发送数据。
if (TC_Epoller::readEvent(ev)) {
int ret = cPtr->recv();
if (ret < 0) {
delConnection(cPtr, true, EM_CLIENT_CLOSE);
return;
}
}
int TC_EpollServer::Connection::recv() {
return isTcp() ? recvTcp() : recvUdp();
}
Connection::recv()会依照不同的传输层协议(若UDP传输,lfd==-1),执行不同的接收方法,例如TCP会执行:
int TC_EpollServer::Connection::recvTcp() {
int recvCount = 0;
TC_NetWorkBuffer *rbuf = &_recvBuffer;
while (true) {
char buffer[BUFFER_SIZE] = {0x00};
int iBytesReceived = _sock.recv((void *)buffer, BUFFER_SIZE);
int TC_Socket::recv(void *pvBuf, size_t iLen, int iFlag) {
return ::recv(_sock, (char *)pvBuf, (int)iLen, iFlag);
}
根据数据接收情况,如收到FIN分节,errno==EAGAIN等执行不同的动作。若收到真实的请求信息包,会将接收到的数据放在TC_NetWorkBuffer Connection::_recvbuffer中,然后调用Connection:: parseProtocol()。
在Connection:: parseProtocol()中会回调协议解析函数对接收到的数据进行检验,检验通过后,会构造线程安全队列中的元素shared_ptr
} else if (b == TC_NetWorkBuffer::PACKET_FULL) {
shared_ptr<RecvContext> recv = std::make_shared<RecvContext>(
getId(), _ip, _port, getfd(), _pBindAdapter);
recv->buffer().swap(ro);
if (_pBindAdapter->getEndpoint().isTcp() &&
_pBindAdapter->_authWrapper &&
_pBindAdapter->_authWrapper(this, recv))
continue;
//收到完整的包才算
this->_bEmptyConn = false;
//收到完整包
insertRecvQueue(recv);
} else {
到此,RPC请求数据已经被完全获取并放置在线程安全队列中(具体过程如下图所示)。
图(2-22)接收客户请求
在Connection:: insertRecvQueue()中,会先对BindAdapter进行过载判断,分为未过载,半过载以及全过载三种情况。若全过载会丢弃线程安全队列中的所有RPC请求数据,否则会执行BindAdapter::insertRecvQueue()。
void TC_EpollServer::Connection::insertRecvQueue(
const shared_ptr<TC_EpollServer::RecvContext> &recv) {
if (_pBindAdapter->getEpollServer()->isMergeHandleNetThread()) {
int index = _pBindAdapter->getEpollServer()
->getNetThreadOfFd(recv->fd())
->getIndex();
//直接在网络线程中调用handle的process
_pBindAdapter->getHandle(index)->process(recv);
} else {
int iRet = _pBindAdapter->isOverloadorDiscard();
if (iRet == 0) //未过载
{
_pBindAdapter->insertRecvQueue(recv);
} else if (iRet == -1) //超过接受队列长度的一半,需要进行overload处理
{
recv->setOverload();
_pBindAdapter->insertRecvQueue(recv); //, false);
} else //接受队列满,需要丢弃
{
_pBindAdapter->getEpollServer()->error(
"[Connection::insertRecvQueue] overload discard package");
}
}
}
在BindAdapter::insertRecvQueue()中,代码主要有两个动作,第一个是将获取到的RPC请求包放进BindAdapter的接收队列——_threadDataQueue[idx]->_rbuffer中,第二个是唤醒等待条件变量的Handle线程组:
void TC_EpollServer::BindAdapter::insertRecvQueue(
const shared_ptr<RecvContext> &recv) {
_iRecvBufferSize++;
size_t idx = 0;
if (isQueueMode()) {
//相同连接过来的进入同一个buffer, 被Handle的同一个线程处理
idx = recv->fd() % _iHandleNum + 1;
}
_threadDataQueue[idx]->_rbuffer.push_back(recv);
//通知对应的线程队列醒过来
TC_ThreadLock::Lock lock(_threadDataQueue[idx]->_monitor);
_threadDataQueue[idx]->_monitor.notify();
}
现在,服务端的网络线程在接收RPC请求数据后,终于唤醒了业务线程(具体流程看下图所示),接下来轮到业务模块登场,看看如何处理RPC请求了。
图(2-23)唤醒业务线程
与前文接收到请求数据后,唤醒业务线程组Handle(就是刚刚才介绍完的_threadDataQueue[idx]->_monitor.notify())遥相呼应的地方是在“2.2.3业务模块的初始化”第2小点“Handle业务线程的启动”中提到的,在Handle::handleImp()函数中的l.timedWait(waitTime)。通过条件变量,业务线程组Handle里面的业务线程一起阻塞等待着网络线程对其发起唤醒。现在,终于对条件变量发起通知了,接下来将会如何处理请求呢?在这里,需要先对2.2.3节进行复习,了解到ServantHandle::_servants里面究竟承载着什么。
好了,处理RPC请求分为三步:构造请求上下文,调用用户实现的方法处理请求,将响应数据包push到线程安全队列中并通知网络线程,具体函数流程如下图所示,现在进一步分析:
图(2-24)服务端处理RPC请求流程图
当业务线程从条件变量上被唤醒之后,从其负责的BindAdapter中获取请求数据:bindAdapter->waitForRecvQueue(_handleIndex, recv),在BindAdapter::waitForRecvQueue()中,将从线程安全队列recv_queue BindAdapter::DataQueue:: rbuffer中获取数据:
bool TC_EpollServer::BindAdapter::waitForRecvQueue(
uint32_t handleIndex, shared_ptr<RecvContext> &data) {
bool bRet = getRecvQueue(handleIndex).pop_front(data);
if (!bRet) {
return bRet;
}
--_iRecvBufferSize;
return bRet;
}
TC_EpollServer::recv_queue &TC_EpollServer::BindAdapter::getRecvQueue(
uint32_t handleIndex) {
if (isQueueMode()) {
return _threadDataQueue[handleIndex + 1]->_rbuffer;
}
return _threadDataQueue[0]->_rbuffer;
}
还记得在哪里将数据压入线程安全队列的吗?对,就在“2.3.2接收RPC请求”的第3点“线程安全队列非空,唤醒业务线程发送”中。
接着,调用ServantHandle::handle()对接收到的RPC请求数据进行处理。
处理的第一步正如本节小标题所示——构造请求上下文,用的是ServantHandle::createCurrent():
void ServantHandle::handle(
const shared_ptr<TC_EpollServer::RecvContext> &data) {
CurrentPtr current = createCurrent(data);
在ServantHandle::createCurrent()中,先new出TarsCurrent实例,然后调用其initialize()方法,在Current::initialize(const shared_ptr<TC_EpollServer::RecvContext> &data)中,将RPC请求包的内容放进请求上下文CurrentPtr current中,后续只需关注这个请求上下文即可。下面稍微总结一下这小节的流程:
图(2-25)构造请求上下文
当获取到请求上下文之后,就需要对其进行处理了。
void ServantHandle::handle(
const shared_ptr<TC_EpollServer::RecvContext> &data) {
CurrentPtr current = createCurrent(data);
if (!current) return;
if (current->getBindAdapter()->isTarsProtocol()) {
handleTarsProtocol(current);
} else {
handleNoTarsProtocol(current);
}
}
本RPC框架支持TARS协议与非TARS协议,下面只会介绍对TARS协议的处理,对于非TARS协议,分析流程也是差不多,对非TARS协议感兴趣的读者可以对比着来分析非TARS协议部分。在介绍之前,先看看服务相关的继承体系,下面不要混淆这三个类了:
图(2-26)服务类继承体系
好了,现在重点放在ServantHandle::handleTarsProtocol(const TarsCurrentPtr ¤t)函数上面。先贴代码:
void ServantHandle::handleTarsProtocol(const CurrentPtr ¤t) {
TLOGTARS("[ServantHandle::handleTarsProtocol current:"
<< current->getIp() << "|" << current->getPort() << "|"
<< current->getMessageType() << "|" << current->getServantName()
<< "|" << current->getFuncName() << "|" << current->getRequestId()
<< "|" << TC_Common::tostr(current->getRequestStatus()) << "]"
<< endl);
//检查set调用合法性
if (!checkValidSetInvoke(current)) {
return;
}
//处理染色消息
string dyeingKey = "";
TarsDyeingSwitch dyeSwitch;
if (processDye(current, dyeingKey)) {
dyeSwitch.enableDyeing(dyeingKey);
}
//处理cookie
map<string, string> cookie;
CookieOp cookieOp;
if (processCookie(current, cookie)) {
cookieOp.setCookie(cookie);
current->setCookie(cookie);
}
#ifdef TARS_OPENTRACKING
//处理tracking信息
processTracking(current);
#endif
auto sit = _servants.find(current->getServantName());
if (sit == _servants.end()) {
current->sendResponse(TARSSERVERNOSERVANTERR);
#ifdef TARS_OPENTRACKING
finishTracking(TARSSERVERNOSERVANTERR, current);
#endif
return;
}
int ret = TARSSERVERUNKNOWNERR;
string sResultDesc = "";
ResponsePacket response;
// vector<char> buffer;
try {
//业务逻辑处理
ret = sit->second->dispatch(current, response.sBuffer);
} catch (TarsDecodeException &ex) {
TLOGERROR("[ServantHandle::handleTarsProtocol " << ex.what() << "]"
<< endl);
ret = TARSSERVERDECODEERR;
sResultDesc = ex.what();
} catch (TarsEncodeException &ex) {
TLOGERROR("[ServantHandle::handleTarsProtocol " << ex.what() << "]"
<< endl);
ret = TARSSERVERENCODEERR;
sResultDesc = ex.what();
} catch (exception &ex) {
TLOGERROR("[ServantHandle::handleTarsProtocol " << ex.what() << "]"
<< endl);
ret = TARSSERVERUNKNOWNERR;
sResultDesc = ex.what();
} catch (...) {
TLOGERROR("[ServantHandle::handleTarsProtocol unknown error]" << endl);
ret = TARSSERVERUNKNOWNERR;
sResultDesc = "handleTarsProtocol unknown exception error";
}
//单向调用或者业务不需要同步返回
if (current->isResponse()) {
current->sendResponse(ret, response, Current::TARS_STATUS(), sResultDesc);
}
#ifdef TARS_OPENTRACKING
finishTracking(ret, current);
#endif
}
进入函数中,会先对请求上下文进行预处理,例如set调用合法性检查,染色处理等。随后,就依据上下文中的服务名来获取服务对象:auto sit = _servants.find(current->getServantName()),_servants在“2.2.3业务模块的初始化”第2小点“Handle业务线程的启动”中被赋予内容,其key是服务ID(或者叫服务名),value是用户实现的服务XXXServantImp实例指针。
随后就可以利用XXXServantImp实例指针来执行RPC请求了:ret = sit->second->dispatch(current, response.sBuffer),在Servant:: dispatch()(如图(2-26)因为XXXServantImp是继承自XXXServant,而XXXServant继承自Servant,所以实际是执行Servant的方法)中,使用不同的协议会有不同的处理方式,这里只介绍TARS协议的,调用了XXXServant::onDispatch(tars::TarsCurrentPtr _current, vector
int Servant::dispatch(CurrentPtr current, vector<char>& buffer) {
int ret = TARSSERVERUNKNOWNERR;
if (current->getFuncName() == "tars_ping") {
TLOGTARS("[Servant::dispatch] tars_ping ok from ["
<< current->getIp() << ":" << current->getPort() << "]" << endl);
ret = TARSSERVERSUCCESS;
} else if (!current->getBindAdapter()->isTarsProtocol()) {
TC_LockT<TC_ThreadRecMutex> lock(*this);
ret = doRequest(current, buffer);
} else {
TC_LockT<TC_ThreadRecMutex> lock(*this);
ret = onDispatch(current, buffer);
}
return ret;
}
XXXServant类就是执行Tars2Cpp的时候生成的,会依据用户定义的tars文件来生成相应的纯虚函数,以及onDispatch()方法,该方法的动作有:
1.找出在本服务类中与请求数据相对应的函数; 2.解码请求数据中的函数参数; 3.执行XXXServantImp类中用户定义的相应RPC方法; 4.编码函数执行后的结果; 5.return tars::TARSSERVERSUCCESS。
上述步骤是按照默认的服务端自动回复的思路去阐述,在实际中,用户可以关闭自动回复功能(如:current->setResponse(false)),并自行发送回复(如:Hello::async_response_testHello(current, ret, sRsp))。
void tars::Current::setResponse(bool value)
static void TestApp::Hello::async_response_testHello(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &sRsp)
到此,服务端已经执行了RPC方法,下面稍微总结一下本小节的内容:
图(2-27)处理TARS协议的请求
处理完RPC请求,执行完RPC方法之后,需要将结果(下面代码中的buffer)回送给客户端:
void ServantHandle::handleTarsProtocol(const CurrentPtr ¤t) {
......
//单向调用或者业务不需要同步返回
if (current->isResponse()) {
current->sendResponse(ret, response, Current::TARS_STATUS(), sResultDesc);
}
由于业务与网络是独立开来的,网络线程收到请求包之后利用条件变量来通知业务线程,而业务线程又有什么方式来通知网络线程呢?由前面可知,网络线程是阻塞在epoll中的,因此需要利用epoll来通知网络线程。这次先看图解总结,再分析代码:
图(2-28)数据push到队列中并通知网络线程
在ServantHandle::handleTarsProtocol()中,最后的一步就是回送响应包。
void Current::sendResponse(int iRet, ResponsePacket &response,
const map<string, string> &status,
const string &sResultDesc) {
......
_servantHandle->sendResponse(send);
void TC_EpollServer::Handle::sendResponse(
const shared_ptr<TC_EpollServer::SendContext> &data) // uint32_t uid, const
// vector<char>
// &sSendBuffer, const
// string &ip, int
// port, int fd)
{
_pEpollServer->send(data);
}
void TC_EpollServer::send(const shared_ptr<SendContext> &data) {
TC_EpollServer::NetThread *netThread = getNetThreadOfFd(data->fd());
netThread->send(data);
}
数据包的回送经历的步骤是:编码响应信息——找出与接收请求信息的网络线程,因为我们需要通知他来干活——将响应包放进该网络线程的发送队列——利用epoll的特性唤醒网络线程,我们重点看看NetThread::send():
void TC_EpollServer::NetThread::send(const shared_ptr<SendContext> &data) {
if (_threadId == std::this_thread::get_id()) {
//发送包线程和网络线程是同一个线程,直接发送即可
Connection *cPtr = getConnectionPtr(data->uid());
if (cPtr) {
cPtr->send(data);
}
} else {
//发送包线程和网络线程不是同一个线程, 需要先放队列, 再唤醒网络线程去发送
_sbuffer.push_back(data);
//通知epoll响应, 有数据要发送
if (!_notifySignal) {
_notifySignal = true;
_notify.notify();
}
}
}
void TC_Epoller::NotifyInfo::notify() {
_ep->mod(_notify.getfd(), _data, EPOLLIN | EPOLLOUT);
}
到此,服务器中的业务模块已经完成他的使命,后续将响应数据发给客户端是网络模块的工作了。
获取了请求,当然需要回复响应,从上面知道业务模块是通过_ep->mod(_notify.getfd(), _data, EPOLLIN | EPOLLOUT)通知网络线程的,再加上之前分析“2.3.1接受客户端连接”以及“2.3.2接收RPC请求”的经验,我们知道,这里必须从NetThread::run()开始讲起,而且是进入if分支: |
void TC_EpollServer::NetThread::run() {
......
if (fd == (uint32_t)_notify.notifyFd()) {
//检查是否是通知消息
processPipe();
}
在NetThread::processPipe()中,先从线程安全队列中取得响应信息包:shared_ptr
图(2-29)服务端向客户端返回响应数据
这里用图解总结一下服务端的工作过程:
图(2-30)服务端工作图
TARS可以在考虑到易用性和高性能的同时快速构建系统并自动生成代码,帮助开发人员和企业以微服务的方式快速构建自己稳定可靠的分布式应用,从而令开发人员只关注业务逻辑,提高运营效率。多语言、敏捷研发、高可用和高效运营的特性使 TARS 成为企业级产品。
《微服务开源框架TARS的RPC源码解析》系列文章分上下两篇,对RPC调用部分进行源码解析。本文是下篇,我们带大家了解了一下TARS的服务端。欢迎阅读上篇《初识TARS C++客户端》
作者 herman
简介
本文源自herman的系列文章之一《鹅厂开源框架TARS之运营服务监控》。相关代码已按TARS开源社区最新版本更新。
TARS框架为用户提供了涉及到开发、运维、以及测试的一整套解决方案,帮助一个产品或者服务快速开发、部署、测试、上线。 它集可扩展协议编解码、高性能RPC通信框架、名字路由与发现、发布监控、日志统计、配置管理等于一体。其中发布监控,日志统计等运维功能依靠着TARS框架中的运维服务和工具,本文将对各运维服务的功能和作用进行分析,并通过部分源码帮助读者进一步理解TARS的工作原理。
如上图,可以清楚看到TARS框架的运维服务和工具:包括主控 Registry
、发布平台 Patch
、配置文件中心Config
、远程 Log
、指标统计 Stat
、业务信息 Property
、异常信息 Notify
等主要服务,再结合 TarsWeb 平台对这些服务进行可视化操作和运用,对开发和运维人员算是非常方便和人性化了。
TarsWeb 服务管理平台用于服务的管理与运维,功能包括:
TarsWeb 可视化管理平台对服务的管理和运维功能都是基于TARS框架运维服务的接口来提供服务的。
Registry 服务提供对象名称寻址服务,返回 IP:Port
列表,为客户端提供可用服务列表信息。
同时,它提供TARS框架核心管理功能 :服务部署、服务起停、服务状态信息查询、发布、配置管理、命令通知。
Registry
提供的接口有
同时主控提供了 AdminRegImp
(关联控制接口类),提供如下接口:
下面来看看 startServer
接口如何实现。
startServer
接口的实现中,先调用 updateServerState
更新服务的状态
int AdminRegistryImp::startServer(const string &application,
const string &serverName,
const string &nodeName, string &result,
tars::CurrentPtr current) {
TLOGDEBUG("AdminRegistryImp::startServer: "
<< application << "." << serverName << "_" << nodeName << "|"
<< current->getHostName() << ":" << current->getPort() << endl);
int iRet = EM_TARS_UNKNOWN_ERR;
try {
//更新数据库server的设置状态
DBPROXY->updateServerState(application, serverName, nodeName,
"setting_state", tars::Active);
然后判断 server 是否为 DNS:DNS直接通过db修改状态,非DNS则通过Node节点启动服务(下节介绍Node节点)
vector<ServerDescriptor> server;
server = DBPROXY->getServers(application, serverName, nodeName, true);
//判断是否为dns 非dns才需要到node启动服务
if (server.size() != 0 && server[0].serverType == "tars_dns") {
TLOGINFO(" '" + application + "." + serverName + "_" + nodeName +
"' is tars_dns server"
<< endl);
iRet = DBPROXY->updateServerState(application, serverName, nodeName,
"present_state", tars::Active);
} else {
NodePrx nodePrx = DBPROXY->getNodePrx(nodeName);
TLOGINFO("call node into " << __FUNCTION__ << "|" << application << "."
<< serverName << "_" << nodeName << "|"
<< current->getHostName() << ":"
<< current->getPort() << endl);
current->setResponse(false);
NodePrxCallbackPtr callback = new StartServerCallbackImp(
application, serverName, nodeName, current);
nodePrx->async_startServer(callback, application, serverName);
}
Patch服务提供服务的发布功能,用于实现服务发布包的上传、管理与发布,配合TarsWeb平台,能够管理所有需要发布的服务和文件的目录,如下
Patch 服务中定义了以下四个接口
/**
* 获取路径下所有文件列表信息
* @param path, 目录路径, 相对_directory的路径, 不能有..
* @param vector<FileInfo>, 文件列表信息
* @return int
*/
int listFileInfo(const string &path, vector<FileInfo> &vf,
TarsCurrentPtr current);
/**
* 下载文件
* @param file, 文件完全路径
* @param pos, 从什么位置开始下载
* @return vector<byte>, 文件内容
*/
int download(const string &file, int pos, vector<char> &vb,
TarsCurrentPtr current);
/**
* 准备好需要patch的文件,将发布的文件从上传目录复制到发布目录
* @param app, 应用名
* @param serverName, 服务名
* @param patchFile, 需要发布的文件名
* @return int, 0: 成功, <0: 失败
*/
int preparePatchFile(const string &app, const string &serverName,
const string &patchFile, TarsCurrentPtr current);
/**
* delete patch file
* @param app
* @param serverName
* @param patchFile
* @param current
* @return
*/
int deletePatchFile(const string &app, const string &serverName,
const string &patchFile, TarsCurrentPtr current);
Config
服务用于提供整套框架的配置文件保存和读取等操作,后台使用mysql
存储。配置拉取服务化,服务只需调用配置服务的接口即可获取到配置文件。
为了能灵活管理配置文件,配置文件分为几个级别:应用配置、Set配置、服务配置和节点配置。应用配置为最高一级的配置文件,它是多个服务配置提炼出来的公共配置,服务配置通过引用它来使用其配置内容。 Set配置是具体一个Set分组下所有服务的公共配置,在应用配置的基础上进行补充追加。 服务配置是具体一个服务下所有节点的公共配置,可以引用应用配置。 节点配置是一个应用节点的个性化配置,它和服务配置合并成为具体一个服务节点的配置。
下图是服务配置管理页面
在服务业务代码中,可以通过调用 addAppConfig
和 addConfig
来分别添加应用级配置文件和服务配置文件。接下来我们来对 addAppConfig
和 addConfig
的添加配置的过程进行分析。
addAppConfig
中会调用 RemoteConfig::getInstance()->addConfig(filename, result, true)
函数;getRemoteFile
函数从 Config
服务远程获取配置文件信息。 getRemoteFile
函数中,通过 _configPrx
向 Config
服务发起 rpc
调用实现,如下bool Application::addAppConfig(const string& filename) {
string result = "";
// true-只获取应用级别配置
if (RemoteConfig::getInstance()->addConfig(filename, result, true))
bool RemoteConfig::addConfig(const string& sFileName, string& buffer,
bool bAppConfigOnly) {
TC_LockT<TC_ThreadMutex> lock(_mutex);
try {
string sFullFileName = _basePath + FILE_SEP + sFileName;
string newFile = getRemoteFile(sFileName, bAppConfigOnly);
string RemoteConfig::getRemoteFile(const string& sFileName,
bool bAppConfigOnly) {
if (_configPrx) {
string stream;
int ret = -1;
for (int i = 0; i < 2; i++) {
try {
if (_setdivision.empty()) {
ret =
_configPrx->loadConfig(_app, (bAppConfigOnly ? "" : _serverName),
sFileName, stream, ServerConfig::Context);
} else {
ConfigImp::loadConfig
函数是 Config
的接口函数 ,如果是应用级别的配置文件,则执行 select id,config from t_config_files...
并且把结果使用string
的方式回传给调用者。
/**
* 加载配置文件
* param app :应用
* param server: server名
* param filename: 配置文件名
*
* return : 配置文件内容
*/
virtual int loadConfig(const std::string &app, const std::string &server,
const std::string &filename, string &config,
tars::TarsCurrentPtr current);
int ConfigImp::loadAppConfig(const std::string &appName,
const std::string &fileName, string &config,
tars::TarsCurrentPtr current) {
TLOGDEBUG("ConfigImp::loadAppConfig appName:" << appName << "|fileName:"
<< fileName << endl);
int iRet = 0;
config = "";
try {
string sNULL;
//查公有配置
string sSql =
"select id,config from t_config_files "
调用者获取到文件内容,根据文件的名字生成在本地以供后续程序使用文件:
string RemoteConfig::getRemoteFile(const string& sFileName,
bool bAppConfigOnly) {
......
std::ofstream out(newFile.c_str());
和 addAppConfig
相似, addConfig
也会调用 RemoteConfig::getInstance()->addConfig(filename, result, false)
。最后一个参数 bAppConfigOnly
设置为 false
,表示获取服务配置,而不是应用配置。
接着进行 rpc
调用
ret =
_configPrx->loadConfig(_app, (bAppConfigOnly ? "" : _serverName),
和加载应用级别的配置文件有所不同,这里调用 _configPrx
服务器的参数,增加了 _sServerName
,即 rpc
调用 loadConfig
接口的时候 server
参数不为空字符串,会改调用 loadConfigByHost
函数
int ConfigImp::loadConfig(const std::string &app, const std::string &server,
const std::string &fileName, string &config,
tars::TarsCurrentPtr current) {
// string nodeName = current->getContext()["node_name"];
// if(nodeName.empty())
// {
// nodeName = current->getIp();
// }
TLOGDEBUG("ConfigImp::loadConfig app:" << app << "|server:" << server
<< "|fileName:" << fileName << "|host:"
<< current->getHostName() << endl);
CHECKLIMIT(app, server, current->getHostName(), fileName);
if (!server.empty()) {
return loadConfigByHost(app + "." + server, fileName,
current->getHostName(), config, current);
} else {
return loadAppConfig(app, fileName, config, current);
}
}
loadConfigByHost
函数同样是通过mysql查询 t_config_files
表格,但修改了 where
查询条件,appName
替换为 appServerName
,即查询服务配置,如下
"where server_name = '" +
_mysqlConfig.escapeString(appServerName) +
至此,实现了从指定服务器拉取配置文件的功能 (这里还关系到引用配置等功能,多个同名文件还涉及到文件合并等,这里先不做详细说明)。
TARS框架的日志服务,用于接收远程日志。
提供两个接口,如下
/**
* 输出日志信息到指定文件
* @param app 业务名称
* @param server 服务名称
* @param file 日志文件名称
* @param format 日志输出格式
* @param buffer 日志内容
*
*
*/
void logger(const string& app, const string& server, const string& file,
const string& format, const vector<string>& buffer,
tars::TarsCurrentPtr current);
/**
* 获取数据
* @param info
* @param buffer
*
*/
void loggerbyInfo(const LogInfo& info, const vector<std::string>& buffer,
tars::TarsCurrentPtr current);
业务服务以框架层的 API 异步发送日志到日志服务器,例如 Stat 服务的 ReapSSDThread::run
中通过 FDLOG
发送日志
FDLOG("CountStat") << "stat ip:" << ServerConfig::LocalIp
<< "|Buffer Index:" << iBufferIndex
<< "|ReapSSDThread::run insert record num:"
<< iTotalNum << "|tast patch finished." << endl;
在TarsWeb上可以查询到相应的日志
FDLOG
的定义在库文件的 RemoteLogger.h
,定义如下:
#define FDLOG(x) (RemoteTimeLogger::getInstance()->logger(x)->any())
RemoteTimeLogger::getInstance()->logger(x)
函数生成并返回:TimeLogger*
,TimeLogger
的定义如下:
//定义按时间滚动的日志
typedef TC_Logger<TimeWriteT, TC_RollByTime> TimeLogger;
其中 TC_Logger
为日志基类模板:模板第一个参数 TimeWriteT
负责写Logger。在 Applicantion 服务启动的时候会调用设置远程日志服务器对象的服务,例如 log=tars.tarslog.LogObj
,然后调用 setLogInfo
设置本地信息
void Application::initializeServer() {
......
ServerConfig::Log = _conf.get("/tars/application/server<log>");
......
RemoteTimeLogger::getInstance()->setLogInfo(
_communicator, ServerConfig::Log, ServerConfig::Application,
ServerConfig::ServerName, ServerConfig::LogPath, setDivision(),
bLogStatReport);
这里底层实现函数如下,会获取远程日志服务器的地址
void RemoteTimeLogger::setLogInfo(const CommunicatorPtr &comm,
const string &obj, const string &sApp,
const string &sServer, const string &sLogpath,
const string &setdivision,
const bool &bLogStatReport) {
_app = sApp;
_server = sServer;
_logpath = sLogpath;
_comm = comm;
_setDivision = setdivision;
_logStatReport = bLogStatReport;
if (!obj.empty()) {
_logPrx = _comm->stringToProxy<LogPrx>(obj);
//单独设置超时时间
_logPrx->tars_timeout(3000);
if (_defaultLogger) {
_defaultLogger->getWriteT().setLogPrx(_logPrx);
}
}
//创建本地目录
TC_File::makeDirRecursive(_logpath + FILE_SEP + _app + FILE_SEP + _server);
}
具体写日志的时候,TimeWriteT
类重载了 operator()
,会调用远程日志服务的logger
接口写远程日志,从而实现了日志从本地到远程日志服务器的功能
void TimeWriteT::operator()(ostream &of,
const deque<pair<size_t, string> > &buffer) {
......
_logPrx->logger(DYEING_DIR, DYEING_FILE, "day", "%Y%m%d", vDyeingLog,
ServerConfig::Context);
Stat 服务用于监控服务进程的运行质量,提供服务模块间调用信息统计上报的功能。Stat采集的数据包含
以下为 Stat 服务定义的接口:
/**
* 上报模块间调用信息
* @param statmsg, 上报信息
* @return int, 返回0表示成功
*/
virtual int reportMicMsg(
const map<tars::StatMicMsgHead, tars::StatMicMsgBody>& statmsg,
bool bFromClient, tars::TarsCurrentPtr current);
/**
* 上报模块间调用采样信息
* @param sample, 上报信息
* @return int, 返回0表示成功
*/
virtual int reportSampleMsg(const vector<StatSampleMsg>& msg,
tars::TarsCurrentPtr current);
Property服务提供用户自定义属性上报功能,用于监控业务的运行质量情况和相关指标的统计。采集的数据包含
Property服务提供接口 reportPropMsg
进行服务特性上报,接口声明如下
/**
* 上报性属信息
* @param statmsg, 上报信息
* @return int, 返回0表示成功
*/
virtual int reportPropMsg(
const map<StatPropMsgHead, StatPropMsgBody>& propMsg,
tars::TarsCurrentPtr current);
官方文档定义为异常信息,用于获取服务的业务异常上报的report,输出的信息可以在TarsWeb平台里面看到。
具体服务业务代码中,通过获取 RemoteNotify
实例调用 report
上报异常信息,例如:
RemoteNotify::getInstance()->report("exit: " + string(ex.what()));
源码实现主要部分如下:
void RemoteNotify::report(const string &sMessage, const string &app,
const string &serverName, const string &sNodeName) {
try {
if (_notifyPrx) {
ReportInfo info;
info.eType = REPORT;
info.sApp = app;
info.sServer = serverName;
info.sSet = "";
info.sMessage = sMessage;
info.sNodeName = sNodeName;
_notifyPrx->async_reportNotifyInfo(NULL, info);
}
} catch (exception &ex) {
TLOGERROR("[RemoteNotify::notify error:" << ex.what() << "]" << endl);
} catch (...) {
TLOGERROR("[RemoteNotify::notify unknown error"
<< "]" << endl);
}
}
Notify 服务的实现,主要就是把数据插入 t_server_notifys_
数据库。这里默认都是使用异步的接口,否则效率可能会有问题,毕竟是直接操作db,相关操作源码如下
void NotifyImp::reportNotifyInfo(const tars::ReportInfo& info,
tars::TarsCurrentPtr current) {
......
string sql;
TC_Mysql::RECORD_DATA rd;
rd["application"] = make_pair(TC_Mysql::DB_STR, info.sApp);
rd["server_name"] = make_pair(TC_Mysql::DB_STR, info.sServer);
rd["container_name"] = make_pair(TC_Mysql::DB_STR, info.sContainer);
rd["server_id"] = make_pair(
TC_Mysql::DB_STR, info.sApp + "." + info.sServer + "_" + nodeId);
rd["node_name"] = make_pair(TC_Mysql::DB_STR, nodeId);
rd["thread_id"] = make_pair(TC_Mysql::DB_STR, info.sThreadId);
if (!info.sSet.empty()) {
vector<string> v = TC_Common::sepstr<string>(info.sSet, ".");
if (v.size() != 3 || (v.size() == 3 && (v[0] == "*" || v[1] == "*"))) {
TLOGERROR("NotifyImp::reportNotifyInfo bad set name:" << info.sSet
<< endl);
} else {
rd["set_name"] = make_pair(TC_Mysql::DB_STR, v[0]);
rd["set_area"] = make_pair(TC_Mysql::DB_STR, v[1]);
rd["set_group"] = make_pair(TC_Mysql::DB_STR, v[2]);
}
} else {
rd["set_name"] = make_pair(TC_Mysql::DB_STR, "");
rd["set_area"] = make_pair(TC_Mysql::DB_STR, "");
rd["set_group"] = make_pair(TC_Mysql::DB_STR, "");
}
rd["result"] = make_pair(TC_Mysql::DB_STR, info.sMessage);
rd["notifytime"] = make_pair(TC_Mysql::DB_INT, "now()");
string sTable = "t_server_notifys";
try {
_mysqlConfig.insertRecord(sTable, rd);
} catch (TC_Mysql_Exception& ex) {
服务节点可以认为是服务所实际运行的一个具体的操作系统实例,可以是物理主机或者虚拟主机、云主机。每台服务节点上均有一个Node服务和多个业务服务,Node服务会对业务服务进行统一管理,包括:
Node
服务提供的主要接口:
Registry 服务的 startServer
会调用本节中 Node 服务的 startServer
接口,startServer
再通过调用 CommandStart
类的 startByScript
函数实现服务的启动,该派生自 ServerCommand
,同理也有 CommandStop
, CommandPatch
等,具体详见源码。
startByScript
通过拉取服务的启动脚本启动服务,启动脚本一般随服务conf存储在DB中,然后调用以下函数执行脚本启动服务
_serverObjectPtr->getActivator()->activate(sStartScript, sMonitorScript,
sResult);
最终通过C语言exec
系列函数执行启动脚本启动服务。
bool Activator::doScript(const string& strScript, string& strResult,
map<string, string>& mResult, const string& sEndMark) {
......
string sCmd = strScript + sRedirect + " " + _server->getServerId() + " &";
FILE* fp = popen2(sCmd.c_str(), "r");
FILE* Activator::popen2(const char* cmdstring, const char* type) {
......
execl(SHELL, "sh", "-c", cmdstring, (char*)0);
除了以上主要的管理服务的接口外,还有用于获取Node服务信息的接口,如 getState
用于获取指定服务状态,实现如下:
ServerState NodeImp::getState(const string& application,
const string& serverName, string& result,
TarsCurrentPtr current) {
string serverId = application + "." + serverName;
result = string(__FUNCTION__) + " [" + serverId + "] ";
ServerObjectPtr pServerObjectPtr =
ServerFactory::getInstance()->getServer(application, serverName);
if (pServerObjectPtr) {
result += "succ";
return pServerObjectPtr->getState();
}
result += "server not exist";
NODE_LOG(serverId)->error() << "NodeImp::getState " << result << endl;
return tars::Inactive;
}
本文介绍分析了TARS框架中如何通过不同运维服务工具实现对服务的运营和监控,为开发和运维人员提供方便、人性化的服务管理和维护功能。
TARS可以在考虑到易用性和高性能的同时快速构建系统并自动生成代码,帮助开发人员和企业以微服务的方式快速构建自己稳定可靠的分布式应用,从而令开发人员只关注业务逻辑,提高运营效率。多语言、敏捷研发、高可用和高效运营的特性使 TARS 成为企业级产品。