PixelCascade: Building a Scalable Image Processing Pipeline with AWS Fanout Architecture

Introduction PixelCascade is a mini-project that demonstrates how to build a scalable image processing pipeline using AWS services like S3, Lambda, SNS, SQS, and DynamoDB. By leveraging a fanout architecture, this project allows users to upload images in formats such as JPEG, PNG, and GIF, and process them into formatted outputs, resized versions, and extracted metadata. This article provides a step-by-step guide to implementing PixelCascade, from architecture design to testing the pipeline. You can find the complete source code for this project on: GitHub Repository. Background of the Project and Architecture The primary objective of PixelCascade is to create a scalable and efficient workflow for processing images. Below are the project goals, workflow, and architecture details. Goals User Uploads: Accept images in JPEG, PNG, and GIF formats. Outputs: Formatted images Converted images Extracted metadata Scalability: Handle high volumes of image uploads without bottlenecks. Workflow Image Upload: Users upload a master image to the master folder in an S3 bucket. Lambda Trigger: A Lambda function processes the S3 upload event and publishes a message to an SNS topic. SNS Fanout: The SNS topic fans out the message to three SQS queues: Format Conversion Queue: Converts the master image to JPEG format and stores it in S3. Metadata Extraction Queue: Extracts metadata from the master image and stores it in DynamoDB. Resizing Queue: Resizes the master image into thumbnail, medium, and large sizes and stores them in S3. Architecture Diagram Requirements Before setting up PixelCascade, ensure you have the following: AWS Account: Access to AWS services like S3, Lambda, SNS, SQS, and DynamoDB. Golang: Required for writing and deploying Lambda functions. Step-by-Step Implementation 1. Create an SNS Topic Create an SNS topic named new-image-uploaded to handle messages for every master image uploaded. Steps: Navigate to SNS Console in AWS. Click Create Topic. Select Standard as the topic type. Enter new-image-uploaded as the topic name. Click Create Topic. 2. Create an S3 Bucket and Folders Create an S3 bucket named pixel-cascade with the following folders: master: For storing user-uploaded images. converted: For storing converted images. resized: For storing resized images. Steps: Navigate to S3 Console in AWS. Click Create Bucket and name it pixel-cascade. Open the bucket and create three folders: master, converted, and resized. 3. Create and Deploy the Job Publisher Lambda The Job Publisher Lambda is triggered by S3 events and publishes messages to the SNS topic. Steps: Create an IAM Policy: Name: LambdaJobPublisherPolicy. Grant permissions to read from S3 and publish to SNS. { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:PutLogEvents", "logs:CreateLogGroup", "logs:CreateLogStream" ], "Resource": "arn:aws:logs:*:*:*" }, { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": "arn:aws:sns:*:*:new-image-uploaded" } ] } Create an IAM Role: Name: LambdaJobPublisherExecutionRole. Attach the LambdaJobPublisherPolicy. Write the Lambda Code: Use Golang to write a Lambda function that: Processes S3 events. Publishes messages to the new-image-uploaded SNS topic. package main import ( "context" "encoding/json" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" "log" ) var newImageUploadedTopicArn = "arn:aws:sns:us-east-1:116981782358:new-image-uploaded" func handler(ctx context.Context, s3Event events.S3Event) error { sdkConfig, err := config.LoadDefaultConfig(ctx) if err != nil { log.Printf("failed to load default config: %s\n", err) return err } snsClient := sns.NewFromConfig(sdkConfig) for _, record := range s3Event.Records { bucket := record.S3.Bucket.Name key := record.S3.Object.URLDecodedKey log.Printf("success receive s3 event trigger for bucket: %s and key: %s\n", bucket, key) payload := struct { Bucket string `json:"bucket"` Key string `json:"key"` }{ Bucket: bucket, Key: key, } m, err := json.Marshal(payload) if err != nil { log.Printf("failed to marshal payload: %s\n", err) return err } message := string(m) resp, err := snsClient.Publish(ctx, &sns.PublishInput{ Message: &message, TopicArn: &newImageUploadedTopicArn, }) if err != nil {

May 1, 2025 - 06:27
 0
PixelCascade: Building a Scalable Image Processing Pipeline with AWS Fanout Architecture

Introduction

PixelCascade is a mini-project that demonstrates how to build a scalable image processing pipeline using AWS services like S3, Lambda, SNS, SQS, and DynamoDB. By leveraging a fanout architecture, this project allows users to upload images in formats such as JPEG, PNG, and GIF, and process them into formatted outputs, resized versions, and extracted metadata.

This article provides a step-by-step guide to implementing PixelCascade, from architecture design to testing the pipeline. You can find the complete source code for this project on: GitHub Repository.

Background of the Project and Architecture

The primary objective of PixelCascade is to create a scalable and efficient workflow for processing images. Below are the project goals, workflow, and architecture details.

Goals

  1. User Uploads: Accept images in JPEG, PNG, and GIF formats.
  2. Outputs:
    • Formatted images
    • Converted images
    • Extracted metadata
  3. Scalability: Handle high volumes of image uploads without bottlenecks.

Workflow

  1. Image Upload: Users upload a master image to the master folder in an S3 bucket.
  2. Lambda Trigger: A Lambda function processes the S3 upload event and publishes a message to an SNS topic.
  3. SNS Fanout: The SNS topic fans out the message to three SQS queues:
    • Format Conversion Queue: Converts the master image to JPEG format and stores it in S3.
    • Metadata Extraction Queue: Extracts metadata from the master image and stores it in DynamoDB.
    • Resizing Queue: Resizes the master image into thumbnail, medium, and large sizes and stores them in S3.

Architecture Diagram

High Level Architecture Diagram

Requirements

Before setting up PixelCascade, ensure you have the following:

  • AWS Account: Access to AWS services like S3, Lambda, SNS, SQS, and DynamoDB.
  • Golang: Required for writing and deploying Lambda functions.

Step-by-Step Implementation

1. Create an SNS Topic

Create an SNS topic named new-image-uploaded to handle messages for every master image uploaded.

Steps:

  1. Navigate to SNS Console in AWS.
  2. Click Create Topic.
  3. Select Standard as the topic type.
  4. Enter new-image-uploaded as the topic name.
  5. Click Create Topic.

Create SNS Topic

2. Create an S3 Bucket and Folders

Create an S3 bucket named pixel-cascade with the following folders:

  • master: For storing user-uploaded images.
  • converted: For storing converted images.
  • resized: For storing resized images.

Steps:

  1. Navigate to S3 Console in AWS.
  2. Click Create Bucket and name it pixel-cascade.
  3. Open the bucket and create three folders: master, converted, and resized.

Create S3 Bucket and Folders

3. Create and Deploy the Job Publisher Lambda

The Job Publisher Lambda is triggered by S3 events and publishes messages to the SNS topic.

Steps:

Create an IAM Policy:
  • Name: LambdaJobPublisherPolicy.
  • Grant permissions to read from S3 and publish to SNS.
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents",
        "logs:CreateLogGroup",
        "logs:CreateLogStream"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sns:Publish"
      ],
      "Resource": "arn:aws:sns:*:*:new-image-uploaded"
    }
  ]
}

