fluentdのtimeの仕様とネストしたフィールドの扱い方


今回はfluentd(v0.14)についてです。

仕事柄、データ・ログを扱うことが多いので、このツールは大変重宝しています。そんな便利なfluentdですが、timeに関する仕様を毎回忘れて困っているので、これを機にまとめておきます。

 

fluentdのtimeとは

fluentdは読み込んだデータを time, tag, record の3つの要素に分けて保持します。具体的なイメージは以下のページのExampleを見るとよいかと思います。

https://docs.fluentd.org/v0.14/articles/parser_json

fluentdのtimeでよくある問題

このrecordの部分をOutput pluginによってどこかに出力したりするので、よく出力してみるとtimeフィールドが消えていた、などの問題が発生します。

このtimeの扱い方を書いていきます。

fluentdのtimeの仕様とconf

まず、timeの値がどのように設定されるかについて考える必要があります。これは利用するParserによって変わります。何らかのファイルをformat noneで読み込む場合は、読み込んだシステム時刻をtimeとして設定します。

これは以下のParser noneのドキュメントにあります。(current time)

https://docs.fluentd.org/v0.14/articles/parser_none

format jsonとした場合はjsonデータ内のtimeフィールドの値を利用します。これはInput pluginではなく、json Parser pluginの仕様です。詳細は以下を参照すると良いです。

https://docs.fluentd.org/v0.14/articles/parser_json

time_keyというパラメタのデフォルトがtimeなので、json内のtimeフィールドを読むようになっています。必要に応じて変えてください。

time_formatというパラメタはデフォルトnilで、こうするとtimeフィールドは秒を示すintegerであるという解釈になります。これも必要に応じて変えてください。大抵は

%Y-%m-%d %H:%M:%S

というような指定になると思います。何が指定できるのかは公式ドキュメントにもある通り、以下のRubyのTimeクラスを参照すると良いです。

http://ruby-doc.org/stdlib-2.4.1/libdoc/time/rdoc/Time.html#method-c-strptime

keep_time_keyというパラメタがtimeフィールドを残すのか、取り除くのか、です。デフォルトはfalseなので、取り除かれます。これも必要に応じて変えてください。

dockerでfluentdを動かして実験してみる

https://github.com/fluent/fluentd-docker-image

fluentdをgemで自分のマシンにインストールしても良いですが、環境を汚したくない人はdockerを使うと良いでしょう。

# 設定ファイルなど色々準備
% mkdir fluentd-docker
% cd fluentd-docker
% mkdir conf
% mkdir data

# 好きなエディタで以下のようにconfを作成
% cat conf/fluent.conf
<source>
  @type       tail
  @label      @mainstream
  tag         record.*
  format      json
  path        /fluentd/log/in
  pos_file    /fluentd/log/in.pos
</source>

<label @mainstream>
  <match **>
    @type stdout
  </match>
</label>
# docker containerを起動
% docker run -v $(pwd)/data:/fluentd/log -v $(pwd)/conf:/fluentd/etc fluent/fluentd:v0.14.21
2017-10-01 00:00:00 +0000 [info]: parsing config file is succeeded path="/fluentd/etc/fluent.conf"
2017-10-01 00:00:00 +0000 [info]: using configuration file: <ROOT>
  <source>
    @type tail
    @label @mainstream
    tag "record.*"
    format json
    path "/fluentd/log/in"
    pos_file "/fluentd/log/in.pos"
    <parse>
      @type json
    </parse>
  </source>
  <label @mainstream>
    <match **>
      @type stdout
    </match>
  </label>
</ROOT>
2017-10-01 00:00:00 +0000 [info]: starting fluentd-0.14.21 pid=7
2017-10-01 00:00:00 +0000 [info]: spawn command to main:  cmdline=["/usr/bin/ruby", "-Eascii-8bit:ascii-8bit", "/usr/bin/fluentd", "-c", "/fluentd/etc/fluent.conf", "-p", "/fluentd/plugins", "--under-supervisor"]
2017-10-01 00:00:00 +0000 [info]: gem 'fluentd' version '0.14.21'
2017-10-01 00:00:00 +0000 [info]: adding match in @mainstream pattern="**" type="stdout"
2017-10-01 00:00:00 +0000 [info]: adding source type="tail"
2017-10-01 00:00:00 +0000 [info]: #0 starting fluentd worker pid=17 ppid=7 worker=0
2017-10-01 00:00:00 +0000 [info]: #0 following tail of /fluentd/log/in
2017-10-01 00:00:00 +0000 [info]: #0 fluentd worker is now running worker=0

# 別のterminalから
% echo '{"time":1500000000,"field1":"a","field2":"b"}' >> data/in

