読者です 読者をやめる 読者になる 読者になる

sifue's blog

プログラマな二児の父の日常

JavaでノンブロッキングIOを使ったネットワークアプリを学ぶのに最適なNetty 3.5系のGetting Startedを日本語訳しました

Nettyと言えばJavaのノンブロッキングIOのAPIであるNIOをラップしたフレームワークとして、TwitterのFinagleなどで分散ネットワークアプリケーションシステムで使わていて高速で実績のあるライブラリとして有名ですが、ノンブロッキングIOでイベント駆動のサーバークライアントのネットワークアプリケーションを知るのに非常に良い題材ですので、素人翻訳ですがその日本語訳を公開することにしました。


ちなみにNettyがどれぐらいパフォーマンスに優れているのかというと、Herokuの仮想インスタンスを利用した実験の結果が参考になります。Scala(Finagle)がNettyの実装を利用したものになりますが、秒間6000リクエスト時の1dyno(APサーバー)の応答が秒間4000レスポンスで、C(Accept)、Java(Jetty)、Java(Tomcat)、Js(Node)、Python(Bottle)、Ruby(Sinatra)と比べて最も応答性がよく、接続エラーが少ないという結果が出ています。


つまり、Nettyは今ある有名なネットワークアプリを作るフレームワークの中でかなり良いパフォーマンスと安定性を持つ選択肢として考えることができると思います。
Nettyすごい。
NettyはTCP/IPを使った通信のフレームワークを自由に実装できますが、Codec framework、SSL/TLS、HTTP、WebSockets、Google Protocol Bufferなどに関しては予め実装があり、利用することができます。


といってもこういうものを実務で利用しなくては行けない方というのは少ないのではないかと思いますが、TCP/IPの通信ってこんな感じなんだ、Webサーバーの中身の実装ってこうなってるんだとか、リソース管理ってこうするんだというノウハウがこのGetting Startedを通じて学べますので非常に価値があると思います。


元のドキュメントは、
http://static.netty.io/3.5/guide/
となります。素人の意訳なのでご利用は自己責任でお願いします。


なお、ソースコードGitHub
https://github.com/sifue/nettystudy
にて公開しておりますので、動かす場合はチェックアウトして、mavenでビルドしてライブラリを落としてご利用下さい。

The Netty Project 3.x User Guide

高速なネットワークアプリケーション開発のための折り紙つきのアプローチ


はじめに

1. 問題
2. 解決法

1. 問題

今日、わたちたちは一般的に相互通信をさせるためにアプリケーションやライブラリを使います。例えばウェブサーバーから情報を取ったり、ウェブサーバーのリモートプロシージャを実行するのにいつもHTTPクライアントライブラリを利用します。


しかし、一般的なプロトコルやその実装はよくスケールしません。 それは巨大なファイルやeメール、ほぼリアルタイムの金融情報ややマルチプレイヤーのゲームにの通信にHTTPサーバーを使わないというようなことです。特別な目的を果たすための高く最適化されたプロトコルの実装には何が必要なのでしょうか。例えば、AJAXベースのチャットアプリや、メディアストリーミング、大きなファイルの転送にはHTTPサーバーは最適化されたHTTPサーバーの実装が必要になるでしょう。更に正確にあなたの要求を満たすならば、全く新しいプロトコルをデザイン、実装をしたいかもしれません。

2. 解決法

Nettyプロジェクトは、非同期のイベント駆動ネットワークアプリケーションフレームワークの提供に注力し、メンテナンス可能なハイパフォーマンスでハイスケーラビリティのサーバーとクライアントのプロトコルの高速な開発を整備します。


言い換えれば、Nettyは、高速に簡単に、プロトコルサーバークライアントのようなネットワークアプリを開発することを可能にするNIOクライアントサーバーフレームワークです。NettyはTCPUDPソケットサーバー開発のようなネットワークプログラミングを簡単にし、合理化します。


"高速で簡単"はメンテナンス性やパフォーマンスの問題に見舞われることを意味しているのではありません。Nettyは、FTPSMTP、HTTPや様々なバイナリやテキストベースの巨大な古いプロトコルのような多くのプロトコルの実装から得られた経験に基づき、とても注意深く設計されました。結果として、Nettyは簡単な開発、パフォーマンス、安定性、柔軟性を、妥協なしに達成する方法を見つけることに成功しました。


あるユーザーは、既に似たような利点を持つ他のネットワークアプリケーションフレームワークを既に見つけていたり、Nettyがそれらとどう違うのかが聞きたいでしょう。答は哲学的なところです(難訳)。NettyはあなたにAPIの内容とその実装という最も快適な経験をその日から与えるでしょう。ただそれは、あなたがこのガイドを読んで実際にNettyで遊んで得られる哲学のように、触れないものなのです。

