Amazon EventBridge + AWS Lambda + Amazon DynamoDBでBlueskyのbot機能を実装してみた


皆さんはBlueskyについてはご存知でしょうか? 旧Twitterのようなマイクロブログと呼ばれるテキスト系のSNSで、現在はクローズドベータ版として運用されているため、参加は招待制ですが、イーロン・マスクがオーナーとなって以来混迷を続けている旧Twitterと比較するとシンプルではあるものの、非常に使い勝手の良いUIとどちらかという中道な雰囲気のユーザー層が多いため、安心して利用することができます。

以前、同じ構成で旧Twitterへのbot機能を実装しましたが、日々のPostを徐々に旧TwitterからBlueskyへ移行しつつあるということもあり、同じ機能をBluesky向けにカスタマイズしてみました。

構成もAmazon EventBridge + AWS Lambda + Amazon DynamoDBという全く同じ構成にしていますが、一部構成を変更した箇所があるので、その辺りを中心に書いていきたいと思います。

Amazon DynamoDB

以前はDynamoDBテーブルの属性のうち、コンテンツに該当する部分はタイトルとURLを一緒に格納していたのですが、Bluesky向けにはAPIの仕様の関係もあり、一旦タイトルとURLを分割しました。そしてidをパーティーションキーに、dateをソートキーにしているすことで、この2つのキーで主キーにすることとしました。こうすることで逆に後のコーディングで苦労することになってしまったのですが。。。

AWS Lambda向けのPythonのコード

基本的にはAmazon DynamoDBのテーブルから必要なデータを取得してBlueskyの投稿用APIに対してpostするという仕組みには変わりがないのですが、旧Twitterの場合と違って幾つかの処理に差異があるので、 lambda_handler() 以外にも幾つかの関数を追加しています。

パスワードを取得するための関数 get_app_password()

Blueskyの投稿用APIに対して投稿する場合は、Blueskyにログインする必要があるため、SSMパラメータストアにBlueskyから提供されるサードパーティアプリケーション向けのApp Passwordを生成したものを格納しています。くれぐれもBlueskyユーザーのパスワードを使用するのはもしもの時のためにやめましょう。

DIDを取得するための関数 get_did()

Blueskyは分散型SNSとしての運用が考慮されているため、ユーザアカウント以外にユーザ固有のIDを識別するためのDIDを保持しています。そのDIDを取得するための関数になります。

API Keyを取得するための関数 get_api_key()

DIDとパスワードが取得できたら、その情報を元にしてBlueskyのAPIへのアクセス用のAPI Keyを取得する必要があります。このAPI Keyを取得するための関数になります。

BlueskyのAPIを叩いてスキートをPostするための関数 post_skeet()

これまでの情報と、Amazon DynamoDBのテーブルに格納されている情報を元に、スキートを行う関数になります。

これらを実際に実装した全体像が以下の通りになります。

import os
import boto3
import urllib3
import http
import json
import re
import logging
import traceback
import random
from datetime import datetime
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

ids = int(os.environ.get('POST_ID'))

dynamodb = boto3.client('dynamodb')


def lambda_handler(event, context):
    
    text = ""
    url = ""
    id = str(random.randint(0, ids))

    try:
        response = dynamodb.query(
            ExpressionAttributeValues={
                ':v1': {
                    'S': id,
                },
            },
            KeyConditionExpression='id = :v1',
            TableName='BlueskyBotOldBlogPost',
        )
        
        logger.info("response is %s", response)

        if ('Items' in response):
            text = text + response['Items'][0]['description']['S']
            url = url + response['Items'][0]['url']['S']
        
        logger.info("description is %s", text)
        logger.info("url is %s", url)
        
        app_password = get_app_password()
        did = get_did()
        key = get_api_key(did, app_password)
        
        response = post_skeet(did, key, text, url)
        
        return response
        
    except Exception as e:
        logger.error(e)
        logger.error(traceback.format_exc())
        return {
            "statusCode": 500,
            "message": 'An error occured at skeet old Blog post.'
        }


def get_app_password():
    ssm = boto3.client("ssm")

    app_password = ssm.get_parameter(Name="bluesky_password", WithDecryption=False)
    app_password = app_password["Parameter"]["Value"]

    return app_password
    

def get_did():
    http = urllib3.PoolManager()

    HANDLE = "vlayusuke.bsky.social"
    DID_URL = "https://bsky.social/xrpc/com.atproto.identity.resolveHandle"

    did_resolve = http.request("GET", DID_URL, fields={"handle": HANDLE})
    did_resolve = json.loads(did_resolve.data)
    did = did_resolve["did"]

    return did


def get_api_key(did, app_password):
    http = urllib3.PoolManager()

    API_KEY_URL = "https://bsky.social/xrpc/com.atproto.server.createSession"

    post_data = {"identifier": did, "password": app_password}
    headers = {"Content-Type": "application/json"}
    api_key = http.request(
        "POST",
        API_KEY_URL,
        headers = headers,
        body = bytes(json.dumps(post_data), encoding="utf-8"),
    )
    api_key = json.loads(api_key.data)

    return api_key["accessJwt"]


def post_skeet(did, key, text, url):
    http = urllib3.PoolManager()
    now = datetime.today()
    
    text = text + "\n\n" + url
    
    found_uri = find_uri_position(text)
    
    if found_uri:
        uri, start_position, end_position = found_uri
    
    post_feed_url = "https://bsky.social/xrpc/com.atproto.repo.createRecord"

    post_record = {
        "collection": "app.bsky.feed.post",
        "repo": did,
        "record": {
            "text": f"{text}",
            "facets": [{
                "index": {
                    "byteStart": start_position,
                    "byteEnd": end_position + 1
                },
                "features": [
                    {
                        "$type": "app.bsky.richtext.facet#link",
                        "uri": uri
                    }
                ]
            }],
            "createdAt": now.strftime("%Y-%m-%dT%H:%M:%S"),
        }
    }

    post_request = http.request(
        "POST",
        post_feed_url,
        body = json.dumps(post_record),
        headers = {"Content-Type": "application/json", "Authorization": f"Bearer {key}"},
    )

    post_request = json.loads(post_request.data)

    return post_request

  
def find_uri_position(text):
    pattern = r'(https?://\S+)'
    match = re.search(pattern, text)

    if match:
        uri = match.group(0)
        start_position = len(text[:text.index(uri)].encode('utf-8'))
        end_position = start_position + len(uri.encode('utf-8')) - 1
        
        return (uri, start_position, end_position)
    else:
        return None

苦労したところ

実装例はいくつかあって参考にさせていただいたものも多いので、それほど苦労したところはなかったのですが、BlueskyのAPIの仕様を元にして、スキートに含まれるURLをきちんとリンクとしてPostする箇所の実装にはだいぶ苦労しました。旧Twitterと違ってBlueskyではURLもプレーンテキストとして認識するので、ここをリッチテキストとして認識させる必要があります。もちろんBlueskyのAPIにはその実装が行われているのですが、そこでだいぶ苦労をしました。

とはいえ、旧Twitterがこの先どういう運命を辿るかわからない中で、次のテキスト系のSNSを選んでいく中で、Blueskyの存在は見逃せないでしょう。APIの内容をもっと理解して、あ様々な使い方を見出していきたいと思います。

カテゴリー: AWS, Private タグ: , , , パーマリンク