Erlang WebサーバCowboy - WebSocket編

2014年10月8日

Cowboy Erlang WebSocket

WebSocketとは

WebSocket(ウェブソケット)は、コンピュータ・ネットワーク用の通信規格の1つです。インターネットの標準化団体であるW3CとIETFがウェブサーバーとウェブブラウザとの間の通信のために規定を予定している双方向通信用の技術規格であり、APIはW3Cが、WebSocket プロトコルはIETFが策定に関与している。プロトコルの仕様は RFC 6455。TCP上で動く。

WebSocketを使うことで、サーバがリクエストをされなくても、クライアントにデータを送信することができます。

目的

Twitterホームの様なアプリケーションを作る。

動作としては、新しいメッセージが投稿されたら、ページを見てるクライアントはWebSocket経由で自動的にそのメッセージが表示される。

利用するコンポネントは。

また、わかりやすい様にJiffyのJSONエンコードにマップを利用するので、Erlang 17以上が必要となります。

メッセージウォール

早速、Cowboyアプリケーションのフォルダーを作成してみましょう。

mkdir message_wall && cd message_wall

erlang.mkをダウンロードします。

wget https://raw.githubusercontent.com/ninenines/erlang.mk/master/erlang.mk

アプリケーションベースファイルを生成する。

make -f erlang.mk bootstrap

このコマンドで下記のファイルが生成されました。

├── Makefile
└── src
    ├── message_wall.app.src
    ├── message_wall_app.erl
    └── message_wall_sup.erl

Cowboy、JiffyとGprocをプロジェクトに追加するため、Makefileを下記のように変更。

PROJECT = hello_erlang
DEPS = cowboy jiffy gproc
dep_gproc = git https://github.com/uwiger/gproc master
include erlang.mk

Gprocはerlang.mkのパッケージに登録されていないため、dep_gprocでGprocのリポジトリを指定します。

アプリケーションリソースファイルsrc/message_wall.app.srcにCowboy、JiffyとGprocの依存を追加します。

{application, message_wall, [
  {description, ""},
    {vsn, "0.1.0"},
    {modules, []},
    {registered, []},
    {applications, [
      kernel,
      stdlib,
      cowboy,
      jiffy,
      gproc
    ]},
    {mod, {message_wall_app, []}},
    {env, []}
]}.

makeコマンドでCowboy等の依存パッケージを一括にダウンロードし、コンパイルします。

make

アプリケーションのメインソースファイルであるsrc/message_wall_app.erlにCowboyを初期化します。

-module(message_wall_app).
-behaviour(application).

-export([start/2]).
-export([stop/1]).

start(_Type, _Args) ->
  % ETSテーブルを初期化
  ets:new(message_wall, [ordered_set, named_table, public]),
  % ルート宣言
  Dispatch = cowboy_router:compile([
    {'_', [
       % cowboy_staticはパスマッチに対して、静的ファイルを読み込む
       % index.htmlを読み込む
       {"/", cowboy_static, {priv_file, message_wall, "index.html"}},
       % /websocketのリクエストをws_handlerに渡す
       {"/websocket", message_wall_handler, []}
      ]}
   ]),
  % Cowboyを起動
  {ok, _} = cowboy:start_http(http, 100, 
      [{port, 8001}],
      [
       {env, [{dispatch, Dispatch}]}
      ]),
  message_wall_sup:start_link().

stop(_State) ->
  ok.

解説:

重要なポイントはETSテーブルの初期化です、メッセージはアプリケーションプロセスのETSテーブルで保存します。
ETSテーブルは軽く、簡単に使えるので今回のデモアプリケーションに最適です。

一つ注意点はETSがメモリ内のテーブルのため、message_wall_appプロセスが終了する際にテーブル内のデータは消えます。
ETSをファイルシステムで保存するDETSもあります、重要なデータの場合はETSを使わない方がいいです。

Erlangとデータベース

Erlangと一緒にデータベースが付いています。

機能と使い方がそれぞれ分かれていますので、興味がある方はぜひ「Learn you some Erlang for great good!」のBears, ETS, BeetsMnesiaと記憶術を読みください。

外部データベースへのドライバーもあります。

フロントエンド - HTML[frontend-html]

一番イメージが湧きやすいフロントエンドページを先に作ります。

フロントエンドはシンプルに

のみとします。

フロントエンドで利用するライブラリは以下です。

