Elasticsearchプラグインを作ってみよう

はじめに

これは Elasticsearch Advent Calendar 2014 – Qiita 、13日目の記事です。
Elasticsearchはプラグインを作成することで、様々な機能を拡張することができますが、今回はプラグインの作り方を紹介します。

前提条件

今回、Elasticsearchプラグインを作成するにあたり、以下のものが準備済みである前提で話を進めます。

  • Java 7 または 8
  • Maven 3

今回作るプラグイン

一応、サンプル的ではありますが、ハローワールドだけでも簡単すぎますし、いきなり難しいことでもモチベーションが下がるかと思うので、今回はElasticsearchのクラスター内に含まれるインデックス数を独自のREST APIで返却するプラグインを作成してみたいと思います。
仕様としては、

$ curl -XGET http://localhost:9200/_index/count
{"size":1}

という感じで、/_index/countにGETでアクセスしたら、sizeとしてインデックス数をJSONで返却するものとします。sizeが1という場合は、インデックスが1個だけ存在している状態です。

プラグインの作成

まずは、Elasticsearch Plugin Archetypeを利用して、空のプロジェクトを作成します。
以下のように、Mavenのarchetype:generateゴールを実行します。

$ mvn archetype:generate \
    -DarchetypeGroupId=org.codelibs \
    -DarchetypeArtifactId=elasticsearch-plugin-archetype \
    -DarchetypeVersion=1.4.0 \
    -DelasticsearchVersion=1.4.1 \
    -DgroupId=com.github.marevol \
    -DartifactId=elasticsearch-indexcount \
    -Dversion=1.0-SNAPSHOT \
    -DpluginName=IndexCount
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) > generate-sources @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) < generate-sources @ standalone-pom <<<
[INFO]
[INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] Archetype repository missing. Using the one from [org.codelibs:elasticsearch-plugin-archetype:1.4.1] found in catalog remote
[INFO] Using property: groupId = com.github.marevol
[INFO] Using property: artifactId = elasticsearch-indexcount
[INFO] Using property: version = 1.0-SNAPSHOT
[INFO] Using property: package = com.github.marevol
[INFO] Using property: elasticsearchVersion = 1.4.1
[INFO] Using property: pluginName = IndexCount
[INFO] Using property: restName = hello
[INFO] Using property: riverName = hello
Confirm properties configuration:
groupId: com.github.marevol
artifactId: elasticsearch-indexcount
version: 1.0-SNAPSHOT
package: com.github.marevol
elasticsearchVersion: 1.4.1
pluginName: IndexCount
restName: hello
riverName: hello
 Y: :
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: elasticsearch-plugin-archetype:1.4.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.github.marevol
[INFO] Parameter: artifactId, Value: elasticsearch-indexcount
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.github.marevol
[INFO] Parameter: packageInPathFormat, Value: com/github/marevol
[INFO] Parameter: restName, Value: hello
[INFO] Parameter: package, Value: com.github.marevol
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: elasticsearchVersion, Value: 1.4.1
[INFO] Parameter: groupId, Value: com.github.marevol
[INFO] Parameter: pluginName, Value: IndexCount
[INFO] Parameter: riverName, Value: hello
[INFO] Parameter: artifactId, Value: elasticsearch-indexcount
[INFO] project created from Archetype in dir: /home/shinsuke/workspace/elasticsearch-indexcount
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 39.677 s
[INFO] Finished at: 2014-12-08T06:32:35+09:00
[INFO] Final Memory: 13M/240M
[INFO] ------------------------------------------------------------------------

elasticsearchVersionは対象とするElasticsearchのバージョン、groupIdはプロジェクトのグループID、artifactIdはプロジェクト名、versionはプロジェクトのバージョン、pluginNameはプラグイン名です。その辺りの値は環境に合わせて変更してください。
プロジェクトの生成に成功すると以下のようなファイルが生成されます。

$ cd elasticsearch-indexcount/
$ find . -type f
./src/main/assemblies/plugin.xml
./src/main/java/com/github/marevol/module/IndexCountRiverModule.java
./src/main/java/com/github/marevol/module/IndexCountModule.java
./src/main/java/com/github/marevol/rest/IndexCountRestAction.java
./src/main/java/com/github/marevol/IndexCountPlugin.java
./src/main/java/com/github/marevol/service/IndexCountService.java
./src/main/java/com/github/marevol/river/IndexCountRiver.java
./src/main/resources/es-plugin.properties
./pom.xml

プロジェクトを生成した状態では、Riverやサービスクラスなども作成されます。
今回、REST APIを作成するだけので、それらの不要なファイルを削除します。

$ rm -r ./src/main/java/com/github/marevol/module ./src/main/java/com/github/marevol/service ./src/main/java/com/github/marevol/river/

次にプラグインを管理するPluginクラスを編集します。Pluginクラスはどのようなモジュールに機能を差し込むか等の設定を行っています。
デフォルトの状態ではRiver等の初期化するためのコードが含まれるので、削除して以下のようにします。

