基本工作流程如下。
① 客户端发起调用请求。
② 将调用的内容序列化后通过网络发给服务端。
③ 服务端接收到调用请求,执行具体服务并获得结果。
④ 将结果序列化后通过网络返回给客户端。
在发起远程调用时,需要基于接口(Interface)来约定客户端与服务端所调用服务的具体内容。为了方便管理依赖关系,这里使用Maven构建应用并编写一些接口,以提供给客户端与服务端使用。
当然也可以使用普通的Java应用来实现此简单微服务框架,只需将该应用编译后的jar包提供给后续的服务端与客户端即可。
groupId:org.book
artifactId:rpc-interface
version:0.0.1-SNAPSHOT
packaging:jar
编写接口。
public interface HelloService {
public String hello(String name);
}
新建用于提供服务的Maven应用,并引入刚编写的接口应用依赖。
groupId:org.book
artifactId:rpc-server
version:0.0.1-SNAPSHOT
packaging:jar
① 在pom.xml文件中引入依赖。
<dependency>
<groupId>org.book</groupId>
<artifactId>rpc-interface</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
② 实现服务接口。
public class HelloServiceImple implements HelloService {
public String hello(String name) {
System.out.println("收到消息:" + name);
return "你好:" + name;
}
}
③ 编写监听服务类。
public class Server {
private static ExecutorService executor = Executors.newFixedThreadPool(10);
private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();
public void register(Class serviceInterface, Class impl) {
//注册服务
serviceRegistry.put(serviceInterface.getName(), impl);
}
public void start(int port) throws IOException {
final ServerSocket server = new ServerSocket();
server.bind(new InetSocketAddress(port));
System.out.println("服务已启动");
while (true) {
executor.execute(new Runnable() {
public void run() {
Socket socket = null;
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
socket = server.accept();
// 接收到服务调用请求,将码流反序列化定位具体服务
input = new ObjectInputStream(socket.getInputStream());
String serviceName = input.readUTF();
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
// 在服务注册表中根据调用的服务获取到具体的实现类
Class serviceClass = serviceRegistry.get(serviceName);
if (serviceClass == null) {
throw new ClassNotFoundException(serviceName + " 未找到");
}
Method method = serviceClass.getMethod(methodName, parameterTypes);
// 调用获取结果
Object result = method.invoke(serviceClass.newInstance(), arguments);
// 将结果序列化后发送回客户端
output = new ObjectOutputStream(socket.getOutputStream());
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭资源
try {
if (socket != null) socket.close();
if (input == null) input.close();
if (output == null) output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
}
}
register()
提供一个数组保存所注册的服务接口及实现类。
start()
启动一个阻塞式的Socket服务用于等待客户端发起的调用请求,当收到请求后将码流反序列化成对象,并根据接口从注册列表中寻找具体实现类,最终通过反射的方式调用该实现类返回结果。
④ 注册服务并启动服务端。
public class App {
public static void main(String[] args) throws IOException {
Server server = new Server();
// 注册服务
server.register(HelloService.class, HelloServiceImple.class);
// 启动并绑定端口
server.start(8020);
}
}
新建用于调用服务的Maven应用,并引入刚编写的接口应用依赖。
groupId:org.book
artifactId:rpc-client
version:0.0.1-SNAPSHOT
packaging:jar
① 在pom.xml文件中引入依赖。
<dependency> <groupId>org.book</groupId> <artifactId>rpc-interface</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
② 编写远程调用类。
public class Client<T> {
@SuppressWarnings("unchecked")
public static <T> T get(final Class<?> serviceInterface, final InetSocketAddress addr) {
T instance = (T) Proxy.newProxyInstance(serviceInterface. getClassLoader(), new Class<?>[]{serviceInterface},
new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream output = null;
ObjectInputStream input = null;
try {
// 连接服务端
socket = new Socket();
socket.connect(addr);
// 将调用的接口类、方法名、参数列表等序列后发送给服务提供者
output = new ObjectOutputStream(socket.getOutputStream());
output.writeUTF(serviceInterface.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
// 同步阻塞等待服务器返回应答,获取应答后返回
input = new ObjectInputStream(socket.getInputStream());
return input.readObject();
} finally {
if (socket != null) socket.close();
if (output != null) output.close();
if (input != null) input.close();
}
}
});
return instance;
}
}
使用JDK动态代理方式,根据提供的服务接口类将接口序列化成码流,向目标服务端发起Socket远程调用请求,获得服务端反馈的结果并反序列化成对象后返回。
③ 调用测试。
public class App {
public static void main(String[] args) throws IOException {
HelloService service = Client.get(HelloService.class, new InetSocketAddress ("localhost", 8020));
System.out.println(service.hello("RPC"));
}
}
运行结果如下所示:
// 客户端
hello : RPC
// 服务端
服务已启动
收到消息:RPC
本章示例代码详见异步社区网站本书页面。
服务之间的调用已基本实现,但想将它投入正式开发使用还有很多细节需要完善。
当请求过大后会发现,BIO(同步阻塞式)的通信方式会消耗过多的资源导致服务器变慢甚至崩溃。
在发起网络请求前,将对象转换成二进制串便于网络传输;收到消息请求后,将二进制串反转换成对象便于后续处理。序列化及反序列化直接影响到整个RPC框架的效率及稳定性。
发起服务调用时,都需要指定服务提供方的访问地址(ip + 端口),如果当前服务提供方有多个或一个服务部署在多个机器上,调用时每次手动指定访问地址非常麻烦,这时就需要一个公共的注册中心去管理这些服务。
实施微服务的目的是为了让系统在进行横向扩展时能够拥有更多的计算资源,如果发现某一提供服务的机器负载较大,这就需要将新的需求转发到其他空闲的机器上。
服务提供方有可能崩溃无法继续提供服务,在客户端进行调用时就需要将这些无法使用的服务排除掉。
当服务端有异常发生导致无法返回正确的结果时,客户端并不知道该如何处理,只能等待并最终以超时结束此次远程调用请求。