Example of using Amazon EventBridge Pipes with code example written in Python and the AWS CDK
Preface
✔️ We discuss Amazon EventBridge Pipes and what they are.
✔️ We walk through an example in Python and the AWS CDK.
Introduction 👋🏽
In this article, we are going to cover Amazon EventBridge Pipes by building a fictitious dental app, which will be built using the AWS CDK and Python.
Our fictitious company ‘LJ Dentists’ that we are using for this article
The ‘LJ Dentists’ app lets people book an appointment with their closest dentist.
A person booking a dental appointment with their closet dentist for their son
If they have been before we check their preferred contact method before sending an email, SMS or no correspondence; all using Amazon EventBridge Pipes.
The son receives an SMS as he has been to this dentist before and has preferred contact details
You can find the full code example for the article here:
What are we building? 🛠️
We are going to allow patients to book appointments online using the following architecture:
We can see that patients:
- Book appointments online using our Amazon API Gateway REST API.
- A Lambda function writes the appointment details to a DynamoDB table.
- We use DynamoDB streams to listen to changes to the table, and if a record is created (filter only on record creation), we use Amazon EventBridge Pipes to listen to the stream as a source.
- We hydrate the appointment information on the stream record by invoking a Lambda function which checks to see if a matching record exists in the contact preferences table; adding the preference to the stream record if it does.
- Finally, Amazon EventBridge Pipes has a target set as SQS where we store our appointments ready for processing.
👇 Before we go any further — please connect with me on LinkedIn for future blog posts and Serverless news https://www.linkedin.com/in/lee-james-gilmore/
What are Amazon EventBridge Pipes?
Amazon EventBridge Pipes helps you create point-to-point integrations between event producers and consumers with optional transform, filter and enrich steps. EventBridge Pipes reduces the amount of integration code you need to write and maintain when building event-driven applications. This is shown in the diagram below for our example:
We can see from the above diagram that we have:
- DynamoDB streams as the source.
- We add some additional filtering to ensure we only receive newly inserted records (i.e. not deletions or updates).
- We enrich the appointment data from the stream with additional information using an enrichment Lambda function which reads contact information from a separate database.
- We finally have a target set as an SQS queue so we can further process the appointment records.
It is worth noting that there are many other different services and configurations we can use which are shown below:
https://aws.amazon.com/eventbridge/pipes/
Talking through key code 👨💻
Now let’s talk through some of the key code.
We start by splitting up our CDK Application into Stateful
and Stateless
resources (stacks) as we can see below:
<span id="856c" data-selectable-paragraph=""><br><span>import</span> os<br><br><span>import</span> aws_cdk <span>as</span> cdk<br><span>from</span> stateful.stateful <span>import</span> DentistsStatefulStack<br><span>from</span> stateless.stateless <span>import</span> DentistsStatelessStack<br><br>app = cdk.App()<br><br><br>DentistsStatefulStack(app, <span>"DentistsStatefulStack"</span>)<br>DentistsStatelessStack(app, <span>"DentistsStatelessStack"</span>)<br><br>app.synth()</span>
If we first look at our Stateful
stack, we will see that we set up our two DynamoDB tables; one for the appointments, and one for the preferred contact details:
Note: On our main appointments table we add DynamoDB streams so we can listen to changes i.e. change data capture.
<span id="24dc" data-selectable-paragraph=""><span>from</span> aws_cdk <span>import</span> CfnOutput, RemovalPolicy, Stack<br><span>from</span> aws_cdk <span>import</span> aws_dynamodb <span>as</span> dynamodb<br><span>from</span> constructs <span>import</span> Construct<br><br><br><span>class</span> <span>DentistsStatefulStack</span>(<span>Stack</span>):<br><br> <span>def</span> <span>__init__</span>(<span>self, scope: Construct, construct_id: <span>str</span>, **kwargs</span>) -> <span>None</span>:<br> <span>super</span>().__init__(scope, construct_id, **kwargs)<br><br> <br> table = dynamodb.Table(<br> self, <span>'DentistTable'</span>,<br> billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,<br> table_name=<span>'DentistTable'</span>,<br> stream=dynamodb.StreamViewType.NEW_IMAGE,<br> removal_policy=RemovalPolicy.DESTROY,<br> partition_key=dynamodb.Attribute(<br> name=<span>'id'</span>,<br> <span>type</span>=dynamodb.AttributeType.STRING<br> )<br> )<br> <br> <br> contact_table = dynamodb.Table(<br> self, <span>'DentistContactsTable'</span>,<br> billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,<br> table_name=<span>'DentistContactsTable'</span>,<br> removal_policy=RemovalPolicy.DESTROY,<br> partition_key=dynamodb.Attribute(<br> name=<span>'id'</span>,<br> <span>type</span>=dynamodb.AttributeType.STRING<br> )<br> )<br> <br> <br> CfnOutput(<br> self, <span>'DentistDynamoDBTableName'</span>,<br> value=table.table_name,<br> description=<span>'Name of the DynamoDB table'</span>,<br> export_name=<span>'DentistDynamoDBTableName'</span><br> )<br> <br> <br> CfnOutput(<br> self, <span>'DentistContactDynamoDBTableName'</span>,<br> value=contact_table.table_name,<br> description=<span>'Name of the Contact Preferences DynamoDB table'</span>,<br> export_name=<span>'DentistContactDynamoDBTableName'</span><br> )<br> <br> <br> CfnOutput(<br> self, <span>'DentistDynamoDBTableStreamArn'</span>,<br> value=table.table_stream_arn,<br> description=<span>'DynamoDB table stream ARN'</span>,<br> export_name=<span>'DentistDynamoDBTableStreamArn'</span><br> )</span>
Now that we have our Stateful
resources set up, in our Stateless
stack, we first create our Lambda function for creating an appointment:
<span id="483d" data-selectable-paragraph=""><br>create_appointment_lambda = aws_lambda.Function(<br> self, <span>'CreateAppointment'</span>,<br> runtime=aws_lambda.Runtime.PYTHON_3_12,<br> handler=<span>'create_appointment.handler'</span>,<br> code=aws_lambda.Code.from_asset(os.path.join(DIRNAME, <span>'src'</span>)),<br> function_name=<span>'CreateAppointment'</span>,<br> environment={<br> <span>'dynamodb_table'</span>: dynamodb_table_name,<br> },<br>)</span>
We also add a second Lambda function which will be used in our pipe to enrich the data it gets from DynamoDB streams:
<span id="7068" data-selectable-paragraph=""><br>get_contact_details = aws_lambda.<span>Function</span>(<br> <span>self</span>, <span>'GetContactDetails'</span>,<br> runtime=aws_lambda.Runtime.PYTHON_3_12,<br> handler=<span>'get_contact_details.handler'</span>,<br> code=aws_lambda.Code.<span>from_asset</span>(os.path.<span>join</span>(DIRNAME, <span>'src'</span>)),<br> function_name=<span>'GetContactDetails'</span>,<br> environment={<br> <span>'contacts_dynamodb_table'</span>: contacts_dynamodb_table_name,<br> },<br>)</span>
We next add an Amazon SQS queue which will be our target for our pipe:
<span id="c4bb" data-selectable-paragraph=""><br>sqs_queue = sqs.Queue(<br> self, <span>'AppointmentsQueue'</span>,<br> queue_name=<span>'AppointmentsQueue'</span>,<br> removal_policy=RemovalPolicy.DESTROY<br>)</span>
We then create the IAM policy for the Amazon EventBridge pipe itself which allows it to connect to the other resources for source, enrichment and target:
<span id="c0bb" data-selectable-paragraph=""><br>pipe_source_policy = iam.PolicyStatement(<br> actions=[<br> <span>'dynamodb:DescribeStream'</span>,<br> <span>'dynamodb:GetRecords'</span>,<br> <span>'dynamodb:GetShardIterator'</span>,<br> <span>'dynamodb:ListStreams'</span><br> ],<br> resources=[stream_arn],<br> effect=iam.Effect.ALLOW<br>)<br><br><br>pipe_target_policy = iam.PolicyStatement(<br> actions=[<span>'sqs:SendMessage'</span>],<br> resources=[sqs_queue.queue_arn],<br> effect=iam.Effect.ALLOW<br>)<br><br><br>pipe_enrichment_policy = iam.PolicyStatement(<br> actions=[<span>'lambda:InvokeFunction'</span>],<br> resources=[get_contact_details.function_arn],<br> effect=iam.Effect.ALLOW<br>)<br><br><br>pipe_role = iam.Role(self, <span>'PipeRole'</span>,<br> assumed_by=iam.ServicePrincipal(<span>'pipes.amazonaws.com'</span>),<br>)<br><br><br>pipe_role.add_to_policy(pipe_source_policy)<br>pipe_role.add_to_policy(pipe_target_policy)<br>pipe_role.add_to_policy(pipe_enrichment_policy)</span>
We can now create our pipe using the following code:
<span id="e02f" data-selectable-paragraph=""><br>pipe = pipes.CfnPipe(self, <span>'Pipe'</span>,<br> role_arn=pipe_role.role_arn,<br> source=stream_arn,<br> log_configuration=pipes.CfnPipe.PipeLogConfigurationProperty(<br> cloudwatch_logs_log_destination=pipes.CfnPipe.CloudwatchLogsLogDestinationProperty(<br> log_group_arn=log_group.log_group_arn<br> ),<br> level=<span>'INFO'</span>,<br> ),<br> name=<span>'DentistPipe'</span>,<br> source_parameters=pipes.CfnPipe.PipeSourceParametersProperty(<br> dynamo_db_stream_parameters=pipes.CfnPipe.PipeSourceDynamoDBStreamParametersProperty(<br> starting_position=<span>'LATEST'</span>,<br> ),<br> filter_criteria=pipes.CfnPipe.FilterCriteriaProperty(<br> filters=[pipes.CfnPipe.FilterProperty(<br> pattern=json.dumps({<span>'eventName'</span>: [ { <span>'prefix'</span>: <span>'INSERT'</span> } ]})<br> )]<br> ),<br> ),<br> enrichment=get_contact_details.function_arn,<br> target=sqs_queue.queue_arn,<br>)<br>pipe.apply_removal_policy(RemovalPolicy.DESTROY)</span>
Note: We also add the code for our Amazon API Gateway and Lambda function integration which you can see in the repo.
At this point, we have all of our infrastructure set up, but let’s have a quick look at the Lambda function code, starting with the ‘create_appointment.py
’ file:
<span id="ed0f" data-selectable-paragraph=""><span>import</span> json<br><span>import</span> os<br><span>import</span> uuid<br><span>from</span> http <span>import</span> HTTPStatus<br><br><span>import</span> boto3<br><span>from</span> boto3.dynamodb.types <span>import</span> TypeSerializer<br><br>dynamodb_table = os.getenv(<span>'dynamodb_table'</span>)<br>dynamodb_client = boto3.client(<span>'dynamodb'</span>)<br>serializer = TypeSerializer()<br><br><span>def</span> <span>handler</span>(<span>event, context</span>):<br> <span>try</span>:<br> <br> request_data = json.loads(event[<span>'body'</span>])<br> <br> <br> request_data[<span>'id'</span>] = <span>str</span>(uuid.uuid4())<br> <br> <br> appointment_data = {k: serializer.serialize(v) <span>for</span> k,v <span>in</span> request_data.items()}<br> <br> <br> dynamodb_client.put_item(TableName=dynamodb_table, Item=appointment_data)<br><br> body = {<br> <span>'message'</span>: request_data,<br> <span>'statusCode'</span>: HTTPStatus.CREATED,<br> }<br><br> <br> response = {<br> <span>'statusCode'</span>: HTTPStatus.CREATED,<br> <span>'body'</span>: json.dumps(body, indent=<span>2</span>),<br> <span>'headers'</span>: {<br> <span>'content-type'</span>: <span>'application/json'</span>,<br> },<br> }<br><br> <span>except</span> Exception <span>as</span> e:<br> response = {<br> <span>'statusCode'</span>: HTTPStatus.INTERNAL_SERVER_ERROR.value,<br> <span>'body'</span>: <span>f'Exception=<span>{e}</span>'</span>,<br> }<br><br> <span>return</span> response</span>
We can see from the code above that we create a basic function which takes the appointment from the API Gateway event, adds a new unique ID (uuid), and writes the record to the appointments table.
When records are added to the table the stream will be invoked with any record changes, and our pipe will pick up the changes as a source.
Next, let’s look at the enrichment Lambda function which is invoked as part of the pipe:
<span id="1936" data-selectable-paragraph=""><span>import</span> os<br><br><span>import</span> boto3<br><span>from</span> boto3.dynamodb.types <span>import</span> TypeDeserializer<br><br>dynamodb_table = os.getenv(<span>'contacts_dynamodb_table'</span>)<br>dynamodb_client = boto3.client(<span>'dynamodb'</span>)<br>deserializer = TypeDeserializer()<br><br><span>def</span> <span>handler</span>(<span>event, context</span>):<br> <span>try</span>:<br> <br> new_image_data = event[<span>0</span>][<span>'dynamodb'</span>][<span>'NewImage'</span>]<br> <br> response = {k: deserializer.deserialize(v) <span>for</span> k,v <span>in</span> new_image_data.items()}<br> <br> <br> email_address = response[<span>'appointment'</span>][<span>'patient'</span>][<span>'email'</span>]<br> <span>print</span>(<span>'Email: '</span>, email_address)<br><br> <br> account = dynamodb_client.get_item(<br> TableName=dynamodb_table,<br> Key={<span>'id'</span>: {<span>'S'</span>: email_address}}<br> )<br> <br> <span>if</span> <span>'Item'</span> <span>in</span> account:<br> item = account[<span>'Item'</span>]<br> <br> <br> account_data = {k: deserializer.deserialize(v) <span>for</span> k,v <span>in</span> item.items()}<br> <span>print</span>(<span>'Contact information found: '</span>, account_data)<br> <br> <br> response[<span>'preferredMethod'</span>] = account_data[<span>'preferredMethod'</span>]<br> <span>else</span>:<br> <br> response[<span>'preferredMethod'</span>] = <span>'none'</span><br> <span>print</span>(<span>'Contact information not found.'</span>)<br><br> <span>except</span> Exception <span>as</span> e:<br> <span>print</span>(e)<br> response = {<br> <span>'error'</span>: <span>f'Exception=<span>{e}</span>'</span>,<br> <span>'body'</span>: response,<br> }<br><br> <span>print</span>(<span>'response: '</span>, response)<br> <span>return</span> response</span>
We can see from the code above that we take the email_address
property from the stream record, and do a lookup on our preferred contact DynamoDB table. If a record exists, we will take the preferred method and add it to the appointment record. If a matching record doesn’t exist, we add a preferredMethod
of ‘none
’.
If you want to test this end to end you just need to deploy the application and add the following item into our Contacts DynamoDb table if you want to add a preferredMethod
:
Note: There is a Postman file in the repo which is all set up to allow you to test the functionality through the REST API.
<span id="75b0" data-selectable-paragraph=""><span>{</span><br> <span>"id"</span><span>:</span> <span>"john.doe@example.com"</span><span>,</span><br> <span>"preferredMethod"</span><span>:</span> <span>"email"</span><br><span>}</span></span>
If a record doesn’t exist it will default to ‘none
’ as discussed above.
Conclusion
I hope you found that useful as a practical example of using Amazon EventBridge pipes in your Serverless solutions. Pipes allow us to remove the Lambda glue code we had to use in the past and gives us a fantastic way of enriching data from a source before pushing that data to a target.
Wrapping up 👋🏽
I hope you enjoyed this article, and if you did then please feel free to share and feedback!
prompt