Cocos2d-JS/Ajax用Protobuf与NodeJS/Java通信

Published: 2016-04-29 Category: JavaScript, Java

Google的Protobuf

Protobuf全称为“Protocol Buffers”,是Google开源出来的一个序列化协议并配备多种编程语言的实现(Java、C、C++、Python等,甚至JavaScript、ActionScript都有对应的实现),其本质是按照协议规范编写proto文件,该proto文件内容由若干个message消息体组成,而message消息体是由编程语言中常用的数据类型(int、long、String等)对应的Protobuf字段类型组合而成的,Protobuf的作用是可以帮你把定义好的message消息体按协议编码(Encode)转为二进制字节流(byte[]),反之亦可帮你把已编码的byte[]字节流再解码(Decode)还原回来。

操作步骤

举个Java的例子,你想使用Protobuf把Java中的对象转成byte[]的话,需要如下这几个步骤:

  1. 根据Protobuf规范编写proto文件:该proto文件内需要定义一个与Java对象匹配的message消息体,该message消息体中的字段类型与Java对象的成员变量字段类型一一对应上
  2. 利用Protobuf对应Java语言的protoc.exe生成工具去根据第1步定义的proto文件生成对应的Protobu编解码Java类
  3. 使用第2步生成的Protobuf编解码Java类对Java对象做编解码的工作,例如编码Java对象为byte[]或者解码byte[]为Java对象

这里用Java代码举例(GitHub上的代码在这里)来解释说明前面介绍使用Protobuf的步骤:

  1. 编写proto文件在其中定义message消息体,这里我们定义了一个名为StudentProto的消息体
package tutorial;  

option java_package = "com.whg.protobuf";  
option java_outer_classname = "StudentProtoBuf";  

message StudentProto {  
  optional int64 id = 1;  
  optional int32 age = 2;  
  optional bool sex = 3; 
  optional string name = 4; 
}
  1. 执行protoc.exe来生成对应的Protobu编解码Java类,这里写个名为exec-protoc.bat的脚本来执行protoc.exe