Chapter 1. Getting Started

1. はじめる前に
2. ディスカードサーバーを書く
3. 受け取ったデータを見る
4. ECHOサーバーを書く
5. TIMEサーバーを書く
6. TIMEクライアントを書く
7. ストリームベースの通信を扱う
7.1. ソケットバッファーに関する一つの小さな警告
7.2. 第一解決策
7.3. 第二解決策
8. ChannelBufferの代わりのPOJOを使って通信する
9. アプリケーションをシャットダウンする
10. まとめ


この章は、あなたが素早くはじめるための簡単な例を用いたNettyの中心的構造のツアーです。この章の最後についた時、あなたはNetty上でクライアントとサーバーが書けるようになっているでしょう。


もしあなたが何かを学ぶのにトップダウンのアプローチの方が好きなら、Chapter 2, Architectural Overview を先に読んでそれから戻ってきてください。

1. はじめる前に

この章の例を動かすのに最低限のものが2つあります。NettyとJDK1.5以上です。Nettyの最新バージョンはプロジェクトのdownloadのページから利用できます。正しいバージョンのJDKに関しては、好きなJDKのベンダーのサイトを参照して下さい。


読む時に、この章で沢山の導入されたクラスについて沢山の疑問を持つでしょう。もしあなたがもっと知りたいとおもったのであれば、APIリファレンスを参照して下さい。このドキュメントの全てのクラス名はAPIリファレンスにリンクしています。同様に、Nettyプロジェクトのコミュニティに連絡を取るのをためらわないでください。不正確な情報、文法間違い、ドキュメント改善に関する良いアイディがあったら教えて下さい。

2. ディスカードサーバーを書く

最もシンプルなプロトコルはこの世界に置いてはハローワールドではなくDISCARDです。このプロトコルは、何かしらのデータを受け取ったら何かのレスポンスをすることなく、無視します。


DISCARDプロトコルを実装するためにはただ、受け取ったデータを無視することを考えればよいだけです。ではNettyによって生成されたIOイベントを操作をハンドラーの実装を始めましょう。

package org.soichiro.nettystudy.example.discard;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class DiscardServerHandler extends SimpleChannelHandler {

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
		e.getCause().printStackTrace();

		Channel ch = e.getChannel();
		ch.close();
	}
}

1. SimpleChannelHandlerを継承したDiscardServerHandlerはChannelHandlerの実装です。SimpleChannelHandlerはあなたがオーバーライドすべき様々なイベントハンドラメソッドを提供します。ここではハンドラを実装しなくても、SimpleChannelHandlerを継承するだけで十分です。
2. messageReceivedイベントハンドラメソッドをここではオーバーライドしています。このメソッドは、 MessageEventと共にコールされ、クライアントから受け取る新しいデータが含まれています。この例では、DISCARDプロトコルの実装のために、受け取ったデータは何もしないことによって無視します。
3. exceptionCaughtは IO例外がイベント処理の中で投げられた時に、ExceptionEventと共に呼ばれます。大抵の場合、キャッチされた例外はログにはかれ、関連するチャンネルをここで閉じます、例外のシチュエーションへの対応実装をどうしたいかは異なるにも関わらずです。例えば、あなたはコネクションが閉じる前にエラーコードとレスポンスメッセージ返したいだろう。


今のところ順調です。もうDISCARDサーバーの半分を実装しました。DiscardServerHandlerを使いサーバーをスタートさせるmainメソッドが残っています。

package org.soichiro.nettystudy.example.discard;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class DiscardServer {

  public static void main(String[] args) throws Exception {
      ChannelFactory factory =
          new NioServerSocketChannelFactory(
                  Executors.newCachedThreadPool(),
                  Executors.newCachedThreadPool());

      ServerBootstrap bootstrap = new ServerBootstrap(factory);

      bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
          public ChannelPipeline getPipeline() {
              return Channels.pipeline(new DiscardServerHandler());
          }
      });

      bootstrap.setOption("child.tcpNoDelay", true);
      bootstrap.setOption("child.keepAlive", true);

      bootstrap.bind(new InetSocketAddress(8080));
  }
}

