RPC

include: co/so/rpc.h.

co/rpc 是一个类似 grpcthrift 的 RPC 框架,只不过它使用 JSON 格式传输数据,而不是 protobuf, thrift 等二进制协议。

#rpc::Service

rpc::Service 是一个接口类,它表示一个 service,一个 RPC server 中可以包含多个 service。

#Service::name

virtual const char* name() const = 0;
  • 返回 service 的名字,该名字包含包名,如 “xx.yy.HelloWorld”。
  • RPC 请求中,必须带一个 **“service” **字段,它的值就是 service 的名字。

#Service::process

virtual void process(const Json& req, Json& res) = 0;
  • 该方法处理 RPC 请求,结果被填充到参数 res 中。
  • server 接收到 RPC 请求时,根据 req 中的 “service” 字段找到对应的 service,然后调用该 service 的 process() 方法处理该请求。

#rpc::Server

rpc::Server 基于 tcp::Server 实现,它支持 SSL 以及用户名与密码认证。几年前的测试结果显示,单线程 QPS 可以达到 120k+。

#Server::Server

Server();
  • 默认构造函数,用户不需要关心。

#Server::add_service

void add_service(rpc::Service* s);
  • 添加 service,参数 s 必须是用 operator new 动态创建的。
  • 用户可以多次调用此方法,添加多个 service,不同 service 必须有不同的名字。

#Server::add_userpass

void add_userpass(const char* user, const char* pass);
void add_userpass(const char* s);
  • 第 1 个版本添加一对用户名、密码。用户可以多次调用此方法,添加多个用户名、密码。

  • 第 2 个版本,s 是一个包含用户名、密码的 JSON 字符串,它的值类似于 {"user1":"pass1", "user2":"pass2"}。用户可以调用此方法一次性添加多对用户名、密码。

  • rpc::Server 不会保存密码明文,而是保存该密码的 md5 值。

  • 若服务端设置了用户名、密码,客户端连接后,必须用一对用户名、密码进行认证。

  • 示例

DEF_string(password, "", "password");
DEF_string(userpass, "", "usernames and passwords");

rpc::Server s;
s.add_userpass("alice", FLG_password.c_str());
FLG_password.safe_clear();

s.add_userpass(FLG_userpass.c_str());
FLG_userpass.safe_clear();
  • 上面的例子中,定义一个 flag 保存密码,默认值为空,必须从命令行或配置文件传入密码。
  • 调用 add_userpass() 方法后,立即调用 safe_clear() 方法将密码内容清零。

#Server::start

void start(const char* ip, int port, const char* key=NULL, const char* ca=NULL);
  • 启动 RPC server,此方法不会阻塞当前线程。
  • 参数 ip 是服务器 ip,可以是 IPv4 或 IPv6 地址,参数 port 是服务器端口。
  • 参数 key 是存放 SSL private key 的 PEM 文件路径,参数 ca 是存放 SSL 证书的 PEM 文件路径,默认 key 和 ca 是 NULL,不启用 SSL。

#Server::exit

void exit();
  • v2.0.3 新增。
  • 退出 RPC server,关闭 listening socket,不再接收新的连接。
  • 此方法不会关闭之前已经建立的连接。

#RPC server 示例

#定义 proto 文件

// hello_world.proto

package xx  // namespace xx

service HelloWorld {
    hello,
    world,
}
  • 上面是一个简单的 proto 文件,# 或 // 表示注释。
  • package xx 指定包名,在 C++ 中表示将代码生成到命名空间 xx 中,可以用 package xx.yy.zz 生成嵌套命名空间。
  • service HelloWorld 定义一个继承 rpc::Service 的 service 类,它的名字是 “xx.HelloWorld”。该 service 提供 hello, world 两个 RPC 方法。
  • 可以看到,co/rpc 的 proto 文件比 protobuf 简单得多。由于 RPC 请求及响应都是 JSON,用户不需要定义各种结构体
  • **一个 proto 文件只能定义一个 service。**proto 文件中,在 service 定义后面的内容都会被忽略。一般可以在 service 定义后面,加上 RPC 方法的 JSON 参数示例,如下所示:
package xx  // namespace xx

// class HelloWorld : public rpc::Service
service HelloWorld {  
    hello,  // void HelloWorld::hello(const Json& req, Json& res);
    world,  // void HelloWorld::world(const Json& req, Json& res);
}

// All the following are ignored by the code generator.
// param
hello.req {
    "service": "xx.HelloWorld",
    "method": "hello"
}

hello.res {
    "err": 200,
    "errmsg": "ok"
}

world.req {
    "service": "xx.HelloWorld",
    "method": "world"
}

world.res {
    "err": 200,
    "errmsg": "ok"
}

#生成 RPC 框架代码

gen 是 co 提供的 RPC 代码生成器,它可以生成 service 及 client 的相关代码。