package com.github.marevol;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.rest.RestModule;
import com.github.marevol.rest.IndexCountRestAction;
public class IndexCountPlugin extends AbstractPlugin {
    @Override
    public String name() {
        return "IndexCountPlugin";
    }
    @Override
    public String description() {
        return "This is a elasticsearch-indexcount plugin.";
    }
    // for Rest API
    public void onModule(final RestModule module) {
        module.addRestAction(IndexCountRestAction.class);
    }
}

次にRESTの応答をするRestActionクラスを以下のように編集します。

package com.github.marevol.rest;
import static org.elasticsearch.rest.RestStatus.*;
import java.io.IOException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
public class IndexCountRestAction extends BaseRestHandler {
    @Inject
    public IndexCountRestAction(final Settings settings, final Client client,
            final RestController controller) {
        super(settings, controller, client);
        // /_index/countにGETでアクセスされた場合に呼ばれるように設定
        controller.registerHandler(RestRequest.Method.GET, "/_index/count",
                this);
    }
    @Override
    protected void handleRequest(final RestRequest request,
            final RestChannel channel, final Client client) {
        // インデックス情報を取得する
        client.admin().indices().prepareGetIndex()
                .execute(new ActionListener<GetIndexResponse>() {
                    @Override
                    public void onResponse(GetIndexResponse response) {
                        try {
                            // レスポンスのJSONを生成する
                            final XContentBuilder builder = JsonXContent
                                    .contentBuilder();
                            builder.startObject();
                            builder.field("size", response.getIndices().length);
                            builder.endObject();
                            // レスポンスを返却する
                            channel.sendResponse(new BytesRestResponse(OK,
                                    builder));
                        } catch (final IOException e) {
                            handleErrorRequest(channel, e);
                        }
                    }
                    @Override
                    public void onFailure(Throwable e) {
                        handleErrorRequest(channel, e);
                    }
                });
    }
    private void handleErrorRequest(final RestChannel channel, final Throwable e) {
        try {
            // エラーレスポンスを返却する
            channel.sendResponse(new BytesRestResponse(channel, e));
        } catch (final IOException e1) {
            logger.error("Failed to send a failure response.", e1);
        }
    }
}

中身を順に見ていくと、コンストラクタで

controller.registerHandler(RestRequest.Method.GET, "/_index/count",
                this);

とすることで、指定したURLでの応答ができます。今回は、/_index/countにGETで応答するため、上記のように登録しています。
/{index}/{type}/_index/… というにプレースホルダで指定にすると、RestRequestでparam(“index”)という感じのメソッド呼び出しで値を取得することができます(確か、indexとtypeしか取れなかったと思います…)。
インデックス数の取得には、clientからprepareGetIndexメソッドを呼ぶことで取得できます。
executeメソッドには、execute(ActionListener)で非同期に呼ぶものとexecute().actionGet()で同期的に呼び出す方法があります。私は過去のバージョンにおいて同期的な呼び出しをしていてハマったことがあったので、基本的に非同期で呼び出す方法を利用しています。というわけで、今回はexecute(ActionListener)を利用します。
あとは、hadnleRequestメソッドがリクエストのたびに呼ばれるので、以下のようにXContentBuilderでJSONのレスポンスを構築して、channelの渡すとレスポンスが返却されます。

// レスポンスのJSONを生成する
final XContentBuilder builder = JsonXContent
                                    .contentBuilder();
builder.startObject();
builder.field("size", response.getIndices().length);
builder.endObject();
// レスポンスを返却する
channel.sendResponse(new BytesRestResponse(OK,
                                    builder));

最後に、es-plugin.propertiesでPluginクラスが正しく指定されていることを確認してください。

plugin=com.github.marevol.IndexCountPlugin

今回、プラグイン内で利用する追加のライブラリはないので、変更はしていませんが、追加するライブラリ等がある場合はplugin.xmlとpom.xmlを修正してください。
plugin.xmlの詳しい編集方法が必要な場合は、Maven Assembly Pluginを参照してください。

プラグインのテスト方法

プラグインの開発において、編集するたびにElasticsearchにプラグインをインストールして確認していては開発効率が上がりません。そこでJUnitのテストケースでElasticsearchを立ち上げて、検証していく方法を紹介します。
Elasticsearchではjava testing frameworkを提供していますが、CodeLibsでもElasticsearch Cluster Runnerを提供しています。大きな観点で見るとだいたい同じ機能を持っているかとは思いますが、java testing frameworkはランダムテスト的な感じで継続テストをしたいときに向いているかと思います。
一方、Elasticsearch Cluster RunnerはJUnitのテストでの利用もできますが、バッチ等での実際の環境でも利用できるような汎用性を目指した作りになっています(なので、できるだけ本番でも利用できるものを…)。
今回のJUnitでは、Cluster Runnerを利用したテストケースの導入を紹介します。
まず、必要な依存関係をpom.xmlに加えます。
以下では、elasticsearch-cluster-runnerとjunitを加えています(必要であれば、groovy-allも加えてください)。

...
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.codelibs</groupId>
            <artifactId>elasticsearch-cluster-runner</artifactId>
            <version>1.4.1.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
...