4. ChannelFactoryは、 Channelとそれに関わるリソースを作るファクトリーです。全てのIOリクエストを処理し、ChannelEventの生成のためのIOを実行します。Nettyは様々なChannelFactoryの実装を提供しています。この例ではサーバー再度のアプリケーションを実装しています。それはつまり、NioServerSocketChannelFactoryが使われます。別なことを書くと、これ自身はIOスレッドを作りません。あなたがコンストラクタで設定したスレッドプールを使ってスレッドを作成します。そしてセキュリティーマネージャの用いられサーバーのようなアプリケーション実行環境でのスレッドがどのように管理されるべきかという制御をあなたに与えます。
5. ServerBootstrap はサーバーをセットアップするヘルパークラスです。あなたは、Channelを直接設定することができます。しかし、それは殆どの場合あなたに不必要な退屈なプロセスです。
6. ここでChannelPipelineFactoryを設定します。新しい接続がサーバーにアクセプトサれるときはいつでも、新しいChannelPipelineがChannelPipelineFactoryに特徴づけられて作成されます。この新しいパイプラインはDiscardServerHandlerを含みます。もし複雑化してきたときは、もっとハンドラをパイプラインに追加して、匿名クラスをトップレベルクラスに随時抽出しましょう。
7. Channelにパラメータを渡すこともできます。私たちはTCP/IPのサーバーを書いています。だから、tcpNoDelayとkeepAliveをソケットオプションとして設定できます。"child."とプレフィックスを付け加えていることについて書きます。これは、ServerSocketChannelのオプションの代わりに、Channelのオプションとして適用されます。ServerSocketChannelのオプションは以下のようにします。

bootstrap.setOption("reuseAddress", true);

8. 今行く準備ができました。あとは、ポートをバインドしてサーバーをスタートさせるだけです。ここにこのマシンのすべてのNICの8080にバインドしました。違うアドレスならば何度でもバインドすることができます。


おめでとうございます!これであなたはあなたの最初のサーバーをNetty上に構築し終えました。

3. 受け取ったデータを見る

丁度いまわたしたちの最初のサーバーを書きました。本当に動いているかどうか確かめなくてはいけません。最もテストする簡単な方法はtelnetコマンドを使うことです。例えば、"telnet localhost 8080"とコマンドラインで入力して、さらに何かタイプしてみてください。(MacLinuxならすぐtelnetが使えます。Windowsはバージョンによって準備が必要かもしれません...)


しかしながら、サーバーはうまく動いているといっていいのでしょうか?ディスカードサーバーであるためわかりません。何のレスポンスも得られないでしょう。本当にうまく行っているか検査するために、何か受け取ったらプリントしてみるように修正しましょう。


私たちは、既にイベントを受け取るとMessageEventが生成され、messageReceivedが呼び出されることを知っています。ちょっとしたコードを、DiscardServerHandlerのmessageReceivedメソッドに書きましょう。

	 @Override
	 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
	     ChannelBuffer buf = (ChannelBuffer) e.getMessage();
	     while(buf.readable()) {
	         System.out.println((char) buf.readByte());
	         System.out.flush();
	     }
	 }

9. ソケット通信におけるメッセージの型が常にChannelBufferだと推定するのは安全です。ChannelBufferはNettyにおいてバイトシーケンスを保存する基本的なデータ構造です。これは、NIOのByteBufferに似ています。しかし、より簡単で柔軟性があります。例えば、Nettyは不要なメモリコピーを減らしたChannelBufferを複数組み合わせたChannelBufferなどを作ることもできます。
多くの部分がByteBufferに似ているにていますが、APIリファレンスを参照することを強く薦めます。ChannelBufferを正確に学ぶことは難なくNettyを使うために致命的に重要なステップです。


telnetのコマンドをもう一度叩いてみて下さい。サーバーが受け取ったものがprintされるのがわかります。
DISCOARDサーバーの全ソースは、配布物のorg.jboss.netty.example.discardの中にあります。

4. ECHOサーバーを書く

ここまでで、全くレスポンスはせずデータを消費するだけでした。しかしながら、サーバーは通常リクエストに反応します。次は与えられたデータを返すECHOプロトコルを実装して、メッセージをクライアントに返す実装の仕方を学びましょう。
DISCARDサーバーとの唯一の違いは、受け取ったデータをコンソールに出す代わりにクライアントに送り返すところです。つまり、messageReceivedを以下のように修正し直すだけで十分です。

	 @Override
	 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
	     Channel ch = e.getChannel();
	     ch.write(e.getMessage());
	 }

10. ChannelEventオブジェクトは関連するChannelへの参照をもっています。つまり、Channelは受け取ったMessageEventの接続を表しています。Channelを取得しwriteメソッドでリモートの相手に対して何かを戻すことができます。


telnetコマンドをもう一度実行してみましょう。何を送っても返してくれると思います。
この全体のソースコードは、配布物のorg.jboss.netty.example.echoに含まれています。

5. TIMEサーバーを書く

この項ではTIMEプロトコルを実装します。前の例と違うのは、何も受け取らず32bitの整数を含むメッセージを送り、送った後にすぐに切断するところです。この例では、メッセージの構築の仕方と完全にコネクションを閉じることを学びます。
どんなデータを受け取っても無視するためコネクションが確立した瞬間にメッセージを送ります。この度はmessageReceivedは使えません。代わりに、channelConnectedメソッドを使います。以下のような実装になります。

