1. Home
  2. Managing Complex Workflows with AWS Step Functions

Managing Complex Workflows with AWS Step Functions

AWS Step Functions is a serverless workflow service that allows you to build and coordinate distributed applications using state machines. It provides an easy way to create, run, and visualize complex workflows without the need for managing and maintaining any infrastructure. In this article, we will discuss what AWS Step Functions is, how it works, and how it can be used to manage complex workflows.

What is it for?

With Step Functions, you can define, run, and visualize complex workflows, all without having to worry about infrastructure or the underlying servers that power them. It integrates with other AWS services like AWS Lambda, Amazon SNS, and Amazon SQS, as well as external services via REST and HTTP APIs.

How does AWS Step Functions work?

AWS Step Functions work by creating and managing state machines that define the steps of a workflow. A state machine is a collection of states that describe the different steps of the workflow. Each state can perform a specific action or task and can trigger one or more other states to execute. The state machine uses JSON-based language to define the workflow, which is known as Amazon States Language. The Amazon States Language allows you to define state machines that can handle different types of input and output, interact with AWS services, and wait for external events.

Once you have created a state machine, you can start it by submitting a JSON input. The input is used to initialize the state machine and start the first state. Each state in the workflow can modify the input and generate output, which is passed to the next state. The output of the last state in the workflow is considered the output of the entire workflow.

How can AWS Step Functions be used to manage complex workflows?

AWS Step Functions can be used to manage complex workflows in a number of ways. For example, it can be used to manage workflows that involve long-running tasks or multiple services. It can also be used to create workflows that require multiple steps, such as data processing, machine learning, or ETL tasks. Here are a few examples of how AWS Step Functions can be used to manage complex workflows:

Long-Running Tasks

AWS Step Functions can be used to manage workflows that involve long-running tasks. For example, you can use Step Functions to create a workflow that processes large files or performs a series of computations that take a long time to complete. You can break the workflow down into smaller steps, each of which can be executed by a separate Lambda function. AWS Step Functions takes care of coordinating the execution of the different steps and retries any failed steps automatically.

Multiple Services

AWS Step Functions can be used to manage workflows that involve multiple services. For example, you can use Step Functions to create a workflow that involves calling multiple REST APIs, storing the results in a database, and triggering other services based on the output. AWS Step Functions can manage the dependencies between the services and ensure that each service is executed in the correct order.

Data Processing

AWS Step Functions can be used to manage data processing workflows. For example, you can use Step Functions to create a workflow that processes data from multiple sources, transforms it, and loads it into a data warehouse. You can use services like AWS Lambda, Amazon SNS, and Amazon SQS to perform the different steps of the workflow. AWS Step Functions can manage the dependencies between the services and retries any failed steps automatically.

Example

Here's an example CloudFormation template that deploys an AWS Step Function with 2 steps. The first step is triggered when a CSV file is put into an S3 bucket, and the second step converts the CSV to JSON and puts it into a DynamoDB table.