{priv_file, message_wall, "index.html"}をアプリケーションに登録しましたので、 インデックスファイルはpriv/index.htmlとなります。

privフォルダーがまだ存在しないため、mkdir priv等のコマンドで作成します。

bodyタグ内はbootstrapのベーシックテンプレートをベースに、下記のソースにします。

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>メッセージウォール(WebSocketデモ)</title>
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.2.0/css/bootstrap.min.css">
<!-- JQuery -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.1/jquery.min.js"></script>
<!-- bootstrap -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.2.0/js/bootstrap.min.js"></script>
<!-- ua-parser.js -->
<script src="http://cdn.jsdelivr.net/ua-parser.js/0.7.0/ua-parser.min.js"></script>
<!-- moment.js -->
<script src="http://cdnjs.cloudflare.com/ajax/libs/moment.js/2.8.3/moment.min.js"></script>
<script>
// WebSocket Javascriptはここに入ります。
</script>
<!-- HTML5 Shim and Respond.js IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- ナビゲーション -->
<div class="navbar navbar-inverse" role="navigation">
<div class="container">
<div class="navbar-header">
<a class="navbar-brand" href="/">メッセージウォール</a>
</div>
</div>
</div>

<div class="container">

<!-- 投稿エリア -->
<p><textarea class="form-control" id="message" rows="3"></textarea></p>
<p class="text-right"><button type="submit" class="btn btn-default" id="send_message">送信</button></p>
<hr />

<!-- メッセージはここに入ります -->
<div id="wall">
</div>

<!-- メッセージのテンプレート -->
<div id="message_template">
<div class="list-group list-group-item">
<h4 class="list-group-item-heading heading"><span class="date">date</span> <span class="ua">UA</span> <span class="ip">IP</span></h4>
<p class="list-group-item-text message">Message</p>
</div>
</div>

</div><!-- /.container -->

</body>
</html>

WebSocket関連の処理はJavascriptで行いますが、フロントエンドの実装の前に、ハンドラーを作ります。

ハンドラー

erlang.mkのテンプレートシステムを利用して、ハンドラーのベースコードを生成します。

make new t=cowboy_ws n=message_wall_handler

このコマンドでsrc/message_wall_handler.erlが作成されます。

ソースは下記のようになります。

-module(message_wall_handler).
-behaviour(cowboy_websocket_handler).

-export([init/3]).
-export([websocket_init/3]).
-export([websocket_handle/3]).
-export([websocket_info/3]).
-export([websocket_terminate/3]).

-record(state, {
}).

init(_, _, _) ->
	{upgrade, protocol, cowboy_websocket}.

websocket_init(_, Req, _Opts) ->
	Req2 = cowboy_req:compact(Req),
	{ok, Req2, #state{}}.

websocket_handle({text, Data}, Req, State) ->
	{reply, {text, Data}, Req, State};
websocket_handle({binary, Data}, Req, State) ->
	{reply, {binary, Data}, Req, State};
websocket_handle(_Frame, Req, State) ->
	{ok, Req, State}.

websocket_info(_Info, Req, State) ->
	{ok, Req, State}.

websocket_terminate(_Reason, _Req, _State) ->
	ok.

解説:

今回クライアントからWebSocketメッセージは2種類です。

get_listメッセージ

ハンドラーはget_listメッセージを受けた際にETSテーブルに保存されているメッセージの最新の10通を返します。 クライアントからのメッセージとなりますのでwebsocket_handle関数を利用します。

% get_listメッセージの場合はメッセージのリストを返します
websocket_handle({text, <<"get_list">>}, Req, State) ->
  % 最新のメッセージを取得する
  RawMessages = get_recent_messages(10),
  % メッセージをJiffyが変換できる形式に変更
  Messages = format_messages(RawMessages),
  % JiffyでJsonレスポンスを生成
  JsonResponse = jiffy:encode(#{
    <<"type">> => <<"message_list">>,
    <<"messages">> => Messages
  }),
  % JSONを返す
  {reply, {text, JsonResponse}, Req, State};

解説:

get_recent_messages関数

% 最新のNumberメッセージを取得する
get_recent_messages(Number) ->
  case ets:last(?TABLE) of
    '$end_of_table' -> [];
    Key -> get_recent_messages(Key, Number, [])
  end.