package org.soichiro.nettystudy.example.time;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class TimeServerHandler extends SimpleChannelHandler {

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
        Channel ch = e.getChannel();
        
        ChannelBuffer time = ChannelBuffers.buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        ChannelFuture f = ch.write(time);
        
        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                Channel ch = future.getChannel();
                ch.close();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();
        e.getChannel().close();
    }
}

11. 説明すると、channelConnectedメソッドは接続が確立した時に呼ばれます。そして、現在時間の秒数を表す32bitのintegerを書きます。
12. 新しいメッセージを送るため、メッセージを含む新しいバッファが必要です。32bitのintegerを書くので、ChannelBufferのキャパシティは4バイトです(4 x 8bit)。ChannelBuffersは新しいバッファを確保するためのヘルパークラスです。bufferメソッドに加え、多くの便利な関連するメソッドを持っています。もっと知るために、APIリファレンスを参照して下さい。
一方で、ChannelBuffersをstaticインポートすると、以下のように書けます。

 import static org.jboss.netty.buffer.ChannelBuffers.*;
 ...
 ChannelBuffer  dynamicBuf = dynamicBuffer(256);
 ChannelBuffer ordinaryBuf = buffer(1024);

13. 普段は、構築されたメッセージを書きますがちょっとまってください、filpメソッドはどこでしょうか?私たちは、NIOにメッセージを送る際にByteBuffer.flip()を読んでいません。ChannelBufferはそのようなメソッドをもっていません。なぜならそこには2つのポインタしかないからです。一つは読む操作のためのもの、もうひとつは書く操作のためのものです。(ちなみにNIOのByteBufferのposition, limit, capacityの話は、http://kimama2index.info/coolJava/channel/buffer.html の日本語の資料が参考になる) writer indexはChannelBufferに書き込みを行った場合に増加します。その場合reader indexは増加しない。reader indexとwriter indexはそれぞれ、メッセージの最初と終わりをそれぞえ表しています。
対照的に、NIO のバッファはflipメソッドを除いてメッセージの最初と終わりを表す明確な方法を提供していません。flipし忘れて不正なデータを送ったり送れなかったりするトラブルに見舞わるでしょう。Nettyの場合そんなエラーは発生しません。なぜなら、操作の異なる違うポインターを用意しているためです。あなたはflipをしない生活がとても簡単だということにきづくでしょう。
writeメソッドが返すChannelFutureの別な点を記述します。ChannelFutureはまだ起こっていないIOの操作を表すものです。つまり、まだ実行されていないリクエストです。なぜならば、Nettyの全ての操作は非同期だからです。例えば、以下のコードではメッセージを送るまえに 接続を閉じてしまいます。

Channel ch = ...;
ch.write(message);
ch.close();

つまり、closeメソッドは、ChannelFutureでwriteメソッドの結果が返されてその結果が通知された後実行される必要があります。同様にclose処理もすぐに行われるわけではなく、ChannelFutureを返します。
14. どのようにwriteリクエスト終了したかをきづくのでしょうか。これはシンプルでChannelFutureListenerをChannelFutureに追加することで出来ます。私たちは、オペレーションが行われたあとで、Channelを閉じる非同期のChannelFutureListenerを追加しました。
別に言い換えれば、あなたは元々定義されたリスナーを使って以下のようにシンプルにも書けます。

f.addListener(ChannelFutureListener.CLOSE);

このサーバーが正しく動いているかテストするために、UNIXのrdateコマンドを使えます。

$ rdate -o <port> -p <host>

ポートはmainメソッドで設定されたもので、hostはいつもどおりlocalhostです。(Macにはrdateコマンドを入れる方法がありませんでした、残念。次にTIMEクライアントを書くのでそこで検証します。)


なおTimeServerの実装は、例が配布物に含まれていませんが以下のとおり。

package org.soichiro.nettystudy.example.time;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class TimeServer {

  public static void main(String[] args) throws Exception {
      ChannelFactory factory =
          new NioServerSocketChannelFactory(
                  Executors.newCachedThreadPool(),
                  Executors.newCachedThreadPool());

      ServerBootstrap bootstrap = new ServerBootstrap(factory);

      bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
          public ChannelPipeline getPipeline() {
              return Channels.pipeline(new TimeServerHandler());
          }
      });

      bootstrap.setOption("child.tcpNoDelay", true);
      bootstrap.setOption("child.keepAlive", true);

      bootstrap.bind(new InetSocketAddress(8080));
  }
}
6. TIMEクライアントを書く

DISCARDやECHOサーバーと異なり、TIMEプロトコルはクライアントが必要です。なぜなら、32bitのバイナリデータをカレンダー上の日付に人間は訳せないからです。
この項では、どうやってサーバーが正しく動いているか確かめる方法を議論し、Nettyでクライアントを書きます。


