Auroraを活用してAWS Glueでデータ分析機能を構築した話

alt ベネッセ i-キャリアの小島です。 dodaキャンパスでは、AWS AthenaやQuickSightを利用して、日々データ分析活用を行っていますが、 今回は、Auroraを活用して、AWS Glueでデータ分析機能を構築しました。 その際に選定したアーキテクトとその理由を中心にお話させていただきたいと思います。

背景

dodaキャンパスでは現在、以下のサービスを用いてデータを取り扱っています。

  • DynamoDB
    • サービスのデータ管理として利用
  • Athena
    • SQLを用いたデータ分析に利用
  • QuickSight
    • AthenaをデータソースとしたBIツールとして利用

今回、QuickSightで実行しているデータ分析のような機能を法人ユーザーにも提供することになりました。 要件は以下です

  • 前日までの分析データのためデータの更新は1日1回でよい
  • Readは大量かつ高速であることが求められる
  • グラフを用いた描画を提供する

アーキテクト概要

既存の構成および、今回の要件を図にすると以下のようになります。 今回は、赤枠の部分をどうするかというお話です。

データマートの検討

まずはデータマート部分の検討です。

結論から申しますと、データの構造や読み込みクエリの速度、データ量や認証などを考慮した結果、AWS Auroraを利用することになりました。 参考までに、採用を検討した他のアイデアと採用しなかった簡単な理由を掲載します。

採用しなかった案と理由

  • DynamoDB
    • 複雑なクエリでの読み込みができないため
  • Elasticsearch
    • Elasticsearchに専用のデータ形式に変換してしまうため
  • Athena
    • クエリ実行速度が早くない
  • QuickSight埋め込みによるデータ提供
  • RedShift
    • 扱いたいデータ量に比べて、Redshiftが得意としているデータ量が大規模だった
    • 数十TBクラスのデータを取り扱うようになったら検討すると良いとのこと

Tips : Auroraの利用方法についての検討

Aurora(RDS)を利用するにあたり、以下の方法を検討しました。 下記理由により、採用しませんでしたが、要件によっては候補に上がるかもしれません。

  • RDS Proxy
    • Auroraに対して、大量のReadを行う前提だとRead Replica + Reader End Pointの構成が推奨されるが、RDS ProxyはReader Endpointに対して適用できないため十分に利用できない
  • Aurora Serverless
    • 制限があったりSLAがない他、FailOverがAuroraほど高速ではない。Read Replicaが少ない小さいシステムであれば向いている。

ETL方法の検討

次にETLの方法を検討しました。 結論としては、すでにサービスのETL処理に利用していたAWS Glueを採用することにしました。

  • Lambda
    • 実行時間が長い処理に向かないため、大量のデータをロードすることができない。
  • インスタンスやコンテナ上で動作するスクリプト
    • 管理およびそれを整備するための実装コストがかかるため

Glue用のスクリプト

メインのロジック部分は以下のようなコードを記載しました。

athena = boto3.client('athena')
ATHENA_OUTPUT_DIR = "s3://%s/%s" % (args['S3BUCKET_NAME'], datetime.strftime(datetime.now(pytz.timezone('Asia/Tokyo')) - timedelta(days=1), '%Y/%m/%d'))
s3 = boto3.client('s3')

ATHENA_QUERY = """
SELECT xxxxxx
  ...(略)
"""


@retry(stop_max_attempt_number = 10,
       wait_fixed = 60 * 1000 * 3)
def poll_status(_id):
    result = athena.get_query_execution(
        QueryExecutionId = _id
    )

    state = result['QueryExecution']['Status']['State']
    if state == 'SUCCEEDED':
        return result
    elif state == 'FAILED':
        return result
    else:
        raise Exception("poll_statusで例外発生(state:%s)" % (state))

def query_to_athena():
    send_slack('query_to_athena start')
    output_location = ATHENA_OUTPUT_DIR

    result = athena.start_query_execution(
       QueryString = ATHENA_QUERY,
       QueryExecutionContext = {
           'Database': args['ATHENA_DATABASE_NAME']
       },
       ResultConfiguration = {
           'OutputLocation': output_location,
       }
    )
    result = poll_status(result['QueryExecutionId'])
    return result

def query_to_aurora(QueryExecutionId):
    send_slack('query_to_aurora start')
    # 一旦別テーブルにデータを入れて、renameすることにより、データが空の時間を最小限にする
    truncate_sql = "TRUNCATE TABLE %s_tmp" %(args['AURORA_TABLE_NAME'])
    load_sql = """
        LOAD DATA FROM S3 '%s/%s.csv' INTO TABLE %s_tmp FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '' LINES TERMINATED BY '\n' IGNORE 1 ROWS
            SET
                created_at = NULLIF(created_at, '0000-00-00 00:00:00'),
                ...(略)
""" % (ATHENA_OUTPUT_DIR, QueryExecutionId, args['AURORA_TABLE_NAME'])
    rename_sql_1st = "RENAME TABLE %s TO %s_old" %(args['AURORA_TABLE_NAME'], args['AURORA_TABLE_NAME'])
    rename_sql_2st = "RENAME TABLE %s_tmp TO %s" %(args['AURORA_TABLE_NAME'], args['AURORA_TABLE_NAME'])
    rename_sql_3st = "RENAME TABLE %s_old TO %s_tmp" %(args['AURORA_TABLE_NAME'], args['AURORA_TABLE_NAME'])
    try:
        aurora_cur.execute(truncate_sql)
        aurora_cur.execute(load_sql)
        aurora_cur.execute(rename_sql_1st)
        aurora_cur.execute(rename_sql_2st)
        aurora_cur.execute(rename_sql_3st)
        aurora_conn.commit()
    except mysql.connector.Error as err:
        send_slack(err, "danger")
        aurora_conn.rollback()

aurora_conn = mysql.connector.connect(**aurora_config, use_pure=True)
aurora_cur = aurora_conn.cursor(buffered=True)

try:
    result = query_to_athena()
    if result['QueryExecution']['Status']['State'] == 'SUCCEEDED':
        query_to_aurora(result['QueryExecution']['QueryExecutionId'])
    else:
        send_slack("athenaデータの取得に失敗しました", "danger")

except NameError:
    send_slack("athenaデータの取得に失敗しました (NameError)", "danger")
except Exception as err:
    send_slack(err, "danger")

aurora_cur.close()
aurora_conn.close()

その他工夫点

認証

RDSにアクセスするLambdaにはPWなどの認証情報を定義したくなかったため、IAMによる認証を採用しました。 こちらの記事が詳しいので参照させていただきました IAM認証によるRDS接続を試してみた | Developers.IO

まとめ

最終的に、以下のような構成になりました。

最終的な構成

記載しなかったものも含めて、今回、主に以下の観点で選定を行いました。

  • Read / Write の頻度
  • 必要な検索の種類
  • データ容量(現在・将来)
  • 今後のデータ構造変更の頻度
  • 運用コスト
  • 認証方法
  • 可用性
  • ETLの要件

AWSには様々なデータ系のサービスがあるので、既に利用しているサービスだけにとらわれず、上記のような特色に合わせて検討することが必要だと感じました。

alt

小島 慧 Satoshi Ojima

株式会社ベネッセi-キャリア 商品サービス本部 dodaキャンパス企画部 IT企画課

SESでバックエンド系のシステムを中心に、様々な企業でサーバサイドアプリケーションの開発やAWSを用いたクラウド構築・運用などの業務を担当。2018年9月よりパーソルキャリアに入社。リードエンジニアとしてdoda Campusの設計・開発・運用保守等を担当。

※2021年2月現在の情報です。