Examples#

This section provides practical examples of common AWS architectures using core-aws-cdk.

Complete SNS → SQS → Lambda → S3 Integration#

This example demonstrates a complete event-driven pipeline:

from aws_cdk import App, Environment, Duration, CfnOutput
from aws_cdk.aws_lambda import Code, Runtime
from aws_cdk.aws_lambda_event_sources import SqsEventSource
from aws_cdk.aws_sns_subscriptions import SqsSubscription
from core_aws_cdk.stacks.lambdas import BaseLambdaStack
from core_aws_cdk.stacks.s3 import BaseS3Stack
from core_aws_cdk.stacks.sns import BaseSnsStack
from core_aws_cdk.stacks.sqs import BaseSqsStack

class IntegratedStack(BaseSnsStack, BaseSqsStack,
                      BaseLambdaStack, BaseS3Stack):
    pass

app = App()
stack = IntegratedStack(
    app,
    "IntegratedStack",
    env=Environment(account="123456789", region="us-east-1")
)

# Create S3 bucket
bucket = stack.create_bucket(
    bucket_id="DataBucket",
    bucket_name=None  # Auto-generate
)

# Create SQS queue with DLQ
queue = stack.create_sqs_queue(
    queue_id="ProcessQueue",
    queue_name="process-queue",
    with_dlq=True,
    dlq_id="ProcessQueueDLQ",
    max_receive_count=3
)

# Create SNS topic
topic = stack.create_sns_topic(
    topic_id="EventTopic",
    topic_name="event-topic"
)

# Subscribe queue to topic
topic.add_subscription(SqsSubscription(queue))

# Create Lambda processor
lambda_function = stack.create_lambda(
    function_id="Processor",
    handler="handler.lambda_handler",
    code=Code.from_asset("./lambda"),
    runtime=Runtime.PYTHON_3_12,
    timeout=Duration.minutes(5),
    environment={"BUCKET_NAME": bucket.bucket_name}
)

# Configure SQS as Lambda trigger
lambda_function.add_event_source(SqsEventSource(queue))

# Grant permissions
bucket.grant_write(lambda_function)

# Export outputs
CfnOutput(stack, "TopicArn", value=topic.topic_arn)
CfnOutput(stack, "BucketName", value=bucket.bucket_name)

app.synth()

Lambda with Custom Dependencies#

Create a Lambda function with custom Python dependencies:

from aws_cdk import App, Environment
from aws_cdk.aws_lambda import Runtime
from core_aws_cdk.stacks.lambdas import BaseLambdaStack, ZipAssetCode
import pathlib

app = App()
stack = BaseLambdaStack(
    app,
    "CustomLambdaStack",
    env=Environment(account="123456789", region="us-east-1")
)

# Create Lambda with custom packaging
code = ZipAssetCode(
    project_directory=pathlib.Path("/path/to/project"),
    work_dir=pathlib.Path("/path/to/lambda"),
    includes=["handler.py", "__init__.py", "config.json"],
    include_project_folders=["commons", "utils"],
    debug=True
)

lambda_function = stack.create_lambda(
    function_id="CustomFunction",
    handler="handler.lambda_handler",
    code=code,
    runtime=Runtime.PYTHON_3_12,
    function_name="custom-processor",
    environment={
        "ENV": "production",
        "LOG_LEVEL": "INFO"
    }
)

app.synth()

VPC with Private Subnets#

Create a VPC with custom subnet configuration:

from aws_cdk import App, Environment
from aws_cdk.aws_ec2 import SubnetConfiguration, SubnetType
from core_aws_cdk.stacks.network import NetworkStack

app = App()
stack = NetworkStack(
    app,
    "NetworkStack",
    env=Environment(account="123456789", region="us-east-1")
)

vpc = stack.create_vpc(
    vpc_id="MainVPC",
    cidr="10.0.0.0/16",
    max_azs=3,
    subnet_configuration=[
        SubnetConfiguration(
            name="Public",
            subnet_type=SubnetType.PUBLIC,
            cidr_mask=24
        ),
        SubnetConfiguration(
            name="Private",
            subnet_type=SubnetType.PRIVATE_WITH_EGRESS,
            cidr_mask=24
        ),
        SubnetConfiguration(
            name="Isolated",
            subnet_type=SubnetType.PRIVATE_ISOLATED,
            cidr_mask=28
        )
    ]
)

