fluentdのtimeの仕様とネストしたフィールドの扱い方
今回はfluentd(v0.14)についてです。
仕事柄、データ・ログを扱うことが多いので、このツールは大変重宝しています。そんな便利なfluentdですが、timeに関する仕様を毎回忘れて困っているので、これを機にまとめておきます。
fluentdのtimeとは
fluentdは読み込んだデータを time, tag, record の3つの要素に分けて保持します。具体的なイメージは以下のページのExampleを見るとよいかと思います。
https://docs.fluentd.org/v0.14/articles/parser_json
fluentdのtimeでよくある問題
このrecordの部分をOutput pluginによってどこかに出力したりするので、よく出力してみるとtimeフィールドが消えていた、などの問題が発生します。
このtimeの扱い方を書いていきます。
fluentdのtimeの仕様とconf
まず、timeの値がどのように設定されるかについて考える必要があります。これは利用するParserによって変わります。何らかのファイルをformat noneで読み込む場合は、読み込んだシステム時刻をtimeとして設定します。
これは以下のParser noneのドキュメントにあります。(current time)
https://docs.fluentd.org/v0.14/articles/parser_none
format jsonとした場合はjsonデータ内のtimeフィールドの値を利用します。これはInput pluginではなく、json Parser pluginの仕様です。詳細は以下を参照すると良いです。
https://docs.fluentd.org/v0.14/articles/parser_json
time_keyというパラメタのデフォルトがtimeなので、json内のtimeフィールドを読むようになっています。必要に応じて変えてください。
time_formatというパラメタはデフォルトnilで、こうするとtimeフィールドは秒を示すintegerであるという解釈になります。これも必要に応じて変えてください。大抵は
%Y-%m-%d %H:%M:%S
というような指定になると思います。何が指定できるのかは公式ドキュメントにもある通り、以下のRubyのTimeクラスを参照すると良いです。
http://ruby-doc.org/stdlib-2.4.1/libdoc/time/rdoc/Time.html#method-c-strptime
keep_time_keyというパラメタがtimeフィールドを残すのか、取り除くのか、です。デフォルトはfalseなので、取り除かれます。これも必要に応じて変えてください。
dockerでfluentdを動かして実験してみる
https://github.com/fluent/fluentd-docker-image
fluentdをgemで自分のマシンにインストールしても良いですが、環境を汚したくない人はdockerを使うと良いでしょう。
# 設定ファイルなど色々準備
% mkdir fluentd-docker
% cd fluentd-docker
% mkdir conf
% mkdir data
# 好きなエディタで以下のようにconfを作成
% cat conf/fluent.conf
<source>
@type tail
@label @mainstream
tag record.*
format json
path /fluentd/log/in
pos_file /fluentd/log/in.pos
</source>
<label @mainstream>
<match **>
@type stdout
</match>
</label>
# docker containerを起動
% docker run -v $(pwd)/data:/fluentd/log -v $(pwd)/conf:/fluentd/etc fluent/fluentd:v0.14.21
2017-10-01 00:00:00 +0000 [info]: parsing config file is succeeded path="/fluentd/etc/fluent.conf"
2017-10-01 00:00:00 +0000 [info]: using configuration file: <ROOT>
<source>
@type tail
@label @mainstream
tag "record.*"
format json
path "/fluentd/log/in"
pos_file "/fluentd/log/in.pos"
<parse>
@type json
</parse>
</source>
<label @mainstream>
<match **>
@type stdout
</match>
</label>
</ROOT>
2017-10-01 00:00:00 +0000 [info]: starting fluentd-0.14.21 pid=7
2017-10-01 00:00:00 +0000 [info]: spawn command to main: cmdline=["/usr/bin/ruby", "-Eascii-8bit:ascii-8bit", "/usr/bin/fluentd", "-c", "/fluentd/etc/fluent.conf", "-p", "/fluentd/plugins", "--under-supervisor"]
2017-10-01 00:00:00 +0000 [info]: gem 'fluentd' version '0.14.21'
2017-10-01 00:00:00 +0000 [info]: adding match in @mainstream pattern="**" type="stdout"
2017-10-01 00:00:00 +0000 [info]: adding source type="tail"
2017-10-01 00:00:00 +0000 [info]: #0 starting fluentd worker pid=17 ppid=7 worker=0
2017-10-01 00:00:00 +0000 [info]: #0 following tail of /fluentd/log/in
2017-10-01 00:00:00 +0000 [info]: #0 fluentd worker is now running worker=0
# 別のterminalから
% echo '{"time":1500000000,"field1":"a","field2":"b"}' >> data/in
# docker runしたterminalに出る
2017-07-14 02:40:00.000000000 +0000 record.fluentd.log.in: {"field1":"a","field2":"b"}
echoしたデータにtimeフィールドがあり、unix timeの秒数が入っているので、それに従って、最終的な出力のtimeも2017/07/14 02:40になっています。
他のoptionを試すために、以下のようにsourceを変えて、再度fluentdを実行します。
<source>
@type tail
@label @mainstream
tag record.*
format json
time_key dateTime
time_format %Y-%m-%d %H:%M:%S
keep_time_key true
path /fluentd/log/in
pos_file /fluentd/log/in.pos
</source>
この状態で、以下のように少し変えたechoを行うと、以下のような出力になります。
% echo '{"dateTime":"2017-09-01 12:34:56","field1":"a","field2":"b"}' >> data/in
# docker runのterminal
2017-09-01 12:34:56.000000000 +0000 record.fluentd.log.in: {"dateTime":"2017-09-01 12:34:56","field1":"a","field2":"b"}
optionの意図通り、dateTimeをtimeフィールドとして扱い、Y-m-d…のフォーマットで読み取り、recordにtime(dateTime)を含める、という設定になっています。
余談ですがOutput pluginの種類によってはoutput_include_timeというtimeのフィールドを出力に含めるかどうかを切り替えるパラメタもあります。具体的には tagomoris/fluent-plugin-file-alternative や fluent/fluent-plugin-kafka にあります。状況に応じて使い分けてください。
time_keyに指定したいフィールドがネストしていたらどうするか
ここからが本題です。上記のoptionは状況に応じて設定すれば良いだけですが、もしtimeフィールドがネストしていた場合はどうなるのでしょうか。
例えば
time_key a.dateTime
という設定で以下のデータを流すと
% echo '{"a":{"dateTime":"2017-09-01 12:34:56"},"field1":"a","field2":"b"}' >> data/in
以下のようにtimeが正しく設定されません(current timeになる)
2017-10-03 11:20:17.947279042 +0000 record.fluentd.log.in: {"a":{"dateTime":"2017-09-01 12:34:56"},"field1":"a","field2":"b"}
ネストに対応するにはrecord_transformerを使います。これを使うとレコード自体も書き換え可能ですが、他にもtimeを書き換えできます。
以下のようにrecord_transformerのfilter設定を行い、
% cat conf/fluent.conf
<source>
@type tail
@label @mainstream
tag record.*
format json
keep_time_key true
path /fluentd/log/in
pos_file /fluentd/log/in.pos
</source>
<label @mainstream>
<filter **>
@type record_transformer
enable_ruby
renew_time_key t
remove_keys t # removed keys after apply <record> and renew_time_key
<record>
t ${record["a"]["dateTime"]}
</record>
</filter>
<match **>
@type stdout
</match>
</label>
以下のようなデータを入れると
% echo '{"a":{"dateTime":1500000000},"field1":"a","field2":"b"}' >> data/in
ネストしていますが、以下のように正しく読み込みできます。(2017/7/14 02:40:00になっている)
2017-07-14 02:40:00.000000000 +0000 record.fluentd.log.in: {"a":{"dateTime":1500000000},"field1":"a","field2":"b"}
record_transformerの仕様はこちらです。
https://docs.fluentd.org/v0.14/articles/filter_record_transformer
1つ注意が必要で、renew_time_keyで読み取る値はunix timeでなければなりません。日付の文字列が来る場合は、enable_rubyによってrubyのコードを書けるので、それで変換を行う必要がありそうです。
fluentdのネストレコードの対応状況と今後について
現時点ではfluentd v0.14はstableではありませんし、fluentd v0.12はネストをサポートしていません。
ですが、サポートは考えられていて、fluentd v0.14.20からrecord_accessorという仕組みが入りました。
詳しくはこちらを御覧ください。
Fluentd v0.14.20 has been released
簡単に言うと $.event.level というように$と.を使ってネストしたフィールドにアクセスできる、というものです。
現在対応中のようで、v0.14.21で一部、このrecord_accessorの対応が入ったようです。
- filter_parser: Support record_accessor in key_name https://github.com/fluent/fluentd/pull/1654
- buffer: Support record_accessor in chunk keys https://github.com/fluent/fluentd/pull/1662
引用 https://github.com/fluent/fluentd/blob/master/CHANGELOG.md
試してみたところ、今回のtime_keyにはまだ使えないようでしたが、今後これらのパラメタにも対応が入るかもしれません。
今後はネストしたレコードを扱うときは、このrecord_accessorの対応状況を調べると良いかと思います。
まとめ
- timeの読み込みは各種Parserのドキュメントを参照
- 適宜設定を行ってtime_keyの変更やフォーマット調整などを行う
- time_keyがネストしている場合はrecord_transformerを使う
- 今後はネストの対応はrecord_accessorをチェック
以上です。