次にsrc/test/java/com/github/marevolあたりにIndexCountPluginTestのテストクラスを置きます。

$ mkdir -p ./src/test/java/com/github/marevol

で作ったら、IndexCountPluginTest.javaを置きます。

package com.github.marevol;
import static org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner.*;
import static org.junit.Assert.*;
import java.util.Map;
import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner;
import org.codelibs.elasticsearch.runner.net.Curl;
import org.codelibs.elasticsearch.runner.net.CurlResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class IndexCountPluginTest {
    private ElasticsearchClusterRunner runner;
    @Before
    public void setUp() throws Exception {
        // Cluster Runnerのインスタンス作成
        runner = new ElasticsearchClusterRunner();
        // Cluster Runnerの初期化
        runner.onBuild(new ElasticsearchClusterRunner.Builder() {
            @Override
            public void build(final int number, final Builder settingsBuilder) {
                // elasticsearch.ymlに書くような内容を記述
                settingsBuilder.put("http.cors.enabled", true);
                settingsBuilder.put("index.number_of_replicas", 0);
            }
            // Elasticsearchのノード数を1に設定
        }).build(newConfigs().ramIndexStore().numOfNode(1));
        // クラスタ状態がYellowになるのを待つ
        runner.ensureYellow();
    }
    @After
    public void cleanUp() throws Exception {
        // Cluster Runnerの終了
        runner.close();
        // 作成したものを削除(ただし、Windowsでは消せない…)
        runner.clean();
    }
    @Test
    public void test_runCluster() throws Exception {
        final String index = "dataset";
        final String type = "item";
        // インデックスの作成
        runner.createIndex(index, null);
        // 適当に1000ドキュメント作成
        for (int i = 1; i <= 1000; i++) {
            final IndexResponse indexResponse1 = runner.insert(index, type,
                    String.valueOf(i), "{\"msg\":\"test " + i + "\", \"id\":\""
                            + i + "\"}");
            assertTrue(indexResponse1.isCreated());
        }
        runner.refresh();
        // 1000ドキュメント作成されたか確認
        {
            final SearchResponse searchResponse = runner.search(index, type,
                    null, null, 0, 10);
            assertEquals(1000, searchResponse.getHits().getTotalHits());
        }
        // /_index/count にGETでアクセス
        try (CurlResponse curlResponse = Curl.get(runner.node(),
                "/_index/count").execute()) {
            assertEquals(200, curlResponse.getHttpStatusCode());
            // レスポンスで返却されたJSONをMapで取得
            Map<String, Object> content = curlResponse.getContentAsMap();
            // {"size":1} が返却される想定
            assertEquals(1, content.get("size"));
        }
    }
}

setUpメソッドでは、Cluster RunnerでElasticsearchを立ち上げています。
cleanUpメソッドは起動したElasticsearchを終了処理を実行しています。
test_runClusterメソッドにテストを記述しています。
test_runClusterではdatasetというインデックスを作成して、そこに1000ドキュメントを登録しています。そのあと、ドキュメント数を確認して、今回、作成した/_index/countのREST APIのテストを実施しています。
Elasticsearch Cluster RunnerではCurlクラスを提供しており、HTTPでのアクセスを支援するユーティリティクラスになります。
Curlを利用することで、結果のJSONのレスポンスもMapで扱うこともできます!
ここでは、インデックスを1個作成しているので、プラグインで作ったREST APIの応答結果はsizeが1になります。
という感じで、楽しくプラグイン開発ができます。
(私がさくさくプラグインを作っていっているのは、この辺があるおかげですー)

成果物を作成

ここまでできれば、あとはプラグインのzipを作るだけです。
mvn コマンドを叩きます。

$ mvn package

ビルドが成功すると、target/releases/elasticsearch-indexcount-1.0-SNAPSHOT.zip にプラグインのzipファイルが生成されます。
あとは、このzipをElasticsearchのpluginsディレクトリにindexcountディレクトリを作成してその中に展開して、Elasticsearchを再起動すると利用することができるようになります。Mavenセントラルリポジトリにアップロードするように設定してmvn deployすれば、プラグインのzipファイルがMavenセントラルリポジトリに置かれ、Elasticsearchに含まれるpluginコマンドで簡単にインストールすることもできます。

$ cd $ES_HOME/plugins
$ mkdir indexcount
$ cd indexcount
$ unzip ...indexcountプラグインの場所.../target/releases/elasticsearch-indexcount-1.0-SNAPSHOT.zip

以上です。

まとめ

上記のようにElasticsearchプラグインは開発、テスト、インストールまで簡単に行うことができます。Elasticsearchのバージョンによっては、Elasticsearch自体のI/Fが変わるので、OSSでプラグインを作成してElasticsearchの更新に追随するのを維持するのはなかなか大変ではあります。しかし、JUnit等でテストを書いておけば、知らぬ間に微妙な変更があったとしても検知することができますし、そもそもの動きの確認などにも役立つことでしょう。
このように簡単にElasticsearchを拡張ができますので、ぜひ、試してみてはいかがでしょうか?(今回作成したものはここにあります)

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です