Nettyにおけるサーバーとクライアントの最も大きい唯一の違いは、違うBootstrapとChannelFactoryが必要なことです。以下のコードを見て下さい。

package org.soichiro.nettystudy.example.time;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

public class TimeClient {

    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = Integer.parseInt("8080");

        ChannelFactory factory =
            new NioClientSocketChannelFactory(
                    Executors.newCachedThreadPool(),
                    Executors.newCachedThreadPool());

        ClientBootstrap bootstrap = new ClientBootstrap(factory);

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(new TimeClientHandler());
            }
        });
        
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", true);

        bootstrap.connect(new InetSocketAddress(host, port));
    }
}

15. NioServerSocketChannelFactoryの代わりにNioClientSocketChannelFactoryをクライアントサイドのChannelを作るために使います
16. ClientBootstrapがServerBootstrapのクライアントサイドに相当するものです。
17. child.というプレフィックスは不要です。SocketChannelはクライアントサイドでは親を持っていません。
18. bindメソッドの代わりにconnectメソッドを使う必要があります。


見てみるとサーバーサイドのスタートアップとの違いは内容に見えます。ChannelHandlerの方はどうでしょうか?32bitのintegerをサーバーから受け取り、人間が読めるフォーマットに変換し、出力して接続を閉じなくてはいけません。

package org.soichiro.nettystudy.example.time;

import java.util.Date;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class TimeClientHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        ChannelBuffer buf = (ChannelBuffer) e.getMessage();
        long currentTimeMillis = buf.readInt() * 1000L;
        System.out.println(new Date(currentTimeMillis));
        e.getChannel().close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();
        e.getChannel().close();
    }
}

とてもシンプルでサーバーサイドの例とは違って見えると思います。しかしながら、このハンドラはIndexOutOfBoundsExceptionを生じて動かないでしょう。なぜこのようなことが起きたのかは次の項で議論します。

7. ストリームベースの通信を扱う
7.1. ソケットバッファーに関する一つの小さな警告

TCP/IPのようなストリームベースの通信では、受け取られたデータはソケットのrecive bufferで受け取られる。不幸なことに、ストリームペースの通信のバッファは、パケットのqueueではなく、byteのqueueとなっています。これは、2つのメッセージを異なる独立したパケットとして送った場合、オペレーションシステムはこれらを2つのメッセージとしては扱わず、ただバイトの束として扱うということです。つまり、リモートで書かれたものが正確に読まれるという保証がないのです。例えば、OSのTCP/IPが3つのパケットを受け取ったと想定します。

                                    • +
ABC DEF GHI
                                    • +

ストリームベースのプロトコルの一般的な性質のため、あなたのアプリケーション上では以下の様な断片で受け取る可能性が高い。

                                          • +
AB CDEFG H I
                                          • +

つまり、サーバーサイドかクライアントサイドかによらず受け取りの部分では、受け取った一つまたは複数のフレームを結合し、アプリケーションのロジックが簡単に理解できるようにしなくてはならない。上の例で言うならば、データは以下のようにフレーム分けされないといけません。

                                    • +
ABC DEF GHI
                                    • +
7.2. 第一解決策

TIMEクライアントの例に戻ろう。わたちたちは同じ問題に面しています。32bitのintegerはとても小さなデータです。そして、よく断片化しそうな幹事ではない。しかし、実際には断片化しています。断片化の可能性はトラフィックが増えるほど増す。
最も簡単な解決案は、内部的な累積バッファを作り、全ての4バイトを内部バッファないに受け取るまで待つことです。以下は、そのようにしてTimeClientHandlerを実装して問題を修正した例です。

package org.soichiro.nettystudy.example.time;

import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;

import java.util.Date;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
 
 public class TimeClientHandler extends SimpleChannelHandler {
 
     private final ChannelBuffer buf = dynamicBuffer();
 
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
         ChannelBuffer m = (ChannelBuffer) e.getMessage();
         buf.writeBytes(m);
         
         if (buf.readableBytes() >= 4) {
             long currentTimeMillis = buf.readInt() * 1000L;
             System.out.println(new Date(currentTimeMillis));
             e.getChannel().close();
         }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
         e.getCause().printStackTrace();
         e.getChannel().close();
     }
 }

19. 必要に応じて容量を増加させるChannelBufferは動的バッファであります。メッセージの長さがわからない時にはとても便利であります。
20. 最初に、全てのデータがbufの中に蓄積されます。
21. そして、bufの長さが十分かどうかチェックし、この例では4バイトに達した時に実施のビジネスロジックを実行します。一方で、もしもっとデータが到着した場合、NettyはmessageReceivedをもう一度呼び出し、最終的に4バイトずつ累積します。

