今回、自分が運営するTwitterボット作成サービス キャラボットのユーザーデータ調査にあたって、GAE/Jのdatastoreのデータを複数のアプリケーションをまたがってダウンロードする必要があったので、そのスクリプトをPythonで作りました。以下に手順を載せていきます。
- 1. GAE/Jのアプリケーションにremote_apiのサーブレット設定する
GAE/Jのプロジェクトのwar/WEB-INF/web.xml内に、remote_apiのサーブレットを加えます。
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd"> <web-app> <!-- サービスの機能として使っている設定 --> <!-- To use remote api --> <servlet> <servlet-name>remoteapi</servlet-name> <servlet-class>com.google.apphosting.utils.remoteapi.RemoteApiServlet</servlet-class> </servlet> <servlet-mapping> <servlet-name>remoteapi</servlet-name> <url-pattern>/remote_api</url-pattern> </servlet-mapping> <security-constraint> <web-resource-collection> <web-resource-name>remoteapi</web-resource-name> <url-pattern>/remote_api</url-pattern> </web-resource-collection> <auth-constraint> <role-name>admin</role-name> </auth-constraint> </security-constraint> </web-app>
あとは、このweb.xmlを含めたwarフォルダ以下をGAE/JのアプリケーションにGoogleのEclipseプラグインやappcfg.shを使ってアップロードします。
- 2. PythonのGAEの環境を作成
残念ながらJavaの開発環境のツールでは、datastoreのアップロード、ダウンロードするためのbulkloaderがついていないので、以下を見ながら環境を作成します。ここでPython2.5などのインストールも済ませてください。
http://code.google.com/intl/ja/appengine/docs/python/gettingstarted/devenvironment.html
$ python
Python 2.5.5 (r255:77872, Mar 9 2010, 21:47:39)
[GCC 4.2.1 (Apple Inc. build 5646) (dot 1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
と、バージョンが出たり、appcfg.pyと打つと
$ appcfg.py
Usage: appcfg.py [options]Action must be one of:
create_bulkloader_config: Create a bulkloader.yaml from a running application.
cron_info: Display information about cron jobs.
...
と出力されれば成功です。
- 3. bulkloaderのための設定ファイル出力し、設定する
そもそものbulkloaderの使い方は、
http://code.google.com/intl/en/appengine/docs/python/tools/uploadingdata.html
に書いてあります(英語のみ)。ただ、今回はこのbulkloaderのダウンロードの機能しか使いません。読み進めていくとどうやらbulkloader.yamlという設定ファイルを作る必要があるようです。というわけで早速、
appcfg.py create_bulkloader_config --filename=bulkloader_cvs.yaml --url=http://
.appspot.com/remote_api
を実行して、bulkloader_cvs.yamlを作成します。ただ、この設定ファイルはただのバイナリがダウンロードできるだけの雛形で、CSVファイルだったりXMLファイルを生成するには、connectorの設定をしていかなくてはいけません。設定例は以上のURLにも書いてありますが、一つのプロパティから複数の情報を出したい時などは、違う書き方をしなくてはいけません。
詳しい例は、
◯Appengine Bulkloader IO 2010
http://bulkloadersample.appspot.com/showfile/bulkloader-presentation.pdf
の記述が役に立ちました。最終的には、bulkloader.pyのソースを読んだりしつつ、設定してみるのも一つです。自分は、JDOを使っているため、エンティティのプロパティではなく、キーの親子関係でデータの親子関係が表現されていたため、以下の設定のように、__key__のプロパティから、全ての親の階層のIDを取り出すということをしました。Slim3を使っているとこのへんの設定は楽なはずです。
また、今回は複数のアプリケーションから情報をダウンロードするため、全てのエンティティは、アプリケーションID、エンティティの名前、自分と自分より親のすべてのIDの3つで一意になります。実際のキャラボットの設定例は以下のようになりました。
# Autogenerated bulkloader.yaml file. # You must edit this file before using it. TODO: Remove this line when done. # At a minimum address the items marked with TODO: # * Fill in connector and connector_options # * Review the property_map. # - Ensure the 'external_name' matches the name of your CSV column, # XML tag, etc. # - Check that __key__ property is what you want. Its value will become # the key name on import, and on export the value will be the Key # object. If you would like automatic key generation on import and # omitting the key on export, you can remove the entire __key__ # property from the property map. # If you have module(s) with your model classes, add them here. Also # change the kind properties to model_class. python_preamble: - import: base64 - import: re - import: google.appengine.ext.bulkload.transform - import: google.appengine.ext.bulkload.bulkloader_wizard - import: google.appengine.api.datastore - import: google.appengine.api.users transformers: - kind: Keyword connector: csv connector_options: property_map: - property: __key__ export: - external_name: twitter_account_key export_transform: transform.key_id_or_name_as_string_n(0) - external_name: post_type_key export_transform: transform.key_id_or_name_as_string_n(1) - external_name: key export_transform: transform.key_id_or_name_as_string_n(2) - property: createdAt external_name: createdAt import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S') export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S') - property: createdBy external_name: createdBy import_transform: transform.none_if_empty(users.User) # Assumes email address - property: isActivated external_name: isActivated import_transform: transform.regexp_bool('true', re.IGNORECASE) - property: isRegex external_name: isRegex import_transform: transform.regexp_bool('true', re.IGNORECASE) - property: keyword external_name: keyword - property: sequence external_name: sequence import_transform: transform.none_if_empty(int) - property: updatedAt external_name: updatedAt import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S') export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S') - property: updatedBy external_name: updatedBy import_transform: transform.none_if_empty(users.User) # Assumes email address - kind: PostType connector: csv connector_options: property_map: - property: __key__ export: - external_name: twitter_account_key export_transform: transform.key_id_or_name_as_string_n(0) - external_name: key export_transform: transform.key_id_or_name_as_string_n(1) - property: createdAt external_name: createdAt import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S') export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S') - property: createdBy external_name: createdBy import_transform: transform.none_if_empty(users.User) # Assumes email address - property: ignoredIDs external_name: ignoredIDs - property: interval external_name: interval import_transform: transform.none_if_empty(int) - property: isUseSleep external_name: isUseSleep import_transform: transform.regexp_bool('true', re.IGNORECASE) - property: postTypeName external_name: postTypeName - property: sequence external_name: sequence import_transform: transform.none_if_empty(int) - property: updatedAt external_name: updatedAt import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S') export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S') - property: updatedBy external_name: updatedBy import_transform: transform.none_if_empty(users.User) # Assumes email address - kind: Post connector: csv connector_options: property_map: - property: __key__ export: - external_name: twitter_account_key export_transform: transform.key_id_or_name_as_string_n(0) - external_name: post_type_key export_transform: transform.key_id_or_name_as_string_n(1) - external_name: keyword_key export_transform: transform.key_id_or_name_as_string_n(2) - external_name: key export_transform: transform.key_id_or_name_as_string_n(3) - property: count external_name: count import_transform: transform.none_if_empty(int) - property: createdAt external_name: createdAt import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S') export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S') - property: createdBy external_name: createdBy import_transform: transform.none_if_empty(users.User) # Assumes email address - property: message external_name: message - property: sequence external_name: sequence import_transform: transform.none_if_empty(int) - property: updatedAt external_name: updatedAt import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S') export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S') - property: updatedBy external_name: updatedBy import_transform: transform.none_if_empty(users.User) # Assumes email address - kind: ServerProperties connector: csv connector_options: property_map: - property: __key__ external_name: key export_transform: transform.key_id_or_name_as_string - property: hasTwitterAccountChange external_name: hasTwitterAccountChange import_transform: transform.regexp_bool('true', re.IGNORECASE) - property: isStopCreateBot external_name: isStopCreateBot import_transform: transform.regexp_bool('true', re.IGNORECASE) - property: lastExecuteTime external_name: lastExecuteTime import_transform: transform.none_if_empty(int) - property: limitTwitterAccount external_name: limitTwitterAccount import_transform: transform.none_if_empty(int) - property: numberRemainingTwitterAccount external_name: numberRemainingTwitterAccount import_transform: transform.none_if_empty(int) - kind: TwitterAccount connector: csv connector_options: property_map: - property: __key__ external_name: key export_transform: transform.key_id_or_name_as_string - property: botName external_name: botName - property: consumerKey external_name: consumerKey - property: consumerSecret external_name: consumerSecret - property: createdAt external_name: createdAt import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S') export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S') - property: createdBy external_name: createdBy import_transform: transform.none_if_empty(users.User) # Assumes email address - property: isActivated external_name: isActivated import_transform: transform.regexp_bool('true', re.IGNORECASE) - property: owner external_name: owner import_transform: transform.none_if_empty(users.User) # Assumes email address - property: screenName external_name: screenName - property: secret external_name: secret - property: timeZoneId external_name: timeZoneId - property: token external_name: token - property: updatedAt external_name: updatedAt import_transform: transform.import_date_time('%Y-%m-%dT%H:%M:%S') export_transform: transform.export_date_time('%Y-%m-%dT%H:%M:%S') - property: updatedBy external_name: updatedBy import_transform: transform.none_if_empty(users.User) # Assumes email address
以上で、bulkloader_cvs.yamlの設定が終わったので、実際にCSVがダウンロードできるか以下のコマンドを叩いて試してみます。
appcfg.py download_data --config_file=bulkloader_cvs.yaml --filename=twitter_accounts.csv --kind=TwitterAccount --url=http://
.appspot.com/remote_api
これで、TwitterAccountのCSVファイルが出力されれば成功です。bulkloader_cvs.yamlの設定が間違っていたりすると、丁寧に原因を教えてくれます。
あとここで、気をつけなくてはいけないことは必ず、一つのアプリケーションに対して一つのエンティティ(kind)ずつしかダウンロードできないと言うことです。これを踏まえて以下の、Pythonでsqliteへの取り込みプログラムを実装していきます。
- 4. Pythonでまとめて複数のアプリケーションから全てのエンティティをダウンロードして、sqliteのDBに入れる
というわけで、ここからPythonのプログラムを実装するわけですが、基本方針は、すべてのアプリケーションの全てのエンティティをダウンロードして、applicationIDを付加して、各エンティティのテーブルにて、一意になるようインポートすることです。ちなみに、あとでデータ解析をする時の高速化のためにインデックスもおまけで作ってあります。loader.pyの実装は以下の通り。bulkloader_cvs.yamlと同じフォルダに配置します。文字コードはUTF-8です。
loader.py
# -*- coding: utf-8 -*- ''' Web上からappengineのデータストアを取得し、 sqlite3に格納する 複合主キーの設定と主キーのインデックスを作成する Created on 2010/08/10 @author: soichiro ''' import os import sqlite3 import csv import traceback appcfg_path = "/usr/local/google_appengine/appcfg.py" email = "hogehoge@gmail.com" loader_path = "bulkloader_cvs.yaml" db_path = "datastore.db" # 最初かならずパスワード認証が必要なため、データ量の少ないエンティティから進めるのがオススメ kind_names = ( "ServerProperties", "TwitterAccount", "PostType", "Keyword", "Post" ) primary_key_dic = { "ServerProperties":"applicationId, key", "TwitterAccount":"applicationId, key", "PostType":"applicationId, key, twitter_account_key", "Keyword":"applicationId, key, twitter_account_key, post_type_key", "Post":"applicationId, key, twitter_account_key, post_type_key, keyword_key" } application_ids = ( "charactorbot", "charactorbot2" ) def load(loader_path, application_id, kind_name, email): ''' appengineのremote_apiからcsvファイルを取得する ''' command_template = "%s download_data --email=%s --config_file=%s --filename=%s --kind=%s --url=http://%s.appspot.com/remote_api" command = command_template % (appcfg_path, email, os.path.abspath(loader_path), get_csv_filename(application_id, kind_name), kind_name, application_id) os.system(command) def get_csv_filename(application_id, kind_name): ''' application_id, kind_nameの二つからcsvファイル名を取得する ''' return kind_name + "_" + application_id + ".csv" def import_to_sqlite(application_id, kind_name): ''' application_id, kind_nameのcsvファイルをsqlite3のdbに読み込む ''' csv_filename = get_csv_filename(application_id, kind_name) if not os.path.isfile(csv_filename): return csv_file = open(csv_filename, "r") header = csv_file.readline() csv_file.close() db = sqlite3.connect(db_path) try: try: db.execute("create table " + kind_name + "(" + "applicationId," + header + "," + " primary key("+ primary_key_dic[kind_name] +"))") db.execute("create index primaryKeyIndex on " + kind_name +"("+ primary_key_dic[kind_name] +")") except sqlite3.OperationalError: print traceback.format_exc() print "[import_to_sqlite] Table name %s already exists." % kind_name reader = csv.reader(open(csv_filename, "rb")) count = 0 for row in reader: # headerを無視 if count == 0: count += 1 continue # 先頭にapplicationIdを加えてテーブルを作成してあるのでそれに対応 row.insert(0, application_id) ph = "?," * (len(row) - 1) + "?" sql = "insert into %s VALUES(%s)" % (kind_name, ph) db.execute(sql, tuple(row)) if count % 100 == 0: print ".", count += 1 finally: db.commit() db.close() print "[import_to_sqlite] application_id: %s , kind_name : %s imported." % (application_id, kind_name) def delete_csv_file(application_id, kind_name): ''' application_id, kind_nameのcsvファイルを削除します ''' csv_filename = get_csv_filename(application_id, kind_name) if not os.path.isfile(csv_filename): return os.remove(csv_filename) if __name__ == '__main__': for kind_name in kind_names: for application_id in application_ids: load(loader_path, application_id, kind_name, email) print "[__main__] load finished." for kind_name in kind_names: for application_id in application_ids: import_to_sqlite(application_id, kind_name) print "[__main__] import to sqlite finished." for kind_name in kind_names: for application_id in application_ids: delete_csv_file(application_id, kind_name) print "[__main__] delete csv file." os.system("rm bulkloader-log-*") os.system("rm bulkloader-progress-*") os.system("rm bulkloader-results-*") print "[__main__] delete logs." print "[__main__] loader.py totally finished."
以上を実行することで、無事sqliteに主キーが作られた状態でsqliteのDBにインポートされます。
Lita( http://www.forest.impress.co.jp/article/2009/02/17/lita.html ) などのビュアーでちゃんとデータが入っているか見てみましょう。処理時間の長いSQLを流したり、データ量が1GB近い場合には、コマンドラインから実行しないと処理速度が遅いかもしれません。
なお、すべてのデータはTEXT形式で入っているので、数値として後に解析したい場合には、SQLのround('数値の文字列')関数を使って数値に変換してやる必要があります。
以上が、Google app engine/Java のdatastoreをPythonを使って複数のアプリケーションにまたがりダウンロードし、sqlite3にインポートする方法でした。お疲れ様でした。
あとは、好きなだけSQLを書いたり、Python等で解析してみてください。
追伸
実はPythonでしっかりコードを書いたのは初めてです。ちょっとJavaっぽかったり、Rubyっぽかったりしますが、ご指摘頂けるとうれしいです。