Create Job Publisher Policy

Create an IAM Role:
  • Name: LambdaJobPublisherExecutionRole.
  • Attach the LambdaJobPublisherPolicy.

Create Job Publisher Execution Role

Write the Lambda Code:
  • Use Golang to write a Lambda function that:
    • Processes S3 events.
    • Publishes messages to the new-image-uploaded SNS topic.
package main

import (
    "context"
    "encoding/json"
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/sns"
    "log"
)

var newImageUploadedTopicArn = "arn:aws:sns:us-east-1:116981782358:new-image-uploaded"

func handler(ctx context.Context, s3Event events.S3Event) error {
    sdkConfig, err := config.LoadDefaultConfig(ctx)
    if err != nil {
        log.Printf("failed to load default config: %s\n", err)
        return err
    }
    snsClient := sns.NewFromConfig(sdkConfig)

    for _, record := range s3Event.Records {
        bucket := record.S3.Bucket.Name
        key := record.S3.Object.URLDecodedKey
        log.Printf("success receive s3 event trigger for bucket: %s and key: %s\n", bucket, key)

        payload := struct {
            Bucket string `json:"bucket"`
            Key    string `json:"key"`
        }{
            Bucket: bucket,
            Key:    key,
        }

        m, err := json.Marshal(payload)
        if err != nil {
            log.Printf("failed to marshal payload: %s\n", err)
            return err
        }
        message := string(m)

        resp, err := snsClient.Publish(ctx, &sns.PublishInput{
            Message:  &message,
            TopicArn: &newImageUploadedTopicArn,
        })
        if err != nil {
            log.Printf("failed to publish message to SNS: %s\n", err)
            return err
        }

        log.Printf("message sent to SNS with message ID: %s\n", *resp.MessageId)
    }

    return nil
}