get_recent_messages(_Key, 0, Messages) -> lists:reverse(Messages);
get_recent_messages('$end_of_table', _Number, Messages) -> lists:reverse(Messages);
get_recent_messages(Key, Number, Messages) ->
  Message = ets:lookup(?TABLE, Key),
  PreviousKey = ets:prev(?TABLE, Key),
  get_recent_messages(PreviousKey, Number-1, [Message|Messages]).

解説:

% ETS結果メッセージをJiffyが変換できる形式に変更
format_message([{Time, Message, Ip, Ua}]) ->
  #{
    <<"date">> => unicode:characters_to_binary(iso8601(Time)),
    <<"ip">>   => unicode:characters_to_binary(format_ip(Ip)),
    <<"text">> => Message,
    <<"ua">>   => Ua
  }.

% IPタプルを文字列に変換
format_ip({I1,I2,I3,I4}) ->
  io_lib:format("~w.~w.~w.~w",[I1,I2,I3,I4]);
format_ip(Ip) -> Ip.

% erlangのdatetimeをISO8601形式に変換
iso8601(Time) ->
  {{Year, Month, Day},{Hour, Minut, Second}} = calendar:now_to_universal_time(Time),
  io_lib:format("~4..0B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ", [Year, Month, Day, Hour, Minut, Second]). 

解説:

% テーブル名をマクロで定義
-define(TABLE, message_wall).

一般メッセージ

get_list以外の場合はメッセージをETSテーブルに保存し、 接続中のクライアントにそのメッセージをプッシュします。

処理の流れ。

  1. クライアントからget_list以外のメッセージリクエストを受けます。
  2. メッセージをETSテーブルに保存。
  3. gprocnew_messageイベントを送ります。
  4. gprocに登録されているプロセスにnew_messageメッセージを送ります。
  5. プロセスがWebSocket経由で新しいメッセージをクライアントにプッシュします。
% get_list以外のメッセージは保存する
websocket_handle({text, Text}, Req, #state{ua=Ua, ip=Ip} = State) ->
  Time = now(),
  Message = {Time, Text, Ip, Ua},
  save_message(Message),
  % gprocにイベントを公開し、
  % 全ての接続クライアントにwebsocket_info({gproc_ps_event, new_message, Time}, Req, State)を呼び出します
  gproc_ps:publish(l, new_message, Time),
  {ok, Req, State};

% ETSにメッセージを保存する
save_message({Time, Text, Ip, Ua}) ->
  ets:insert(?TABLE, {Time, unicode:characters_to_binary(Text), Ip, Ua}).

解説:

gproc_ps:publishを利用するためにプロセスをgprocに登録することが必要です。
gprocの登録はwebsocket_initで行います。

% websocket_init はwebsocket接続が開始された時に実行されます
websocket_init(_, Req, _Opts) ->
  % プロセスをgproc pubsubに登録する
  gproc_ps:subscribe(l, new_message),
  % stateを設定する
  Ip = get_ip(Req),
  {UserAgent, _Req} = cowboy_req:header(<<"user-agent">>, Req),
  State = #state{ip=Ip, ua=UserAgent},
  % WebSocketリクエストは長くなる可能性があるため
  % 不要なデータをReqから削除
  Req2 = cowboy_req:compact(Req),
  % 自動切断を10分に設定する(60万ミリ秒)
  {ok, Req2, State, 600000, hibernate}.

解説:

gprocからのメッセージを受ける処理は下記の様になります。

% websocket_infoは本プロセスにErlangメッセージが届いた時に実行されます
% gprocからnew_messageメッセージの場合はそのメッセージをWebSocketに送信します
websocket_info({gproc_ps_event, new_message, Key}, Req, State) ->
  RawMessage = ets:lookup(?TABLE, Key),
  % ETS結果をマップに変換
  Message = format_message(RawMessage),
  JsonResponse = jiffy:encode(#{
    <<"type">> => <<"new_message">>,
    <<"message">> => Message
  }),
  {reply, {text, JsonResponse}, Req, State};

解説:

ハンドラーコード

src/message_wall_handler.erl

-module(message_wall_handler).
-behaviour(cowboy_websocket_handler).

-export([init/3]).
-export([websocket_init/3]).
-export([websocket_handle/3]).
-export([websocket_info/3]).
-export([websocket_terminate/3]).

% プロセスステートにIPアドレスとユーザエージェントを保存
-record(state, {ip, ua}).

% テーブル名をマクロで定義
-define(TABLE, message_wall).

% ========================
%  初期化
% ========================