app.synth()

S3 Bucket with Versioning and Lifecycle#

Create an S3 bucket with advanced configuration:

from aws_cdk import App, Environment, Duration, RemovalPolicy
from aws_cdk.aws_s3 import LifecycleRule, StorageClass
from core_aws_cdk.stacks.s3 import BaseS3Stack

app = App()
stack = BaseS3Stack(
    app,
    "S3Stack",
    env=Environment(account="123456789", region="us-east-1")
)

bucket = stack.create_bucket(
    bucket_id="ArchiveBucket",
    bucket_name="my-archive-bucket",
    versioned=True,
    removal_policy=RemovalPolicy.RETAIN,
    lifecycle_rules=[
        LifecycleRule(
            id="TransitionToIA",
            transitions=[{
                "storage_class": StorageClass.INFREQUENT_ACCESS,
                "transition_after": Duration.days(30)
            }]
        ),
        LifecycleRule(
            id="TransitionToGlacier",
            transitions=[{
                "storage_class": StorageClass.GLACIER,
                "transition_after": Duration.days(90)
            }]
        )
    ]
)

app.synth()

FIFO Queue with Dead Letter Queue#

Create a FIFO SQS queue with DLQ:

from aws_cdk import App, Environment, Duration
from core_aws_cdk.stacks.sqs import BaseSqsStack

app = App()
stack = BaseSqsStack(
    app,
    "SQSStack",
    env=Environment(account="123456789", region="us-east-1")
)

# Create FIFO queue with DLQ
queue = stack.create_sqs_queue(
    queue_id="OrderQueue",
    queue_name="order-queue.fifo",
    with_dlq=True,
    dlq_id="OrderQueueDLQ",
    dlq_name="order-queue-dlq.fifo",
    max_receive_count=3,
    visibility_timeout=Duration.minutes(5),
    fifo=True,
    content_based_deduplication=True
)

app.synth()

Multi-Stack Application#

Create a multi-stack application with dependencies:

from aws_cdk import App, Environment
from aws_cdk.aws_lambda import Code, Runtime
from core_aws_cdk.stacks.lambdas import BaseLambdaStack
from core_aws_cdk.stacks.s3 import BaseS3Stack
from core_aws_cdk.stacks.sqs import BaseSqsStack

app = App()
env = Environment(account="123456789", region="us-east-1")

# Create storage stack
storage_stack = BaseS3Stack(app, "StorageStack", env=env)
bucket = storage_stack.create_bucket(
    bucket_id="DataBucket",
    bucket_name=None
)

# Create queue stack
queue_stack = BaseSqsStack(app, "QueueStack", env=env)
queue = queue_stack.create_sqs_queue(
    queue_id="ProcessQueue",
    queue_name="process-queue",
    with_dlq=True,
    dlq_id="ProcessQueueDLQ"
)

# Create Lambda stack that depends on both
lambda_stack = BaseLambdaStack(app, "LambdaStack", env=env)
lambda_stack.add_dependency(storage_stack)
lambda_stack.add_dependency(queue_stack)

lambda_function = lambda_stack.create_lambda(
    function_id="Processor",
    handler="handler.lambda_handler",
    code=Code.from_asset("./lambda"),
    runtime=Runtime.PYTHON_3_12,
    environment={
        "BUCKET_NAME": bucket.bucket_name,
        "QUEUE_URL": queue.queue_url
    }
)

# Grant permissions
bucket.grant_read_write(lambda_function)
queue.grant_consume_messages(lambda_function)

app.synth()

Tagged Resources#

Apply tags to all resources in a stack:

from aws_cdk import App, Environment
from core_aws_cdk.stacks.lambdas import BaseLambdaStack

app = App()
stack = BaseLambdaStack(
    app,
    "TaggedStack",
    env=Environment(account="123456789", region="us-east-1"),
    tags={
        "Environment": "Production",
        "Project": "MyProject",
        "Team": "Platform",
        "CostCenter": "Engineering"
    }
)

# All resources created in this stack will inherit these tags
# ...

app.synth()

Programmatic Stack Deployment#

Deploy and manage CDK stacks programmatically using the Deployer utility:

from aws_cdk import App, Environment
from core_aws_cdk.utils.deployment import Deployer
from core_aws_cdk.stacks.ecs.cluster.base import ECSClusterStack
from core_aws_cdk.stacks.network.base import NetworkStack