protoc -I=../proto --java_out=../proto ../proto/*.proto

pause

其中-I代表Input输入proto文件的目录,而--java_out代表输出Java类的目录,最后的参数是一个通配符匹配输入proto文件的目录下的所有以.proto的文件,即达到批量生成Protobuf Java类的效果

  1. 然后我们编写与message消息体对应的Java对象,例如这里的Student类对应的就是StudentProto消息体,注意字段名和类型一一对应上了,其实Java字段名可以不必与消息体名称一样,但这么写也算是一种约定吧,一目了然嘛
public class Student {

    private long id;
    private int age;
    private boolean sex;
    private String name;

    public Student() {

    }

    public Student(StudentProto proto){
        id = proto.getId();
        age = proto.getAge();
        sex = proto.getSex();
        if(proto.hasName()){
            name = proto.getName();
        }
    }

    public byte[] toByteArray(){
        StudentProto.Builder builder = StudentProto.newBuilder();
        builder.setId(id);
        builder.setAge(age);
        builder.setSex(sex);
        if(name != null){
            builder.setName(name);
        }
        return builder.build().toByteArray();
    }

    public static Student parse(byte[] bytes){
        StudentProto proto = null;
        try {
            proto = StudentProto.parseFrom(bytes);
        } catch (InvalidProtocolBufferException ex) {
            throw new IllegalArgumentException(ex);
        }
        return new Student(proto);
    }

    //省略setter/getter方法

}

然后具体使用就像下面这样在byte[]和Java对象之间互相编解码转换了

public static void main(String[] args) {
    Student student = new Student();
    student.setId(300);
    student.setAge(30);

    byte[] bytes = student.toByteArray();
    Parser.printHex(bytes);
    Parser.printInt(bytes);
    Parser.printBinary(bytes);

    Student student2 = Student.parse(bytes);
    System.out.println(student2.getId());
    System.out.println(student2.getAge());
}

有了Protobuf这个序列化byte[]编解码的利器,相较于文本协议的Xml和Json来说的话,相当于做了很大的压缩!所以无论是在需要序列化存储的场景,还是在网络序列化传输场景,Protobuf都不失为一个好抉择!

序列化传输

例如在网络传输的场景下,我们可以用Protobuf在发送端把Java对象编码为byte[]后发送出去,然后在接收端再用Protobuf解码(Decode)byte[]来还原成对应的Java对象供程序使用,这样就可以在网络传输方面极大的缩减网络流量了。

序列化存储

而在存储的场景下,无论是基于内存/磁盘/RMDB的存储,都能节约成本:

  1. 内存——可以在使用Protobuf编码为byte[]后存储进memcached/redis中
  2. 磁盘——同样可以用Protobuf编码为byte[]后存储成文件,需要存储多个byte[]则需要加入分隔符用于区分单个,类似网络传输中的成帧/解析
  3. RMDB——形如MySql的RMDB,也都有支持byte[]二进制存储的Blob字段

总结与思考

总结一下,因为Protobuf已经message转换为二进制字节流byte[]了,而计算机对二进制字节流的操作最在行了,所以除了压缩节约成本外,其可用性也接近计算机底层处理的本质了:因为无论是什么东西在计算机内的表示都是字节(即8个二进制位)!字符串String可以转为byte[]、图片可以转为byte[]、任何东西想在计算机内表示都必须是byte[]!

前端使用Protobuf发送/接收二进制数据

前面提到过Protobuf网络传输的场景,这里我们就来看看:前端(Cocos2d-JS/Ajax)如何使用Protobuf与后端(NodeJS/Java)通信?由于Protobuf能把proto文件定义的消息体转换为二进制字节流(byte[]),所以问题就变成:前端(Cocos2d-JS/Ajax)如何使用二进制与后端(NodeJS/Java)通信?

网络通信一般分为2类:短连接和长连接。短连接一般说的是基于HTTP协议的请求/响应的连接;而长连接则是基于TCP/IP协议的3次握手不随意中断的连接;当然其实HTTP协议是基于TCP/IP协议的,只是请求/响应这种模式令其相较TCP/IP来说更“随意”中断了一点,但中断的后果是太浪费底层TCP/IP连接了,所以之后的HTTP1.1以及2.0为了减少浪费提出了Keep-Alive及多路复用等改进,甚至演化出了Html5的WebSocket协议这种基于HTTP协议升级版的全双通长连接,发展趋势轨迹:TCP/IP长连接 --> HTTP短连接 --> WebSocket长连接;这也令我想起后端服务器处理请求IO模型的进化轨迹:单线程 --> 多线程 --> 事件驱动单线程

下面根据这2个分类连接说说基于JavaScript的前端(Cocos2d-JS/Ajax)如何用Protobuf与后端(NodeJS/Java)通信

短连接——HTTP

无论是Cocos2d-JS还是Ajax,其进行HTTP通信都是基于JavaScript的XMLHttpRequest对象!所以只要搞清楚XMLHttpRequest对象如何与后端通信发送/接收二进制即可。使用如下几步来操作XMLHttpRequest发送Protobuf二进制数据:

1. 获取XMLHttpRequest

Cocos2d-JS里就有XMLHttpRequest对象的支持,直接使用cc.loader.getXMLHttpRequest()即可获取到;而Ajax里面的XMLHttpRequest对象由于浏览器支持不同,可以使用如下代码获取

function createXMLHttpRequest(){
    if(window.ActiveXObject){ //IE only
        return new ActiveXObject("Microsoft.XMLHTTP");
    }else if(window.XMLHttpRequest){ //others
        return new XMLHttpRequest();
    }
}

2. 设置XMLHttpRequest头部支持Protobuf协议

无论是Cocos2d-JS还是Ajax,其XMLHttpRequest对象本质都一样,所以如下open(打开)url以及setRequestHeader(设置请求头部)等代码都是通用的

var xhr = cc.loader.getXMLHttpRequest(); // or use createXMLHttpRequest() in Ajax 
xhr.open("POST", "http://localhost:3000/protobuf");

xhr.setRequestHeader("Content-Type","application/x-protobuf");
xhr.setRequestHeader("Accept","application/x-protobuf");

if (xhr.overrideMimeType){
    //这个是必须的,否则返回的是字符串,导致protobuf解码错误
    //具体见http://www.ruanyifeng.com/blog/2012/09/xmlhttprequest_level_2.html
    xhr.overrideMimeType("text/plain; charset=x-user-defined");
}

3. 前端使用protobuf.js来编解码Protobuf

protobuf.js是GitHub上使用JavaScript实现Protobuf Buffer协议编解码的项目,这里我们使用它来作为前端JavaScript编解码Protobuf的利器

3.1 引入protobuf.js

这里引入的protobuf.js版本为5.0.1,其中主要使用到了long.js、bytebuffer.js和protobuf.js这3个JS文件,如果使用NodeJS的话,直接在package.json添加dependencies依赖配置

"protobufjs": "~5.0.1"

然后使用

npm install

即可完成对该依赖的下载,在node_modules文件夹下找到那3个JS文件拷贝到前端JS文件夹,然后在前端的index.html中引入protobuf.js

<script src="../protobuf/long.js"></script>
<script src="../protobuf/bytebuffer.js"></script>
<script src="../protobuf/protobuf.js"></script>
<script>
    if (typeof dcodeIO === "undefined" || !dcodeIO.ProtoBuf) {
        throw(new Error("ProtoBuf.js is not present. Please see www/index.html for manual setup instructions."));
    }
</script>

3.2 使用protobuf.js

引入protobuf.js后就可以在JS代码中使用protobuf.js了,我们这里用于测试的TestProtobuf.proto文件如下

package TestProtobuf;

option java_package = "com.why.game.protobuf";
option java_outer_classname = "TestProtobuf";

message TestProto{
    optional int32 id = 1;
    optional string name = 2;
}

然后在JS中加载该TestProtobuf.proto文件,并把该proto文件中定义的TestProto消息体赋值为JS局部变量TestProto

var ProtoBuf = dcodeIO.ProtoBuf,
    TestProtobuf = ProtoBuf.loadProtoFile("../protobuf/TestProtobuf.proto").build("TestProtobuf"),
    TestProto = TestProtobuf.TestProto;

如此一来我们就可以用TestProtobuf.proto文件中定义的消息体来发送/接收二进制了:发送的时候使用XMLHttpRequest对象的send方法发送经由TestProto编码(encode)后的buffer数组(本质也是二进制字节流),接收的时候同样使用TestProto解码(decode)接收到的二进制数据

xhr.onreadystatechange = function(){
    if (xhr.readyState == 4 && xhr.status == 200) {
        var data = xhr.responseText;
        var protobufResp = TestProto.decode(str2bytes(data));
        var jsonResp = JSON.stringify(protobufResp);
        console.log(jsonResp);
    }
};

var testProto = new TestProto({
    id:10014,
    name:"testProtoName测试987"
});
xhr.send(testProto.toBuffer());

这里因为浏览器会把Ajax返回的二进制数据当做文本数据,所以写个str2bytes方法把接收到的文本数据按字节一个个做与运算来还原成二进制byte

function str2bytes(str){
    var bytes = [];
    for (var i = 0, len = str.length; i < len; ++i) {
        var c = str.charCodeAt(i);
        var byte = c & 0xff;
        bytes.push(byte);
    }
    return bytes;
}

长连接——SocketIO/WebSocket

可以说整个互联网的普及依靠的是浏览器和HTTP协议这一最佳拍档的完美组合,老早前所说的上网冲浪就是打开浏览器,输入网页地址,然后等待浏览器渲染显示网页后阅览;但HTTP协议的一个短板就是不能即时刷新,即需要自己手动刷新页面,这也就是为什么贴吧/论坛有“F5已烂”这一说法,因为最新的信息不会自动呈现出来。

虽然到了Web2.0时代由于Ajax的应用这一短板的用户体验有了大幅度的改善,但Ajax的本质依旧还是基于HTTP协议的短连接只不过是浏览器异步加载完成的响应信息而已;甚至还有使用“轮询”机制模仿长连接即时性的做法(即定时的用Ajax“拉取”服务器的信息来更新页面),但由于HTTP短连接本质就不是一个真实的双通道全开的“稳定”的连接,所以其即时性方面无论如何蹩脚的去模拟总会有或多或少的不爽(例如实现起来费劲麻烦等)。

于是乎Html5的到来顺便携带了WebSocket:这一在HTTP协议基础上做出“升级”的“稳定”的长连接协议,其本质上是完全双通道全开,即服务器和客户端之间的通道随时可以进行互相推送消息。而SocketIO协议则是考虑到不是所有的浏览器都支持WebSocket,于是做了层WebSocket的封装,对于不支持WebSocket的浏览器其内部可能使用的是Ajax模拟的长连接。

因为SocketIO封装了WebSocket,所以其API接口和WebSocket大同小异。下面分别介绍使用SocketIO/WebSocket来整合Protobuf发送/接收二进制数据的步骤

引入SocketIO客户端socket.io-client

socket.io-client是GitHub上使用JavaScript实现SocketIO协议的客户端,这里引入的socket.io-client版本为1.4.5,其中主要使用到了socket.io.js这个JS文件,如果使用NodeJS的话,直接在package.json添加dependencies依赖配置

"socket.io" : "~1.4.5"

然后使用

npm install

即可完成对该依赖的下载,在node_modules文件夹下找到那个JS文件拷贝到前端JS文件夹,然后在前端的index.html中引入socket.io.js

<script type="text/javascript" src="static/js/lib/socket.io/socket.io.js"></script>

使用SocketIO客户端

然后我们在JS代码中结合protobuf.js来使用socket.io.js来发送/接收二进制消息,这里的测试example.proto文件如下

message Message {
    required string text = 1;
}

接着使用protobuf.js加载上面的example.proto文件,注意同前面的TestProtobuf.proto对比区别下有无package包声明其protobuf.js加载和构造消息体的不同之处

var ProtoBuf = dcodeIO.ProtoBuf;
var Message = ProtoBuf.loadProtoFile("./example.proto").build("Message");

// Connect to our SocketIO server: node server.js
var socket = io.connect("http://localhost:3000");

socket.on("connect", function () {
    log.value += "Connected\n";
});

socket.on("disconnect", function () {
    log.value += "Disconnected\n";
});

socket.on("message", function (message) {
    try{
        var msg = Message.decode(message);
        log.value += "Received: " + msg.text + "\n";
    }catch(err){
        log.value += "Error: " + err + "\n";
    }
});

function send() {
    if (socket.connected) {
        var msg = new Message(text.value);
        socket.send(msg.toBuffer());
        log.value += "Sent: " + msg.text + "\n";
    } else {
        log.value += "Not connected\n";
    }
}

使用WebSocket

下面使用Html5 WebSocket API重写上面SocketIO发送/接收Protobuf二进制的例子,可以看到其实是大同小异的,除了协议不是HTTP而是WebSocket,其API基本类似

// Connect to our server: node server.js
var socket = new WebSocket("ws://localhost:8080/ws");
socket.binaryType = "arraybuffer"; // We are talking binary

socket.onopen = function() {
    log.value += "Connected\n";
};

socket.onclose = function() {
    log.value += "Disconnected\n";
};

socket.onmessage = function(evt) {
    try {
        var msg = Message.decode(evt.data);
        log.value += "Received: "+msg.text+"\n";
    } catch (err) {
        log.value += "Error: "+err+"\n";
    }
};

function send() {
    if (socket.readyState == WebSocket.OPEN) {
        var msg = new Message(text.value);
        socket.send(msg.toBuffer());
        log.value += "Sent: "+msg.text+"\n";
    } else {
        log.value += "Not connected\n";
    }
}

后端使用Protobuf发送/接收二进制数据

这里的后端使用NodeJS和Java实现Protobuf二进制数据的发送/接收,且同样看看区分短连接和长连接的实现

短连接——HTTP

在原生的NodeJS中可以自己编写代码开启一个简单的HTTP服务器,并自定义实现对HTTP请求的处理,当然你也可以使用一些现成的Web MVC框架例如Express来简化开发;而在Java中常见的还是使用Tomcat/JBoss这类已经久经沙场的Web容器比较方便,再配合上SpringMVC/Struts2等Web MVC框架的使用话,可以让Java Web开发人员把精力集中在业务逻辑处理方面;

NodeJS

不得不说基于JavaScript语言的后端开发平台NodeJS确实很强大,它把浏览器Ajax这种事件驱动的异步编程模型的写法从前端照搬到了后端,其核心库完美的实现了很多底层模块并提供友好的对外API,令你启动一个HTTP服务器也就只需要写几行代码的事情,除此之外引入的模块化机制完美的避开了JS中常见的“命名污染”,还有类似Java中的Maven一样的依赖包管理工具——NPM,简直让你觉得真的是“处处都运行着JavaScript”,Java处处运行的梦想好像要被JavaScript替代了似的

NodeJS使用protobuf.js处理Protobuf

由于NodeJS基于JavaScript语言,所以我们还是和前端的JavaScript代码一样使用protobuf.js来处理Protobuf,且使用了前面提到的TestProtobuf.proto

var ProtoBuf = require("protobufjs");

var TestProtobuf = ProtoBuf.loadProtoFile(protobufDir+"TestProtobuf.proto").build("TestProtobuf"),
    TestProto = TestProtobuf.TestProto;

NodeJS启动HTTP服务并接收/发送二进制数据

在NodeJS中真的是就几句代码就启动HTTP服务器了

var http = require("http");

var server = http.createServer(function(request, response){
    //处理request和返回response响应
});

server.listen(3000);

但这是只一个啥事都没干的HTTP服务器,真正的HTTP服务器至少能提供静态文件浏览服务,在NodeJS上这也需要我们自己去实现,写个serveStatic方法:其原理是根据请求路径去读取磁盘上的文件,如果存在的话读取成功后返回给前端,不存在就报404错误,为了避免每次都从磁盘读取我们还可以加入缓存

除了处理静态文件外,我们的重点还是放在NodeJS使用Protobuf发送/接收二进制数据:当我们识别一个来自客户端的请求参数是二进制数据时(这里是请求方法是POST且包含protobuf关键字),我们需要先收集完全部的二进制数据后方可解析,由于网络的传输可能不是一次到位全部传输过来,而是一段段(chunk)的过来,所以就有个收集的过程,这里使用了bufferhelper库简化收集网络二进制数据的过程,具体代码如下

var server = http.createServer(function(request, response){
    var filePath = false;
    if(request.url == "/"){
        filePath = "index.html";
    }else if(request.method === "POST"){
        if(request.url.indexOf("protobuf") != -1){
            //BufferHelper参考链接 http://www.infoq.com/cn/articles/nodejs-about-buffer/
            var bufferHelper = new BufferHelper();
            request.on("data", function (chunk) {
                bufferHelper.concat(chunk);
            });
            request.on("end", function () {
                var buffer = bufferHelper.toBuffer();
                var testProtoData = TestProto.decode(buffer);
                response.writeHead(200, {"Content-Type": "application/x-protobuf"});
                response.end(testProtoData.toBuffer());
            });
        }

        return;
    }else{
        filePath = request.url;
    }

    var absPath = webRoot+filePath;
    serveStatic(response, cache, absPath);
});

可见在收集完二进制数据后的end回调方法中使用了TestProto来解码二进制,然后再原封不动的转换为Buffer后通过response的end方法作为响应返回给HTTP客户端

Java SpringMVC与Protobuf

Java SpringMVC从4.1.6开始使支持Protobuf协议的自动编解码,所以需要确保pom.xml文件中的Spring核心包以及SpringMVC包的版本都是4.1.6+,当然也需要确保依赖了Protobuf的Java包

<!-- springframework 4.0.7 RELEASE -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.1.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>4.1.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>4.1.7.RELEASE</version>
</dependency>

<!-- spring mvc -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webmvc</artifactId>
    <version>4.1.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>commons-fileupload</groupId>
    <artifactId>commons-fileupload</artifactId>
    <version>1.3.1</version>
</dependency>

<!-- protobuf -->
<dependency>
    <groupId>com.google.code</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.4.0a</version>
</dependency>
<dependency>
    <groupId>com.googlecode.protobuf-java-format</groupId>
    <artifactId>protobuf-java-format</artifactId>
    <version>1.2</version>
</dependency>

然后web.xml配置了SpringMVC及其mvc.xml文件位置以及匹配后缀名

<!-- 引入上下文配置文件 -->
<context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath:applicationContext.xml</param-value>
</context-param>
<listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>

<servlet>
    <servlet-name>spring-web</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath:spring/mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
</servlet>

<servlet-mapping>
    <servlet-name>spring-web</servlet-name>
    <url-pattern>*.why</url-pattern>
</servlet-mapping>

关键的部分在mvc.xml配置中,这里使用mvc:annotation-driven的配置写法配置了消息转换器为ProtobufHttpMessageConverter令SpringMVC自动支持Protobuf的编解码

<!-- 配置只扫描web下面类文件,即controller和interceptors,只关注mvc的配置,整个应用的配置在applicationContext.xml -->
<context:component-scan base-package="com.why.game.web.*" />

<mvc:interceptors>
    <bean class="com.why.game.web.interceptor.ControllerInterceptor" />
</mvc:interceptors>
<mvc:annotation-driven>  
    <mvc:message-converters>  
        <bean class="org.springframework.http.converter.protobuf.ProtobufHttpMessageConverter"/>  
    </mvc:message-converters>  
</mvc:annotation-driven>

最后在SpringMVC的Controller中可使用RequestEntity直接操作传递过来的TestProto,并使用ResponseEntity把TestProto作为响应返回去给HTTP客户端

@Controller
@RequestMapping("/")
public class TestController {

    @RequestMapping(value="/protobuf")
    @ResponseBody
    public ResponseEntity<TestProto> protobuf(RequestEntity<TestProto> requestProto){
        TestProto testProto = requestProto.getBody();
        String s = new String(testProto.toByteArray());
        System.out.println(s);
        System.out.println(testProto);
        HttpServiceCaller.printProtoStr(s);
        return ResponseEntity.ok(testProto);
    }

}

长连接——SocketIO/WebSocket

这里以SocketIO为例来看看其在NodeJS和Java中的使用,其实WebSocket的使用方法也是大同小异,仅仅是API略微差别,但思想步骤是一样适用的

NodeJS中的SocketIO

与前面介绍的SocketIO客户端socket.io-client相对应的NodeJS服务端是socket.io,我们需要在package.json添加dependencies依赖配置

"socket.io" : "~1.4.5"

并使用npm install下载安装后,就可以直接在NodeJS中使用socket.io库来构建了SocketIO服务器了,下面实现一个简单的业务逻辑:把接收到的数据转换成大写后再发送回去给客户端

var ProtoBuf = require("protobufjs");
var socketio = require("socket.io");

// Initialize from .proto file
var builder = ProtoBuf.loadProtoFile(path.join(__dirname, "www", "example.proto")),
    Message = builder.build("Message");

// SocketIO adapter
var io = socketio.listen(server);
io.set("log level", 1);
io.sockets.on("connection", function(socket){
    console.log(socket.id+" connecting...");
    socket.on("disconnect", function() {
        console.log("WebSocket disconnected");
    });
    socket.on("message", function(data) {
        try {
            // Decode the Message
            var msg = Message.decode(data);
            console.log("Received: "+msg.text);
            // Transform the text to upper case
            msg.text = msg.text.toUpperCase();
            // Re-encode it and send it back
            socket.send(msg.toBuffer());
            //socket.emit('message', msg.toBuffer());
            console.log("Sent: "+msg.text);
        } catch (err) {
            console.log("Processing failed:", err);
        }
    });
});

注意上面的代码先使用require引入了Protobuf和SocketIO模块,然后初始化Protobuf的消息体并让SocketIO启动监听,这里SocketIO监听的server其实就是NodeJS创建的HTTP服务器,因为在NodeJS里面HTTP服务器和SocketIO服务器共用同一个端口;接下来就是Protobuf对接收到的二进制数据进行解码打印,然后把字母转换为大写后再编码发送出去

NodeJS中的WebSocket

在NodeJS中使用WebSocket最简便的方式是使用GitHub上名为ws的项目,其号称可能是NodeJS里面速度最快的WebSocket库,我们可以在package.json添加dependencies依赖配置

"ws": "~0.4"

并使用npm install下载安装后,就可以在NodeJS里使用ws库来构建WebSocket服务器了,实现与上面SokcetIO服务器相同逻辑的代码如下,可见WebSocket与SocketIO的API是大同小异的

// WebSocket adapter
var wss = new ws.Server({server: server});
wss.on("connection", function(socket) {
    console.log("New WebSocket connection");
    socket.on("close", function() {
        console.log("WebSocket disconnected");
    });
    socket.on("message", function(data, flags) {
        if (flags.binary) {
            try {
                // Decode the Message
                var msg = Message.decode(data);
                console.log("Received: "+msg.text);
                // Transform the text to upper case
                msg.text = msg.text.toUpperCase();
                // Re-encode it and send it back
                socket.send(msg.toBuffer());
                console.log("Sent: "+msg.text);
            } catch (err) {
                console.log("Processing failed:", err);
            }
        } else {
            console.log("Not binary data");
        }
    });
});

Java中的SocketIO

在Java中我们使用了GitHub上一个名为netty-socketio的项目,由名字可看出其是在Netty框架基础上实现的SocketIO协议,并提供了事件驱动注册监听器的写法,当你从NodeJS转换代码过来时会发现其写法大同小异:即NodeJS使用on方法来注册监听事件,netty-socketio中使用addEventListener方法来实现;NodeJS使用emit触发事件,而netty-socketio中使用sendEvent来触发事件等

首先在pom.xml中加入netty-socketio的依赖以及Protobuf的依赖:

<dependency>
    <groupId>com.corundumstudio.socketio</groupId>
    <artifactId>netty-socketio</artifactId>
    <version>1.7.11-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.7</version>
</dependency>

<!-- protobuf -->
<dependency>
    <groupId>com.google.code</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.4.0a</version>
</dependency>
<dependency>
    <groupId>com.googlecode.protobuf-java-format</groupId>
    <artifactId>protobuf-java-format</artifactId>
    <version>1.2</version>
</dependency>

接下来就是写一个实现了ConnectListener和DisconnectListener这2个分别代表连接监听与断开监听的接口的SocketIO服务器类,然后该类内部再使用addEventListener来监听感兴趣的事件,对应上面NodeJS SocketIO服务器逻辑的Java SocketIO服务器类如下

public class SocketIOProtoServer implements ConnectListener, DisconnectListener{

    private static final String HOST = "localhost";
    private static final int PORT = 3001;

    private final SocketIOServer server;

    public SocketIOProtoServer(){
        server = new SocketIOServer(config());
    }

    private Configuration config(){
        Configuration config = new Configuration();
        config.setHostname(HOST);
        config.setPort(PORT);
        config.setMaxFramePayloadLength(1024 * 1024);
        config.setMaxHttpContentLength(1024 * 1024);
        return config;
    }

    public void start(){
        server.addConnectListener(this);
        server.addDisconnectListener(this);   
        server.addEventListener("message", byte[].class, new DataListener<byte[]>() {
            @Override
            public void onData(SocketIOClient client, byte[] data, AckRequest ackRequest) {
                Message message = Message.parse(data);
                System.out.println("Received: "+message.getText());
                // Transform the text to upper case
                message.setText(message.getText().toUpperCase());
                // Re-encode it and send it back
                client.sendEvent("message", message.toByteArray());
                System.out.println("Sent: "+message.getText());
            }
        });

        server.start();
        System.out.println("\n------ "+this.getClass().getSimpleName()+"start on "+PORT+" ------\n");
    }

    public void stop(){
        server.stop();
    }

    @Override
    public void onConnect(SocketIOClient client) {
        System.out.println(client.getSessionId()+" connecting...");
    }

    @Override
    public void onDisconnect(SocketIOClient client) {
        System.out.println(client.getSessionId()+" disconnecting...");
    }

    public static void main(String[] args){
        new SocketIOProtoServer().start();
    }

}

Netty-SocketIO除了基于接口实现(例如上面的ConnectListener和DisconnectListener与DataListener这3个接口)完成监听外,还提供了基于注解的监听机制(对应上面接口实现的3个注解分别是@OnConnect和@OnDisconnect与@OnEvent),如下基于注解的代码和上面基于接口实现效果是一样的,注意添加监听器部分使用server.addListeners(annotationInstance)即可

public class SocketIOProtoServer{

    private static final String HOST = "localhost";
    private static final int PORT = 3001;

    private final SocketIOServer server;

    public SocketIOProtoServer(){
        server = new SocketIOServer(config());
    }

    private Configuration config(){
        Configuration config = new Configuration();
        config.setHostname(HOST);
        config.setPort(PORT);
        config.setMaxFramePayloadLength(1024 * 1024);
        config.setMaxHttpContentLength(1024 * 1024);
        return config;
    }

    public void start(){
        server.addListeners(this);

        server.start();
        System.out.println("\n------ "+this.getClass().getSimpleName()+"start on "+PORT+" ------\n");
    }

    public void stop(){
        server.stop();
    }

    @OnConnect
    public void onConnect(SocketIOClient client) {
        System.out.println(client.getSessionId()+" connecting...");
    }

    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        System.out.println(client.getSessionId()+" disconnecting...");
    }

    @OnEvent("message")
    public void onData(SocketIOClient client, byte[] data, AckRequest ackRequest) {
        Message message = Message.parse(data);
        System.out.println("Received: "+message.getText());
        // Transform the text to upper case
        message.setText(message.getText().toUpperCase());
        // Re-encode it and send it back
        client.sendEvent("message", message.toByteArray());
        System.out.println("Sent: "+message.getText());
    }

    public static void main(String[] args){
        new SocketIOProtoServer().start();
    }

}

可见其接收/发送Protobuf二进制的代码与NodeJS相比其实是相当类似的,然后封装的Message.java如下

public class Message {

    private String text;

    public Message() {

    }

    public Message(Example.Message proto){
        text = proto.getText();
    }

    public byte[] toByteArray(){
        Example.Message.Builder builder = Example.Message.newBuilder();
        builder.setText(text);
        return builder.build().toByteArray();
    }

    public static Message parse(byte[] bytes){
        Example.Message proto = null;
        try {
            proto = Example.Message.parseFrom(bytes);
        } catch (InvalidProtocolBufferException ex) {
            throw new IllegalArgumentException(ex);
        }
        return new Message(proto);
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

}

Java中的WebSocket库

此处我们基于Netty4下的WebSocket实现包来看看上面SocketIO服务类在Netty中长什么样,其中大部分代码均源自Netty自带的example包里的;首先看看Netty中的WebSocketServer

public final class WebSocketServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "3000"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new WebSocketServerInitializer(sslCtx));

            Channel ch = b.bind(PORT).sync().channel();

            System.out.println("Open your web browser and navigate to " +
                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');

            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

如下是WebSocketServerInitializer,里面设置了各种HTTP解码器,然后还有由HTTP协议升级到WebSocket协议的处理器类WebSocketServerProtocolHandler,最后是WebSocket帧处理器类WebSocketFrameHandler

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

    private static final String WEBSOCKET_PATH = "/ws";

    private final SslContext sslCtx;

    public WebSocketServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (sslCtx != null) {
            pipeline.addLast(sslCtx.newHandler(ch.alloc()));
        }
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(65536));
        pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
        pipeline.addLast(new WebSocketFrameHandler());
    }
}

最后看看实现与之前的服务器业务逻辑(把接收到的数据转换成大写后再发送回去给客户端相同的WebSocketFrameHandler类的写法:

public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private static final Logger logger = LoggerFactory.getLogger(WebSocketFrameHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        // ping and pong frames already handled

        if (frame instanceof TextWebSocketFrame) {
            // Send the uppercase string back.
            String request = ((TextWebSocketFrame) frame).text();
            logger.info("{} received {}", ctx.channel(), request);
            ctx.channel().writeAndFlush(new TextWebSocketFrame(request.toUpperCase(Locale.US)));
        } else if(frame instanceof BinaryWebSocketFrame){
            ByteBuf byteBuf = ((BinaryWebSocketFrame) frame).content();
            byte[] data = new byte[byteBuf.capacity()];
            byteBuf.readBytes(data);
            Message message = Message.parse(data);
            System.out.println("Received: "+message.getText());
            // Transform the text to upper case
            message.setText(message.getText().toUpperCase());
            // Re-encode it and send it back
            byte[] bytes = message.toByteArray();
            ByteBuf payload = ctx.alloc().buffer(bytes.length);
            payload.writeBytes(bytes);
            ctx.channel().writeAndFlush(new BinaryWebSocketFrame(payload));
            System.out.println("Sent: "+message.getText());
        } else {
            String message = "unsupported frame type: " + frame.getClass().getName();
            throw new UnsupportedOperationException(message);
        }
    }
}

可见在使用Message类编解码的使用方式是一样的,只是Netty中接收/发送二进制数据需要基于ByteBuf类去转换为byte[]给Message编解码;而在SocketIO中是以泛型编程的方式直接声明接收二进制数据byte[];

这也导致了在Netty里面可以写一个统一处理WebSocket的Handler,在处理WebSocket帧时可以判定是字符帧(TextWebSocketFrame还是字节帧(BinaryWebSocketFrame,然后分别做处理;但是在SocketIO里面在添加监听器addEventListener时就决定了处理类型到底是byte[]还是String,不能是一个泛泛的Object对象然后区分处理,除非自己自定义一个泛泛的SocketIOFrame类然后根据什么内部bit位去判断到底是转换为byte[]还是String后才分别处理,这就需要看看netty-socketio的源码实现去了解了。

源码

参考

(完)

  • 版权声明:自由转载-非商用-非衍生-保持署名 | BY-NC-SA
  • 文章作者:「whg」
  • 发表日期:2016年04月29日