sifue's blog

プログラマな二児の父の日常

Google app engine/Java のdatastoreのデータを複数のアプリケーションにまたがりダウンロードし、sqlite3にインポートする方法

今回、自分が運営するTwitterボット作成サービス キャラボットのユーザーデータ調査にあたって、GAE/Jのdatastoreのデータを複数のアプリケーションをまたがってダウンロードする必要があったので、そのスクリプトPythonで作りました。以下に手順を載せていきます。



- 1. GAE/Jのアプリケーションにremote_apiサーブレット設定する

GAE/Jのプロジェクトのwar/WEB-INF/web.xml内に、remote_apiサーブレットを加えます。

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE web-app
    PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
    "http://java.sun.com/dtd/web-app_2_3.dtd">

<web-app>
 <!-- 
 
 
         サービスの機能として使っている設定
 
 
  -->

 <!-- To use remote api -->
  <servlet>
    <servlet-name>remoteapi</servlet-name>
    <servlet-class>com.google.apphosting.utils.remoteapi.RemoteApiServlet</servlet-class>
  </servlet>
  <servlet-mapping>
    <servlet-name>remoteapi</servlet-name>
    <url-pattern>/remote_api</url-pattern>
  </servlet-mapping>
  <security-constraint>
    <web-resource-collection>
      <web-resource-name>remoteapi</web-resource-name>
      <url-pattern>/remote_api</url-pattern>
    </web-resource-collection>
    <auth-constraint>
      <role-name>admin</role-name>
    </auth-constraint>
  </security-constraint>
   
</web-app>

あとは、このweb.xmlを含めたwarフォルダ以下をGAE/JのアプリケーションにGoogleEclipseプラグインやappcfg.shを使ってアップロードします。



- 2. PythonのGAEの環境を作成
残念ながらJavaの開発環境のツールでは、datastoreのアップロード、ダウンロードするためのbulkloaderがついていないので、以下を見ながら環境を作成します。ここでPython2.5などのインストールも済ませてください。

http://code.google.com/intl/ja/appengine/docs/python/gettingstarted/devenvironment.html

コマンドラインで、pythonと打つと

