ablog

不器用で落着きのない技術者のメモ

Step Functions で Glueジョブの成功/失敗を判定する

Glueジョブの成功/失敗を Step Functions で判定してジョブ制御する。

  • ステートマシン
{
  "StartAt": "ReadFilterJob",
  "States": {
    "ReadFilterJob": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters":{
        "JobName":"rs_query",
        "Arguments":{
          "--db":"reviews", 
          "--db_creds":"reviewssecret",
          "--bucket": "stepfunctionredshift-scriptbucket-13...",
          "--file": "sql/etl.sql"
        }
      },
        "Next": "ReportJob",
        "Catch": [
          {
            "ErrorEquals": ["States.TaskFailed"], 
            "Next": "NotifyFailure",
            "ResultPath": "$.cause"
          }
        ]
     },
     
    "ReportJob": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters":{
        "JobName":"rs_query",
        "Arguments":{
          "--db":"reviews", 
          "--db_creds":"reviewssecret",
          "--bucket": "stepfunctionredshift-scriptbucket-13...",
          "--file": "sql/topreviews.sql"
        }
      },
        "End": true,
        "Catch": [
          {
            "ErrorEquals": ["States.TaskFailed"], 
            "Next": "NotifyFailure",
            "ResultPath": "$.cause"
          }
        ]
     },
      "NotifyFailure": {
        "Type": "Task", 
        "Resource": "arn:aws:states:::sns:publish", 
        "Parameters": {
          "TopicArn": "arn:aws:sns:us-east-1:0123456789012:StepFunctionStack-FailNotificationSNS-RR7WOSB4VTBF", 
          "Message.$": "$.cause"
        },
        "End": true
      }
  }
}
  • Glueジョブ(Python shell)
from redshift_module import pygresql_redshift_common as rs_common
import sys
from awsglue.utils import getResolvedOptions
import boto3

#get job args
args = getResolvedOptions(sys.argv,['db','db_creds','bucket','file'])
db = args['db']
db_creds = args['db_creds']
bucket = args['bucket']
file = args['file']

#get sql statements
s3 = boto3.client('s3') 
sqls = s3.get_object(Bucket=bucket, Key=file)['Body'].read().decode('utf-8')
sqls = sqls.split(';')

#get database connection
print('connecting...')
con = rs_common.get_connection(db,db_creds)

#run each sql statement
print("connected...running query...")
results = []
for sql in sqls[:-1]:
    sql = sql + ';'
    result = rs_common.query(con, sql)
    print(result)
    results.append(result)

print(results)