func main() {
    lambda.Start(handler)
}
Add Trigger:
  • Set the trigger to the pixel-cascade bucket.
  • Prefix: master/.

Add Trigger Job Publisher

Deploy the Lambda:
  • Create lambda function.
  • Package the code into a .zip file and upload it to the AWS Lambda Console.

Create Lambda Job Publisher

$ cd job-publisher-function
$ GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc main.go
$ zip job-publisher-function.zip bootstrap

Deploy Job Publisher

4. Create SQS Queues

Create three SQS queues: format-conversion-queue, metadata-extraction-queue, and resizing-queue.

Steps:

  1. Navigate to SQS Console in AWS.
  2. Click Create Queue and choose Standard Queue.
  3. Enter the queue name and repeat for the other two queues.

Create SQS Queue

5. Edit Access Policies for SQS Queues

Allow the new-image-uploaded SNS topic to send messages to the SQS queues.

Edit SQS Access Policy

Edit SQS Access Policy

Edit SQS Access Policy

6. Create SNS Subscriptions

Subscribe each SQS queue to the new-image-uploaded SNS topic.

Steps:

  1. Navigate to the SNS Console.
  2. Open the new-image-uploaded topic.
  3. Click Create Subscription and set the protocol to SQS.
  4. Provide the ARN of each SQS queue.

Create SNS Subscription

Create SNS Subscription

Create SNS Subscription

7. Deploy Format Converter Lambda

The Format Converter Lambda converts the master image to JPEG and stores it in the converted folder.

Steps:

Create an IAM Policy:
  • Name: LambdaFormatConverterPolicy.
  • Grant permissions to read from SQS and write to S3.
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents",
        "logs:CreateLogGroup",
        "logs:CreateLogStream"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Resource": "arn:aws:s3:::pixel-cascade/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:ReceiveMessage"
      ],
      "Resource": "arn:aws:sqs:*:*:format-conversion-queue"
    }
  ]
}

Create Format Converter Policy

Create an IAM Role:
  • Name: LambdaFormatConverterExecutionRole.
  • Attach the LambdaFormatConverterPolicy.

Create Format Converter Execution Role

Write the Lambda Code:
  • Use Golang to write a Lambda function that:
    • Reads messages from the format-conversion-queue.
    • Converts the image to JPEG format.
    • Stores the result in the converted folder.
package main

import (
    "bytes"
    "context"
    "encoding/json"
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/s3"
    "image"
    _ "image/gif"
    "image/jpeg"
    _ "image/png"
    "log"
    "path/filepath"
    "strings"
)

var s3Client *s3.Client

const (
    ConvertedFolder = "converted/"
)

type messageBody struct {
    Bucket string `json:"bucket"`
    Key    string `json:"key"`
}

func handler(ctx context.Context, event events.SQSEvent) error {
    sdkConfig, err := config.LoadDefaultConfig(ctx)
    if err != nil {
        log.Printf("failed to load default config: %s\n", err)
        return err
    }
    s3Client = s3.NewFromConfig(sdkConfig)

    for _, record := range event.Records {
        log.Printf("received message from SQS with messageID: %s, body: %s\n", record.MessageId, record.Body)

        var body messageBody
        err := json.Unmarshal([]byte(record.Body), &body)
        if err != nil {
            log.Printf("failed to unmarshal this messageID %s: %s\n", record.MessageId, err)
            return err
        }

        objOutput, err := getObject(ctx, body.Bucket, body.Key)
        if err != nil {
            log.Printf("failed to get object from S3: %s\n", err)
            return err
        }
        defer objOutput.Body.Close()
        log.Printf("content type: %s\n", *objOutput.ContentType)
        log.Printf("content length: %d\n", *objOutput.ContentLength)

        // Decode the image
        img, format, err := image.Decode(objOutput.Body)
        if err != nil {
            log.Printf("failed to decode image: %s\n", err)
            return err
        }
        log.Printf("decoded image format: %s\n", format)

        jpegImg, err := convertToJPEG(img)
        if err != nil {
            log.Printf("failed to convert image: %s\n", err)
            return err
        }

        err = putObject(ctx, body.Bucket, ConvertedFolder+extractFilename(body.Key)+".jpg", jpegImg)
        if err != nil {
            log.Printf("failed to put object to S3: %s\n", err)
            return err
        }
    }

    return nil
}