7.3. 第二解決策

第一解決策でTIMEクライアントの問題を解決したが、修正したハンドラはクリーンには見えない。もっと複雑なプロトコルを想像しよう。様々なフィールドを複数持つようなものです。あなたのChannelHandlerはメンテナンス不可だが速いというものになるだろう。
あなたが多くに気を払う時、あなたは一つ以上のChannelHandlerをChannelPipelineに足すことができる。つまり、あなたのアプリケーションの複雑性を減らすために複数のモジュールに巨大なひとつのChannelHandlerを分割することができる。例えば、TimeClientHandlerは2つのハンドラに分割できる。

  • 断片化問題を扱うTimeDecoder
  • 最初のシンプルなTimeClientHandler

幸運なことに、Nettyは最初の物を箱の外に書くような拡張クラスを提供しています。

package org.soichiro.nettystudy.example.time;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

public class TimeDecoder extends FrameDecoder{
	 
    @Override
    protected Object decode(
            ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
            
        if (buffer.readableBytes() < 4) {
            return null; 
        }
        
        return buffer.readBytes(4);
    }
}

22. FrameDecoderはおの断片化問題を簡単に扱うChannelHandlerの実装です。
23. FrameDecoderはdecodeメソッドをメンテナンスされた累積バッファを用いて呼ぶ。
24. nullが返った場合、まだデータが十分ではないということです。FrameDecoderは十分の量のデータが来た時にもう一度呼ばれる。
25. もしnullではないデータが帰った場合、decodeメソッドが成功したということであります。FrameDecoderは読まれた部分を内部バッファから破棄します。複数のメッセージをデコードする必要が無いことを思い出して欲しい。FrameDecoderはdecorderメソッドをnullの間呼び続ける。

なお、TimeClientのパイプラインの部分は以下の様な実装となる。

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(
                		new TimeDecoder(),
                		new TimeClientHandler());
            }
        });


もしあなたが一歩進んだ人なら、decorderをもっとシンプルにするReplayingDecoderを試してみたいだろう。もしもっと情報が欲しい場合にはAPIリファレンスをもっと読む必要があります。

package org.soichiro.nettystudy.example.time;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
import org.jboss.netty.handler.codec.replay.VoidEnum;

public class TimeDecoder extends ReplayingDecoder<VoidEnum> {
	 
    @Override
    protected Object decode(
            ChannelHandlerContext ctx, Channel channel,
            ChannelBuffer buffer, VoidEnum state) {
            
        return buffer.readBytes(4);
    }
}

加えて、Nettyはもっとプロトコルを簡単にし、巨大なメンテナンスできないハンドラの実装を避けることを助けるout-of-the-boxのデコーダーを提供します。以下のパッケージの例を参照して欲しい。

8. ChannelBufferの代わりのPOJOを使って通信する

いま見てきた全ての例において、プロトコルのメッセージとしてずっとChannelBufferが使われてきました。この項では、TIMEプロトコルクライアント、サーバーをChannelBufferの代わりにPOJO(Plain Old Java Object)を使うように改善します。
POJOをあなたのChannelHandlerで使う利点は、明白性にあります。あなたのハンドラはよりメンテナンスできるようになりChannelBufferから情報を抽出することでコードが分割されて再利用性が高まります。このTIMEクライアント、サーバーの例では、32bitのintegerのみを読むので、ChannelBufferを直接読む問題はありません。しかし、実装と現地つ世界のプロトコルを分割することが重要なことを理解してもらいます。
まず最初に、UnixTypeという型を定義しましょう。

package org.soichiro.nettystudy.example.time;

import java.util.Date;

public class UnixTime {
	private final int value;

	public UnixTime(int value) {
		this.value = value;
	}

	public int getValue() {
		return value;
	}

	@Override
	public String toString() {
		return new Date(value * 1000L).toString();
	}
}

TimeDecoderをUnixTImeをChannelBufferの代わりに返すように改定します。

	 @Override
	 protected Object decode(
	         ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
	     if (buffer.readableBytes() < 4) {
	         return null;
	     }
	 
	     return new UnixTime(buffer.readInt());
	 }

26. FrameDecoderとReplayingDecoderはどんな型のオブジェクトでも返せるようになっています。もしChannelBufferだけを返すように制限したい場合は、ChannelBufferをUnixTimeに変更する別なChannelHandlerを追加しなくてはいけません。


このdecoderの更新で、TimeClientHandlerもうChannelBufferを使えなくなりました。

	 @Override
	 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
	     UnixTime m = (UnixTime) e.getMessage();
	     System.out.println(m);
	     e.getChannel().close();
	 }

