Bài viết sẽ bàn về một framework C++ RP, ko yêu cầu bước code generation để glue code. Trước khi đi vào chi tiết, hãy tìm hiểu một số tính năng cơ bản của công cụ:
Đoạn mã nguồn trên vẫn chưa hoàn chỉnh. Mục tiêu của nó chỉ là để thể hiện nền tảng xây dựng framework. Hơn nữa, để rút ngắn, đoạn code là hỗn hợp của code từ repo lúc viết và custom sample code, nên có thể sẽ xảy ra lỗi.
Một số đoạn trong mã nguồn (không liên quan trực tiếp đến nội dung ta đang bàn đến) sẽ chỉ được xem xét nhanh mà không quá quan tâm đến hiệu năng. Bất cứ cải tiến nào sẽ được thêm vào repo mã nguồn sau.
Modern C++ (C++11/14)
Yêu cầu ít nhất Visual Studio 2015. Clang/GCC cũng tạm ổn.
Type-safe
Framework xác định những RPC call không hợp lệ trong lúc compile, như tên RPC không rõ, số thông số sai, hoặc kiểu thông số sai.
API tương đối nhỏ, không dài dòng
Nhiều cách xử lý RPC reply
Bộ xử lý không đồng bộ (Asynchronous handler)
Futures
Client có thể xác định nếu một RPC gây exception bên phía server
Cho phép sử dụng (gần như) bất kỳ kiểu RPC parameter nào
Giả sử nguời dùng chạy đúng hàm để xử lý kiểu parameter đó
RPC hai chiều (server có thể call RPC trên client)
Thông thường, client code không thể tin tưởng được, nhưng vì framework nhằm sử dụng giữa các bên uy tín, nên đây không phải là vấn đề quá lớn
Không xâm nhập
Một object dùng cho RPC call không cần biết bất cứ thứ gì về RPC hoặc network cả.
Từ đó có thể bọc các class bên thứ ba cho RPC call.
Chi phí băng thông tối thiểu mỗi RPC call
Không phụ thuộc bên ngoài
Dù supplied transport (trong repo mã nguồn) dùng Asio/Boost Asio, nhưng bản thân framework không phụ thuộc vòa đó. Bạn có thể tự cắm transport của mình.
Không có tính năng bảo mật
Vì frame work được thiết kế sử dụng cho các bên tin tưởng lẫn nhau (như giữa server chẳng hạn).
Ứng dụng có thể tự chọn transport riêng, nên sẽ có cơ hội encrypt bất cứ thứ gì nếu cần thiết.
Mặc dù mã nguồn vẫn chưa hoàn thiện, bài viết vẫn rất nặng về code. Code được tách thành nhiều phần nhỏ và một phần sau sẽ dựa theo phần trước. Để bạn hình dung được, sau đây là một mẫu code hoạt động hoàn chỉnh sử dụng repo mã nguồn tại thời điểm viết bài:
//////////////////////////////////////////////////////////////////////////// Useless RPC-agnostic class that performs calculations.//////////////////////////////////////////////////////////////////////////classCalculator{public:doubleadd(double a,double b){return a + b;}};//////////////////////////////////////////////////////////////////////////// Define the RPC table for the Calculator class// This needs to be seen by both the server and client code//////////////////////////////////////////////////////////////////////////#define RPCTABLE_CLASS Calculator#define RPCTABLE_CONTENTS
REGISTERRPC(add)#include"crazygaze/rpc/RPCGenerate.h"//////////////////////////////////////////////////////////////////////////// A Server that only accepts 1 client, then shuts down// when the client disconnects//////////////////////////////////////////////////////////////////////////voidRunServer(){
asio::io_service io;// Start thread to run Asio's the io_service// we will be using for the server
std::thread th = std::thread([&io]{
asio::io_service::work w(io);
io.run();});// Instance we will be using to serve RPC calls.// Note that it's an object that knows nothing about RPCsCalculator calc;// start listening for a client connection.// We specify what Calculator instance clients will use,auto acceptor =AsioTransportAcceptor<Calculator,void>::create(io, calc);// Start listening on port 9000.// For simplicity, we are only expecting 1 clientusingConType=Connection<Calculator,void>;
std::shared_ptr<ConType> con;
acceptor->start(9000,[&io,&con](std::shared_ptr<ConType> con_){
con = con_;// Since this is just a sample, close the server once the first client// disconnectsreinterpret_cast<BaseAsioTransport*>(con->transport.get())->setOnClosed([&io]{ io.stop();});});
th.join();}//////////////////////////////////////////////////////////////////////////// A client that connects to the server, calls 1 RPC// then disconnects, causing everything to shut down//////////////////////////////////////////////////////////////////////////voidRunClient(){// Start a thread to run our Asio io_service
asio::io_service io;
std::thread th = std::thread([&io]{
asio::io_service::work w(io);
io.run();});// Connect to the server (localhost, port 9000)auto con =AsioTransport<void,Calculator>::create(io,"127.0.0.1",9000).get();// Call one RPC (the add method), specifying an asynchronous handler for// when the result arrives
CZRPC_CALL(*con,add,1,2).async([&io](Result<double> res){
printf("Result=%fn", res.get());// Prints 3.0// Since this is a just a sample, stop the io_service after we get// the result,// so everything shuts down
io.stop();});
th.join();}// For testing simplicity, run both the server and client on the same machine,voidRunServerAndClient(){auto a = std::thread([]{RunServer();});auto b = std::thread([]{RunClient();});
a.join();
b.join();}
Đây đa phần là setup code, vì transport được nhắc đến sử dụng Asio. Bản thân RPC có thể đơn giản như:
// RPC call using asynchronous handler to handle the result
CZRPC_CALL(*con,add,1,2).async([](Result<double> res){
printf("Result=%fn", res.get());// Prints 3.0});// RPC call using std::future to handle the resultResult<double> res = CZRPC_CALL(*con,add,1,2).ft().get();
printf("Result=%fn", res.get());// Prints 3.0
Tôi đang làm việc với một game có code named G4 cũng được vài năm rồi, trò chơi cung cấp cho player những chiếc máy tính mô phỏng nhỏ in-game mà thậm chí có thể code được. Điều này yêu cầu tôi phải cho nhiều server cùng chạy song song:
Gameplay Server(s)
VM Server(s) (mô phỏng máy tính ingame)
Để vẫn có thể mô phỏng máy tính ngay cả khi người chơi hiện không online
VM Disk Server(s)
Xử lý lưu trữ của máy tính ingame, như đĩa mềm hoặc ổ cứng.
Database server(s)
Login server(s)
Tất cả những server này cần trao đổi dữ liệu, vì vậy ta cần một RPC framework linh hoạt.
Ban đầu, giải pháp tôi tìm đến là tag method của một class với attribute cụ thể, rồi từ parser gốc Clang (clReflect) tạo bất cứ serialization cần thiết nào, và glue code.
Dù cách này vẫn áp dụng được, nhưng nhiều năm qua tôi vẫn luôn canh cánh làm cách nào tôi có thể dùng tính năng C++11/14 mới để tạo một minimal type safe C++ RPC framework, một phương pháp không cần bước code generation cho glue code, mà vẫn giữa một API chấp nhận được.
Với serialization của non-fundamental type, code genaration vẫn còn hữu dụng, vậy nên tôi không cần phải xác định thủ công làm sao để serialize tất cả các field của một struct/class cho trước. (dù khâu xác định thủ công này cũng chả phiền phức cho lắm).
RPC Parameters
Xét một function, để có type safe RPC call, ta phải làm được:
Xác định lúc compile nếu function là một RPC function hợp lệ (đúng số thông ố, đúng kiểu parameter,…)
Kiểm tra liệu các parameter được supply khớp (hoặc convert được) sang thứ mà function signature chỉ ra.
Serialize tất cả parameter.
Deserialize tất cả parameter.
Call những function mong muốn
Parameter Traits
Vấn đề đầu tiên bạn đối mặt là việc xác định kiểu parameter nào được chấp nhập. Một số RPC framework chỉ chấp nhật một số kiểu giới hạn, như Thrift. Hãy xem thử vấn đề.
Vậy làm cách nào ta bắt compile kiểm tra theo parameter? Những kiểu fundamental type khá dễ và chắc chắn phải được framework hỗ trợ. Một bản copy dumb memory là đủ cho trường hợp như thế này, trừ khi bạn muốn cắt giảm số bit cần thiết để hy sinh một ít hiệu năng lấy bandwidth. Vậy chòn với các kiểu phức tạp như std::string, std::vector, hoặc chính class của mình? Còn pointer, reference, const reference, rvalue thì sao?
Ta có thể lấy một số ý tưởng từ những thứ C++ Standard Library đang làm trong type_traits header. Ta cần truy vấn một kiểu cho trước dựa theo tính chất RPC của nó. Hãy thử biến ý tưởng trên thành một template class `ParamTraits<T>`, với layout như sau:
Member constants
valid
true if T is valid for RPC parameters, false otherwise
Member types
store_type
Type dùng để giữ copy tạm thời cần cho deserializing
Member functions
write
Ghi parameter đến một stream
read
Đọc parameter vào một store_type
get
Xét một store_type parameter, kết quả nó trả có thể được chuyển đến RPC function làm parameter
Trong ví dụ này,hãy thực thi `ParamTraits<T>` cho kiểu thuật toán, trong trường hợp ta có stream class với method `write` và `read`:
namespace cz {namespace rpc {// By default, all types for which ParamTraits is not specialized are invalidtemplate<typename T,typename ENABLED =void>structParamTraits{using store_type =int;staticconstexprbool valid =false;};// Specialization for arithmetic typestemplate<typename T>structParamTraits<
T,typename std::enable_if<std::is_arithmetic<T>::value>::type>{using store_type =typename std::decay<T>::type;staticconstexprbool valid =true;template<typename S>staticvoid write(S& s,typename std::decay<T>::type v){
s.write(&v,sizeof(v));}template<typename S>staticvoid read(S& s, store_type& v){
s.read(&v,sizeof(v));}static store_type get(store_type v){return v;}};}// namespace rpc}// namespace cz
`ParamTraits<T>` còn được sử dụng để kiểm tra nếu return type hợp lệ, và vì một void function hợp lệ, ta cần phải chuyên hóa `ParamTraits` cho void nữa.
namespace cz {namespace rpc {// void type is validtemplate<>structParamTraits<void>{staticconstexprbool valid =true;using store_type =void;};}// namespace rpc}// namespace cz
Một điều khá kỳ lạ là khi chuyên hóa cho `void` là còn chỉ định một `store_type` nữa. Chúng ta không thể sử dụng nó để lưu trữ bất cư thứ gì, nhưng sẽ làm một vìa đoạn template code về sau dễ dàng hơn.
Với những ví dụ `ParamTraits`, reference không phải là RPC parameter hợp lệ. Trong thực tế, bạn ít nhất sẽ muốn cho phép const reference, đặc biệt với fundamental type. Bạn có thể thêm một tweak để kích hoạt hỗ trợ cho `const T&` cho bất kỳ T hợp lệ nào khi ứng dụng của bạn cần đến.
// Make "const T&" valid for any valid T#define CZRPC_ALLOW_CONST_LVALUE_REFS
namespace cz {namespace rpc {template<typename T>structParamTraits<const T&>:ParamTraits<T>{static_assert(ParamTraits<T>::valid,"Invalid RPC parameter type. Specialize ParamTraits if ""required.");};}}
Bạn cũng có thể thực hiện những tweak tương tự để kích hoạt hỗ trợ `T&` hoặc `T&&` nếu cần thiết, mặc dù nếu function thay đổi đến những parameter này, những thay đổi đó sẽ mất đi.
Hãy thử thêm hỗ trợ cho các kiểu phức tạp như `std::vector<T>`. Để hỗ trợ `std::vector<T>`, ta buộc phải hỗ trợ thêm cả `T`:
namespace cz {namespace rpc {template<typename T>structParamTraits<std::vector<T>>{using store_type = std::vector<T>;staticconstexprbool valid =ParamTraits<T>::valid;static_assert(ParamTraits<T>::valid ==true,"T is not valid RPC parameter type.");// std::vector serialization is done by writing the vector size, followed by// each elementtemplate<typename S>staticvoid write(S& s,const std::vector<T>& v){int len =static_cast<int>(v.size());
s.write(&len,sizeof(len));for(auto&& i : v)ParamTraits<T>::write(s, i);}template<typename S>staticvoid read(S& s, std::vector<T>& v){int len;
s.read(&len,sizeof(len));
v.clear();while(len--){
T i;ParamTraits<T>::read(s, i);
v.push_back(std::move(i));}}static std::vector<T>&&get(std::vector<T>&& v){return std::move(v);}};}// namespace rpc}// namespace cz// A simple testvoid testVector(){
TEST(ParamTraits<std::vector<int>>::valid);// true// true if support for const refs was enabled
TEST(ParamTraits<const std::vector<int>&>::valid);}
Để tiện hơn, chúng ta có thể dùng toán tử `<<` và `>>` với class `stream` (không thế hiện ở đây). Với những toán tử này, hãy đơn thuần call các function `ParamTraits<T> read` và `ParamTraits<T> write` tương ứng.
Giờ đây ta đã có thể kiểm tra xem một type nhất định có được phép cho RPC parameter hay không, chúng ta có thể build trên đó kiểm tra xem một function có được dùng cho RPC hay không. Cách này được thực thi với variadic template.
Trước hết hãy tạo một template cho ta biết nếu nhóm parameter có hợp lệ hay không.
namespace cz {namespace rpc {//// Validate if all parameter types in a parameter pack can be used for RPC// calls//template<typename... T>structParamPack{staticconstexprbool valid =true;};template<typenameFirst>structParamPack<First>{staticconstexprbool valid =ParamTraits<First>::valid;};template<typenameFirst,typename...Rest>structParamPack<First,Rest...>{staticconstexprbool valid =ParamTraits<First>::valid &&ParamPack<Rest...>::valid;}}// namespace rpc}// namespace cz// Usage example:void testParamPack(){
TEST(ParamPack<>::valid);// true (No parameters is a valid too)
TEST((ParamPack<int,double>::valid));// true
TEST((ParamPack<int,int*>::valid));// false}UsingParamPack, we can now create a FunctionTraitstemplate to query a function's properties.
namespace cz {
namespace rpc {
template <class F>
struct FunctionTraits {};
// For free function pointers
template <class R, class... Args>
struct FunctionTraits<R (*)(Args...)> : public FunctionTraits<R(Args...)> {};
// For method pointers
template <class R, class C, class... Args>
struct FunctionTraits<R (C::*)(Args...)> : public FunctionTraits<R(Args...)> {
using class_type = C;
};
// For const method pointers
template <class R, class C, class... Args>
struct FunctionTraits<R (C::*)(Args...) const>
: public FunctionTraits<R(Args...)> {
using class_type = C;
};
template <class R, class... Args>
struct FunctionTraits<R(Args...)> {
// Tells if both the return type and parameters are valid for RPC calls
static constexpr bool valid =
ParamTraits<R>::valid && ParamPack<Args...>::valid;
using return_type = R;
// Number of parameters
static constexpr std::size_t arity = sizeof...(Args);
// A tuple that can store all parameters
using param_tuple = std::tuple<typename ParamTraits<Args>::store_type...>;
// Allows us to get the type of each parameter, given an index
template <std::size_t N>
struct argument {
static_assert(N < arity, "error: invalid parameter index.");
using type = typename std::tuple_element<N, std::tuple<Args...>>::type;
};
};
} // namespace rpc
} // namespace cz
// A simple test...
struct FuncTraitsTest {
void func1() const {}
void func2(int) {}
int func3(const std::vector<int>&) { return 0; }
int* func4() { return 0; }
};
void testFunctionTraits() {
TEST(FunctionTraits<decltype(&FuncTraitsTest::func1)>::valid); // true
TEST(FunctionTraits<decltype(&FuncTraitsTest::func2)>::valid); // true
TEST(FunctionTraits<decltype(&FuncTraitsTest::func3)>::valid); // true
TEST(FunctionTraits<decltype(&FuncTraitsTest::func4)>::valid); // false
}
`FunctionTraits` cho ta một vài tính chất sẽ được sử dụng sau. Lưu ý với ví dụ rằng `FunctionTraits::param_tuple` build trên `ParamTraits<T>::store_type`. Việc này cần thiết, vì ở một thời điểm nào đó, chung ta cần cách ể deserialize tất cả parameter thành một tuple trước khi call function.
Serialization
Vì chúng ta giờ đây có code cần thiết để truy vấn parameter, return type và validating function, chúng ta có thể gộp code lại để serialize một function call. Hơn nữa, đây chính là type safe. Nó sẽ không complie nếu nhập sai số hoặc kiểu parameter, hoặc nếu bản thân function không hợp lệ cho RPC (không hỗ trợ các return/parameter type).
namespace cz {namespace rpc {namespace details {template<typename F,int N>structParameters{template<typename S>staticvoid serialize(S&){}template<typename S,typenameFirst,typename...Rest>staticvoid serialize(S& s,First&& first,Rest&&... rest){usingTraits=ParamTraits<typenameFunctionTraits<F>::template argument<N>::type>;Traits::write(s, std::forward<First>(first));Parameters<F, N +1>::serialize(s, std::forward<Rest>(rest)...);}};}// namespace detailstemplate<typename F,typename...Args>void serializeMethod(Stream& s,Args&&... args){usingTraits=FunctionTraits<F>;static_assert(Traits::valid,"Function signature not valid for RPC calls. Check if ""parameter types are valid");static_assert(Traits::arity ==sizeof...(Args),"Invalid number of parameters for RPC call.");
details::Parameters<F,0>::serialize(s, std::forward<Args>(args)...);}}// namespace rpc}// namespace cz//// A simple test// structSerializeTest{void func1(int,const std::vector<int>){}void func2(int*){}};void testSerializeCall(){Stream s;
serializeMethod<decltype(&SerializeTest::func1)>(s,1,
std::vector<int>{1,2,3});// These fail to compile because of the wrong number of parameters// serializeMethod<decltype(&SerializeTest::func1)>(s);// serializeMethod<decltype(&SerializeTest::func1)>(s, 1);// Doesn't compile because of wrong type of parameters// serializeMethod<decltype(&SerializeTest::func1)>(s, 1, 2);// Doesn't compile because the function can't be used for RPCs.// int a;// serializeMethod<decltype(&SerializeTest::func2)>(s, &a);}
Như đã đề cập ở trên, `FunctionTraits<F>::param_tuple` là type `std::tuple` ta dùng được để giữ tất cả parameter của function. Để có thể sử dụng tuple này để deserialize parameter, chúng ta phải chuyên hóa `ParamTraits` cho các tuple. Bên cạnh đó, ta còn có thể sử dụng `std::tuple` cho RPC parameter.
namespace cz {namespace rpc {namespace details {template<typename T,boolDone,int N>structTuple{template<typename S>staticvoid deserialize(S& s, T& v){
s >> std::get<N>(v);Tuple<T, N == std::tuple_size<T>::value-1, N +1>::deserialize(s, v);}template<typename S>staticvoid serialize(S& s,const T& v){
s << std::get<N>(v);Tuple<T, N == std::tuple_size<T>::value-1, N +1>::serialize(s, v);}};template<typename T,int N>structTuple<T,true, N>{template<typename S>staticvoid deserialize(S&, T&){}template<typename S>staticvoid serialize(S&,const T&){}};}// namespace detailstemplate<typename... T>structParamTraits<std::tuple<T...>>{using tuple_type = std::tuple<T...>;// for internal useusing store_type = tuple_type;staticconstexprbool valid =ParamPack<T...>::valid;static_assert(ParamPack<T...>::valid ==true,"One or more tuple elements is not a valid RPC parameter type.");template<typename S>staticvoid write(S& s,const tuple_type& v){
details::Tuple<tuple_type, std::tuple_size<tuple_type>::value==0,0>::serialize(s, v);}template<typename S>staticvoid read(S& s, tuple_type& v){
details::Tuple<tuple_type, std::tuple_size<tuple_type>::value==0,0>::deserialize(s, v);}static tuple_type&&get(tuple_type&& v){return std::move(v);}};}// namespace rpc}// namespace cz// A simple testvoid testDeserialization(){Stream s;
serializeMethod<decltype(&SerializeTest::func1)>(s,1,
std::vector<int>{1,2});// deserialize the parameters into a tuple.// the tuple is of type std::tuple<int,std::vector<int>>FunctionTraits<decltype(&SerializeTest::func1)>::param_tuple params;
s >>params;};
Sau khi deserialize tất cả parameter thành tuple, chúng ta giờ đâu phải tìm cách tháo tuble để call matching function. Một lần nữa, ta có thể dùng variadic template.
namespace cz {namespace rpc {namespace detail {template<typename F,typenameTuple,boolDone,intTotal,int... N>struct callmethod_impl {staticdecltype(auto) call(typenameFunctionTraits<F>::class_type& obj, F f,Tuple&& t){return callmethod_impl<F,Tuple,Total==1+sizeof...(N),Total, N...,sizeof...(N)>::call(obj, f,
std::forward<Tuple>(t));}};template<typename F,typenameTuple,intTotal,int... N>struct callmethod_impl<F,Tuple,true,Total, N...>{staticdecltype(auto) call(typenameFunctionTraits<F>::class_type& obj, F f,Tuple&& t){usingTraits=FunctionTraits<F>;return(obj.*f)(ParamTraits<typenameTraits::template argument<N>::type>::get(
std::get<N>(std::forward<Tuple>(t)))...);}};}// namespace detailstemplate<typename F,typenameTuple>decltype(auto) callMethod(typenameFunctionTraits<F>::class_type& obj, F f,Tuple&& t){static_assert(FunctionTraits<F>::valid,"Function not usable as RPC");typedeftypename std::decay<Tuple>::type ttype;return detail::callmethod_impl<
F,Tuple,0== std::tuple_size<ttype>::value,
std::tuple_size<ttype>::value>::call(obj, f, std::forward<Tuple>(t));}}// namespace rpc}// namespace cz// A simple testvoid testCall(){Stream s;// serialize
serializeMethod<decltype(&SerializeTest::func1)>(s,1,
std::vector<int>{1,2});// deserializeFunctionTraits<decltype(&SerializeTest::func1)>::param_tuple params;
s >>params;// Call func1 on an object, unpacking the tuple into parametersSerializeTest obj;
callMethod(obj,&SerializeTest::func1, std::move(params));};
Vậy, giờ đây ta đã biết cách xác thực một function, serialize, deserialize, và call function đó. Vậy là đã xong bước code “level thấp” rồi. Lớp tiếp theo chúng ta sắp sửa phủ lên code này sẽ là RPC API thực sự.
The RPC API
Header
Header có chứa những thông tin sau:
Field
size
Tổng dung lượng (byte) của RPC. Có dung lượng là một phần của header sẽ đơn giản hóa mạnh mẽ, vì chúng ta có thể kiểm tra nếu ta nhận tất cả dữ liệu trước khi thử xử lý RPC.
counter
Số call. Mỗi lúc ta gọi RPC, một counter sẽ tăng và được chỉ định đến RPC call đó.
rpcid
The function to call
isReply
Nếu true, đây là reply đến RPC. Nếu false, đây là RPC call.
success
Chỉ áp dụng cho reply (isReply==true). Nếu true, call thành công và data là reply. Nấu false, data là exception information
Counter và rpcid hình thành một key xác định RPC call instance, cần khi ghép một incoming RPC reply đế RPC call gây ra nó.
// Small utility struct to make it easier to work with the RPC headers
struct Header {
enum {
kSizeBits = 32,
kRPCIdBits = 8,
kCounterBits = 22,
};
explicit Header() {
static_assert(sizeof(*this) == sizeof(uint64_t),
“Invalid size. Check the bitfields”);
all_ = 0;
}
struct Bits {
uint32_t size : kSizeBits;
unsigned counter : kCounterBits;
unsigned rpcid : kRPCIdBits;
unsigned isReply : 1; // Is it a reply to a RPC call ?
unsigned success : 1; // Was the RPC call a success ?
};
Chúng ta đã serialize và deserialize một RPC, nhưng không phải là cách map một RPC đã serialize đến đúng function bên phía server. Để giải quyết vấn đề này, chúng ta cần chỉ định một ID đến mỗi function. Client sẽ biết nó muốn call function nào, và điền đúng ID vào header. Server kiểm tra header, và khi biết được ID, nó sẽ gửi đến đúng handler. Hãy tạo một số ID cơ bản để xác định bảng phân phối tương tự.
namespace cz {namespace rpc {//// Helper code to dispatch a call.namespace details {// Handle RPCs with return valuestemplate<typename R>structCallHelper{template<typename OBJ,typename F,typename P>staticvoid impl(OBJ& obj, F f, P&¶ms,Stream&out){out<< callMethod(obj, f, std::move(params));}};// Handle void RPCstemplate<>structCallHelper<void>{template<typename OBJ,typename F,typename P>staticvoid impl(OBJ& obj, F f, P&¶ms,Stream&out){
callMethod(obj, f, std::move(params));}};}structBaseRPCInfo{BaseRPCInfo(){}virtual~BaseRPCInfo(){};
std::string name;};classBaseTable{public:BaseTable(){}virtual~BaseTable(){}bool isValid(uint32_t rpcid)const{return rpcid < m_rpcs.size();}protected:
std::vector<std::unique_ptr<BaseRPCInfo>> m_rpcs;};template<typename T>classTableImpl:publicBaseTable{public:usingType= T;structRPCInfo:publicBaseRPCInfo{
std::function<void(Type&,Stream&in,Stream&out)> dispatcher;};template<typename F>void registerRPC(uint32_t rpcid,constchar* name, F f){assert(rpcid == m_rpcs.size());auto info = std::make_unique<RPCInfo>();
info->name = name;
info->dispatcher =[f](Type& obj,Stream&in,Stream&out){usingTraits=FunctionTraits<F>;typenameTraits::param_tuple params;in>>params;using R =typenameTraits::return_type;
details::CallHelper<R>::impl(obj, f, std::move(params),out);};
m_rpcs.push_back(std::move(info));}};template<typename T>classTable:publicTableImpl<T>{static_assert(sizeof(T)==0,"RPC Table not specified for the type.");};}// namespace rpc}// namespace cz
Với một ID,function `get` sẽ trả dispatcher về đúng method `Calculator`. Rồi ta có thể chuyển `Calculator` instance, input và output streams sang dispatcher để xử lý mọi thứ còn lại.
Quá trình chuyên hóa khá dài dòng và không thể tránh khỏi sai sót, vì enum và `registerRPC` call phải khớp. Nhưng chúng ta có thể rút ngắn rõ rệt quá trình với một vài macro. Đầu tiên, hãy xem thử một ví dụ dài dòng để thấy cách dùng bảng này:
void testCalculatorTable(){// Both the client and server need to have access to the necessary tableusingCalcT=Table<Calculator>;//// Client sends RPCStream toServer;RPCHeader hdr;
hdr.bits.rpcid =(int)CalcT::RPCId::add;
toServer << hdr;
serializeMethod<decltype(&Calculator::add)>(toServer,1.0,9.0);//// Server receives RPC, and sends back a replyCalculator calc;// object used to receive the RPCs
toServer >> hdr;auto&& info =CalcT::get(hdr.bits.rpcid);Stream toClient;// Stream for the reply// Call the desired Calculator function.
info->dispatcher(calc, toServer, toClient);//// Client receives a replydouble r;
toClient >> r;
printf("%2.0fn", r);// Will print "10"}
Một lần nữa, ví dụ này khá dài dòng, chỉ để thể hiện code flow. Chúng ta sẽ có ví dụ tối ưu hơn sau.
Vậy, chúng ta đơn giản quá trình chuyên hóa bảng như thế nào? Nếu ta đặt gist của chuyên hóa bảng trong một header không được guard, bạn chỉ cần một vài define theo sau là một include của header (không guard) đó để tạo tương tự.
Thêm vào đó, việc chuyên hóa bảng thế này giúp ta hỗ trợ inheritance dễ dàng hơn. Hãy tưởng tượng chúng ta có một `ScientificCalculator` kế thừa từ Calculator:
Chúng ta phải xác định dữ liệu được chuyển đổi như thế nào giữa client và server. Hãy đặt dữ liệu đó vào `Transport` interface class. Interface bị cố ý để rất đơn giản để ứng dụng xác định một custom transport. Tất cả chúng ta cần là method để gửi, nhận, và đóng.
namespace cz {namespace rpc {classTransport{public:virtual~Transport(){}// Send one single RPCvirtualvoid send(std::vector<char> data)=0;// Receive one single RPC// dst : Will contain the data for one single RPC, or empty if no RPC// available// return: true if the transport is still alive, false if the transport// closedvirtualbool receive(std::vector<char>& dst)=0;// Close connection to the peervirtualvoid close()=0;};}// namespace rpc}// namespace cz
Chuyên hóa `Result<void>` rất cần thiết, vì trong trường hợp đó không có kết quả, nhưng caller vẫn muốn biết neus RPC call có được xử lý đúng cách hay không. Mới đầu, tôi từng xem xét sử dụng `Expected<T>` cho RPC reply. Nhưng `Expected<T>` về cơ bản có hai trạng thái (Value hoặc Exception), mà chúng ta lại cần tới 3 (Value, Exception, và Aborted). Ta hay cho rằng Aborted có thể được xem là exceptiom, nhưng từ góc nhìn của client, điều này không phải lúc nào cũng đúng. Trong một số trường hợp bạn sẽ muốn biết một RPC fail vì đóng kết nối, mà không phải vì server phản hồi exception.
Video: Sendo.vn xây dựng kiến trúc hệ thống mở rộng để đáp ứng tăng trưởng 10x mỗi năm như thế nào?
OutProcessor
Chúng ta cần phải theo dõi những RPC call đang diễn ra để user code nhận kết quả khi chúng đến. Ta có thể handle một kết quả theo hai cách. Thông qua handle không đồng bộ (tương tự Asio), hoặc với một future.
Ta cần hai class cho việc này: một outgoing processor và một wrapper cho một RPC call duy nhất. Một class khác (Connection) để buộc các outgoing processor và incoming processor với nhau. Class này sẽ được giới thiệu sau.
namespace cz {namespace rpc {classBaseOutProcessor{public:virtual~BaseOutProcessor(){}protected:template<typename R>friendclassCall;template<typename L,typename R>friendstructConnection;template<typename F,typename H>void commit(Transport& transport,uint32_t rpcid,Stream& data,
H&& handler){
std::unique_lock<std::mutex> lk(m_mtx);Header hdr;
hdr.bits.size = data.writeSize();
hdr.bits.counter =++m_replyIdCounter;
hdr.bits.rpcid = rpcid;*reinterpret_cast<Header*>(data.ptr(0))= hdr;
m_replies[hdr.key()]=[handler = std::move(handler)](Stream*in,Header hdr){using R =typenameParamTraits<typenameFunctionTraits<F>::return_type>::store_type;if(in){if(hdr.bits.success){
handler(Result<R>::fromStream((*in)));}else{
std::string str;(*in)>> str;
handler(Result<R>::fromException(std::move(str)));}}else{// if the stream is nullptr, it means the result is being aborted
handler(Result<R>());}};
lk.unlock();
transport.send(data.extract());}void processReply(Stream&in,Header hdr){
std::function<void(Stream*,Header)> h;{
std::unique_lock<std::mutex> lk(m_mtx);auto it = m_replies.find(hdr.key());assert(it != m_replies.end());
h = std::move(it->second);
m_replies.erase(it);}
h(&in, hdr);}void abortReplies(){decltype(m_replies) replies;{
std::unique_lock<std::mutex> lk(m_mtx);
replies = std::move(m_replies);}for(auto&& r : replies){
r.second(nullptr,Header());}};
std::mutex m_mtx;uint32_t m_replyIdCounter =0;
std::unordered_map<uint32_t, std::function<void(Stream*,Header)>>
m_replies;};template<typename T>classOutProcessor:publicBaseOutProcessor{public:usingType= T;template<typename F,typename...Args>auto call(Transport& transport,uint32_t rpcid,Args&&... args){usingTraits=FunctionTraits<F>;static_assert(
std::is_member_function_pointer<F>::value&&
std::is_base_of<typenameTraits::class_type,Type>::value,"Not a member function of the wrapped class");Call<F> c(*this, transport, rpcid);
c.serializeParams(std::forward<Args>(args)...);return std::move(c);}protected:};// Specialization for when there is no outgoing RPC calls// If we have no outgoing RPC calls, receiving a reply is therefore an error.template<>classOutProcessor<void>{public:OutProcessor(){}void processReply(Stream&,Header){assert(0&&"Incoming replies not allowed for OutProcessor<void>");}void abortReplies(){}};}// namespace rpc}// namespace cz
Nên nhớ `OutProcessor<T>` không cần một reference/pointer đến object của T. Nó chỉ cần biết type mà ta gửi RPC đến, để biết được phải dùng `Table<T>`.
Đây là ví dụ sử dụng OutProcessor:
//// "trp" is a transport that sends data to a "Calculator" servervoid testOutProcessor(Transport& trp){// A processor that calls RPCs on a "Calculator" serverOutProcessor<Calculator> outPrc;// Handle with an asynchronous handler
outPrc.call<decltype(&Calculator::add)>(
trp,(int)Table<Calculator>::RPCId::add,1.0,2.0).async([](Result<double> res){
printf("%2.0fn", res.get());// prints '3'});// Handle with a futureResult<double> res = outPrc.call<decltype(&Calculator::add)>(
trp,(int)Table<Calculator>::RPCId::add,1.0,3.0).ft().get();
printf("%2.0fn", res.get());// prints '4'}
Một lần nữa, hơi quá dài dòng, vì tôi chưa giới thiệu tất cả code để áp dụng tối ưu. Nhưng chí ít, ta đã thấy được interface `OutProcessor<T>` và `Call` làm việc như thế nào. Thực thi `std::future` build đơn giản trên thực thi không đồng bộ.
InProcessor
Giờ đây ta có thể gửi một RPC và đợi kết quả, hãy xem thử ta cần gì ở bên kia. Phải làm gì khi server nhận được một RPC call.
Hãy tạo class `InProcessor<T>`. Trái với `OutProcessor<T>`, `InProcessor<T>` cần phải giữ một reference tới một object thuộc kiểu T. Điều này sảy ra khi nhận được RPC, nó có thể gọi requested method lên object đó, và gửi kết quả trở lại client.
namespace cz {namespace rpc {classBaseInProcessor{public:virtual~BaseInProcessor(){}};template<typename T>classInProcessor:publicBaseInProcessor{public:usingType= T;InProcessor(Type* obj,bool doVoidReplies =true): m_obj(*obj), m_voidReplies(doVoidReplies){}void processCall(Transport& transport,Stream&in,Header hdr){Streamout;// Reuse the header as the header for the reply, so we keep the counter// and rpcid
hdr.bits.size =0;
hdr.bits.isReply =true;
hdr.bits.success =true;auto&& info =Table<Type>::get(hdr.bits.rpcid);#if CZRPC_CATCH_EXCEPTIONStry{#endifout<< hdr;// Reserve space for the header
info->dispatcher(m_obj,in,out);#if CZRPC_CATCH_EXCEPTIONS}catch(std::exception& e){out.clear();out<< hdr;// Reserve space for the header
hdr.bits.success =false;out<< e.what();}#endifif(m_voidReplies ||(out.writeSize()>sizeof(hdr))){
hdr.bits.size =out.writeSize();*reinterpret_cast<Header*>(out.ptr(0))= hdr;
transport.send(out.extract());}}protected:Type& m_obj;bool m_voidReplies =false;};template<>classInProcessor<void>{public:InProcessor(void*){}void processCall(Transport&,Stream&,Header){assert(0&&"Incoming RPC not allowed for void local type");}};}// namespace rpc}// namespace cz
Define `CZRPC_CATCH_EXCEPTIONS` cho phép chúng ta tweak nếu ta muốn exception phía server được chuyển sang phía client.
Nếu việc sử dụng `InProcessor<T>` (và `Table<T>`) cho phép call RPC lên object (không biết gì về RPC hoặc netwrk). Ví dụ như, hay xem xét giả dụ sau:
oid calculatorServer(){// The object we want to use for RPC callsCalculator calc;// The server processor. It will call the appropriate methods on 'calc' when// an RPC is receivedInProcessor<Calculator> serverProcessor(&calc);while(true){// calls to serverProcessor::processCall whenever there is data}}
Object `Calculator` được dùng cho RPC không biết gì về RPC. `InProcessor<Calculator>` sẽ đảm nhiệm mọi nhiệm vụ liên quan. Từ đó ta không thể sử dụng các class bên thức ba cho PRC. Trong một số trường hợp, chung ta muốn class dùng cho RPC biết biết về RPC và/hoặc network. Ví dụ như, nếu bạn đang tạo một chat system, bạn sẽ khiến client gửi message (RPC call) đến server. Server cần biết client được kết nối đến thứ gì, để có thể truyền phát message.
Connection
Ta giờ đây đã có thể gửi nhận RPC. dù API có hơi dài một chút. Những template class `OutProcessor<T>` và `InProcessor<T>` sẽ xử lý những sự kiện sảy ra với data tại cả hai đầu kết nối. Vậy, hiện ta cần chính điều này. Một `Connection` để buộc mọi thức cần để gửi nhận dữ liệu, và đơn giản là API, về một chỗ.
namespace cz {namespace rpc {structBaseConnection{virtual~BaseConnection(){}//! Process any incoming RPCs or replies// Return true if the connection is still alive, false otherwisevirtualbool process()=0;};template<typename LOCAL,typename REMOTE>structConnection:publicBaseConnection{usingLocal= LOCAL;usingRemote= REMOTE;usingThisType=Connection<Local,Remote>;Connection(Local* localObj, std::shared_ptr<Transport> transport): localPrc(localObj), transport(std::move(transport)){}template<typename F,typename...Args>auto call(Transport& transport,uint32_t rpcid,Args&&... args){return remotePrc.template call<F>(transport, rpcid,
std::forward<Args>(args)...);}staticThisType* getCurrent(){auto it =Callstack<ThisType>::begin();return(*it)==nullptr?nullptr:(*it)->getKey();}virtualbool process()override{// Place a callstack marker, so other code can detect we are serving an// RPCtypenameCallstack<ThisType>::Context ctx(this);
std::vector<char> data;while(true){if(!transport->receive(data)){// Transport is closed
remotePrc.abortReplies();returnfalse;}if(data.size()==0)returntrue;// No more pending data to processHeader hdr;Streamin(std::move(data));in>> hdr;if(hdr.bits.isReply){
remotePrc.processReply(in, hdr);}else{
localPrc.processCall(*transport,in, hdr);}}}InProcessor<Local> localPrc;OutProcessor<Remote> remotePrc;
std::shared_ptr<Transport> transport;};}// namespace rpc}// namespace cz
Từ đây ta có output processor, the input processor, và the transport. Để user code có thể xác định liệu mó có đang phục vụ RPC không. nó sử dụng một class Callstack. Class cho phép tạo RPC/network aware code nếu cần thiết, giống server class.
Vậy, ta đơn giản hóa API bằng cách nào? Vì `Connection<T>` có mọi thứ ta cần, một macro dưới dạng paramether của connection object, một function name và parameter, xử lý mọi thứ, bao gồm cả type check để nó không compile nếu là call không hợp lệ.
Khi dùng marcro này, cú pháp RPC call sẽ trở nên cực kỳ đơn giản. Hãy xe, thử client code ví dụ dưới đây:
// Some class to use for RPC callsclassMagicSauce{public:int func1(int a,int b){return a + b;}int func2(int a,int b){return a + b;}};// Define RPC table for MagicSauce#define RPCTABLE_CLASS MagicSauce#define RPCTABLE_CONTENTS REGISTERRPC(func1)#include"RPCGenerate.h"// 'trp' is a fully functional transportvoid test_Connection(std::shared_ptr<Transport> trp){Connection<void,MagicSauce> con(nullptr, trp);// Doesn't compile : Invalid number of parameters// CZRPC_CALL(con, func1, 1);// Doesn't compile : Wrong type of parameters// CZRPC_CALL(con, func1, 1, "hello");// Doesn't compile: func3 is not a MagicSauce method// CZRPC_CALL(con, func3, 1, 2);// Doesn't compile: func2 is a method of MagicSauce, but not registered as// RPC// CZRPC_CALL(con, func2, 1, 2);// Compiles fine, since everything is valid
CZRPC_CALL(con, func1,1,2).async([](Result<int> res){
printf("%dn", res.get());// print '3'});}
Bạn có để ý là `void` và `nullptr` được sử dụng khi tạo kết nối với `Connection<void`, `MagicSauce> con(nullptr, trp);`? Thao tác này sẽ điều tiết bidirectional RPC (server cũng có thể call RPC lên một client). Trong trường hợp này, chúng ta không trong đợi vào client side RPC, vậy nên client side `Connection` object không có một local object để call RPC.
Một ví dụ đơn giản (nhưng không hoạt động) của bidirectional RPC:
classChatClient;classChatServer{public:// Called by clients to post new messagesvoid msg(constchar* msg);void addNewClient(std::shared_ptr<Transport> trp);private:// Connection specifies both a LOCAL, and REMOTE object types
std::vector<std::unique_ptr<Connection<ChatServer,ChatClient>>> m_clients;};#define RPCTABLE_CLASS ChatServer#define RPCTABLE_CONTENTS REGISTERRPC(msg)#include"RPCGenerate.h"classChatClient{public:void onMsg(constchar* msg);};#define RPCTABLE_CLASS ChatClient#define RPCTABLE_CONTENTS REGISTERRPC(onMsg)#include"RPCGenerate.h"voidChatServer::msg(constchar* msg){// Simply broadcast the message to all clientsfor(auto&& c : m_clients){
CZRPC_CALL(*c, onMsg, msg);}}voidChatServer::addNewClient(std::shared_ptr<Transport> trp){auto con = std::make_unique<Connection<ChatServer,ChatClient>>(this, trp);
m_clients.push_back(std::move(con));}voidChatClient::onMsg(constchar* msg){
printf("%sn", msg);}void test_ChatServer(){ChatServer server;while(true){// Wait for connections, and call ChatServer::addClient}}// 'trp' is some fully functional transport connected to the ChatServervoid test_ChatClient(std::shared_ptr<Transport> trp){ChatClient client;// In this case, we have a client side object to answer RPCsConnection<ChatClient,ChatServer> con(&client, trp);while(true){// call the 'msg' RPC whenever the user types something, like this:
CZRPC_CALL(con, msg,"some message");// The server will call our client 'onMsg' when other clients send a// message}}
Các parameter của `Connection` template trên server và client được đảo ngược. Khi nhận được dữ liệu, object `Connection` sẽ forward xử lý đến `InProcessor` nếu là một incoming RPC call, hoặc đến `OutProcessor` nếu đó là reply đến outgoing RPC call trước đó. Data flow của bidirectional RPC trông như thế này:
Improvements
Có một số phần được cố ý không đưa vào framework, để từ đó ứng dụng có thể quyết định thế nào là tốt nhất. Ví dụ:
Transport initialization
Giao diện transport rất đơn giản, nên nó không quá chú trọng vào hướng khởi tạo cụ thể hoặc xác định incoming data. Việc cung cấp một transport hiệu quả đến class `Connecttion` hoàn toàn phụ thuộc vào ứng dụng. Đây cũng là lý do tại sao tôi tránh hiển thị transport initialization, vì tôi phải trình bày một transport implementation hoạt động hoàn chỉnh.
Tại thời điểm viết bài, repo mã nguồn có một transport implementation dùng Boost Asio (hoặc Asio biệt lập)
Xác định disconnection
Như với initialization, có những đoạn code để xử lý và xác định shutdown.Ứng dụng sẽ có nhiệm vụ quyết định thao tác như thế nào với bất cứ custom implementation nó cung cấp.