Resources:
  # The S3 bucket which will recieve the CSV file
  S3Bucket:
    Type: "AWS::S3::Bucket"
    Properties:
      BucketName: !Ref BucketName

  # Dynamo DB table which will receive the JSON representation of the csv file
  DynamoDBTable:
    Type: "AWS::DynamoDB::Table"
    Properties:
      TableName: !Ref TableName
      AttributeDefinitions:
        - AttributeName: "id"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "id"
          KeyType: "HASH"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  # This lambda converts the csv file to JSON
  CSVToJSONLambda:
    Type: "AWS::Lambda::Function"
    Properties:
      FunctionName: !Ref CSVToJSONLambdaName
      Handler: "index.handler"
      Runtime: "nodejs14.x"
      Timeout: 300
      MemorySize: 128
      Code:
        ZipFile: !Sub |
          const fs = require('fs');
          const csv = require('csvtojson');
          const AWS = require('aws-sdk');
          const s3 = new AWS.S3();
          const dynamodb = new AWS.DynamoDB.DocumentClient();
          
          exports.handler = async (event, context) => {
            const bucketName = event.Records[0].s3.bucket.name;
            const objectKey = event.Records[0].s3.object.key;
            const csvString = await s3.getObject({ Bucket: bucketName, Key: objectKey }).promise().then(data => data.Body.toString());
            const jsonArray = await csv().fromString(csvString);
            const putRequests = jsonArray.map(item => ({ PutRequest: { Item: item } }));
            const batches = putRequests.reduce((resultArray, item, index) => {
              const chunkIndex = Math.floor(index / 25);
              if (!resultArray[chunkIndex]) {
                resultArray[chunkIndex] = [];
              }
              resultArray[chunkIndex].push(item);
              return resultArray;
            }, []);
            const batchPromises = batches.map(batch => {
              const params = {
                RequestItems: {
                  [!Ref TableName]: batch
                }
              };
              return dynamodb.batchWrite(params).promise();
            });
            await Promise.all(batchPromises);
          };
      Role: !GetAtt [LambdaExecutionRole, Arn]

  # Execution role for the lambda function to get access to S3 & DynamoDB
  LambdaExecutionRole:
    Type: "AWS::IAM::Role"
    Properties:
      RoleName: !Ref LambdaExecutionRoleName
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "lambda.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: "S3GetObjectPolicy"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action: "s3:GetObject"
                Resource: !Join [ "", [ "arn:aws:s3:::", !Ref BucketName, "/*" ] ]
        - PolicyName: "DynamoDBPutItemPolicy"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action:
                  - "dynamodb:BatchWriteItem"
                  - "dynamodb:PutItem"
                Resource: !GetAtt [ DynamoDBTable, Arn ]

  # Step function role required for execution of the state machine
  # gives it access to invoke lambda functions (our csv to json)
  StepFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service: states.amazonaws.com
          Action: sts:AssumeRole
      Path: /
      Policies:
      - PolicyName: StepFunctionAccess
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Action:
            - logs:CreateLogGroup
            - logs:CreateLogStream
            - logs:PutLogEvents
            Resource: arn:aws:logs:*:*:*
          - Effect: Allow
            Action:
            - lambda:InvokeFunction
            Resource: '*'

  # Step function config
  MyStateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      DefinitionString: !Sub |
        {
          "StartAt": "CSV to JSON",
          "States": {
            "CSV to JSON": {
              "Type": "Task",
              "Resource": "${ConvertFunction.Arn}",
              "Next": "Put to DynamoDB"
            },
            "Put to DynamoDB": {
              "Type": "Task",
              "Resource": "arn:aws:states:::dynamodb:putItem",
              "Parameters": {
                "TableName": "${DynamoTableName}",
                "Item": {
                  "json_data": {
                    "S.$": "$.ConvertedData"
                  }
                }
              },
              "End": true
            }
          }
        }
      RoleArn: !GetAtt StepFunctionRole.Arn
      StateMachineName: MyStateMachine

In this CloudFormation template, we have defined a MyStateMachine resource of type AWS::StepFunctions::StateMachine. The DefinitionString property defines the state machine in JSON format. The state machine has two states: CSV to JSON and Put to DynamoDB.

The first state is a Task state, which invokes the ConvertFunction Lambda function to convert the CSV file to JSON. The Next property specifies that the next state to execute is the Put to DynamoDB state.

The second state is also a Task state, which uses the arn:aws:states:::dynamodb:putItem resource to put the converted JSON data into a DynamoDB table. The Parameters property specifies the name of the DynamoDB table and the JSON data to put. The End property specifies that this is the last state in the state machine.

We have used !GetAtt to get the ARN of the IAM role created for the state machine, which is required for execution of the state machine. We have also used ${`\${ConvertFunction.Arn}`} and ${`\${DynamoTableName}`} to reference the ARN of the Lambda function and the name of the DynamoDB table, respectively.

Conclusion

AWS Step Functions is a powerful tool for managing complex workflows. It provides an easy way to create, run, and visualize workflows, all without having to worry about infrastructure or the underlying servers that power them. By using AWS

This article was written by Gen-AI GPT-3. Articles published after 2023 are written by GPT-4, GPT-4o or GPT-o1

3137 words authored by Gen-AI! So please do not take it seriously, it's just for fun!

Related