// extractFilename extracts the filename from the input string
func extractFilename(input string) string {
    // Extract the base name (e.g., aaa.jpg)
    base := filepath.Base(input)
    // Remove the file extension (e.g., aaa)
    name := strings.TrimSuffix(base, filepath.Ext(base))
    return name
}

// getObject retrieves an object from S3
func getObject(ctx context.Context, bucket string, key string) (*s3.GetObjectOutput, error) {
    output, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
        Bucket: &bucket,
        Key:    &key,
    })
    if err != nil {
        log.Printf("failed to get object from S3: %s\n", err)
        return nil, err
    }

    log.Printf("successfully retrieved object from S3 bucket: %s and key: %s\n", bucket, key)

    return output, nil
}

// putObject uploads an object to S3
func putObject(ctx context.Context, bucket string, key string, body []byte) error {
    _, err := s3Client.PutObject(ctx, &s3.PutObjectInput{
        Bucket: &bucket,
        Key:    &key,
        Body:   bytes.NewReader(body),
    })
    if err != nil {
        log.Printf("failed to put object to S3: %s\n", err)
        return err
    }

    log.Printf("successfully put object to S3 bucket: %s and key: %s\n", bucket, key)

    return nil
}

// convertToJPEG converts an image to JPEG format
func convertToJPEG(img image.Image) ([]byte, error) {
    var buf bytes.Buffer

    // Set JPEG options (quality 100)
    options := &jpeg.Options{Quality: 100}

    err := jpeg.Encode(&buf, img, options)
    if err != nil {
        log.Printf("failed to convert image to JPEG: %s\n", err)
        return nil, err
    }
    return buf.Bytes(), nil
}

func main() {
    lambda.Start(handler)
}
Add Trigger:
  • Set the trigger to the format-conversion-queue.

Add Trigger Format Converter

Set Timeout:
  • Edit the Lambda configuration to set the execution timeout to 30 seconds.

Edit Format Converter Timeout

Deploy the Lambda:
  • Create lambda function.
  • Package the code into a .zip file and upload it to the AWS Lambda Console.

Create Lambda Format Converter

$ cd format-converter-function
$ GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc main.go
$ zip format-converter-function.zip bootstrap

Deploy Format Converter

8. Deploy Pixel Resizer Lambda

The Pixel Resizer Lambda resizes the master image into three sizes (thumbnail, medium, and large) and stores them in the resized folder.

Steps:

Create an IAM Policy:
  • Name: LambdaPixelResizerPolicy.
  • Grant permissions to read from SQS and write to S3.
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents",
        "logs:CreateLogGroup",
        "logs:CreateLogStream"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Resource": "arn:aws:s3:::pixel-cascade/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:ReceiveMessage"
      ],
      "Resource": "arn:aws:sqs:*:*:resizing-queue"
    }
  ]
}

Create Pixel Resizer Policy

Create an IAM Role:
  • Name: LambdaPixelResizerExecutionRole.
  • Attach the LambdaPixelResizerPolicy.

Create Pixel Resizer Execution Role

Write the Lambda Code:
  • Use Golang to write a Lambda function that:
    • Reads messages from the resizing-queue.
    • Resizes the image into three sizes (thumbnail, medium, large).
    • Stores the results in the resized folder.
package main

import (
    "bytes"
    "context"
    "encoding/json"
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/s3"
    "github.com/nfnt/resize"
    "image"
    _ "image/gif"
    "image/jpeg"
    _ "image/png"
    "log"
    "path/filepath"
    "strings"
    "sync"
)

