はじめに
Lambdaを1分より短いサイクルで定期的に動かしたいと思ったこと、ありませんか?
Lambdaを定期的に実行する方法としてまず最初に思い浮かぶのはEventBridgeです。ただ、EventBridgeの場合、最短でも1分サイクルでの実行が限界になります。長時間かかる処理の状態を1分より短いサイクルでポーリングしたい...というときにはcronベースの指定ではどうしても限界がありますね。そこでどうするか?というのが今回のお題です。
どうするのか
今回はSQSを使って実装してみました。メッセージの表示遅延秒数をタイマーとして使用し、そこにSQS+Lambdaのメッセージトリガーを組み合わせる方法です。そしてLambda自身が新たなメッセージを投入することで定期実行を実現しています。
アーキテクチャ図はタイトル画像の通りです。LambdaとSQSの間でメッセージが行ったり来たり、という構図です。
キーとなる設定ポイントはDelaySeconds
です。これはメッセージタイマーと呼ばれるもので、メッセージ投入後、投入タイミングから起算して指定した時間だけメッセージが表示されないようにするために指定するパラメータです。メッセージ単位で実現する場合はメッセージタイマーが使えますが、キュー全体で表示遅延を設定したい場合は遅延キューを設定することで同様の設定が実現できます。また、遅延キューとメッセージタイマーの両方を指定した場合はメッセージタイマーの値が優先されます。オーバーライドされる挙動としては直感的でわかりやすいですね。
作ってみる
今回は下記のテンプレートのような構成で組んでみました。主要なリソースはLambda関数(AWS::Lambda::Function)とSQSキュー(AWS::SQS::Queue)のふたつです。また、SQSのメッセージトリガーを利用するためにイベントソースマッピング(AWS::Lambda::EventSourceMapping)を構成しています。
CloudWatchロググループ(AWS::Logs::LogGroup)は、直接は必要ないリソースですがあわせて作成しています。これは、ロググループを事前作成せずにLambdaを実行した場合における課題を解決するのが目的です。
- 保持期間が無制限となるロググループが作成されてしまう
→あらかじめ保存期限を指定したロググループを作ることで、ログの肥大化を抑止できるようになります。 - ロググループがみなしごリソースになってしまう
→最初からテンプレートに組み込んでおくことで、「立つ鳥跡を濁さず」を実現できます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
AWSTemplateFormatVersion: 2010-09-09 Description: '' Parameters: DelaySeconds: Type: Number MinValue: 10 Default: 60 Resources: Role: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: 'sts:AssumeRole' Path: / Policies: - PolicyName: test PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: 'sqs:*' Resource: '*' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' Queue: Type: 'AWS::SQS::Queue' EventSourceMapping: Type: 'AWS::Lambda::EventSourceMapping' Properties: BatchSize: 1 EventSourceArn: !Sub "${Queue.Arn}" FunctionName: !Ref Function Function: Type: 'AWS::Lambda::Function' Properties: Code: ZipFile: | import boto3 import json import os sqs = boto3.client('sqs') def lambda_handler(event, context): message = event['Records'][0]['body'] print(message) res = sqs.send_message(QueueUrl = os.environ['QUEUEURL'], MessageBody = message, DelaySeconds = int(os.environ['DELAY'])) print(json.dumps(res)) Environment: Variables: QUEUEURL: !Ref Queue DELAY: !Ref DelaySeconds Handler: index.lambda_handler MemorySize: 128 ReservedConcurrentExecutions: 1 Role: !GetAtt - Role - Arn Runtime: python3.10 Timeout: 3 LogGroup: Type: 'AWS::Logs::LogGroup' Properties: LogGroupName: !Sub "/aws/lambda/${Function}" RetentionInDays: 1 |
パラメータには、メッセージタイマーで遅延させる秒数を指定しておきます。今回は30(秒)を設定してみました。
なお、このテンプレートではLambda関数と同時にその実行用ロールを作っていますので、「AWS CloudFormation によって IAM リソースが作成される場合があることを承認します。」のチェックを忘れずに入れておきます。
試してみる
作った環境にはまだメッセージがひとつもなく、Lambdaも動かない状態ですので、どこかで「着火」する必要があります。作成されたキューの画面で、「メッセージの送信」ボタンからメッセージを送ってみます。内容はなんでもOKです。ここではTTEESSTT
の文字列を送ってみました。
なお、今回作った仕組みではキューにメッセージがない場合のエラーハンドリングは行っていませんので、Lambda関数のテストで着火するのは不可能です。テストした場合、定義関数の1行目でKeyError
が発生して関数がエラーになっちゃいます。
当初の目的は「Lambdaを定期的に実行したい」でしたね。CloudFormationのテンプレートで指定したDelay
パラメータがsqs#send_message()
のDelaySeconds
パラメータに渡されるようになっていますので、その間隔で実行されるはずです。CloudWatchの画面に移動し、ログの出力間隔を見てみましょう。
指定した通りの頻度で実行されているのが確認できますね。
もうひとつメッセージを投入してみましょう。次はHHOOGGEE
の文字列を送ってみます。
先ほどのTTEESSTT
とは独立したサイクルで実行されていますね。このように、それぞれのメッセージごとに処理が定期実行されるため、シーケンシャルで処理する場合に起こりがちな「前の処理が長引くと実行が遅延していく」といった事象とはおさらばできそうです。
応用
今回作成した構成はPoCに近いバラックレベルの環境ですが、いろいろ応用の余地がありそうなので考えてみました。
- SQSで保持するメッセージの内容にカウンターとなる値を保持させる
定期的な実行を有限な回数で終わらせることができるようになります。 - 実行間隔を動的に変更する
何かの処理の進捗をチェックするケースにおいて、ステータスに進行率を持っている場合、80%までは120秒ごと、80%を超えたら60秒ごとにチェック頻度を上げる、といったことができるようになります。
まとめ
今回はLambdaを定期的に実行させるためにSQSを使ったアーキテクチャを紹介しました。SQSの定番処理といえばメッセージのシリアライズが挙げられますが、意外な使い方もできるんですね。ではまた!
あ、テストしたあとはちゃんと作成したスタックを削除しましょうね!
投稿者プロフィール
- 根っこはインフラ屋な古いおじさん。
最新の投稿
- AWS2023年11月14日DLQを積み重ねる
- SQS2023年10月2日1分より短いサイクルで定期的にLambdaを実行する
- CloudFormation2022年12月22日ToJsonString関数を試してみた
- AWS2022年12月22日AWS固有のパラメータタイプを学ぶ