$ python
Python 2.5.5 (r255:77872, Mar 9 2010, 21:47:39)
[GCC 4.2.1 (Apple Inc. build 5646) (dot 1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.

と、バージョンが出たり、appcfg.pyと打つと

$ appcfg.py
Usage: appcfg.py [options]

Action must be one of:
create_bulkloader_config: Create a bulkloader.yaml from a running application.
cron_info: Display information about cron jobs.
...

と出力されれば成功です。



- 3. bulkloaderのための設定ファイル出力し、設定する
そもそものbulkloaderの使い方は、

http://code.google.com/intl/en/appengine/docs/python/tools/uploadingdata.html

に書いてあります(英語のみ)。ただ、今回はこのbulkloaderのダウンロードの機能しか使いません。読み進めていくとどうやらbulkloader.yamlという設定ファイルを作る必要があるようです。というわけで早速、

appcfg.py create_bulkloader_config --filename=bulkloader_cvs.yaml --url=http://.appspot.com/remote_api

を実行して、bulkloader_cvs.yamlを作成します。ただ、この設定ファイルはただのバイナリがダウンロードできるだけの雛形で、CSVファイルだったりXMLファイルを生成するには、connectorの設定をしていかなくてはいけません。設定例は以上のURLにも書いてありますが、一つのプロパティから複数の情報を出したい時などは、違う書き方をしなくてはいけません。


詳しい例は、

◯Appengine Bulkloader IO 2010
http://bulkloadersample.appspot.com/showfile/bulkloader-presentation.pdf

の記述が役に立ちました。最終的には、bulkloader.pyのソースを読んだりしつつ、設定してみるのも一つです。自分は、JDOを使っているため、エンティティのプロパティではなく、キーの親子関係でデータの親子関係が表現されていたため、以下の設定のように、__key__のプロパティから、全ての親の階層のIDを取り出すということをしました。Slim3を使っているとこのへんの設定は楽なはずです。


また、今回は複数のアプリケーションから情報をダウンロードするため、全てのエンティティは、アプリケーションID、エンティティの名前、自分と自分より親のすべてのIDの3つで一意になります。実際のキャラボットの設定例は以下のようになりました。

# Autogenerated bulkloader.yaml file.
# You must edit this file before using it. TODO: Remove this line when done.
# At a minimum address the items marked with TODO:
#  * Fill in connector and connector_options
#  * Review the property_map.
#    - Ensure the 'external_name' matches the name of your CSV column,
#      XML tag, etc.
#    - Check that __key__ property is what you want. Its value will become
#      the key name on import, and on export the value will be the Key
#      object.  If you would like automatic key generation on import and
#      omitting the key on export, you can remove the entire __key__
#      property from the property map.

# If you have module(s) with your model classes, add them here. Also
# change the kind properties to model_class.
python_preamble:
- import: base64
- import: re
- import: google.appengine.ext.bulkload.transform
- import: google.appengine.ext.bulkload.bulkloader_wizard
- import: google.appengine.api.datastore
- import: google.appengine.api.users

transformers:

- kind: Keyword
  connector: csv
  connector_options:
  property_map:
    - property: __key__
      export:
      - external_name: twitter_account_key
        export_transform: transform.key_id_or_name_as_string_n(0)
      - external_name: post_type_key
        export_transform: transform.key_id_or_name_as_string_n(1)
      - external_name: key
        export_transform: transform.key_id_or_name_as_string_n(2)

    - property: createdAt
      external_name: createdAt
      import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S')
      export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S')

    - property: createdBy
      external_name: createdBy
      import_transform: transform.none_if_empty(users.User)  # Assumes email address

    - property: isActivated
      external_name: isActivated
      import_transform: transform.regexp_bool('true', re.IGNORECASE)

    - property: isRegex
      external_name: isRegex
      import_transform: transform.regexp_bool('true', re.IGNORECASE)

    - property: keyword
      external_name: keyword

    - property: sequence
      external_name: sequence
      import_transform: transform.none_if_empty(int)

    - property: updatedAt
      external_name: updatedAt
      import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S')
      export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S')

    - property: updatedBy
      external_name: updatedBy
      import_transform: transform.none_if_empty(users.User)  # Assumes email address


- kind: PostType
  connector: csv
  connector_options:
  property_map:
    - property: __key__
      export:
      - external_name: twitter_account_key
        export_transform: transform.key_id_or_name_as_string_n(0)
      - external_name: key
        export_transform: transform.key_id_or_name_as_string_n(1)

    - property: createdAt
      external_name: createdAt
      import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S')
      export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S')

    - property: createdBy
      external_name: createdBy
      import_transform: transform.none_if_empty(users.User)  # Assumes email address

    - property: ignoredIDs
      external_name: ignoredIDs

    - property: interval
      external_name: interval
      import_transform: transform.none_if_empty(int)

    - property: isUseSleep
      external_name: isUseSleep
      import_transform: transform.regexp_bool('true', re.IGNORECASE)

    - property: postTypeName
      external_name: postTypeName

    - property: sequence
      external_name: sequence
      import_transform: transform.none_if_empty(int)

    - property: updatedAt
      external_name: updatedAt
      import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S')
      export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S')

    - property: updatedBy
      external_name: updatedBy
      import_transform: transform.none_if_empty(users.User)  # Assumes email address


- kind: Post
  connector: csv
  connector_options:
  property_map:
    - property: __key__
      export:
      - external_name: twitter_account_key
        export_transform: transform.key_id_or_name_as_string_n(0)
      - external_name: post_type_key
        export_transform: transform.key_id_or_name_as_string_n(1)
      - external_name: keyword_key
        export_transform: transform.key_id_or_name_as_string_n(2)
      - external_name: key
        export_transform: transform.key_id_or_name_as_string_n(3)

    - property: count
      external_name: count
      import_transform: transform.none_if_empty(int)

    - property: createdAt
      external_name: createdAt
      import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S')
      export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S')

    - property: createdBy
      external_name: createdBy
      import_transform: transform.none_if_empty(users.User)  # Assumes email address

    - property: message
      external_name: message

    - property: sequence
      external_name: sequence
      import_transform: transform.none_if_empty(int)

    - property: updatedAt
      external_name: updatedAt
      import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S')
      export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S')

    - property: updatedBy
      external_name: updatedBy
      import_transform: transform.none_if_empty(users.User)  # Assumes email address


- kind: ServerProperties
  connector: csv
  connector_options:
  property_map:
    - property: __key__
      external_name: key
      export_transform: transform.key_id_or_name_as_string

    - property: hasTwitterAccountChange
      external_name: hasTwitterAccountChange
      import_transform: transform.regexp_bool('true', re.IGNORECASE)

    - property: isStopCreateBot
      external_name: isStopCreateBot
      import_transform: transform.regexp_bool('true', re.IGNORECASE)

    - property: lastExecuteTime
      external_name: lastExecuteTime
      import_transform: transform.none_if_empty(int)

    - property: limitTwitterAccount
      external_name: limitTwitterAccount
      import_transform: transform.none_if_empty(int)

    - property: numberRemainingTwitterAccount
      external_name: numberRemainingTwitterAccount
      import_transform: transform.none_if_empty(int)


- kind: TwitterAccount
  connector: csv
  connector_options:
  property_map:
    - property: __key__
      external_name: key
      export_transform: transform.key_id_or_name_as_string

    - property: botName
      external_name: botName

    - property: consumerKey
      external_name: consumerKey

    - property: consumerSecret
      external_name: consumerSecret

    - property: createdAt
      external_name: createdAt
      import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S')
      export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S')

    - property: createdBy
      external_name: createdBy
      import_transform: transform.none_if_empty(users.User)  # Assumes email address

    - property: isActivated
      external_name: isActivated
      import_transform: transform.regexp_bool('true', re.IGNORECASE)

    - property: owner
      external_name: owner
      import_transform: transform.none_if_empty(users.User)  # Assumes email address

    - property: screenName
      external_name: screenName

    - property: secret
      external_name: secret

    - property: timeZoneId
      external_name: timeZoneId

    - property: token
      external_name: token

    - property: updatedAt
      external_name: updatedAt
      import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S')
      export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S')

    - property: updatedBy
      external_name: updatedBy
      import_transform: transform.none_if_empty(users.User)  # Assumes email address

以上で、bulkloader_cvs.yamlの設定が終わったので、実際にCSVがダウンロードできるか以下のコマンドを叩いて試してみます。

appcfg.py download_data --config_file=bulkloader_cvs.yaml --filename=twitter_accounts.csv --kind=TwitterAccount --url=http://.appspot.com/remote_api


これで、TwitterAccountのCSVファイルが出力されれば成功です。bulkloader_cvs.yamlの設定が間違っていたりすると、丁寧に原因を教えてくれます。


あとここで、気をつけなくてはいけないことは必ず、一つのアプリケーションに対して一つのエンティティ(kind)ずつしかダウンロードできないと言うことです。これを踏まえて以下の、Pythonでsqliteへの取り込みプログラムを実装していきます。



- 4. Pythonでまとめて複数のアプリケーションから全てのエンティティをダウンロードして、sqliteのDBに入れる

というわけで、ここからPythonのプログラムを実装するわけですが、基本方針は、すべてのアプリケーションの全てのエンティティをダウンロードして、applicationIDを付加して、各エンティティのテーブルにて、一意になるようインポートすることです。ちなみに、あとでデータ解析をする時の高速化のためにインデックスもおまけで作ってあります。loader.pyの実装は以下の通り。bulkloader_cvs.yamlと同じフォルダに配置します。文字コードはUTF-8です。


loader.py

# -*- coding: utf-8 -*-
'''
Web上からappengineのデータストアを取得し、
sqlite3に格納する
複合主キーの設定と主キーのインデックスを作成する
Created on 2010/08/10
@author: soichiro
'''
import os
import sqlite3
import csv
import traceback

appcfg_path = "/usr/local/google_appengine/appcfg.py"
email = "hogehoge@gmail.com"
loader_path = "bulkloader_cvs.yaml"
db_path = "datastore.db"

# 最初かならずパスワード認証が必要なため、データ量の少ないエンティティから進めるのがオススメ
kind_names = (
              "ServerProperties",
              "TwitterAccount",
              "PostType",
              "Keyword",
              "Post"
              )

primary_key_dic = {
                    "ServerProperties":"applicationId, key",
                    "TwitterAccount":"applicationId, key",
                    "PostType":"applicationId, key, twitter_account_key",
                    "Keyword":"applicationId, key, twitter_account_key, post_type_key",
                    "Post":"applicationId, key, twitter_account_key, post_type_key, keyword_key"
                   }

application_ids = (
                    "charactorbot",
                    "charactorbot2"
                            )

def load(loader_path, application_id, kind_name, email):
    '''
    appengineのremote_apiからcsvファイルを取得する
    '''
    command_template = "%s download_data --email=%s --config_file=%s --filename=%s --kind=%s --url=http://%s.appspot.com/remote_api"
    command = command_template % (appcfg_path,
                                  email, 
                                  os.path.abspath(loader_path),
                                  get_csv_filename(application_id, kind_name),
                                  kind_name,
                                  application_id)
    os.system(command)

def get_csv_filename(application_id, kind_name):
    '''
    application_id, kind_nameの二つからcsvファイル名を取得する
    '''
    return kind_name + "_" + application_id + ".csv"

def import_to_sqlite(application_id, kind_name):
    '''
    application_id, kind_nameのcsvファイルをsqlite3のdbに読み込む
    '''
    csv_filename = get_csv_filename(application_id, kind_name)
    if not os.path.isfile(csv_filename): return
       
    csv_file = open(csv_filename, "r")
    header = csv_file.readline()
    csv_file.close()
    db = sqlite3.connect(db_path)
    try:
        try:
            db.execute("create table " + kind_name +
                       "(" +
                       "applicationId," +
                       header + "," +
                       " primary key("+  primary_key_dic[kind_name]  +"))")
            db.execute("create index primaryKeyIndex on " + kind_name +"("+ primary_key_dic[kind_name] +")")
        except sqlite3.OperationalError:
            print traceback.format_exc()
            print "[import_to_sqlite] Table name %s already exists." % kind_name
        
        reader = csv.reader(open(csv_filename, "rb"))

        count = 0  
        for row in reader:
            # headerを無視
            if count == 0:
                count += 1
                continue
            
            # 先頭にapplicationIdを加えてテーブルを作成してあるのでそれに対応
            row.insert(0, application_id)
            
            ph = "?," * (len(row) - 1) + "?"
            sql = "insert into %s VALUES(%s)" % (kind_name, ph)
            db.execute(sql, tuple(row))
            
            if count % 100 == 0:
                print ".",
                
            count += 1
            
    finally:
        db.commit()
        db.close()
    print "[import_to_sqlite] application_id: %s , kind_name : %s imported." % (application_id, kind_name)
        
def delete_csv_file(application_id, kind_name):
    '''
    application_id, kind_nameのcsvファイルを削除します
    '''
    csv_filename = get_csv_filename(application_id, kind_name)
    if not os.path.isfile(csv_filename): return
    os.remove(csv_filename)


if __name__ == '__main__':
    
    for kind_name in kind_names:
        for application_id in application_ids:
            load(loader_path,
                  application_id,
                  kind_name,
                  email)
    print "[__main__] load finished."
    
    for kind_name in kind_names:
        for application_id in application_ids:
            import_to_sqlite(application_id,
                             kind_name)
    print "[__main__] import to sqlite finished."
    
    for kind_name in kind_names:
        for application_id in application_ids:
            delete_csv_file(application_id,
                             kind_name)
    print "[__main__] delete csv file."
    
    os.system("rm bulkloader-log-*")
    os.system("rm bulkloader-progress-*")
    os.system("rm bulkloader-results-*")
    print "[__main__] delete logs."
    print "[__main__] loader.py totally finished."

以上を実行することで、無事sqliteに主キーが作られた状態でsqliteのDBにインポートされます。
Lita( http://www.forest.impress.co.jp/article/2009/02/17/lita.html ) などのビュアーでちゃんとデータが入っているか見てみましょう。処理時間の長いSQLを流したり、データ量が1GB近い場合には、コマンドラインから実行しないと処理速度が遅いかもしれません。


なお、すべてのデータはTEXT形式で入っているので、数値として後に解析したい場合には、SQLのround('数値の文字列')関数を使って数値に変換してやる必要があります。


以上が、Google app engine/Java のdatastoreをPythonを使って複数のアプリケーションにまたがりダウンロードし、sqlite3にインポートする方法でした。お疲れ様でした。
あとは、好きなだけSQLを書いたり、Python等で解析してみてください。


追伸
実はPythonでしっかりコードを書いたのは初めてです。ちょっとJavaっぽかったり、Rubyっぽかったりしますが、ご指摘頂けるとうれしいです。