# Use context manager for automatic cleanup
with Deployer(folder_name="production_deployment", region="us-east-1") as deployer:
    # Create app with deployer's output directory
    app = App(outdir=str(deployer.cdk_out_dir))
    env = Environment(account="123456789012", region="us-east-1")

    # Create network stack
    network_stack = NetworkStack(
        app,
        "ProductionNetwork",
        env=env,
        tags={"Environment": "Production"}
    )

    vpc = network_stack.create_vpc(
        vpc_id="MainVPC",
        cidr="10.0.0.0/16",
        max_azs=3,
        nat_gateways=2
    )

    # Create ECS cluster stack
    cluster_stack = ECSClusterStack(
        app,
        "ProductionCluster",
        cluster_name="prod-cluster",
        vpc=vpc,
        enable_fargate=True,
        enable_container_insights=True,
        fargate_capacity_providers=["FARGATE", "FARGATE_SPOT"],
        env=env,
        tags={"Environment": "Production"}
    )

    # Synthesize stacks
    deployer.synthesize(app)

    # Deploy network stack first
    print("Deploying network stack...")
    network_result = deployer.deploy_stack(
        "ProductionNetwork",
        timeout_seconds=600
    )

    if not network_result.success:
        raise RuntimeError(f"Network deployment failed: {network_result.error}")

    print(f"Network deployed in {network_result.duration_seconds:.2f}s")

    # Deploy cluster stack
    print("Deploying ECS cluster...")
    cluster_result = deployer.deploy_stack(
        "ProductionCluster",
        timeout_seconds=1200  # ECS clusters take longer
    )

    if not cluster_result.success:
        raise RuntimeError(f"Cluster deployment failed: {cluster_result.error}")

    print(f"Cluster deployed in {cluster_result.duration_seconds:.2f}s")
    print(f"Stack outputs: {cluster_result.outputs}")

    # Get final status
    status = deployer.get_stack_status("ProductionCluster")
    print(f"Stack has {status.resources_count} resources")

# Automatic cleanup happens here when exiting context manager

ECS Cluster with Fargate#

Create a production-ready ECS cluster with Fargate capacity providers:

from aws_cdk import App, Environment
from core_aws_cdk.stacks.ecs.cluster.base import ECSClusterStack

app = App()

cluster_stack = ECSClusterStack(
    app,
    "FargateCluster",
    cluster_name="production-fargate-cluster",
    enable_container_insights=True,
    enable_fargate=True,
    fargate_capacity_providers=["FARGATE", "FARGATE_SPOT"],
    execute_command_logging=True,
    env=Environment(account="123456789012", region="us-east-1"),
    tags={
        "Environment": "Production",
        "Team": "Platform"
    }
)

app.synth()

ECS Cluster with EC2 Capacity Provider#

Create an ECS cluster with EC2 Auto Scaling Group:

from aws_cdk import App, Environment
from core_aws_cdk.stacks.ecs.cluster.base import (
    ECSClusterStack,
    EC2CapacityProviderConfig,
)
from core_aws_cdk.stacks.network.base import NetworkStack

app = App()
env = Environment(account="123456789012", region="us-east-1")

# Create VPC first
network_stack = NetworkStack(app, "NetworkStack", env=env)
vpc = network_stack.create_vpc(
    vpc_id="ClusterVPC",
    cidr="10.0.0.0/16",
    max_azs=3
)

# Create ECS cluster with EC2 capacity
cluster_stack = ECSClusterStack(
    app,
    "EC2Cluster",
    cluster_name="production-ec2-cluster",
    vpc=vpc,
    enable_container_insights=True,
    enable_fargate=True,
    ec2_config=EC2CapacityProviderConfig(
        enabled=True,
        instance_type="t3.medium",
        min_size=2,
        max_size=10,
        desired_capacity=3,
        spot=True,
        spot_instance_types=["t3.medium", "t3a.medium"],
        spot_allocation_strategy="price-capacity-optimized",
        ebs_volume_type="gp3",
        ebs_volume_size=100,
        ebs_iops=3000,
        ebs_throughput=125,
    ),
    env=env,
    tags={
        "Environment": "Production",
        "CostCenter": "Engineering"
    }
)

app.synth()