【Python】Lambda関数を実行する【AWS】

Pythonで
 ・Lambda関数を実行
できます!

この記事では
 ・プロファイルを指定した方法
紹介します!

前提

①外部ライブラリ「boto3」がインストールされていること。
pipコマンドでインストールする場合、以下でインストールできます。

pip install boto3


②AWS CLIのプロファイルが設定済みであること。

実行するLambda関数

ここでは例として
 ・Lambda関数「test_hello_world_function」を実行
します。

Lambda関数「test_hello_world_function」
Lambda関数「test_hello_world_function」


Lambda関数「test_hello_world_function」のコードは以下とします。
※イベント JSONで指定した「key1の値」を、print関数でログへ出力するだけコードです

print('Loading function')

def lambda_handler(event, context):
    print("value1 = " + event['key1'])
    return event['key1'] 

Lambda関数を実行するコード

import json
from boto3.session import Session
from botocore.exceptions import ClientError

# Lambda関数名
lambda_Name = "test_hello_world_function"
# プロファイル名
profile = "develop"

# event(jsonデータ)をdict型で作成
event_dict = {"key1": "hogehoge"}

try:
    # dict型のevent(jsonデータ)をstr型へ変換
    event_str = json.dumps(event_dict)

    session = Session(profile_name=profile)
    lambda_Client = session.client(service_name="lambda")

    # Lambda関数を同期実行
    response = lambda_Client.invoke(FunctionName=lambda_Name, Payload=event_str)
    # 実行結果の確認
    if response.get("FunctionError"):
        print("Lambda関数でエラーが発生しました。")
    else:
        print("Lambda関数が正常終了しました。")
except ClientError as e:
    print("ClientErrorが発生しました。")
    print("エラーコード:" + e.response["Error"]["Code"])
    print("エラーメッセージ:" + e.response["Error"]["Message"])
except Exception as e:
    print("エラーが発生しました。")
    print(e)

以下を指定します(6、8行目)。
※状況に応じて任意の値を指定してください。

・Lambda関数名
・プロファイル名

Lambda関数に渡すevent(jsonデータ)を作成します(11、15行目)。

「invoke」メソッドによりLambda関数を実行します(21行目)。
※引数「InvocationType」により「同期実行(RequestResponse)」または「非同期実行(Event)」を指定できます。
 デフォルト値は「同期実行(RequestResponse)」のため、指定無しの場合は同期実行となります。
※「同期実行」とは「Lambda関数の処理が終了後に、Pythonへ制御が返ってくる実行方法」です。

同期実行の場合、Lambda関数で例外が発生しても、Lambda関数のリトライ処理は行われません。
※非同期実行の場合は、デフォルトでリトライ処理が2回行われます。

実行されたLambda関数でエラーが発生したかどうかを確認します(23~26行目)。

実行結果

プロファイルを指定して、Lambda関数を実行できました。

実行結果①
実行結果①
実行結果②
実行結果②

参考

上記のコードは以下の公式サイトを参考にして作成しました。

タイトルとURLをコピーしました