2014年11月14日金曜日

Scalaで Netty を使った WebSocket Client

JBossのNettyというライブラリを使えばWebSocketクライアントを実装できると聞いてJava製のサンプルコードがあったのでそれをScala化してみた。バージョンはわけあって3.9系だけど多分4系でも同様に動く。

オリジナルのJavaコード:  https://github.com/netty/netty/tree/3.9.1/src/main/java/org/jboss/netty/example/http/websocketx/client

ついでにSBT化もしてみた。ディレクトリ構成はたったの3ファイル。sbt 'run ws://localhost/' で実行可能。
  • WebSocketClient.scala
  • WebSocketClientHandler.scala
  • build.sbt


WebSocketClient.scala
import java.net.InetSocketAddress
import java.net.URI
import java.util.HashMap
import java.util.concurrent.Executors
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.Channels
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.handler.codec.http.HttpRequestEncoder
import org.jboss.netty.handler.codec.http.HttpResponseDecoder
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame
import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame
import org.jboss.netty.handler.codec.http.websocketx.WebSocketClientHandshaker
import org.jboss.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory
import org.jboss.netty.handler.codec.http.websocketx.WebSocketVersion
class WebSocketClient(uri : URI) {
def run() = {
val bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()))
var ch : Channel = null
try {
val protocol = uri.getScheme()
if (!"ws".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol)
}
val customHeaders = new HashMap[String, String]()
customHeaders.put("MyHeader", "MyValue")
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
val handshaker =
new WebSocketClientHandshakerFactory().newHandshaker(
uri, WebSocketVersion.V13, null, false, customHeaders)
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
def getPipeline() = {
val pipeline = Channels.pipeline()
pipeline.addLast("decoder", new HttpResponseDecoder())
pipeline.addLast("encoder", new HttpRequestEncoder())
pipeline.addLast("ws-handler", new WebSocketClientHandler(handshaker))
pipeline
}
})
// Connect
println("WebSocket Client connecting")
val future =
bootstrap.connect(new InetSocketAddress(uri.getHost(), uri.getPort()))
future.syncUninterruptibly()
ch = future.getChannel()
handshaker.handshake(ch).syncUninterruptibly()
// Send 10 messages and wait for responses
println("WebSocket Client sending message")
for (i <- 0 to 1000) {
//ch.write(new TextWebSocketFrame("Message #" + i))
}
// Ping
println("WebSocket Client sending ping")
ch.write(new PingWebSocketFrame(ChannelBuffers.copiedBuffer(Array(1, 2, 3, 4, 5, 6).map(_.toByte))))
// Close
println("WebSocket Client sending close")
ch.write(new CloseWebSocketFrame())
// WebSocketClientHandler will close the connection when the server
// responds to the CloseWebSocketFrame.
ch.getCloseFuture().awaitUninterruptibly()
}
if (ch != null) {
//ch.close()
}
bootstrap.releaseExternalResources()
}
}
object WebSocketClient {
def main(args : Array[String]) = {
val uri =
if (args.length > 0) new URI(args(0))
else new URI("ws://localhost:8080/websocket")
new WebSocketClient(uri).run()
}
}
WebSocketClientHandler.scala
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelUpstreamHandler
import org.jboss.netty.handler.codec.http.HttpResponse
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame
import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame
import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame
import org.jboss.netty.handler.codec.http.websocketx.WebSocketClientHandshaker
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame
import org.jboss.netty.util.CharsetUtil
class WebSocketClientHandler(handshaker : WebSocketClientHandshaker) extends SimpleChannelUpstreamHandler {
override def channelClosed(ctx : ChannelHandlerContext , e : ChannelStateEvent) = {
println("WebSocket Client disconnected!")
}
override def messageReceived(ctx : ChannelHandlerContext, e : MessageEvent) {
val ch = ctx.getChannel()
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, e.getMessage().asInstanceOf[HttpResponse])
println("WebSocket Client connected!")
return
}
if (e.getMessage().isInstanceOf[HttpResponse]) {
val response = e.getMessage().asInstanceOf[HttpResponse]
throw new Exception("Unexpected HttpResponse (status=" + response.getStatus() + ", content="
+ response.getContent().toString(CharsetUtil.UTF_8) + ')')
}
val frame = e.getMessage().asInstanceOf[WebSocketFrame]
if (frame.isInstanceOf[TextWebSocketFrame]) {
val textFrame = frame.asInstanceOf[TextWebSocketFrame]
println("WebSocket Client received message: " + textFrame.getText())
} else if (frame .isInstanceOf [PongWebSocketFrame]) {
println("WebSocket Client received pong")
} else if (frame .isInstanceOf [CloseWebSocketFrame]) {
println("WebSocket Client received closing")
ch.close()
} else if (frame .isInstanceOf [PingWebSocketFrame]) {
println("WebSocket Client received ping, response with pong")
ch.write(new PongWebSocketFrame(frame.getBinaryData()))
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) ={
val t = e.getCause()
t.printStackTrace()
e.getChannel().close()
}
}
build.sbt
name := "Netty Websocket Client Example for Scala"
libraryDependencies ++= Seq(
"io.netty" % "netty" % "3.9.5.Final"
)
view raw build.sbt hosted with ❤ by GitHub
 単純な構文リプレイスなのでScalaらしくはない。

0 件のコメント:

コメントを投稿