Examples#
This section provides practical examples of common AWS architectures using core-aws-cdk.
Complete SNS → SQS → Lambda → S3 Integration#
This example demonstrates a complete event-driven pipeline:
from aws_cdk import App, Environment, Duration, CfnOutput
from aws_cdk.aws_lambda import Code, Runtime
from aws_cdk.aws_lambda_event_sources import SqsEventSource
from aws_cdk.aws_sns_subscriptions import SqsSubscription
from core_aws_cdk.stacks.lambdas import BaseLambdaStack
from core_aws_cdk.stacks.s3 import BaseS3Stack
from core_aws_cdk.stacks.sns import BaseSnsStack
from core_aws_cdk.stacks.sqs import BaseSqsStack
class IntegratedStack(BaseSnsStack, BaseSqsStack,
BaseLambdaStack, BaseS3Stack):
pass
app = App()
stack = IntegratedStack(
app,
"IntegratedStack",
env=Environment(account="123456789", region="us-east-1")
)
# Create S3 bucket
bucket = stack.create_bucket(
bucket_id="DataBucket",
bucket_name=None # Auto-generate
)
# Create SQS queue with DLQ
queue = stack.create_sqs_queue(
queue_id="ProcessQueue",
queue_name="process-queue",
with_dlq=True,
dlq_id="ProcessQueueDLQ",
max_receive_count=3
)
# Create SNS topic
topic = stack.create_sns_topic(
topic_id="EventTopic",
topic_name="event-topic"
)
# Subscribe queue to topic
topic.add_subscription(SqsSubscription(queue))
# Create Lambda processor
lambda_function = stack.create_lambda(
function_id="Processor",
handler="handler.lambda_handler",
code=Code.from_asset("./lambda"),
runtime=Runtime.PYTHON_3_12,
timeout=Duration.minutes(5),
environment={"BUCKET_NAME": bucket.bucket_name}
)
# Configure SQS as Lambda trigger
lambda_function.add_event_source(SqsEventSource(queue))
# Grant permissions
bucket.grant_write(lambda_function)
# Export outputs
CfnOutput(stack, "TopicArn", value=topic.topic_arn)
CfnOutput(stack, "BucketName", value=bucket.bucket_name)
app.synth()
Lambda with Custom Dependencies#
Create a Lambda function with custom Python dependencies:
from aws_cdk import App, Environment
from aws_cdk.aws_lambda import Runtime
from core_aws_cdk.stacks.lambdas import BaseLambdaStack, ZipAssetCode
import pathlib
app = App()
stack = BaseLambdaStack(
app,
"CustomLambdaStack",
env=Environment(account="123456789", region="us-east-1")
)
# Create Lambda with custom packaging
code = ZipAssetCode(
project_directory=pathlib.Path("/path/to/project"),
work_dir=pathlib.Path("/path/to/lambda"),
includes=["handler.py", "__init__.py", "config.json"],
include_project_folders=["commons", "utils"],
debug=True
)
lambda_function = stack.create_lambda(
function_id="CustomFunction",
handler="handler.lambda_handler",
code=code,
runtime=Runtime.PYTHON_3_12,
function_name="custom-processor",
environment={
"ENV": "production",
"LOG_LEVEL": "INFO"
}
)
app.synth()
VPC with Private Subnets#
Create a VPC with custom subnet configuration:
from aws_cdk import App, Environment
from aws_cdk.aws_ec2 import SubnetConfiguration, SubnetType
from core_aws_cdk.stacks.network import NetworkStack
app = App()
stack = NetworkStack(
app,
"NetworkStack",
env=Environment(account="123456789", region="us-east-1")
)
vpc = stack.create_vpc(
vpc_id="MainVPC",
cidr="10.0.0.0/16",
max_azs=3,
subnet_configuration=[
SubnetConfiguration(
name="Public",
subnet_type=SubnetType.PUBLIC,
cidr_mask=24
),
SubnetConfiguration(
name="Private",
subnet_type=SubnetType.PRIVATE_WITH_EGRESS,
cidr_mask=24
),
SubnetConfiguration(
name="Isolated",
subnet_type=SubnetType.PRIVATE_ISOLATED,
cidr_mask=28
)
]
)
app.synth()
S3 Bucket with Versioning and Lifecycle#
Create an S3 bucket with advanced configuration:
from aws_cdk import App, Environment, Duration, RemovalPolicy
from aws_cdk.aws_s3 import LifecycleRule, StorageClass
from core_aws_cdk.stacks.s3 import BaseS3Stack
app = App()
stack = BaseS3Stack(
app,
"S3Stack",
env=Environment(account="123456789", region="us-east-1")
)
bucket = stack.create_bucket(
bucket_id="ArchiveBucket",
bucket_name="my-archive-bucket",
versioned=True,
removal_policy=RemovalPolicy.RETAIN,
lifecycle_rules=[
LifecycleRule(
id="TransitionToIA",
transitions=[{
"storage_class": StorageClass.INFREQUENT_ACCESS,
"transition_after": Duration.days(30)
}]
),
LifecycleRule(
id="TransitionToGlacier",
transitions=[{
"storage_class": StorageClass.GLACIER,
"transition_after": Duration.days(90)
}]
)
]
)
app.synth()
FIFO Queue with Dead Letter Queue#
Create a FIFO SQS queue with DLQ:
from aws_cdk import App, Environment, Duration
from core_aws_cdk.stacks.sqs import BaseSqsStack
app = App()
stack = BaseSqsStack(
app,
"SQSStack",
env=Environment(account="123456789", region="us-east-1")
)
# Create FIFO queue with DLQ
queue = stack.create_sqs_queue(
queue_id="OrderQueue",
queue_name="order-queue.fifo",
with_dlq=True,
dlq_id="OrderQueueDLQ",
dlq_name="order-queue-dlq.fifo",
max_receive_count=3,
visibility_timeout=Duration.minutes(5),
fifo=True,
content_based_deduplication=True
)
app.synth()
Multi-Stack Application#
Create a multi-stack application with dependencies:
from aws_cdk import App, Environment
from aws_cdk.aws_lambda import Code, Runtime
from core_aws_cdk.stacks.lambdas import BaseLambdaStack
from core_aws_cdk.stacks.s3 import BaseS3Stack
from core_aws_cdk.stacks.sqs import BaseSqsStack
app = App()
env = Environment(account="123456789", region="us-east-1")
# Create storage stack
storage_stack = BaseS3Stack(app, "StorageStack", env=env)
bucket = storage_stack.create_bucket(
bucket_id="DataBucket",
bucket_name=None
)
# Create queue stack
queue_stack = BaseSqsStack(app, "QueueStack", env=env)
queue = queue_stack.create_sqs_queue(
queue_id="ProcessQueue",
queue_name="process-queue",
with_dlq=True,
dlq_id="ProcessQueueDLQ"
)
# Create Lambda stack that depends on both
lambda_stack = BaseLambdaStack(app, "LambdaStack", env=env)
lambda_stack.add_dependency(storage_stack)
lambda_stack.add_dependency(queue_stack)
lambda_function = lambda_stack.create_lambda(
function_id="Processor",
handler="handler.lambda_handler",
code=Code.from_asset("./lambda"),
runtime=Runtime.PYTHON_3_12,
environment={
"BUCKET_NAME": bucket.bucket_name,
"QUEUE_URL": queue.queue_url
}
)
# Grant permissions
bucket.grant_read_write(lambda_function)
queue.grant_consume_messages(lambda_function)
app.synth()
Tagged Resources#
Apply tags to all resources in a stack:
from aws_cdk import App, Environment
from core_aws_cdk.stacks.lambdas import BaseLambdaStack
app = App()
stack = BaseLambdaStack(
app,
"TaggedStack",
env=Environment(account="123456789", region="us-east-1"),
tags={
"Environment": "Production",
"Project": "MyProject",
"Team": "Platform",
"CostCenter": "Engineering"
}
)
# All resources created in this stack will inherit these tags
# ...
app.synth()
Programmatic Stack Deployment#
Deploy and manage CDK stacks programmatically using the Deployer utility:
from aws_cdk import App, Environment
from core_aws_cdk.utils.deployment import Deployer
from core_aws_cdk.stacks.ecs.cluster.base import ECSClusterStack
from core_aws_cdk.stacks.network.base import NetworkStack
# Use context manager for automatic cleanup
with Deployer(folder_name="production_deployment", region="us-east-1") as deployer:
# Create app with deployer's output directory
app = App(outdir=str(deployer.cdk_out_dir))
env = Environment(account="123456789012", region="us-east-1")
# Create network stack
network_stack = NetworkStack(
app,
"ProductionNetwork",
env=env,
tags={"Environment": "Production"}
)
vpc = network_stack.create_vpc(
vpc_id="MainVPC",
cidr="10.0.0.0/16",
max_azs=3,
nat_gateways=2
)
# Create ECS cluster stack
cluster_stack = ECSClusterStack(
app,
"ProductionCluster",
cluster_name="prod-cluster",
vpc=vpc,
enable_fargate=True,
enable_container_insights=True,
fargate_capacity_providers=["FARGATE", "FARGATE_SPOT"],
env=env,
tags={"Environment": "Production"}
)
# Synthesize stacks
deployer.synthesize(app)
# Deploy network stack first
print("Deploying network stack...")
network_result = deployer.deploy_stack(
"ProductionNetwork",
timeout_seconds=600
)
if not network_result.success:
raise RuntimeError(f"Network deployment failed: {network_result.error}")
print(f"Network deployed in {network_result.duration_seconds:.2f}s")
# Deploy cluster stack
print("Deploying ECS cluster...")
cluster_result = deployer.deploy_stack(
"ProductionCluster",
timeout_seconds=1200 # ECS clusters take longer
)
if not cluster_result.success:
raise RuntimeError(f"Cluster deployment failed: {cluster_result.error}")
print(f"Cluster deployed in {cluster_result.duration_seconds:.2f}s")
print(f"Stack outputs: {cluster_result.outputs}")
# Get final status
status = deployer.get_stack_status("ProductionCluster")
print(f"Stack has {status.resources_count} resources")
# Automatic cleanup happens here when exiting context manager
ECS Cluster with Fargate#
Create a production-ready ECS cluster with Fargate capacity providers:
from aws_cdk import App, Environment
from core_aws_cdk.stacks.ecs.cluster.base import ECSClusterStack
app = App()
cluster_stack = ECSClusterStack(
app,
"FargateCluster",
cluster_name="production-fargate-cluster",
enable_container_insights=True,
enable_fargate=True,
fargate_capacity_providers=["FARGATE", "FARGATE_SPOT"],
execute_command_logging=True,
env=Environment(account="123456789012", region="us-east-1"),
tags={
"Environment": "Production",
"Team": "Platform"
}
)
app.synth()
ECS Cluster with EC2 Capacity Provider#
Create an ECS cluster with EC2 Auto Scaling Group:
from aws_cdk import App, Environment
from core_aws_cdk.stacks.ecs.cluster.base import (
ECSClusterStack,
EC2CapacityProviderConfig,
)
from core_aws_cdk.stacks.network.base import NetworkStack
app = App()
env = Environment(account="123456789012", region="us-east-1")
# Create VPC first
network_stack = NetworkStack(app, "NetworkStack", env=env)
vpc = network_stack.create_vpc(
vpc_id="ClusterVPC",
cidr="10.0.0.0/16",
max_azs=3
)
# Create ECS cluster with EC2 capacity
cluster_stack = ECSClusterStack(
app,
"EC2Cluster",
cluster_name="production-ec2-cluster",
vpc=vpc,
enable_container_insights=True,
enable_fargate=True,
ec2_config=EC2CapacityProviderConfig(
enabled=True,
instance_type="t3.medium",
min_size=2,
max_size=10,
desired_capacity=3,
spot=True,
spot_instance_types=["t3.medium", "t3a.medium"],
spot_allocation_strategy="price-capacity-optimized",
ebs_volume_type="gp3",
ebs_volume_size=100,
ebs_iops=3000,
ebs_throughput=125,
),
env=env,
tags={
"Environment": "Production",
"CostCenter": "Engineering"
}
)
app.synth()