ログを集める時に気をつけたいポイント

どすこい!去年放置したブログに久々にログインする季節になりました!

今日はCyberAgent Developers Advent Calendar 2016 - Adventarの10日目です。

 

昨日は@shotaTsugeさんのCloud Deployment Managerを少し触ってみて感じた事でした。

 

最近この記事を書きました。

developers.cyberagent.co.jp

 

CyberAgentに入社してから主にログ転送基盤の構築・運用をやってきました私ですが、今回はその経験を踏まえてログ収集系のミドルウェア使う時やストリーミング処理を行う際の注意点についていくつか書きたいと思います。

はじめに

ログ収集基盤

CyberAgentでは以前からApache Flumeを利用してログの収集システムを構築・運用しております。

Flumeを活用したAmebaにおける大規模ログ収集システム

こちらのスライドは2013年のHadoop Conference Japan 2013の資料ですが、当時はソシャゲはブラウザゲームが多くほとんどオンプレのサーバを利用していたのでログは基本的にDC内のサーバから送られるものでした。

 

しかし、その間に弊社のログ収集周りの状況は大きく変わりました。

まずソシャゲはブラウザからネイティブアプリが一般的になりました。それに従って、アプリのSDKからもログが送られるようになりました。

また、クラウドサービスの拡充により、弊社でもAWS/GCPを利用するサービスが増えました。

それと弊社のサービスも増えましたのもありますね。

その結果が次のスライドのP12に現れています。

サイバーエージェントにおけるデータの品質管理について #cwt2016

これは2016年のCloudera World Tokyoにおいて弊社のデータの品質管理についての発表資料ですが、数々のミドルウェアからログが送られる黒いサーバの部分を管理するのが私の仕事です。

動的に転送先を選択できるようにしたり、データの変換やフィルタリングなどストリーミング処理をかませていたりします。

Push型 or Pull型

まずログ収集のミドルウェアはデータの取得方法によってPush型とPull型に分けることができます。

ミドルウェアが設定された送り先にログを転送するシステムはPush型に分類されます。有名どころでいうとApache Flume, Fluentd, Scribe, LogstashなどがPush型に分類されます。

一方、ログを集権的に管理するミドルウェアに対し必要なデータをクライアント側が明示的に指定して取得するのがPull型になります。Apache Kafkaや、ミドルウェアではないですがGoogle Cloud Pub/SubやKinesis StreamなどいわゆるPubSubモデルものはPull型に分類されます。

用語の定義

これから注意点を上げていきますが、ミドルウェアによって呼称が違う部分があります。

なので以下の説明では用語を統一して解説していきますので、みなさまの親しんでいる呼び名に脳内置換していただけると幸いです。

レコード => ミドルウェアで扱うデータの単位。Flumeのイベントに相当。

Source => Push型ミドルウェアにおいてレコードを受け取る口のこと。FluentdでいうInputに相当。

Channel => Pushミドルウェアにおいてレコードの送信が完了するまで保存しておく領域。Fluentdでいうbufferに相当。 

Sink => Pushミドルウェアにおけるレコードの転送先。Fluentdでいうoutputに相当。

上流 => レコードを送る側。

下流 => レコードを受け取る側。

またここから出てくるPull型はPubSubを想定しているのでクライアントを送信するクライアントをPublisher、レコードを取得するクライアントをSubscriberと呼び、Subscriberが参照するレコードのストリームをトピックと呼ぶこととします。

気をつけたいポイント

1. ログの詰まり

Push型の場合

今となってはいろんなところで使われているので今更こんなことを言われてもという感じではありますが、Push型のミドルウェアは下流への送達確認をしてからChannelからレコードを削除します。プロセスダウンなどで下流にレコードは送れない場合は、送信可能な状態になるまでChannelに貯め続けます。この仕組みによってPush型はデータロストを防いでいるのですが、この仕組みが首を絞めることもあります。

それはなんらかの理由で下流にデータが流せなくなり、Channelに溜まり続けてしまう。この状態をログが詰まってる状態と我々は読んでいます。