init(_, _, _) ->
  % 接続をWebSocketにアップグレード
  {upgrade, protocol, cowboy_websocket}.

% websocket_init はwebsocket接続が開始された時に実行されます
websocket_init(_, Req, _Opts) ->
  % プロセスをgproc pubsubに登録する
  gproc_ps:subscribe(l, new_message),
  % stateを設定する
  Ip = get_ip(Req),
  {UserAgent, _Req} = cowboy_req:header(<<"user-agent">>, Req),
  State = #state{ip=Ip, ua=UserAgent},
  % WebSocketリクエストは長くなる可能性があるため
  % 不要なデータをReqから削除
  Req2 = cowboy_req:compact(Req),
  % 自動切断を10分に設定する(60万ミリ秒)
  {ok, Req2, State, 600000, hibernate}.


% ========================
%  メッセージハンドリング
% ========================

% get_listメッセージの場合はメッセージのリストを返します
websocket_handle({text, <<"get_list">>}, Req, State) ->
  % 最新のメッセージを取得する
  RawMessages = get_recent_messages(10),
  % メッセージをJiffyが変換できる形式に変更
  Messages = format_messages(RawMessages),
  % JiffyでJsonレスポンスを生成
  JsonResponse = jiffy:encode(#{
    <<"type">> => <<"message_list">>,
    <<"messages">> => Messages
  }),
  % JSONを返す
  {reply, {text, JsonResponse}, Req, State};

% それ以外のメッセージは保存する
websocket_handle({text, Text}, Req, #state{ua=Ua, ip=Ip} = State) ->
  Time = now(),
  Message = {Time, Text, Ip, Ua},
  save_message(Message),
  % gprocにイベントを公開し、
  % 全ての接続クライアントにwebsocket_info({gproc_ps_event, new_message, Time}, Req, State)を呼び出します
  gproc_ps:publish(l, new_message, Time),
  {ok, Req, State};

websocket_handle(_Frame, Req, State) ->
  {ok, Req, State}.

% websocket_info は本プロセスにErlangメッセージが届いた時に実行されます
% gprocからnew_messageメッセージの場合はそのメッセージをWebSocketに送信します
websocket_info({gproc_ps_event, new_message, Key}, Req, State) ->
  RawMessage = ets:lookup(?TABLE, Key),
  % ETSエントリーをマップに変換
  Message = format_message(RawMessage),
  JsonResponse = jiffy:encode(#{
    <<"type">> => <<"new_message">>,
    <<"message">> => Message
  }),
  {reply, {text, JsonResponse}, Req, State};

websocket_info(_Info, Req, State) ->
  {ok, Req, State}.

websocket_terminate(_Reason, _Req, _State) ->
  ok.


% ========================
%  メッセージ関連
% ========================

% ETSにメッセージを保存する
save_message({Time, Text, Ip, Ua}) ->
  ets:insert(?TABLE, {Time, unicode:characters_to_binary(Text), Ip, Ua}).

% 最新のNumberメッセージを取得する
get_recent_messages(Number) ->
  case ets:last(?TABLE) of
    '$end_of_table' -> [];
    Key -> get_recent_messages(Key, Number, [])
  end.

get_recent_messages(_Key, 0, Messages) -> lists:reverse(Messages);
get_recent_messages('$end_of_table', _Number, Messages) -> lists:reverse(Messages);
get_recent_messages(Key, Number, Messages) ->
  Message = ets:lookup(?TABLE, Key),
  PreviousKey = ets:prev(?TABLE, Key),
  get_recent_messages(PreviousKey, Number-1, [Message|Messages]).

% IP取得
get_ip(Req) ->
  % プロキシ経由対応
  case cowboy_req:header(<<"x-real-ip">>, Req) of
    {undefined, _Req} ->
      {{Ip, _Port}, _Req} = cowboy_req:peer(Req),
      Ip;
    {Ip, _Req} -> Ip
  end.

% ETS結果メッセージをJiffyが変換できる形式に変更
format_messages(RawMessages) ->
  lists:map(fun(Message) -> format_message(Message) end, RawMessages).

% ETS結果メッセージをJiffyが変換できる形式に変更
format_message([{Time, Message, Ip, Ua}]) ->
  #{
    <<"date">> => unicode:characters_to_binary(iso8601(Time)),
    <<"ip">>   => unicode:characters_to_binary(format_ip(Ip)),
    <<"text">> => Message,
    <<"ua">>   => Ua
  }.