よりシンプルでエレガントではないでしょうか?同じテクニックをサーバーサイドにも適用します。まずはTimeServerHandler。

	 @Override
	 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
	     UnixTime time = new UnixTime((int)System.currentTimeMillis() / 1000);
	     ChannelFuture f = e.getChannel().write(time);
	     f.addListener(ChannelFutureListener.CLOSE);
	 }

今、かけている要素としてはエンコーダががあるが、それはUnixTimeをChannelBufferに翻訳するChannelHandlerの実装です。パケットの断片化の問題とアッセンブルのに配慮する必要がないためとてもシンプルに書ける。

package org.soichiro.nettystudy.example.time;

import static org.jboss.netty.buffer.ChannelBuffers.buffer;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class TimeEncoder extends SimpleChannelHandler {

    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
        UnixTime time = (UnixTime) e.getMessage();
        
        ChannelBuffer buf = buffer(4);
        buf.writeInt(time.getValue());
        
        Channels.write(ctx, e.getFuture(), buf);
    }
}

27. エンコーダーは書き込みリクエストに割り込むためにwriteRequestedメソッドをオーバーライドします。MessageEventパラメーターに注目してください。messageReceivedで設定されたものと同じタイプですが、異なる割り込みをします。ChannelEventはアップストリームかダウンストリームかのどちらかのイベントで、イベントの流れに依存します。例えば、MessageEventは、messageReceivedで呼ばれるときはアップストリームでり、writeRequestedで呼ばれるときはダウンストリームとなります。アップストリームイベントとダウンストリームイベントの差をもっと知りたい場合は、リファレンスを参照下さい。
28. POJOをChannelBufferに一度変換したら、新しいバッファを前のChannelDownstreamHandlerにChannelPipelineで転送しなくてはいけません。ChannelsはChannelEventを生成したり送ったりする様々なヘルパーメソッドを提供しています。この例では、Channels.write(...) メソッドは新しいMessageEventが作られ、ChannelPipelineの中でそれを前のChannelDownstreamHandlerに送っています。
一方で、Channelsのstaticインポートを使うとこうも書けます。

import static org.jboss.netty.channel.Channels.*;
...
ChannelPipeline pipeline = pipeline();
write(ctx, e.getFuture(), buf);
fireChannelDisconnected(ctx);

最後のタスクはTimeEndocerをサーバーサイドのChannelPipelineに挿入し、あとは取るに足らないエクササイズが残るのみとなります。

   bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
          public ChannelPipeline getPipeline() {
              return Channels.pipeline(new TimeServerHandler(),
            		  new TimeEncoder());
          }
      });
9. アプリケーションをシャットダウンする

TimeClientを起動すると、アプリケーションが何もせずに起動し続けていることにきづくだろう。全てのスタックトレースから見ると、2つのIOのスレッドが動き続けていることわかる。これらのIOスレッドをシャットダウンし、上品にアプリケーションを終了させるには、ChannelFactoryのリソースを開放する必要があります。
ネットワークアプリケーションの形式的なシャットダウンのプロセスは、以下の3つで構成されます。

  1. なんであれ、全部サーバーのソケットを閉じる
  2. なんであれ、サーバーではない全部のソケットを閉じる(クライアントソケットやアクセプトソケットなど)
  3. ChannelFactoryが持つ全てのリソースを開放する


この3つのステップをTimeClientに適用すると、TimeClient.main() は自身を上品にシャットダウンするため、唯一のクライアントコネクションを閉じて、ChannelFactoryの全リソースを開放します。

package org.soichiro.nettystudy.example.time;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

public class TimeClient {

    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = Integer.parseInt("8080");

        ChannelFactory factory =
            new NioClientSocketChannelFactory(
                    Executors.newCachedThreadPool(),
                    Executors.newCachedThreadPool());

        ClientBootstrap bootstrap = new ClientBootstrap(factory);

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(
                		new TimeDecoder(),
                		new TimeClientHandler());
            }
        });
        
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", true);

        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
        future.awaitUninterruptibly();
        if (!future.isSuccess()) {
            future.getCause().printStackTrace();
        }
        future.getChannel().getCloseFuture().awaitUninterruptibly();
        factory.releaseExternalResources();

    }
}

29. ClientBootstrapのconnectメソッドはChannelFutureを返す。これは接続が成功または失敗した場合に通知します。同様に接続に関連するChannelへの参照を持つ。
30. ChannelFutureは、接続が成功したかしてないかが判定されるまでを待ちます。
31. もし失敗した場合、なぜ失敗したかをプリントします。ChannelFutureのgetCause()メソッドは接続が成功もキャンセルもサれなかった場合、どのような失敗が起こったのかを返す。
32. 接続への試みが終了した場合、ChannelのcloseFutreで接続が閉じられるのを待つ必要があります。全てのChannelは自身のcloseFutreを持っており、閉じられた時に実行され通知を行う。
たとえ接続への試みが失敗してもcloseFutreは呼ばれる。なぜなら、Channelは接続が失敗したら自動的に閉じられるためであります。
33. 全てのコネクションはこの時点で閉じられています。残ったタスクはChannelFactoryの全てのリソースを開放することであります。やり方は、releaseExternalResources()メソッドをよぶだけであります。NIO Selectorsやスレッドプールを含む全てのリソースがシャットダウンし、自動的に終わらせられる。


