こんにちは。takada-at です。


最近 KLabデータ分析グループではAmazon Web Services(AWS)が提供するデータ分析に特化したデータベースであるAmazon Redshiftを導入し、活用しはじめています。
そこで、いくつか運用で学んだRedshiftのノウハウを書いていこうと思います。


今回はデータインポートの際のノウハウのひとつです。
MySQLには、データインポート時、LOAD DATA INFILE コマンドにREPLACEというキーワードをつけることで、データが存在しなければINSERT、データが存在すればUPDATEという動作を実現することができます。


RedshiftにはS3またはDynamoDBから高速にデータをインポートするCOPYコマンドが用意されていますが、このコマンドにはMySQLのREPLACE相当の機能はありません。


ただし、多少めんどうですが、Redshiftの公式ドキュメントでも紹介されている以下の方法で、REPLACE相当の機能を実現できます。


まず更新対象のテーブルに対しステージング用のテーブルを用意します。
例.
本番テーブル: salesdata
ステージングテーブル: salesdata_staging


以下の手順でコマンドを実行することで、高速にINSERT + UPDATEを実現できます。

1. salesdata_stagingのTRUNCATE
まずステージングテーブルを空にしておきます。 
 
2. salesdata_stagingへのCOPY
先にステージングテーブルにデータをインポートします。 

3. salesdataのUPDATE
UPDATE  salesdata
  SET col1=s.col1, col2=s.col2 
FROM
  salesdata_staging
WHERE
  salesdata.unique_id=salesdata_staging.unique_id  

ステージングテーブルのデータを使って本テーブルを更新します。  
 
4. salesdata_staging から salesdataへのINSERT
INSERT INTO salesdata
SELECT * FROM salesdata_staging
LEFT JOIN salesdata
  ON salesdata.unique_id=salesdata_staging.unique_id
WHERE
   salesdata.id IS NULL
 
salesdataに存在しないデータのみをINSERTします。


以上です。KLabデータ分析グループではpythonライブラリ内にこれを実現する関数を用意し、簡単に呼び出せるようにしています。
1から2ノードのRedshiftで使用して、数万件から数十万件程度のデータなら数十秒程度で完了します。


以下にpythonのサンプルスクリプトを掲載しておきます。


def upsert(db, table, stagingtable, joincolnames, updatecolnames, data, enable_update=True):
    u"""
    db -- dbコネクション
    table -- 更新対象
    stagingtable -- ステージングテーブル
    joincolnames -- ステージングテーブルとターゲットテーブルのJOINに使用するカラム。ユニークになる条件を指定すること
    updatecolnames -- アップデート対象のカラム
    data -- 取り込むデータ
    enable_update -- Falseの場合はUPDATEしない
    """
    joinconditions = ["{table}.{colname}={stagingtable}.{colname}".format(table=table, stagingtable=stagingtable, colname=colname) for colname in joincolnames]
    joincondition = " AND ".join(joinconditions)

    updateexprs = ["{colname}={stagingtable}.{colname}".format(table=table, stagingtable=stagingtable, colname=colname) for colname in updatecolnames]
    updateexpr  = ", ".join(updateexprs)

    updatestatement = "UPDATE {table} SET {updateexpr} FROM {stagingtable} WHERE {joincondition}"
    updatestatement = updatestatement.format(table=table, updateexpr=updateexpr, stagingtable=stagingtable, joincondition=joincondition)
    insertstatement = '''INSERT INTO {table} SELECT {stagingtable}.* FROM {stagingtable}
                         LEFT JOIN {table}
                         ON {joincondition}
                         WHERE {table}.{somecollumn} IS NULL
                      '''
    insertstatement = insertstatement.format(table=table, stagingtable=stagingtable, joincondition=joincondition, somecollumn=joincolnames[0])
    # ↑ データが存在するかどうかの判定をjoincolnames[0]がNULLかどうかで行なっているので、ちょっと注意。
    # nullableなカラムがjoincolnamesに入ってくることはおそらく無いだろうと仮定している

    # ステージのTRUNCATE
    db.execute("TRUNCATE {table}".format(table=stagingtable))
    # ステージにCOPY
    copytotable(db, stagingtable, data)
    if enable_update:
        # 重複するデータのupdate
        db.execute(updatestatement)

    # まだ存在しないデータのinsert
    db.execute(insertstatement)