はじめに
これまでに幾つか、AWS上で動くリソースを制御するための自動化便利ツールを作ってきましたが、これをきっかけに日常の色々な作業を自動化するための布石として、なんかいいネタないかなぁと考えながら、builders.flashを眺めていたら、よくあるパターンのTwitterのbotをAmazon EventBridge + AWS Lambda + Amazon DynamoDBで実装してみるというちょうど良いネタがあったので、サクッと実装しつつ、若干のカスタマイズをしてみました。
シリーズものなのでまだまだ途中段階ですが、元記事は以下のリンクに示されているので、ここでは自分なりにカスタマイズした箇所のみを紹介していこうと思います。
カスタマイズしたところ
主に第2回に書かれている箇所が中心になるのですが、私の場合は過去に投稿した記事をランダムにTweetさせるように作りたかったため、まずはDynamoDBのテーブル項目に、パーティーションキーとして”Id”を設定しました。こんな感じ。これがまずは下準備になります。
次に、以下のコードのline.32のように、その時にTweetさせたい過去の記事のidを、random.randomint()関数を用いてランダムに生成させ、line.35で生成させたidをキーにしてget_itemさせるようにしました。もちろん、記事の数は(多分)これからもどんどん増えていくので、ここでidの数をハードコードせずに、一旦line.13でLambda関数の環境変数から取得するように実装を追加しています。
また、記事のコードでは現段階ではエラー処理が実装されていないので、エラーハンドラも仮で実装しています。その全体像がこんな感じ。なんか一部余分な処理が入っていますが、そこは大目に見てください。。。
import json
import os
import boto3
import logging
import traceback
import random
from requests_oauthlib import OAuth1Session
from datetime import datetime, timedelta, timezone
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ids = int(os.environ.get('POST_ID'))
dynamodb = boto3.resource('dynamodb')
twitter_bot_message_table = dynamodb.Table('TwitterBotDailyMessage')
JST = timezone(timedelta(hours=+9), 'JST')
# API Key, Access Token(元記事にも書かれているとおりで本来はハードコードしてはいけないです。連載が進んだ時に直します)
consumer_key = "Twitterから取得したAPI Key"
client_secret = "Twitterから取得したAPI Key Secret"
access_token = "Twitterから取得したAccess Token"
access_token_secret = "Twitterから取得したAccess Token Secret"
oauth = OAuth1Session(consumer_key, client_secret, access_token, access_token_secret)
def lambda_handler(event, context):
text = ""
id = str(random.randint(0, ids))
try:
response = twitter_bot_message_table.get_item(Key={'Id': id})
if ('Item' in response):
text = response['Item']['Message']
payload = {'text': text}
response = oauth.post(
"https://api.twitter.com/2/tweets",
json=payload
)
if response.status_code != 201:
raise Exception(
"[Error] {} {}".format(response.status_code, response.text)
)
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
return {
"statusCode": 500,
"message": 'An error occured at tweet old Blog post.'
}
EventBridgeの設定はお好きなようにという感じで。私の場合は、あまり頻繁に過去記事を投稿するのもなんだかなぁ、ということで、以下のcron式を設定。とりあえず週末だけ動かします。
cron(5 5 ? * SAT-SUN *)
で、EventBridgeを発火させたらこんな感じで無事にTweet成功。
今後やりたいこと
ここまで実装した段階だと、Blogの記事を投稿する度にDynamoDBに項目追加をして、Lambda関数の環境変数も手作業で修正しなければいけないのが面倒なところなので、記事を投稿したタイミングをトリガーにしてDynamoDBに項目を追加しつつ、環境変数もさらに外出しして完全自動化していきたいところ。やり方はたくさんあると思うのですが、まだ絶賛連載が進行中のようなので、ゆっくり機能拡張していきたいと思います。