PixelCascade: Building a Scalable Image Processing Pipeline with AWS Fanout Architecture
Introduction PixelCascade is a mini-project that demonstrates how to build a scalable image processing pipeline using AWS services like S3, Lambda, SNS, SQS, and DynamoDB. By leveraging a fanout architecture, this project allows users to upload images in formats such as JPEG, PNG, and GIF, and process them into formatted outputs, resized versions, and extracted metadata. This article provides a step-by-step guide to implementing PixelCascade, from architecture design to testing the pipeline. You can find the complete source code for this project on: GitHub Repository. Background of the Project and Architecture The primary objective of PixelCascade is to create a scalable and efficient workflow for processing images. Below are the project goals, workflow, and architecture details. Goals User Uploads: Accept images in JPEG, PNG, and GIF formats. Outputs: Formatted images Converted images Extracted metadata Scalability: Handle high volumes of image uploads without bottlenecks. Workflow Image Upload: Users upload a master image to the master folder in an S3 bucket. Lambda Trigger: A Lambda function processes the S3 upload event and publishes a message to an SNS topic. SNS Fanout: The SNS topic fans out the message to three SQS queues: Format Conversion Queue: Converts the master image to JPEG format and stores it in S3. Metadata Extraction Queue: Extracts metadata from the master image and stores it in DynamoDB. Resizing Queue: Resizes the master image into thumbnail, medium, and large sizes and stores them in S3. Architecture Diagram Requirements Before setting up PixelCascade, ensure you have the following: AWS Account: Access to AWS services like S3, Lambda, SNS, SQS, and DynamoDB. Golang: Required for writing and deploying Lambda functions. Step-by-Step Implementation 1. Create an SNS Topic Create an SNS topic named new-image-uploaded to handle messages for every master image uploaded. Steps: Navigate to SNS Console in AWS. Click Create Topic. Select Standard as the topic type. Enter new-image-uploaded as the topic name. Click Create Topic. 2. Create an S3 Bucket and Folders Create an S3 bucket named pixel-cascade with the following folders: master: For storing user-uploaded images. converted: For storing converted images. resized: For storing resized images. Steps: Navigate to S3 Console in AWS. Click Create Bucket and name it pixel-cascade. Open the bucket and create three folders: master, converted, and resized. 3. Create and Deploy the Job Publisher Lambda The Job Publisher Lambda is triggered by S3 events and publishes messages to the SNS topic. Steps: Create an IAM Policy: Name: LambdaJobPublisherPolicy. Grant permissions to read from S3 and publish to SNS. { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:PutLogEvents", "logs:CreateLogGroup", "logs:CreateLogStream" ], "Resource": "arn:aws:logs:*:*:*" }, { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": "arn:aws:sns:*:*:new-image-uploaded" } ] } Create an IAM Role: Name: LambdaJobPublisherExecutionRole. Attach the LambdaJobPublisherPolicy. Write the Lambda Code: Use Golang to write a Lambda function that: Processes S3 events. Publishes messages to the new-image-uploaded SNS topic. package main import ( "context" "encoding/json" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" "log" ) var newImageUploadedTopicArn = "arn:aws:sns:us-east-1:116981782358:new-image-uploaded" func handler(ctx context.Context, s3Event events.S3Event) error { sdkConfig, err := config.LoadDefaultConfig(ctx) if err != nil { log.Printf("failed to load default config: %s\n", err) return err } snsClient := sns.NewFromConfig(sdkConfig) for _, record := range s3Event.Records { bucket := record.S3.Bucket.Name key := record.S3.Object.URLDecodedKey log.Printf("success receive s3 event trigger for bucket: %s and key: %s\n", bucket, key) payload := struct { Bucket string `json:"bucket"` Key string `json:"key"` }{ Bucket: bucket, Key: key, } m, err := json.Marshal(payload) if err != nil { log.Printf("failed to marshal payload: %s\n", err) return err } message := string(m) resp, err := snsClient.Publish(ctx, &sns.PublishInput{ Message: &message, TopicArn: &newImageUploadedTopicArn, }) if err != nil {

Introduction
PixelCascade is a mini-project that demonstrates how to build a scalable image processing pipeline using AWS services like S3, Lambda, SNS, SQS, and DynamoDB. By leveraging a fanout architecture, this project allows users to upload images in formats such as JPEG, PNG, and GIF, and process them into formatted outputs, resized versions, and extracted metadata.
This article provides a step-by-step guide to implementing PixelCascade, from architecture design to testing the pipeline. You can find the complete source code for this project on: GitHub Repository.
Background of the Project and Architecture
The primary objective of PixelCascade is to create a scalable and efficient workflow for processing images. Below are the project goals, workflow, and architecture details.
Goals
- User Uploads: Accept images in JPEG, PNG, and GIF formats.
-
Outputs:
- Formatted images
- Converted images
- Extracted metadata
- Scalability: Handle high volumes of image uploads without bottlenecks.
Workflow
-
Image Upload: Users upload a master image to the
master
folder in an S3 bucket. - Lambda Trigger: A Lambda function processes the S3 upload event and publishes a message to an SNS topic.
-
SNS Fanout: The SNS topic fans out the message to three SQS queues:
- Format Conversion Queue: Converts the master image to JPEG format and stores it in S3.
- Metadata Extraction Queue: Extracts metadata from the master image and stores it in DynamoDB.
- Resizing Queue: Resizes the master image into thumbnail, medium, and large sizes and stores them in S3.
Architecture Diagram
Requirements
Before setting up PixelCascade, ensure you have the following:
- AWS Account: Access to AWS services like S3, Lambda, SNS, SQS, and DynamoDB.
- Golang: Required for writing and deploying Lambda functions.
Step-by-Step Implementation
1. Create an SNS Topic
Create an SNS topic named new-image-uploaded
to handle messages for every master image uploaded.
Steps:
- Navigate to SNS Console in AWS.
- Click Create Topic.
- Select Standard as the topic type.
- Enter
new-image-uploaded
as the topic name. - Click Create Topic.
2. Create an S3 Bucket and Folders
Create an S3 bucket named pixel-cascade
with the following folders:
- master: For storing user-uploaded images.
- converted: For storing converted images.
- resized: For storing resized images.
Steps:
- Navigate to S3 Console in AWS.
- Click Create Bucket and name it
pixel-cascade
. - Open the bucket and create three folders:
master
,converted
, andresized
.
3. Create and Deploy the Job Publisher Lambda
The Job Publisher Lambda is triggered by S3 events and publishes messages to the SNS topic.
Steps:
Create an IAM Policy:
- Name:
LambdaJobPublisherPolicy
. - Grant permissions to read from S3 and publish to SNS.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:PutLogEvents",
"logs:CreateLogGroup",
"logs:CreateLogStream"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": "arn:aws:sns:*:*:new-image-uploaded"
}
]
}
Create an IAM Role:
- Name:
LambdaJobPublisherExecutionRole
. - Attach the
LambdaJobPublisherPolicy
.
Write the Lambda Code:
- Use Golang to write a Lambda function that:
- Processes S3 events.
- Publishes messages to the
new-image-uploaded
SNS topic.
package main
import (
"context"
"encoding/json"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"log"
)
var newImageUploadedTopicArn = "arn:aws:sns:us-east-1:116981782358:new-image-uploaded"
func handler(ctx context.Context, s3Event events.S3Event) error {
sdkConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Printf("failed to load default config: %s\n", err)
return err
}
snsClient := sns.NewFromConfig(sdkConfig)
for _, record := range s3Event.Records {
bucket := record.S3.Bucket.Name
key := record.S3.Object.URLDecodedKey
log.Printf("success receive s3 event trigger for bucket: %s and key: %s\n", bucket, key)
payload := struct {
Bucket string `json:"bucket"`
Key string `json:"key"`
}{
Bucket: bucket,
Key: key,
}
m, err := json.Marshal(payload)
if err != nil {
log.Printf("failed to marshal payload: %s\n", err)
return err
}
message := string(m)
resp, err := snsClient.Publish(ctx, &sns.PublishInput{
Message: &message,
TopicArn: &newImageUploadedTopicArn,
})
if err != nil {
log.Printf("failed to publish message to SNS: %s\n", err)
return err
}
log.Printf("message sent to SNS with message ID: %s\n", *resp.MessageId)
}
return nil
}
func main() {
lambda.Start(handler)
}
Add Trigger:
- Set the trigger to the
pixel-cascade
bucket. - Prefix:
master/
.
Deploy the Lambda:
- Create lambda function.
- Package the code into a .zip file and upload it to the AWS Lambda Console.
$ cd job-publisher-function
$ GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc main.go
$ zip job-publisher-function.zip bootstrap
4. Create SQS Queues
Create three SQS queues: format-conversion-queue
, metadata-extraction-queue
, and resizing-queue
.
Steps:
- Navigate to SQS Console in AWS.
- Click Create Queue and choose Standard Queue.
- Enter the queue name and repeat for the other two queues.
5. Edit Access Policies for SQS Queues
Allow the new-image-uploaded
SNS topic to send messages to the SQS queues.
6. Create SNS Subscriptions
Subscribe each SQS queue to the new-image-uploaded
SNS topic.
Steps:
- Navigate to the SNS Console.
- Open the
new-image-uploaded
topic. - Click Create Subscription and set the protocol to SQS.
- Provide the ARN of each SQS queue.
7. Deploy Format Converter Lambda
The Format Converter Lambda converts the master image to JPEG and stores it in the converted
folder.
Steps:
Create an IAM Policy:
- Name:
LambdaFormatConverterPolicy
. - Grant permissions to read from SQS and write to S3.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:PutLogEvents",
"logs:CreateLogGroup",
"logs:CreateLogStream"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::pixel-cascade/*"
},
{
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage"
],
"Resource": "arn:aws:sqs:*:*:format-conversion-queue"
}
]
}
Create an IAM Role:
- Name:
LambdaFormatConverterExecutionRole
. - Attach the
LambdaFormatConverterPolicy
.
Write the Lambda Code:
- Use Golang to write a Lambda function that:
- Reads messages from the
format-conversion-queue
. - Converts the image to JPEG format.
- Stores the result in the
converted
folder.
- Reads messages from the
package main
import (
"bytes"
"context"
"encoding/json"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"image"
_ "image/gif"
"image/jpeg"
_ "image/png"
"log"
"path/filepath"
"strings"
)
var s3Client *s3.Client
const (
ConvertedFolder = "converted/"
)
type messageBody struct {
Bucket string `json:"bucket"`
Key string `json:"key"`
}
func handler(ctx context.Context, event events.SQSEvent) error {
sdkConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Printf("failed to load default config: %s\n", err)
return err
}
s3Client = s3.NewFromConfig(sdkConfig)
for _, record := range event.Records {
log.Printf("received message from SQS with messageID: %s, body: %s\n", record.MessageId, record.Body)
var body messageBody
err := json.Unmarshal([]byte(record.Body), &body)
if err != nil {
log.Printf("failed to unmarshal this messageID %s: %s\n", record.MessageId, err)
return err
}
objOutput, err := getObject(ctx, body.Bucket, body.Key)
if err != nil {
log.Printf("failed to get object from S3: %s\n", err)
return err
}
defer objOutput.Body.Close()
log.Printf("content type: %s\n", *objOutput.ContentType)
log.Printf("content length: %d\n", *objOutput.ContentLength)
// Decode the image
img, format, err := image.Decode(objOutput.Body)
if err != nil {
log.Printf("failed to decode image: %s\n", err)
return err
}
log.Printf("decoded image format: %s\n", format)
jpegImg, err := convertToJPEG(img)
if err != nil {
log.Printf("failed to convert image: %s\n", err)
return err
}
err = putObject(ctx, body.Bucket, ConvertedFolder+extractFilename(body.Key)+".jpg", jpegImg)
if err != nil {
log.Printf("failed to put object to S3: %s\n", err)
return err
}
}
return nil
}
// extractFilename extracts the filename from the input string
func extractFilename(input string) string {
// Extract the base name (e.g., aaa.jpg)
base := filepath.Base(input)
// Remove the file extension (e.g., aaa)
name := strings.TrimSuffix(base, filepath.Ext(base))
return name
}
// getObject retrieves an object from S3
func getObject(ctx context.Context, bucket string, key string) (*s3.GetObjectOutput, error) {
output, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
log.Printf("failed to get object from S3: %s\n", err)
return nil, err
}
log.Printf("successfully retrieved object from S3 bucket: %s and key: %s\n", bucket, key)
return output, nil
}
// putObject uploads an object to S3
func putObject(ctx context.Context, bucket string, key string, body []byte) error {
_, err := s3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &bucket,
Key: &key,
Body: bytes.NewReader(body),
})
if err != nil {
log.Printf("failed to put object to S3: %s\n", err)
return err
}
log.Printf("successfully put object to S3 bucket: %s and key: %s\n", bucket, key)
return nil
}
// convertToJPEG converts an image to JPEG format
func convertToJPEG(img image.Image) ([]byte, error) {
var buf bytes.Buffer
// Set JPEG options (quality 100)
options := &jpeg.Options{Quality: 100}
err := jpeg.Encode(&buf, img, options)
if err != nil {
log.Printf("failed to convert image to JPEG: %s\n", err)
return nil, err
}
return buf.Bytes(), nil
}
func main() {
lambda.Start(handler)
}
Add Trigger:
- Set the trigger to the
format-conversion-queue
.
Set Timeout:
- Edit the Lambda configuration to set the execution timeout to 30 seconds.
Deploy the Lambda:
- Create lambda function.
- Package the code into a .zip file and upload it to the AWS Lambda Console.
$ cd format-converter-function
$ GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc main.go
$ zip format-converter-function.zip bootstrap
8. Deploy Pixel Resizer Lambda
The Pixel Resizer Lambda resizes the master image into three sizes (thumbnail, medium, and large) and stores them in the resized
folder.
Steps:
Create an IAM Policy:
- Name:
LambdaPixelResizerPolicy
. - Grant permissions to read from SQS and write to S3.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:PutLogEvents",
"logs:CreateLogGroup",
"logs:CreateLogStream"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::pixel-cascade/*"
},
{
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage"
],
"Resource": "arn:aws:sqs:*:*:resizing-queue"
}
]
}
Create an IAM Role:
- Name:
LambdaPixelResizerExecutionRole
. - Attach the
LambdaPixelResizerPolicy
.
Write the Lambda Code:
- Use Golang to write a Lambda function that:
- Reads messages from the
resizing-queue
. - Resizes the image into three sizes (thumbnail, medium, large).
- Stores the results in the
resized
folder.
- Reads messages from the
package main
import (
"bytes"
"context"
"encoding/json"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/nfnt/resize"
"image"
_ "image/gif"
"image/jpeg"
_ "image/png"
"log"
"path/filepath"
"strings"
"sync"
)
var s3Client *s3.Client
const (
ResizedFolder = "resized/"
)
type messageBody struct {
Bucket string `json:"bucket"`
Key string `json:"key"`
}
type resizeTask struct {
SizeName string
Width uint
}
func handler(ctx context.Context, event events.SQSEvent) error {
sdkConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Printf("failed to load default config: %s\n", err)
return err
}
s3Client = s3.NewFromConfig(sdkConfig)
for _, record := range event.Records {
log.Printf("received message from SQS with messageID: %s, body: %s\n", record.MessageId, record.Body)
var body messageBody
err := json.Unmarshal([]byte(record.Body), &body)
if err != nil {
log.Printf("failed to unmarshal this messageID %s: %s\n", record.MessageId, err)
return err
}
objOutput, err := getObject(ctx, body.Bucket, body.Key)
if err != nil {
log.Printf("failed to get object from S3: %s\n", err)
return err
}
defer objOutput.Body.Close()
log.Printf("content type: %s\n", *objOutput.ContentType)
log.Printf("content length: %d\n", *objOutput.ContentLength)
// Decode the image
img, format, err := image.Decode(objOutput.Body)
if err != nil {
log.Printf("failed to decode image: %s\n", err)
return err
}
log.Printf("decoded image format: %s\n", format)
// Resize the image
resizeTasks := []resizeTask{
{"thumbnail", 100},
{"medium", 500},
{"large", 1000},
}
var wg sync.WaitGroup
for _, task := range resizeTasks {
wg.Add(1)
go func(task resizeTask) {
defer wg.Done()
resizedImg := resize.Resize(task.Width, 0, img, resize.Lanczos3)
if err != nil {
log.Printf("failed to resize image: %s\n", err)
return
}
jpegImg, err := convertToJPEG(resizedImg)
if err != nil {
log.Printf("failed to convert image: %s\n", err)
return
}
err = putObject(ctx, body.Bucket, ResizedFolder+extractFilename(body.Key)+"/"+task.SizeName+".jpg", jpegImg)
if err != nil {
log.Printf("failed to put object to S3: %s\n", err)
return
}
}(task)
}
wg.Wait()
log.Printf("all resize tasks completed for image: %s\n", body.Key)
}
return nil
}
// extractFilename extracts the filename from the input string
func extractFilename(input string) string {
// Extract the base name (e.g., aaa.jpg)
base := filepath.Base(input)
// Remove the file extension (e.g., aaa)
name := strings.TrimSuffix(base, filepath.Ext(base))
return name
}
// getObject retrieves an object from S3
func getObject(ctx context.Context, bucket string, key string) (*s3.GetObjectOutput, error) {
output, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
log.Printf("failed to get object from S3: %s\n", err)
return nil, err
}
log.Printf("successfully retrieved object from S3 bucket: %s and key: %s\n", bucket, key)
return output, nil
}
// putObject uploads an object to S3
func putObject(ctx context.Context, bucket string, key string, body []byte) error {
_, err := s3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &bucket,
Key: &key,
Body: bytes.NewReader(body),
})
if err != nil {
log.Printf("failed to put object to S3: %s\n", err)
return err
}
log.Printf("successfully put object to S3 bucket: %s and key: %s\n", bucket, key)
return nil
}
// convertToJPEG converts an image to JPEG format
func convertToJPEG(img image.Image) ([]byte, error) {
var buf bytes.Buffer
// Set JPEG options (quality 100)
options := &jpeg.Options{Quality: 100}
err := jpeg.Encode(&buf, img, options)
if err != nil {
log.Printf("failed to convert image to JPEG: %s\n", err)
return nil, err
}
return buf.Bytes(), nil
}
func main() {
lambda.Start(handler)
}
Add Trigger:
- Set the trigger to the
resizing-queue
.
Set Timeout:
- Edit the Lambda configuration to set the execution timeout to 30 seconds.
Deploy the Lambda:
- Create lambda function.
- Package the code into a .zip file and upload it to the AWS Lambda Console.
$ cd pixel-resizer-function
$ GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc main.go
$ zip pixel-resizer-function.zip bootstrap
9. Create a DynamoDB Table
Create a DynamoDB table named pixel-metadata
with name
as the partition key.
10. Deploy Metadata Extractor Lambda
The Metadata Extractor Lambda generates metadata from the master image and stores it in DynamoDB.
Steps:
Create an IAM Policy:
- Name:
LambdaMetadataExtractorPolicy
. - Grant permissions to read from SQS and write to DynamoDB.
Create an IAM Role:
- Name:
LambdaMetadataExtractorExecutionRole
. - Attach the
LambdaMetadataExtractorPolicy
.
Write the Lambda Code:
- Use Golang to write a Lambda function that:
- Reads messages from the
metadata-extraction-queue
. - Extracts metadata from the image.
- Stores the metadata in the DynamoDB table.
- Reads messages from the
package main
import (
"context"
"encoding/json"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/aws-sdk-go-v2/service/s3"
"image"
_ "image/gif"
_ "image/jpeg"
_ "image/png"
"io"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
var s3Client *s3.Client
var dynamoClient *dynamodb.Client
var tableName = "pixel-metadata"
type messageBody struct {
Bucket string `json:"bucket"`
Key string `json:"key"`
}
func handler(ctx context.Context, event events.SQSEvent) error {
sdkConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Printf("failed to load default config: %s\n", err)
return err
}
s3Client = s3.NewFromConfig(sdkConfig)
dynamoClient = dynamodb.NewFromConfig(sdkConfig)
for _, record := range event.Records {
log.Printf("received message from SQS with messageID: %s, body: %s\n", record.MessageId, record.Body)
var body messageBody
err := json.Unmarshal([]byte(record.Body), &body)
if err != nil {
log.Printf("failed to unmarshal this messageID %s: %s\n", record.MessageId, err)
return err
}
metadata, err := generateMetadata(ctx, body.Bucket, body.Key)
if err != nil {
log.Printf("failed to extract metadata: %s\n", err)
return err
}
// Creating DynamoDB item
err = createDynamoDBItem(ctx, map[string]types.AttributeValue{
"name": &types.AttributeValueMemberS{Value: extractFilename(body.Key)},
"bucket": &types.AttributeValueMemberS{Value: body.Bucket},
"metadata": &types.AttributeValueMemberM{Value: convertToAttributeValue(metadata)},
})
if err != nil {
log.Printf("failed to create DynamoDB item: %s\n", err)
return err
}
}
return nil
}
// generateMetadata generates metadata for the image file
func generateMetadata(ctx context.Context, bucket, key string) (map[string]string, error) {
objOutput, err := getObject(ctx, bucket, key)
if err != nil {
log.Printf("failed to get object from S3: %s\n", err)
return nil, err
}
defer objOutput.Body.Close()
log.Printf("content type: %s\n", *objOutput.ContentType)
log.Printf("content length: %d\n", *objOutput.ContentLength)
// Read all bytes from S3 object
bodyBytes, err := io.ReadAll(objOutput.Body)
if err != nil {
log.Printf("failed to read object body: %s\n", err)
return nil, err
}
// Write bytes to temp file
localFilePath := filepath.Join("/tmp", filepath.Base(key))
err = os.WriteFile(localFilePath, bodyBytes, 0644)
if err != nil {
log.Printf("failed to write to local file: %s\n", err)
return nil, err
}
defer os.Remove(localFilePath)
// Open file for reading
file, err := os.Open(localFilePath)
if err != nil {
log.Printf("failed to open local file: %s\n", err)
return nil, err
}
defer file.Close()
// Decode image to get dimensions
cfg, format, err := image.DecodeConfig(file)
if err != nil {
log.Printf("failed to decode image: %s\n", err)
return nil, err
}
// Open the file again to extract EXIF data
fileInfo, err := os.Stat(localFilePath)
if err != nil {
log.Printf("failed to get file info: %s\n", err)
return nil, err
}
// Construct metadata map
metadata := map[string]string{
"width": strconv.Itoa(cfg.Width),
"height": strconv.Itoa(cfg.Height),
"format": format,
"file_size": strconv.FormatInt(fileInfo.Size(), 10), // File size in bytes, // File size in bytes
"file_name": fileInfo.Name(),
"last_modified": fileInfo.ModTime().Format(time.RFC3339),
}
return metadata, nil
}
// convertToAttributeValue converts a map of string to AttributeValue
func convertToAttributeValue(data map[string]string) map[string]types.AttributeValue {
attributeValue := make(map[string]types.AttributeValue)
for key, value := range data {
attributeValue[key] = &types.AttributeValueMemberS{Value: value}
}
return attributeValue
}
// extractFilename extracts the filename from the input string
func extractFilename(input string) string {
// Extract the base name (e.g., aaa.jpg)
base := filepath.Base(input)
// Remove the file extension (e.g., aaa)
name := strings.TrimSuffix(base, filepath.Ext(base))
return name
}
// getObject retrieves an object from S3
func getObject(ctx context.Context, bucket string, key string) (*s3.GetObjectOutput, error) {
output, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
log.Printf("failed to get object from S3: %s\n", err)
return nil, err
}
log.Printf("successfully retrieved object from S3 bucket: %s and key: %s\n", bucket, key)
return output, nil
}
// createDynamoDBItem creates a new item in DynamoDB
func createDynamoDBItem(ctx context.Context, item map[string]types.AttributeValue) error {
_, err := dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{
TableName: &tableName,
Item: item,
})
if err != nil {
log.Printf("failed to put item in DynamoDB: %s\n", err)
return err
}
log.Printf("successfully put item in DynamoDB table: %s\n", tableName)
return nil
}
func main() {
lambda.Start(handler)
}
Add Trigger:
- Set the trigger to the
metadata-extraction-queue
.
Set Timeout:
- Edit the Lambda configuration to set the execution timeout to 30 seconds.
Deploy the Lambda:
- Create lambda function.
- Package the code into a .zip file and upload it to the AWS Lambda Console.
$ cd metadata-extractor-function
$ GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc main.go
$ zip metadata-extractor-function.zip bootstrap
11. Test End-to-End
- Upload an image to the
master
folder.
- Monitor Lambda execution logs in CloudWatch.
- Verify the
converted
andresized
folders in S3.
- Check the metadata in DynamoDB.
Conclusion
PixelCascade demonstrates how to build a scalable, serverless image processing pipeline using AWS services. The fanout architecture ensures scalability and decoupling, making the system robust and adaptable to high workloads. By following this guide, you can build and customize your own version of PixelCascade. Happy building!