クライアントのシャットダウンはとても簡単です。しかし、サーバーはどうだろうか?まずポートをアンバインドし、全部のacceptedコネクションを閉じる。これをやるために、アクティブな接続のリストのトラックを維持するデータ構造が必要になるだろう。そして、これは平凡な仕事ではない。幸運なことにそれには解決策があります。ChannelGroupです。


ChannelGroupはJava Collections APIの特別な拡張です。これは開いているChannelのセットを表す。もしChannelがChannelGroupに加えられた場合、そして、Channelが閉じられた場合、自動的にChannelGroupから取り除かれる。あなたはただ同じグループの全てのチャンネルに一つの操作を実行すれば良い。例えば、サーバーにおいて全てのChannelGroupのChannelを閉じれば良い。


開いたソケットのトラックを維持するために、TimeServerHandlerでChannelが開いた際にグローバルのChannelGroupにChannelを追加しよう。TimeServer.allChannelsだと

 @Override
 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
     TimeServer.allChannels.add(e.getChannel());
 }

34. もちろんChannelGroupはスレッドセーフです。

今全てのアクティブなチャンネルは自動的にメンテナンスされています。サーバーをシャットダウンするは、クライアントをシャットダウンするのと同じように簡単です。

package org.soichiro.nettystudy.example.time;

import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class TimeServer {

  final static ChannelGroup allChannels = new DefaultChannelGroup("time-server");
  
  public static void main(String[] args) throws Exception {
	  
      ChannelFactory factory =
          new NioServerSocketChannelFactory(
                  Executors.newCachedThreadPool(),
                  Executors.newCachedThreadPool());

      ServerBootstrap bootstrap = new ServerBootstrap(factory);

      bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
          public ChannelPipeline getPipeline() {
              return Channels.pipeline(new TimeServerHandler(),
            		  new TimeEncoder());
          }
      });

      bootstrap.setOption("child.tcpNoDelay", true);
      bootstrap.setOption("child.keepAlive", true);

      Channel channel = bootstrap.bind(new InetSocketAddress(8080));
      allChannels.add(channel);
      waitForShutdownCommand();
      ChannelGroupFuture future = allChannels.close();
      future.awaitUninterruptibly();
      factory.releaseExternalResources();
  }
  
  private static final CountDownLatch shutdownSignal = new CountDownLatch(1);
  private static void waitForShutdownCommand() {
	  Thread shutdown = new Thread() {
	      public void run() { 
	    	  shutdownSignal.countDown();
	      }
	  };
	  Runtime.getRuntime().addShutdownHook(shutdown);
	  try {
		shutdownSignal.await();
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
  }
}

35. DefaultChannelGroupはグループ名を引数として必要とします。グループ名は単独で他のグループと区別が付く名前です。
36. ServerBootstrapのbindメソッドは、ローカルアドレスが設定されたChannelを返します。close()メソッドが呼ばれるとChannelはローカルアドレスをアンバインドします。
37. サーバーサイド、クライアントサイド、アクセプテッドに限らずどんなタイプのChannelもChannelGroupに追加することができます。つまりアクセプテッドなチャンネルを一回で、Channel領域まるまるサーバーをしゃっとだうんですることができます。
38. waitForShutdownCommand()はサーバーのシャットダウンを待つ仮想のメソッドです。特権的なクライアントのからのメッセージやJVMのシャットダウンをフックするように実装してください。
39. 同じChannelGroupに置いてすべてのチャンネルが同じオペレーションが実行されます。この場合、全てのチャンネルを閉じます。これは、サーバーサイドのチャンネル領域を意味し、これらはアンバウンドされて全てのアクセプテッドなコネクションは非同期的に閉じられます。全てのコネクションが閉じたれたことを通知するため、ChannelGroupもChannelFutureを返します。

10. まとめ

このチャプターでは、ネットワーク上で動くNetty上のアプリケーションを書くデモン捨てレーションを交えてNettyのクイックツアーを行いました。
Nettyに関する詳細は、今後の章で紹介します。あと、 org.jboss.netty.example package以下にある他の例も見て見ることをおすすめします。
あと、Nettyのコミュニティはいつも、あなたの質問とあなたを助ける質問を待ち、それらのフィードバックを得てNettyを改良し続けていることを知っていて下さい。



以上となります。お疲れ様でした。