Geographica restituta per globi trientes

まとめ

Elastic MapReduceは、amazonのAWS上でHadoopを使えるサービスです。一時的にインスタンスをたくさん立ち上げることで、重めのバッチ処理を分散処理できます。料金はEC2インスタンス分 +αで使用できます。
楽! インスタンスいっぱいたちあげます、計算します、結果まとめますという一連の処理を気軽に書けて、コマンド一発で実行できます。特に集計処理、バッチ処理には便利です。
ただデバッグは手間がかかります。


MapReduceについて

MapReduceは、大量のデータを複数のマシンで分散して扱うための技術です(デザインパターン的な)。
基本的な考え方は、処理を以下の2種類に分けるだけです。
  •  map: データ1行に対する処理
  •  reduce: 集計処理



mrjobの紹介

Hadoopは基本的にJavaで書くんですが、Hadoop Streamingと言って他の言語の標準入出力を利用することもできます。今回はこちらを利用してます。
pythonのmrjobというライブラリをつかえば、Hadoopのめんどうな部分をラップして使うことができます。

動かすための設定ファイルは以下のようになります。個々のパラメーターは実行時オプションでも上書きできますが、重要なのはAWSにアクセスするための aws_access_key_id と aws_secret_access_key です(アクセスキー自体はAWSのWebコンソールで発行できます)。
↓以下のファイルを ~/.mrjob.confという名前で保存
 
runners:
  emr:
    ec2_instance_type: m1.large
    num_ec2_instances: 4
    aws_access_key_id: ひみつ
    aws_secret_access_key: ひみつ


実行するスクリプト

サンプルとして、アクセスログから時間帯ごとのPVを抽出するスクリプトを掲載します。
#!/usr/bin/env python
# coding:utf-8

import traceback
from cStringIO import StringIO
import csv
import datetime
import mrjob
from mrjob.job import MRJob
import sys

class CountHourly(MRJob):
    def configure_options(self):
        super(CountHourly, self).configure_options()
    def __init__(self, args=None):
        super(CountHourly, self).__init__(args)
    def mapper(self, key, value):
        # mapper: ログの各行を処理する部分。ここではアクセスログをパースし、時間を取得している。
        try:
            reader = csv.reader(StringIO(value), delimiter=' ', quotechar='"')
            row = reader.next()
            if not row:
                return
            actime = datetime.datetime.strptime(row[1]+' '+row[2], '%Y-%m-%d %H:%M:%S')
            hour   = actime.hour
            key = [hour]
            yield hour, 1
        except Exception:
            sys.stderr.write(traceback.format_exc() + "\n")

    def reducer(self, key, data):
        # reducer: mapperのアウトプットを集計する。ここでは時間帯別に データの長さを取得するだけ。
        yield key, sum(data)

if __name__ == '__main__':
    CountHourly.run()



以下のように実行します。はじめはローカルでデバッグするのがおすすめです。


# デバッグ用にローカルで
python counthourly.py path/to/log > result
# elastic mapreduce利用
python counthourly.py -r emr s3://mybucket/path/to/log > result


ディレクトリを引数に指定すると、その下の全ファイルを入力として実行します。またbz2形式は勝手に解凍して処理してくれます。
内部でインスタンスを起動し、スクリプトをコピーし、実行して結果をS3から取得しますが、そのあたりはmrjobがラップしてくれます。