% IPタプルを文字列に変換
format_ip({I1,I2,I3,I4}) ->
  io_lib:format("~w.~w.~w.~w",[I1,I2,I3,I4]);
format_ip(Ip) -> Ip.

% erlangのdatetimeをISO8601形式に変換
iso8601(Time) ->
  {{Year, Month, Day},{Hour, Minut, Second}} = calendar:now_to_universal_time(Time),
  io_lib:format("~4..0B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ", [Year, Month, Day, Hour, Minut, Second]). 

フロントエンド - Javascript

最後はフロントエンドのJavascriptを作ります。

WebSocketの基本JavascriptチュートリアルはMozillaのサイトにあります。

下記のスクリプトは[フロントエンド - HTML][frontend-html]の// WebSocket Javascriptはここに入ります。の代わりに入ります。(フールソース)

// Websocketエンドポイント宣言
var ws_url = "ws://" + window.location.host + "/websocket";
// WebSocket接続変数
var ws_connection;

// JS初期化
$(document).ready(function(){
  ws_init();
  // 送信ボタンを押した際の処理
  $("#send_message").click(function(){
    var message = $.trim( $("#message").val() );
    if( message != "" ){
      ws_connection.send(message);
    }
    // テキストエリアを空に
    $("#message").val("");
    // ボタンのフォカスをはすず
    $(this).blur();
    return false;
  });
});

// WebSocketの初期化
function ws_init(){
  if(!("WebSocket" in window)){  
    // WebSocketが対応しないブラウザの場合にアラートを表示。
    display_alert("お使いのブラウザはWebSocketを対応していません。");
  } else {
    connect();
  }
}

// WebSocket接続関数
function connect(){
  ws_connection = new WebSocket(ws_url);
  // 接続が出来たら、メッセージのリストを取得します。

  ws_connection.onopen = function(e){
    // CowboyのWebSocketにget_listテキストを送信
    ws_connection.send('get_list');
  }; 

  // WebSocketからメッセージが届くときに実行される関数
  ws_connection.onmessage = function (e) {
    var msg = JSON.parse(e.data);
    switch(msg.type) {
      case "message_list":
        populate_wall(msg.messages);
        break;
      case "new_message":
        add_message(msg.message);
        break;
    }
  };

  // WebSocketが切断されるときにアラートを表示
  ws_connection.onclose = function(e) {
    display_alert("WebSocket接続は切断されました、ページをリロードしてください。");
  }

}

function display_alert(message) {
  $("#wall").prepend('<div class="alert alert-danger" role="alert">'+message+'</div>');
}

function populate_wall(messages){
  for(var i=messages.length-1; i>=0; i--){
    add_message(messages[i]);
  }
}

// メッセージをウォールに追加する
function add_message(message){
  // テンプレートHTMLをコピー
  var template = $("#message_template>div").clone();
  $.ua.set(message.ua);
  var message_time = moment(message.date).format("YYYY/M/D H:mm:ss");
  // テンプレートにメッセージの情報を入れる
  template.find(".date").text(message_time);
  template.find(".message").text(message.text);
  template.find(".ua").text($.ua.browser.name+" "+$.ua.browser.major+"("+$.ua.os.name+")");
  template.find(".ip").text(message.ip);
  $("#wall").prepend(template);
  $("#wall>div:gt(9)").remove();
}

解説:

動作確認

ソースをコンパイルします。

make

リリースを起動します。

./_rel/message_wall_release/bin/message_wall_release console

http://localhost:8001にアクセスすれば、メッセージウォールをを試すことができます。

同時に2つのブラウザで開けば、一つのブラウザで投稿したメッセージが自動的に他のブラウザにプッシュされます。

Nginxと組み合わせる

nginx.confserver文を以下の様に設定すれば、NginxをCowboyアプリのプロキシとして使えます。

     server {
         listen       80;
         # 利用するドメイン名
         server_name  mw.l-lab.jp;
 
         location / {
             proxy_pass http://localhost:8001;
             # WebSocketが切断されるまでの秒数
             proxy_read_timeout 600s;
             proxy_http_version 1.1;
             proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
             proxy_set_header X-Real-IP $remote_addr;
             proxy_set_header Upgrade $http_upgrade;
             proxy_set_header Connection "upgrade";
         }
 
         error_page   500 502 503 504  /50x.html;
         location = /50x.html {
             root   /usr/share/nginx/html;
         }
     }