なんらかの状態で考えられるのは以下のとおりです。

  • 下流のプロセスが死んでる。
  • バグなどが原因でSinkで転送リトライし続けてデータが流れない。
  • 上流から流れるレコード量が下流への転送量を超えている。
  • 下流Aと下流Bにレコードを流す場合、下流Aに流せない場合。

Channelの最大容量を超えると、その上流で詰まり、最終的にはデータロストを引き起こします。。。(((( ;゚Д゚)))ガクガクブルブル

基本的にはChannelの容量を常に監視しておき、下流のスループットに余裕をもたせておけば問題ないので難しいことはないんですが、サービスが増えデータの送り先が増えるとこの管理も結構かなり煩わしくなります。

Pull型の場合

Pull型ではデータを集積しているミドルウェアとデータを取得するクライアントが疎結合なので、それぞれのクライアントが独立してデータを参照します。そのためPush型に見られた「下流Aにデータを送れないため下流Bも巻添えを食らう」という状態が起こらないからです。

Pull型にするとミドルウェアを管理する側としてはChannelの容量を監視しなくていいのである程度楽になります。(まあディスク容量は気にしなくならなければいけなくなるのですが、、、)

逆に言えば、Pull型ではSubscriber側が転送の詰まりを気にする必要が出てきます。そのため「Subscriberのスループットの上限 > Publisherのスループットが常に満たされていなければなりません。なので、Subscriberの参照しているレコードと追加されるレコードのラグが大きくならないかモニタリングしておく必要が有ります。

たとえばKafkaではJMXで最新のレコードのオフセットとSubscribeしているオフセットの最大差MaxLagという値が取れます。*1

これで追加されたレコードと参照されたレコードの間にキューイングされているレコード数を確認することができます。この値が大きくなったらSubscriberを増設するなどしてスループットをあげましょう。

またこのようなメトリクスが用意されていない場合は、レコードに含まれる時刻と現在の時刻の差分を出すようにするのもありだと思います。つまり送信されたレコードの時刻と現在の時刻の差分が大きければ、転送が詰まり気味と見なせるわけです。

 

f:id:sitotkfm:20161210191943p:plain

こんな感じでsubscriberのラグをグラフ化して監視しています。面白いグラフじゃなくてすみません。

ただし、これはレコードの時刻がレコードをPublishした時刻として扱えない場合は効果を発揮しません。

この時刻について次のそのまま話をつなげます。 

2. ログの時刻の扱い

当たり前ですが、ログの時刻はアクションを起こした時刻が入ります。ですが、実際にログがミドルウェアに届く時間はこのアクションを起こした時刻と一致することはあり得ません。転送遅延であったり溜まったイベントをワンショットで送られる場合もあります。

例えば前述のログの時刻差を監視する場合、上流で過去分のログを流したら転送が遅延すると誤認される可能性があります。これはちょっと間抜けな例ですが、常にログの時刻≒現在時刻と考えているとうまくいかない場合というのはままあります。

そして分散環境ではログの順序は保証されないことが多いです。アクションがA→Bに怒っても、AとBのアクションは異なるログサーバに送られる可能性がありますし、ミドルウェアで順序を保証していないケースは多いです。*2

ログ収集基盤はログの時系列が保たれていないことを常に考慮しておかなければなりません。

この話はどちらかというとログを送った先の解析やデータストアでどうにかする話のことが多いです。

3. ログ再送のすすめ

あまり考えたくないですが、ログの転送が失敗してデータが一部ロストしたとします。

データストリームの中からロストしたレコードを特定し補完するというのはあまり現実的ではなく、実際の運用ではロストした期間を特定して解析側のデータを削除してから期間内のログを全て再送します。このとき解析側のデータを削除できない場合、ロストしていないデータは2度送られることになります。その後、再解析したときに重複した分、数値が変わってしまってはいけないわけです。

ログは重複して送信されてもいいようにべき等性を担保しておく必要が有ります。

例えばログを一意に識別する値を付与し、DISTINCTを取ればべき等性を担保できます。

DISTINCTはデータストア側で気にすることが多いですが、実はこのべき等性っていうのはログ転送でも重要です。

メッセージの送達保証というのはExactlly-once(正確に一度到達)、At-Least-Once(一度以上到達)、At-Most-Once(高々一度到達)の三つに分類できます。もちろん一番望ましいのはExactlly-onceなんですが、これを保証するのは結構コストが高いです。

なので界隈ではAt-Least-Onceとべき等性を組み合わせることでメッセージの一意性を保証するEffective-Onceが注目されています。

まとめ

  • ログが詰まるとしんどい。しんどい。。。
  • Pull型(というかKafka)が持て囃されてるけど、利用する側がちゃんとしないとダメ。
  • べき等性が肝心
  • 時刻については後で書く。

長々書きましたが、これらのことは上手くやればそんなに苦にもならない話です。

どちらかといえばデータパイプラインをうまく管理方法すればいいかの方が重要です。とりあえず、Kafkaのトピックのうまい管理方法のアイディアを持ってる人は教えてください。。。

明日は@baba5246さんです。

ではごっつあんでした!

*1:http://kafka.apache.org/documentation.html#monitoring

*2:例えばApache Kafkaは同じトピックでもパーティションが異なれば順序の保証はないです。

LMAX Disruptorを調べたり触ったり

これはCyberAgent Advent Calender 16日目の記事です

CyberAgent エンジニア Advent Calendar 2015 - Adventar 

昨日は@choheyさんの「ボードゲーム部(仮)の紹介をするよ。」でした。

ボードゲーム部(仮)の紹介をするよ。 | 今日も元気にがんばるぞぞい

 

さてみなさん、

どすこい!

もう説明不要ですね!どすこいエンジニアの@sitotkfmです
何度も会社のブログで書くのもなんなので今回はブログを作ってみました。
おそらく今日が最初で最後の更新です。
 
さていきなりですが私は秋葉原ラボという組織に在籍しているのですが、秋葉原ラボの有志で分散システム勉強会という輪読会を毎週行っています。
この会は輪読会メンバーの多数決で票が集まった論文を担当の人が資料を作って発表するという極めて一般的な輪読会です。
例えば僕が担当の時にはFaceBookのGorillaという時系列データベース(と書いているけど読んだ感じはキャッシュのようなデータストア)について発表しました。
 
そして今週の月曜日にこの輪読会が行われたのですが、今回の発表はMicrosoftのFaRMというRDMAを使って論文の発表でした。
内容としてはデータストアと行ってもかなり低レイヤな話だったのでWeb企業ではあまり意識しないような領域の話で大変興味深かったのですが、その中でも私の興味を特に引いたのがCircular bufferというデータ構造でした。日本語のwikipediaだとリングバッファという記事になってます。
 リングバッファ - Wikipedia
 
さてCircular buffer、このデータ構造の話題になると必ず出てくるのが「LMAX Disruptor」というLibraryがあるそうです。
あるそうですと結んでいますが、恥ずかしながら私このLibraryについてその時まで全く知りませんでした。
なので今回は勉強がてらこのLMAX Distruptorでちょっと遊んでみたいと思います。
 

LMAX Disruptorとはなんぞや

複数のスレッド間でのデータのやり取りを行うためのです。データを渡す方(Producer, Publisher)からデータを使う方(Comsumer, EventProcessor)に渡す橋渡し的な存在です。
機能はキューなんですが、単純なスレッドセーフなキューではなくComsumerに依存関係をもたせたりすることもできるそうです。
 
Log4j 2にも採用されたLMAX Disruptorはなぜ狂ったように速いのか? | JUMPERZ.NET Blogという上の記事の通りlog4j2にも使われているそうですが、かの有名な活火山NoSQLであるHBaseでもWALに書き込むときに使用しています。
// At the core is a ring buffer.  Our ring buffer is the LMAX Disruptor.  It tries to
// minimize synchronizations and volatile writes when multiple contending threads as is the case
// here appending and syncing on a single WAL.  
 

ちょっとした内部構造の解説

全体を説明すると長くなりそうなのでここでは速さを実現するための工夫を少しだけ紹介します。

Memory Allocation

DisruptorはCircular Buffer(Disruptorの流儀ではRing bufferですが)にエントリを保持します。このデータ構造はサイズがあらかじめ決まっいるので先にメモリを確保してしまいます。
エントリを使い回すことでGCによる性能への影響を回避しています。

False sharing

コンピュータのストレージ構造というのはCPUを頂点として考えた時にCPUに近いほどアクセス速度が高くてサイズが小さく、CPUから離れていくに連れ速度は遅くなりますがその分サイズが大きくなります。
ですので一般的にデータアクセス速度を向上させるためにはいかに頻繁に使われるデータをCPUの近くに配置するかが鍵になるわけです。
まあCSの基本中の基本なんで今更という話なんですが、これが並列処理だと問題になる場合があります。
 
メモリ上のキャッシュというのはCache lineという単位でデータを保存しています。このCache lineは一般的に64byteです。例えば変数がlongの場合longは8byteなので一つのCache lineに8つのlongの変数が同居する可能性があります。
複数のスレッドでCache line上の異なる変数の更新した場合、競合してキャッシュが全く乗らなくなることが起こりえます。これがFalse Sharingという状態です。
 
LMAX DisruptorではFlase sharingを防ぐためにlong値のCache lineにpaddingとしてlongを呼ぶことで回避しています。
 

使ってみよう 

じゃあ早速使ってみましょう。
versionは最新の3.3.0を使っています。
  gistb5bd4e02e2f85d6eb2d5 gist34ce826c0b0ecaa3ff60 
 
へーって感じですね。
 
そりゃほぼGetting Started · LMAX-Exchange/disruptor Wiki · GitHubのコピペなのでそうですよね。
 
ではここでhandleEventsWithの部分をちょっと変更してみましょう。
 
このときdisruptor.handleEventsWithを二回呼び出すことでEventを二股に流しています。
 
Publish ---------> Handler1-1 ----- 1s -----> Handler1-2 
                   \                     ---> Handler2-2 - 0.5s -> Handler2-2
 
その結果がこちらになります。
 
 
この様にHandlerに依存関係をもたせたりHandlerのチェインを作ることができます。
(といってもEventを変更して次のHandlerに渡せるわけではないです。)
これ以外にもhandleEventsWithに複数のEventHandlerを登録することで並列に実行することも可能です。 
 
あとConsumerのWait Strategyを変えることで高速化を試みることもできます。
  • BlockingWaitStrategy : デフォルト。LockとConditionを使って同期をとる
  • SleepingWaitStrategy : いわゆるビジーウェイト
  • YieldingWaitStrategy : Thread.yieldを呼び出しまくる
  • BusySpinWaitStrategy : ほとんど何もしない

基本的に下に行くほど早くなるそうですが楽しそう

まとめ

あんまり掘り下げられなかったですが、スレッド間の通信は解析系のアプリを作るときには必要になることも少なくないと思うので色々検討しようと思います。枯れてそうな雰囲気なのも好印象です。

本当は速度検証をした方がよかったかもしれませんがそれは本家でやっているのを参考にしてください。

Performance Results · LMAX-Exchange/disruptor Wiki · GitHub

 

おまけ

ちなみになんでdisruptorという名前にしたかというと

Why is it called the Disruptor?

Well there are two reasons. Primarily we wanted to disrupt the common assumptions in this space because we think that they are wrong. But, to be honest, we also couldn’t resist the temptation; There was some talk about Phasers in Java at the time when we named it and, for those of you too young to care, Phasers were the Federation weapon and Disruptors the Klingon equivalent in Star Trek.
どうやらスタートレックにPhaserとDisruptorという兵器が出てくるらしく、JavaのPhaserクラスとかけてこの名前をつけたみたいです。
 
へーって感じですね。
 
参考ブログ

golang - 750,000MPSを達成したsurgemqの秘密に迫る - Qiita

Log4j 2にも採用されたLMAX Disruptorはなぜ狂ったように速いのか? | JUMPERZ.NET Blog

記事中で取り上げた記事 

 

 明日は@principia_caなんで社内でもすごい人が記事を書いてくれるはずです。

 

ではごっつあんでした!