var s3Client *s3.Client

const (
    ResizedFolder = "resized/"
)

type messageBody struct {
    Bucket string `json:"bucket"`
    Key    string `json:"key"`
}

type resizeTask struct {
    SizeName string
    Width    uint
}

func handler(ctx context.Context, event events.SQSEvent) error {
    sdkConfig, err := config.LoadDefaultConfig(ctx)
    if err != nil {
        log.Printf("failed to load default config: %s\n", err)
        return err
    }
    s3Client = s3.NewFromConfig(sdkConfig)

    for _, record := range event.Records {
        log.Printf("received message from SQS with messageID: %s, body: %s\n", record.MessageId, record.Body)

        var body messageBody
        err := json.Unmarshal([]byte(record.Body), &body)
        if err != nil {
            log.Printf("failed to unmarshal this messageID %s: %s\n", record.MessageId, err)
            return err
        }

        objOutput, err := getObject(ctx, body.Bucket, body.Key)
        if err != nil {
            log.Printf("failed to get object from S3: %s\n", err)
            return err
        }
        defer objOutput.Body.Close()
        log.Printf("content type: %s\n", *objOutput.ContentType)
        log.Printf("content length: %d\n", *objOutput.ContentLength)

        // Decode the image
        img, format, err := image.Decode(objOutput.Body)
        if err != nil {
            log.Printf("failed to decode image: %s\n", err)
            return err
        }
        log.Printf("decoded image format: %s\n", format)

        // Resize the image
        resizeTasks := []resizeTask{
            {"thumbnail", 100},
            {"medium", 500},
            {"large", 1000},
        }

        var wg sync.WaitGroup

        for _, task := range resizeTasks {
            wg.Add(1)
            go func(task resizeTask) {
                defer wg.Done()

                resizedImg := resize.Resize(task.Width, 0, img, resize.Lanczos3)
                if err != nil {
                    log.Printf("failed to resize image: %s\n", err)
                    return
                }

                jpegImg, err := convertToJPEG(resizedImg)
                if err != nil {
                    log.Printf("failed to convert image: %s\n", err)
                    return
                }

                err = putObject(ctx, body.Bucket, ResizedFolder+extractFilename(body.Key)+"/"+task.SizeName+".jpg", jpegImg)
                if err != nil {
                    log.Printf("failed to put object to S3: %s\n", err)
                    return
                }
            }(task)
        }

        wg.Wait()

        log.Printf("all resize tasks completed for image: %s\n", body.Key)
    }

    return nil
}

// extractFilename extracts the filename from the input string
func extractFilename(input string) string {
    // Extract the base name (e.g., aaa.jpg)
    base := filepath.Base(input)
    // Remove the file extension (e.g., aaa)
    name := strings.TrimSuffix(base, filepath.Ext(base))
    return name
}

// getObject retrieves an object from S3
func getObject(ctx context.Context, bucket string, key string) (*s3.GetObjectOutput, error) {
    output, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
        Bucket: &bucket,
        Key:    &key,
    })
    if err != nil {
        log.Printf("failed to get object from S3: %s\n", err)
        return nil, err
    }

    log.Printf("successfully retrieved object from S3 bucket: %s and key: %s\n", bucket, key)

    return output, nil
}

// putObject uploads an object to S3
func putObject(ctx context.Context, bucket string, key string, body []byte) error {
    _, err := s3Client.PutObject(ctx, &s3.PutObjectInput{
        Bucket: &bucket,
        Key:    &key,
        Body:   bytes.NewReader(body),
    })
    if err != nil {
        log.Printf("failed to put object to S3: %s\n", err)
        return err
    }

    log.Printf("successfully put object to S3 bucket: %s and key: %s\n", bucket, key)

    return nil
}

// convertToJPEG converts an image to JPEG format
func convertToJPEG(img image.Image) ([]byte, error) {
    var buf bytes.Buffer

    // Set JPEG options (quality 100)
    options := &jpeg.Options{Quality: 100}

    err := jpeg.Encode(&buf, img, options)
    if err != nil {
        log.Printf("failed to convert image to JPEG: %s\n", err)
        return nil, err
    }
    return buf.Bytes(), nil
}