xmake -b gen             # 构建 gen
cp gen /usr/local/bin    # 将 gen 放到 /usr/local/bin 目录
gen hello_world.proto    # 生成代码
gen *.proto              # 批量生成

hello_world.proto 生成的文件是 hello_world.h,下面是该文件中 service 类的定义:

// Autogenerated. DO NOT EDIT. All changes will be undone.

#pragma once

#include "co/so/rpc.h"

namespace xx {

class HelloWorld : public rpc::Service {
  public:
    typedef void (HelloWorld::*Fun)(const Json&, Json&);

    HelloWorld() : _name("xx.HelloWorld") {
        _methods[hash64("hello")] = &HelloWorld::hello;
        _methods[hash64("world")] = &HelloWorld::world;
    }

    virtual ~HelloWorld() {}

    virtual const char* name() const {
        return _name.c_str();
    }

    virtual void process(const Json& req, Json& res) {
        json::Value method = req["method"];
        if (!method.is_string()) {
            res.add_member("err", 400);
            res.add_member("errmsg", "req has no method");
            return;
        }

        auto it = _methods.find(hash64(method.get_string(), method.string_size()));
        if (it == _methods.end()) {
            res.add_member("err", 404);
            res.add_member("errmsg", "method not found");
            return;
        }

        (this->*(it->second))(req, res);
    }

    virtual void hello(const Json& req, Json& res) = 0;

    virtual void world(const Json& req, Json& res) = 0;

  private:
    std::unordered_map<uint64, Fun> _methods;
    fastring _name;
};

} // xx
  • 可以看到,HelloWorld 类继承于 rpc::Service,它已经实现了 rpc::Service 中的 name() 与 process() 方法。
  • process() 方法中,会根据 req 中的 “method” 字段,调用相应的 RPC 方法。
  • 用户只需要继承 HelloWorld 类,实现 hello 与 world 两个方法即可。

#业务实现

#include "hello_world.h"

namespace xx {

class HelloWorldImpl : public HelloWorld {
  public:
    HelloWorldImpl() = default;
    virtual ~HelloWorldImpl() = default;

    virtual void hello(const Json& req, Json& res) {
        res.add_member("method", "hello");
        res.add_member("err", 200);
        res.add_member("errmsg", "ok");
    }

    virtual void world(const Json& req, Json& res) {
        res.add_member("method", "world");
        res.add_member("err", 200);
        res.add_member("errmsg", "ok");
    }
};

} // xx
  • 上面只是一个很简单的例子,实际应用中,一般需要根据 req 中的参数,进行相应的业务处理,然后填充 res。

#启动 RPC server

rpc::Server s;
s.add_userpass("alice", "nice");
s.add_service(new xx::HelloWorldImpl);

// without ssl
s.start("127.0.0.1", 7788);

// with ssl
s.start("127.0.0.1", 7788, "privkey.pem", "certificate.pem");
  • 第 2 行调用 add_userpass() 添加一对用户名密码,是可选的。
  • 第 3 行调用 add_service() 添加 HelloWorld service 的实现。
  • 第 6 行与第 9 行启动 RPC server,第 9 行多加了两个参数,指定用 SSL 传输。

#rpc::Client

#Client::Client

Client(const char* ip, int port, bool use_ssl=false);
  • 构造函数。参数 ip 是服务器的 ip,可以是域名、IPv4 或 IPv6 地址;参数 port 是服务器端口;参数 use_ssl 表示是否启用 SSL 传输,默认为 false,不启用 SSL。
  • rpc::Client 构建时,并没有建立连接。

#Client::~Client

Client::~Client();
  • 析构函数,关闭连接。

#Client::call

void call(const Json& req, Json& res);
  • 执行 RPC 请求,必须在协程中调用。
  • 参数 req 中必须带 “service”“method” 两个字段。
  • 参数 res 是 RPC 请求的响应结果。
  • 若 RPC 请求没有发送出去,或者没有收到服务端的响应,res 将不会被填充。
  • 此方法在发送 RPC 请求前,会检查连接状态,未连接时,先建立连接。

#Client::close

void close();
  • 关闭连接,多次调用此函数是安全的。

#Client::ping

void ping();
  • 给 rpc::Server 发送心跳。

#Client::set_userpass

void set_userpass(const char* user, const char* pass);
  • 设置用户名、密码。
  • 若服务端设置了用户名与密码,则客户端需要调用此方法设置用户名、密码,以在连接后进行用户名与密码认证。
  • 此方法只需调用一次,若多次调用,后面的值会覆盖之前的值。
  • 此方法必须在发起 RPC 请求前调用。

#RPC client 示例

#使用 rpc::Client

DEF_bool(use_ssl, false, "use ssl if true");
DEF_string(password, "", "password");
DEF_int32(n, 3, "request num");