# docker runしたterminalに出る
2017-07-14 02:40:00.000000000 +0000 record.fluentd.log.in: {"field1":"a","field2":"b"}

echoしたデータにtimeフィールドがあり、unix timeの秒数が入っているので、それに従って、最終的な出力のtimeも2017/07/14 02:40になっています。

他のoptionを試すために、以下のようにsourceを変えて、再度fluentdを実行します。

<source>
  @type       tail
  @label      @mainstream
  tag         record.*
  format      json
  time_key    dateTime
  time_format %Y-%m-%d %H:%M:%S
  keep_time_key true
  path        /fluentd/log/in
  pos_file    /fluentd/log/in.pos
</source>

この状態で、以下のように少し変えたechoを行うと、以下のような出力になります。

% echo '{"dateTime":"2017-09-01 12:34:56","field1":"a","field2":"b"}' >> data/in

# docker runのterminal
2017-09-01 12:34:56.000000000 +0000 record.fluentd.log.in: {"dateTime":"2017-09-01 12:34:56","field1":"a","field2":"b"}

optionの意図通り、dateTimeをtimeフィールドとして扱い、Y-m-d…のフォーマットで読み取り、recordにtime(dateTime)を含める、という設定になっています。

余談ですがOutput pluginの種類によってはoutput_include_timeというtimeのフィールドを出力に含めるかどうかを切り替えるパラメタもあります。具体的には tagomoris/fluent-plugin-file-alternativefluent/fluent-plugin-kafka にあります。状況に応じて使い分けてください。

time_keyに指定したいフィールドがネストしていたらどうするか

ここからが本題です。上記のoptionは状況に応じて設定すれば良いだけですが、もしtimeフィールドがネストしていた場合はどうなるのでしょうか。

例えば

time_key a.dateTime

という設定で以下のデータを流すと

% echo '{"a":{"dateTime":"2017-09-01 12:34:56"},"field1":"a","field2":"b"}' >> data/in

以下のようにtimeが正しく設定されません(current timeになる)

2017-10-03 11:20:17.947279042 +0000 record.fluentd.log.in: {"a":{"dateTime":"2017-09-01 12:34:56"},"field1":"a","field2":"b"}

ネストに対応するにはrecord_transformerを使います。これを使うとレコード自体も書き換え可能ですが、他にもtimeを書き換えできます。

以下のようにrecord_transformerのfilter設定を行い、

% cat conf/fluent.conf
<source>
  @type       tail
  @label      @mainstream
  tag         record.*
  format      json
  keep_time_key true
  path        /fluentd/log/in
  pos_file    /fluentd/log/in.pos
</source>

<label @mainstream>
  <filter **>
    @type record_transformer
    enable_ruby
    renew_time_key t
    remove_keys t # removed keys after apply <record> and renew_time_key
    <record>
      t ${record["a"]["dateTime"]}
    </record>
  </filter>
  <match **>
    @type stdout
  </match>
</label>

以下のようなデータを入れると

% echo '{"a":{"dateTime":1500000000},"field1":"a","field2":"b"}' >> data/in

ネストしていますが、以下のように正しく読み込みできます。(2017/7/14 02:40:00になっている)

2017-07-14 02:40:00.000000000 +0000 record.fluentd.log.in: {"a":{"dateTime":1500000000},"field1":"a","field2":"b"}

record_transformerの仕様はこちらです。

https://docs.fluentd.org/v0.14/articles/filter_record_transformer

1つ注意が必要で、renew_time_keyで読み取る値はunix timeでなければなりません。日付の文字列が来る場合は、enable_rubyによってrubyのコードを書けるので、それで変換を行う必要がありそうです。

fluentdのネストレコードの対応状況と今後について

現時点ではfluentd v0.14はstableではありませんし、fluentd v0.12はネストをサポートしていません。

ですが、サポートは考えられていて、fluentd v0.14.20からrecord_accessorという仕組みが入りました。

詳しくはこちらを御覧ください。

Fluentd v0.14.20 has been released

簡単に言うと $.event.level というように$と.を使ってネストしたフィールドにアクセスできる、というものです。

現在対応中のようで、v0.14.21で一部、このrecord_accessorの対応が入ったようです。

引用 https://github.com/fluent/fluentd/blob/master/CHANGELOG.md

試してみたところ、今回のtime_keyにはまだ使えないようでしたが、今後これらのパラメタにも対応が入るかもしれません。

今後はネストしたレコードを扱うときは、このrecord_accessorの対応状況を調べると良いかと思います。

まとめ

  • timeの読み込みは各種Parserのドキュメントを参照
  • 適宜設定を行ってtime_keyの変更やフォーマット調整などを行う
  • time_keyがネストしている場合はrecord_transformerを使う
  • 今後はネストの対応はrecord_accessorをチェック

以上です。