func main() {
    lambda.Start(handler)
}
Add Trigger:
  • Set the trigger to the resizing-queue.

Add Trigger Pixel Resizer

Set Timeout:
  • Edit the Lambda configuration to set the execution timeout to 30 seconds.

Edit Pixel Resizer Timeout

Deploy the Lambda:
  • Create lambda function.
  • Package the code into a .zip file and upload it to the AWS Lambda Console.

Create Lambda Pixel Resizer

$ cd pixel-resizer-function
$ GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc main.go
$ zip pixel-resizer-function.zip bootstrap

Deploy Pixel Resizer

9. Create a DynamoDB Table

Create a DynamoDB table named pixel-metadata with name as the partition key.

Create DynamoDB Table

10. Deploy Metadata Extractor Lambda

The Metadata Extractor Lambda generates metadata from the master image and stores it in DynamoDB.

Steps:

Create an IAM Policy:
  • Name: LambdaMetadataExtractorPolicy.
  • Grant permissions to read from SQS and write to DynamoDB.

Create Metadata Extractor Policy

Create an IAM Role:
  • Name: LambdaMetadataExtractorExecutionRole.
  • Attach the LambdaMetadataExtractorPolicy.

Create Metadata Extractor Execution Role

Write the Lambda Code:
  • Use Golang to write a Lambda function that:
    • Reads messages from the metadata-extraction-queue.
    • Extracts metadata from the image.
    • Stores the metadata in the DynamoDB table.
package main

import (
    "context"
    "encoding/json"
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
    "github.com/aws/aws-sdk-go-v2/service/s3"
    "image"
    _ "image/gif"
    _ "image/jpeg"
    _ "image/png"
    "io"
    "log"
    "os"
    "path/filepath"
    "strconv"
    "strings"
    "time"
)

var s3Client *s3.Client
var dynamoClient *dynamodb.Client
var tableName = "pixel-metadata"

type messageBody struct {
    Bucket string `json:"bucket"`
    Key    string `json:"key"`
}

func handler(ctx context.Context, event events.SQSEvent) error {
    sdkConfig, err := config.LoadDefaultConfig(ctx)
    if err != nil {
        log.Printf("failed to load default config: %s\n", err)
        return err
    }
    s3Client = s3.NewFromConfig(sdkConfig)
    dynamoClient = dynamodb.NewFromConfig(sdkConfig)

    for _, record := range event.Records {
        log.Printf("received message from SQS with messageID: %s, body: %s\n", record.MessageId, record.Body)

        var body messageBody
        err := json.Unmarshal([]byte(record.Body), &body)
        if err != nil {
            log.Printf("failed to unmarshal this messageID %s: %s\n", record.MessageId, err)
            return err
        }

        metadata, err := generateMetadata(ctx, body.Bucket, body.Key)
        if err != nil {
            log.Printf("failed to extract metadata: %s\n", err)
            return err
        }

        // Creating DynamoDB item
        err = createDynamoDBItem(ctx, map[string]types.AttributeValue{
            "name":     &types.AttributeValueMemberS{Value: extractFilename(body.Key)},
            "bucket":   &types.AttributeValueMemberS{Value: body.Bucket},
            "metadata": &types.AttributeValueMemberM{Value: convertToAttributeValue(metadata)},
        })
        if err != nil {
            log.Printf("failed to create DynamoDB item: %s\n", err)
            return err
        }
    }

    return nil
}