void client_fun() {
    rpc::Client c("127.0.0.1", 7788, FLG_use_ssl);
    c.set_userpass("alice", FLG_password.c_str());
    FLG_password.safe_clear(); // clear password in the memory

    for (int i = 0; i < FLG_n; ++i) {
        Json req, res;
        req.add_member("service", "xx.HelloWorld");
        req.add_member("method", "hello");
        c.call(req, res);
        co::sleep(1000);
    }

    c.close();
}

go(client_fun);
  • RPC 请求中,“service” 与 “method” 是必带的字段。
  • 使用 rpc::Client,需要用户手动设置 “service” 与 “method” 两个字段。

#使用自动生成的 HelloWorldClient

前面 hello_world.proto 生成的代码中,还包含了一份客户端代码:

class HelloWorldClient {
  public:
    HelloWorldClient(const char* ip, int port, bool use_ssl=false)
        : _rpc_cli(ip, port, use_ssl), _serv_name("xx.HelloWorld") {
    }

    HelloWorldClient(const HelloWorldClient& c)
        : _rpc_cli(c._rpc_cli), _serv_name(c._serv_name) {
    }

    ~HelloWorldClient() {}

    void set_userpass(const char* user, const char* pass) {
        _rpc_cli.set_userpass(user, pass);
    }

    void close() {
        _rpc_cli.close();
    }

    Json make_req_hello() {
        Json req;
        req.add_member("service", _serv_name);
        req.add_member("method", "hello");
        return req;
    }

    Json make_req_world() {
        Json req;
        req.add_member("service", _serv_name);
        req.add_member("method", "world");
        return req;
    }

    Json perform(const Json& req) {
        Json res;
        _rpc_cli.call(req, res);
        return res;
    }

    void ping() {
        _rpc_cli.ping();
    }

  private:
    rpc::Client _rpc_cli;
    fastring _serv_name;
};
  • HelloWorldClient 只是简单的包装了 rpc::Client,它比 rpc::Client 稍微方便一点,不用手动设置 “service” 与 “method” 字段。
#include "hello_world.h"

std::unique_ptr<xx:HelloWorldClient> proto;

co::Pool p(
    []() { return (void*) new xx::HelloWorldClient(*proto); },
    [](void* p) { delete (xx::HelloWorldClient*) p; }
);

void client_fun() {
    co::PoolGuard<xx::HelloWorldClient> c(p);

    for (int i = 0; i < 10; ++i) {
        Json req = c->make_req_hello();
        req.add_member("xx", "123");
        Json res = c->perform(req);
        co::sleep(1000);
    }
}

proto.reset(new xx::HelloWorldClient("127.0.0.1", 7788));
proto->set_userpass("alice", "nice");

for (int i = 0; i < 8; ++i) {
    go(client_fun);
}
  • 上面的例子,使用 co::Pool 保存客户端,多个协程可以共享这些客户端。
  • co::PoolGuard 创建时自动从 co::Pool 中取出一个空闲的客户端,析构时自动将该客户端放回 co::Pool。
  • co::Pool 的 ccb 利用拷贝构造的方式从 proto 复制一个客户端,这样只需要在 proto 客户端中设置一次 ip, port 及用户名与密码。
  • HelloWorldClient 提供的 make_req_xxx() 方法,返回一个已经填充了 “service” 与 “method” 字段的 Json 对象,为用户免去手动设置的麻烦。
  • HelloWorldClient 提供的 perform() 方法,执行 RPC 请求,并返回 RPC 响应结果。

#配置项

#rpc_conn_idle_sec

DEF_int32(rpc_conn_idle_sec, 180, "#2 connection may be closed if no data...");
  • rpc::Server 空闲连接超时时间,单位为秒。一个连接在此时间内没有收到任何数据,server 可能会关掉此连接。

#rpc_conn_timeout

DEF_int32(rpc_conn_timeout, 3000, "#2 connect timeout in ms");
  • rpc::Client 连接超时时间,单位为毫秒。

#rpc_log

DEF_bool(rpc_log, true, "#2 enable rpc log if true");
  • 是否打印 RPC 日志,默认为 true,rpc::Server 与 rpc::Client 会打印 RPC 请求与响应。

#rpc_max_idle_conn

DEF_int32(rpc_max_idle_conn, 128, "#2 max idle connections");
  • rpc::Server 最大空闲连接数,默认为 128,超过这个数量时,server 会关闭部分空闲连接。

#rpc_max_msg_size

DEF_int32(rpc_max_msg_size, 8 << 20, "#2 max size of rpc message, default: 8M");
  • RPC 消息的最大长度,默认为 8M。

#rpc_recv_timeout

DEF_int32(rpc_recv_timeout, 1024, "#2 recv timeout in ms");
  • RPC 接收超时时间,单位为毫秒。

#rpc_send_timeout

DEF_int32(rpc_send_timeout, 1024, "#2 send timeout in ms");
  • RPC 发送超时时间,单位为毫秒。