OceanBase中的网络分为两个部分:对外网络和对内网络。对外网络用于接收用户或者客户端的连接及后续的SQL语句等,目前OceanBase社区版中对外网络部分实现的是MySQL的通信协议。对内网络则负责集群中节点之间的通信(例如心跳监测等),OceanBase将这些通信(操作)实现为RPC调用。虽然这两部分网络的服务对象和服务内容大相径庭,但本质上都需要在当前节点上建立网络监听,然后当连接请求到来时触发相应的操作。事实上,OceanBase在libeasy网络框架的基础上实现了对上述两种网络的统一。
libeasy提供一个处理TCP连接的事件驱动的网络框架。框架本身封装好了底层的网络操作,只需要开发者处理其中的各种事件。libeasy的基本概念有:easy_connection_t(连接)、easy_message_t(消息)和easy_request_t(请求)。每个连接上可以有多个消息,通过链表连起来,每个消息可以由多个请求组成,也通过链表连起来。
easy_request_t就相当于应用层的一个具体的包,多个请求组合起来形成一个完整的消息。在一次长连接中,用户可以接收多次消息。每个request只属于一个connection。libeasy是基于epoll的事件模型,程序收到事件后,回调注册的事件函数。调用回调函数的线程被称为I/O线程,线程的个数在创建easy事件时指定。
libeasy框架之上的开发者需要注册一系列回调函数,供libeasy在接受请求时回调。按照回调的顺序,回调函数包括:
1)on_connect:接受TCP连接时,回调该函数,可以在该事件中做密码验证等事情。
2)decode:从网络上读取一段字节流,并且按照定义的协议,解析成数据结构,供之后处理。
3)process:处理从decode中解析出的结构,可以是同步处理,也可以是异步处理。
4)encode:把process的结果转化成字节流(如果process的结果是要输出的结果,则不需要转化),然后把结果挂载到request的输出上。
5)clean_up:在连接断开前执行的操作,如果在之前的操作中分配了一些内存,需要在这里释放。
6)on_disconnect:连接断开时的操作。
OceanBase将这一整套回调函数称为Handler(处理者),在比较高的抽象层次上,Handler被封装为类ObReqHandler,见代码3.1。
代码3.1 Handler被封装为类ObReqHandler
在这样的设计中,可以认为一种Handler用于处理一种网络通信协议(当然是应用层面的),那么针对前述OceanBase中对外和对内的两种网络,每一种网络都有其专用的Handler:ObMySQLHandler和ObRpcHandler。这两种具体化的Handler都是ObReqHandler的子类。
对OceanBase-CE来说,ObMySQLHandler扮演着门户的角色:所有从集群外部进入的请求都由ObMySQLHandler负责处理,而核心的处理过程则在其process方法中。
由于OceanBase-CE可以兼容MySQL协议以及OceanBase自定义的协议,因此process方法需要对不同的协议版本采用不同的处理机制,这种区分体现在协议处理器(Protocol Processor,父类都为ObVirtualCSProtocolProcessor)上。在process方法中会根据连接建立时识别的协议,采用相应的协议处理器来处理当前收到的请求:
1)普通MySQL协议(非加密):采用ObMysqlProtocolProcessor类作为协议处理器。
2)加密MySQL协议:采用ObMysqlCompressProtocolProcessor类作为协议处理器。
3)OB 2.0协议:采用Ob20ProtocolProcessor类作为协议处理器。
这些协议处理器的作用是将从网络接收到的数据按照相应的协议解析成数据包,如图3.5所示,OceanBase中将数据包表示成以ObPacket为顶层类的一系列子类。其中ObMySQLPacket的子类分别服务于以上三种协议,ObRpcPacket则服务于OceanBase内部节点之间的RPC通信。
图3.5 数据包相关类
协议处理器会把收到的网络包中的载荷(Payload)抽取出来放在数据包的cdata_属性(字符串类型)中,这其中就包含着外部的请求内容,对于普通MySQL协议来说,cdata_属性里存放的就是SQL命令,ObMySQLRawPacket中还定义了一个枚举类型的属性cmd_来标识SQL命令的类型(查询、创建数据库等)。
在请求管理上,OceanBase采用的是典型的“生产者-消费者”模式。解析得到的数据包会与接收时间戳、连接信息等一起被包装成由ObRequest类表示的请求,然后利用投送器(Deliver)送到请求队列中,之后租户的工作线程再从请求队列中取出请求并真正执行。
OBServer收到的所有内部请求(即RPC请求)都会被分发给ObRpcHandler,它对RPC请求的处理同样实现在其process方法中。和ObMySQLHandler一样,ObRpcHandler也将待处理的请求交给ObSrvDeliver,投送到请求队列中,最终被工作线程取出处理。
目前OceanBase用ObSrvDeliver类同时承担了外部请求和内部RPC请求的投送工作,其deliver方法中会根据被投送请求的类型来进一步调用deliver_mysql_request或deliver_rpc_request方法将请求投送到合适的请求队列中。
如图3.6所示,deliver_mysql_request将从请求中获得会话信息,其中包括租户信息(ObTenant类)和分组ID(group_id_),然后根据租户信息将请求放入租户的请求队列中。
在租户的实现ObTenant中,将队列划分成快速队列和普通队列两大类,而每一类队列中又分别有三个具有不同优先级(高、中、低/普通)的队列。OceanBase中收到的请求会按照请求的类型和特征被分别放入上述这些队列中,然后由工作线程根据优先级以一对一的方式“消费”这些请求。
图3.6 deliver_mysql_request流程
在将RP C请求放入租户的请求队列中时,有一种特殊的情况需要考虑:嵌套请求。例如处理一个RP C请求时又发出了其他的RP C请求(RP C请求嵌套),如果让这些RP C请求共享同一组工作线程,在高并发时会导致工作线程耗尽而死锁。OceanBase引入了嵌套层级队列来解决这种问题:将用户发出的RP C请求作为第0层,由其引发的RP C请求依次作为第1层、第2层、…、第 n 层,然后为0层以上的每一层RP C请求单独建立一个队列,且为每一个这样的队列分配一些工作线程。这样,各层的RP C请求使用的是不同的工作线程集合,进而避免死锁问题。当然,这种嵌套层次也不能无限制地延伸下去,OceanBase中设置了一个硬上限为8层(由宏MAX_REQUEST_LEVEL定义),超过这个嵌套深度的RPC请求会被放在最高层的队列中。
具体的投送工作通过租户的recv_request方法完成:
1)如果租户已停止,报错。
2)用请求中的分组ID在租户的资源组地图(租户的group_map_属性)中查找对应的资源组(ObResourceGroup类),如果之前不存在则创建一个新的资源组并且插入在资源组地图中,最后将请求插入该资源组的队列中。
3)如果租户没有初始化资源组地图,则按照下面的规则将请求分配到相应的队列中:
①将具有高或者普通优先级的RPC请求放入快速队列。
②将低优先级的RPC请求(通常是无关痛痒的请求)放入普通队列。
③SQL类型的请求(从MySQL协议进入的请求)以普通优先级放入普通队列。
④服务器任务以及会话关闭任务以高优先级放入普通队列。
根据3.3节和3.4节中所述,多租户环境下的工作线程在启动之后就会运行自己的run方法,进而用一个无效的租户ID进入其worker方法中。worker方法内部实际是一个以线程的停止标志(继承自CoKThreadTemp的布尔属性stop_)值为循环条件的循环,在循环中会根据工作线程所服务的租户ID 找到该租户的请求队列,然后从中取下待处理的请求,交由线程(ObThWorker)的process_request方法进行处理。
process_request方式中会调用procor_中的工作线程处理器(ObIWorkerProcessor类)的process方法对请求进行处理。procor_属性的值是通过实例化时构造器的参数等方式获得,其值的源头来自ObServer实例的同名属性(类型是ObWorkerProcessor,它是接口类ObIWorkerProcessor的一种实现)。构造ObServer实例时,会自动调用ObWorkerProcessor的构造器构造出procor_属性中的对象,该构造器会以一个ObSrvXlator对象(从net_frame_属性中ObSrvNetworkFrame实例的xlator_属性得到)为参数。最终构造的效果是将procor_属性的translator_属性指向这个得到的ObSrvXlator对象。
在process方法中首先会调用translator_的translate方法为要处理的请求得到一个请求处理器(ObReqProcessor),然后调用请求处理器的run方法完成对请求的处理。translator_的translate方法继承自其父类ObReqTranslator,其核心是调用get_processor方法获得一个请求处理器。被调用的get_processor方法是ObSrvXlator类中的实现,它会根据请求的类型(MySQL或RPC)返回一个正确的请求处理器。
(1)MySQL请求处理
如果收到的是MySQL请求,ObSrvXlator::get_processor方法中会调用其mysql_xlator_属性(是一个ObSrvMySQLXlator对象)的translate方法,它会根据MySQL请求的进一步细分确定MySQL请求处理器:
1)OB_MYSQL_COM_FIELD_LIST命令:
①连接使用了obproxy:
a)obproxy版本比较低:采用ObMPDefault类对象作为处理器。
b)obproxy版本比较高:使用ObMPQuery类对象作为处理器。
②连接未使用obproxy:使用ObMPQuery类对象作为处理器。
2)其他命令:采用ObMPDefault类对象作为处理器。
MySQL请求处理器的run方法中又会调用该类的process方法,由于ObMPDefault处理器实际上对应着不支持的版本或命令,它的process方法只会简单地报告“不支持的MySQL命令”这一错误。因此,我们仅介绍ObMPQuery这种MySQL请求处理器,其process方法承担了执行请求的主要工作:
1)调用ObThWorker的check_rate_limiter方法检查对应租户的SQL到达率是否过快,如果过快则会放弃执行这个请求,该请求也不会从请求队列中取走,而是等着再次被分配给该租户的工作线程取出。
2)调用ObThWorker的check_qtime_throttle方法检查该请求在队列中的等待时间是否超限,如果超限同样会放弃执行这个请求。
3)取得请求对象(ObRequest)中的锁等待节点(lock_wait_node_属性)。
4)构建一个解析器(ObParser)对SQL请求进行查询解析:
①先调用解析器的pre_parse方法对查询进行预解析。
②调用解析器的split_multiple_stmt方法尝试从SQL字符串中切分出多个查询语句,得到的结果放入一个字符串数组queries中。
③调用请求处理器的try_batched_multi_stmt_optimization方法尝试把多个更新查询作为一个单一查询来执行,这样可以优化RP C开销。
④如果上面的尝试没有成功,则针对queries中的每一个查询进行处理:
a)将查询包装成一个ObMultiStmtItem对象。
b)用process_single_stmt方法处理得到的ObMultiStmtItem对象。
ObMPQuery的process_single_stmt方法实际上蕴含了OceanBase的SQL引擎入口,具体见第5章。
process_single_stmt方法主要完成以下两个任务:
1)检查和刷新本地模式:通过调用check_and_refresh_schema方法来完成,为了在集群的各个节点上都能快速地访问模式信息,OceanBase中的模式信息在每个节点上都会有一份拷贝。这样自然会产生模式信息同步的问题,check_and_refresh_schema方法会比较本地模式信息的版本和全局最新的模式版本,如果发现本地模式信息的版本较低,该方法会进行模式信息的更新。模式的存储在4.1节中有更多分析。
2)处理SQL请求,且在必要时进行重试:在处理SQL请求时,可能会因为所在节点的表副本不是最新版本导致需要重新在该表的Leader副本上进行重试,而且同样的现象在重试的时候还可能会发生。为了控制这种场景,ObMPQuery中有一个重试控制器,它被用于在请求执行过程和会话环境之间传递重试的类型,同时还记录重试的次数。
process_single_stmt方法中调用do_process方法来执行SQL请求,do_process在准备好执行环境之后会将请求的处理移交给SQL引擎(ObSql)的stmt_query方法,然后根据stmt_query的执行反馈来完成后续的处理。
(2)RPC请求处理
如果收到的请求是一个RPC请求,ObSrvXlator::get_processor方法中会调用其rpc_xlator_属性(是一个ObSrvRpcXlator对象)的translate方法:以请求中的RPC请求代码(pcode,代表请求类型)为下标在该对象的funcs_数组中找到相应的RPCProcessFunc类型函数指针,调用该函数获得实际的RP C请求处理器。
funcs_数组中的RPCProcessFunc函数指针通过RPC_PROCESSOR宏(见src/observer/ob_srv_xlator.h)注册在该数组中,它在funcs_数组中的下标决定它所服务的RPC请求代码(见deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h),每一个RPCProcessFunc函数指针所指向的函数的功能是返回一个用于处理相应RP C请求的请求处理器对象。RP C请求处理器都由名为ObRpc ### P 的类表达,它们通过DEFINE_DDL_RS_RPC_PROCESSOR、OB_DEFINE_SQL_CMD_PROCESSOR之类的宏定义。这些RPC处理器类对RPC请求的处理有两种实现方法:
1)在请求处理的类定义中实现process方法,其中含有RPC请求的处理流程,例如OB_TASK_KILL的最终处理过程由ObRpcTaskKillP::process()方法实现。
2)RPC请求处理器继承某个父类(如ObRootServerRPCProcessor),通过父类的process根据RPC请求代码将处理流程导向ObRootService类的某个方法,该方法会含有相应RPC请求的处理流程,例如OB_CREATE_TABLE的最终处理过程由ObRootService::create_table()方法实现。
通常来说,客户端连接到数据库服务器后就形成了一次会话(Session)。在该次连接断开之前,通过该连接执行的命令及其产生的效果都属于该会话。一个典型的例子是,在同一个连接中,第一个命令对某个参数(例如字符集编码)进行了修改,后续的命令都将会受到该次修改的影响,直至连接断开。因此,会话也可以被看成是客户端在数据库服务器端的一个临时工作环境。
对于所谓“专用服务器”模式,即服务器端自始至终都用专门的一个进程或者线程服务一个连接,维持会话很容易,只需要把会话信息保存在该进程或线程的内存空间里即可。但在OceanBase的客户端请求处理模式下,一个客户端(会话)的多个命令可能会由服务器端的不同线程处理,因此会话的维持要略显复杂。
OceanBase对会话的管理基于以下思路:
1)每一个会话都会得到一个全局唯一的会话ID,客户端将保持该会话ID。
2)会话信息保存在服务器端,并以会话ID为标识。
3)客户端发送请求时会携带其会话ID,OBServer根据请求中的会话ID将请求在相应的会话“环境”中执行。
在实现上,OceanBase采用了两个类来管理会话,其主要构成部分如图3.7所示。其中ObSQLSessionInfo对应着会话本身的信息,而ObSQLSessionMgr是会话的管理器。
ObSQLSessionMgr主要提供以下接口:
1)create_session接口:为会话创建新的ObSQLSessionInfo,同时会给该ObSQLSessionInfo增加引用计数。在OBServer建立与客户端连接时一定会调用这个接口,如果客户端是第一次连接集群则会新建会话,否则会根据会话ID调用get_session接口获取之前已经创建的会话。
2)get_session接口:获取已经创建的ObSQLSessionInfo,同时会给该ObSQLSessionInfo增加引用计数。
ObSQLSessionMgr最重要的任务是为每一个连接分配会话ID,并保证会话ID的全局唯一。但有一种特例,即在集群前端部署有OBProxy层时,客户端连接到OBProxy以后,在连接不断开的情况下,该OBProxy与集群中所有OBServer的连接都会采用相同的会话ID,而这一会话ID是由OBProxy来分配。为了实现这一特性,OBServer在处理连接请求时如果发现请求来自OBProxy,则会放弃当前新分配的会话ID,同时使用请求包中携带的会话ID来标识该连接。
会话ID是一个32位无符号整数(uint32_t),为了保证会话ID在集群中全局唯一,这32位被划分为三段:
图3.7 会话管理器
1)第0~17位:这一段在内部也称为sessid_seq,它是创建会话ID的OBServer为会话分配的18位顺序号。OBServer重启后,sessid_seq从0开始分配,每到来一个新连接sessid_seq会增加1。显然,sessid_seq的位宽使得一台OBServer能承受的并发连接数上限为2 18 ,对于一个配置有负载均衡的集群来说,单台OBServer上的并发连接数远不会达到这一限制。
2)第18~30位:内部称为server_id,即OBServer启动从总控服务获得的服务器ID,将server_id和sessid_seq组合形成的会话ID就能保证全局唯一性 。
3)第31位:用来区分会话ID的来源,此位为1表示会话ID是OBServer新分配,为0则表示会话ID是由OBProxy传来。