// generateMetadata generates metadata for the image file
func generateMetadata(ctx context.Context, bucket, key string) (map[string]string, error) {
    objOutput, err := getObject(ctx, bucket, key)
    if err != nil {
        log.Printf("failed to get object from S3: %s\n", err)
        return nil, err
    }
    defer objOutput.Body.Close()
    log.Printf("content type: %s\n", *objOutput.ContentType)
    log.Printf("content length: %d\n", *objOutput.ContentLength)

    // Read all bytes from S3 object
    bodyBytes, err := io.ReadAll(objOutput.Body)
    if err != nil {
        log.Printf("failed to read object body: %s\n", err)
        return nil, err
    }

    // Write bytes to temp file
    localFilePath := filepath.Join("/tmp", filepath.Base(key))
    err = os.WriteFile(localFilePath, bodyBytes, 0644)
    if err != nil {
        log.Printf("failed to write to local file: %s\n", err)
        return nil, err
    }
    defer os.Remove(localFilePath)

    // Open file for reading
    file, err := os.Open(localFilePath)
    if err != nil {
        log.Printf("failed to open local file: %s\n", err)
        return nil, err
    }
    defer file.Close()

    // Decode image to get dimensions
    cfg, format, err := image.DecodeConfig(file)
    if err != nil {
        log.Printf("failed to decode image: %s\n", err)
        return nil, err
    }

    // Open the file again to extract EXIF data
    fileInfo, err := os.Stat(localFilePath)
    if err != nil {
        log.Printf("failed to get file info: %s\n", err)
        return nil, err
    }

    // Construct metadata map
    metadata := map[string]string{
        "width":         strconv.Itoa(cfg.Width),
        "height":        strconv.Itoa(cfg.Height),
        "format":        format,
        "file_size":     strconv.FormatInt(fileInfo.Size(), 10), // File size in bytes, // File size in bytes
        "file_name":     fileInfo.Name(),
        "last_modified": fileInfo.ModTime().Format(time.RFC3339),
    }

    return metadata, nil
}

// convertToAttributeValue converts a map of string to AttributeValue
func convertToAttributeValue(data map[string]string) map[string]types.AttributeValue {
    attributeValue := make(map[string]types.AttributeValue)
    for key, value := range data {
        attributeValue[key] = &types.AttributeValueMemberS{Value: value}
    }
    return attributeValue
}

// extractFilename extracts the filename from the input string
func extractFilename(input string) string {
    // Extract the base name (e.g., aaa.jpg)
    base := filepath.Base(input)
    // Remove the file extension (e.g., aaa)
    name := strings.TrimSuffix(base, filepath.Ext(base))
    return name
}

// getObject retrieves an object from S3
func getObject(ctx context.Context, bucket string, key string) (*s3.GetObjectOutput, error) {
    output, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
        Bucket: &bucket,
        Key:    &key,
    })
    if err != nil {
        log.Printf("failed to get object from S3: %s\n", err)
        return nil, err
    }

    log.Printf("successfully retrieved object from S3 bucket: %s and key: %s\n", bucket, key)

    return output, nil
}

// createDynamoDBItem creates a new item in DynamoDB
func createDynamoDBItem(ctx context.Context, item map[string]types.AttributeValue) error {
    _, err := dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{
        TableName: &tableName,
        Item:      item,
    })
    if err != nil {
        log.Printf("failed to put item in DynamoDB: %s\n", err)
        return err
    }

    log.Printf("successfully put item in DynamoDB table: %s\n", tableName)

    return nil
}

func main() {
    lambda.Start(handler)
}
Add Trigger:
  • Set the trigger to the metadata-extraction-queue.

Add Trigger Metadata Extractor

Set Timeout:
  • Edit the Lambda configuration to set the execution timeout to 30 seconds.

Edit Metadata Extractor Timeout

Deploy the Lambda:
  • Create lambda function.
  • Package the code into a .zip file and upload it to the AWS Lambda Console.

Create Lambda Metadata Extractor

$ cd metadata-extractor-function
$ GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc main.go
$ zip metadata-extractor-function.zip bootstrap

Deploy Metadata Extractor

11. Test End-to-End

  1. Upload an image to the master folder.

Upload Master Image

  1. Monitor Lambda execution logs in CloudWatch.

Monitor Lambda execution logs in CloudWatch

Monitor Lambda execution logs in CloudWatch

Monitor Lambda execution logs in CloudWatch

Monitor Lambda execution logs in CloudWatch

  1. Verify the converted and resized folders in S3.

Verify Output in S3

Verify Output in S3

  1. Check the metadata in DynamoDB.

Check the metadata in DynamoDB

Conclusion

PixelCascade demonstrates how to build a scalable, serverless image processing pipeline using AWS services. The fanout architecture ensures scalability and decoupling, making the system robust and adaptable to high workloads. By following this guide, you can build and customize your own version of PixelCascade. Happy building!