Azure CosmosDB Change Feed

About

This article is about the unsung feature of the Azure CosmosDB - Change Feed. A quick integration with Azure Functions to read the Change Feed from Cosmos DB and to move the documents to a different storage.

Prerequisites

Basic understanding of Azure CosmosDB

What is a Change Feed

Change feed in Azure CosmosDB is a persistent record of changes to a container in the order they occur

As per Microsoft documentation, a change feed is described as, .

With change feed support, Azure Cosmos DB provides a sorted list of documents within an Azure Cosmos DB collection in the order in which they were modified. This feed can be used to listen for modifications to data within the collection and perform actions.

Some important points to understand here while architecting a solution with the change feed.

  • Changes are ordered per logical partition - The order of events is as per the modification time within the partition

  • Change Feed does not indicate the DB operations like Insert/Update. The recommendation is to add a soft field to indicate the type of operation in the document and to use it in the logic while consuming the feed

  • It does not support delete operation currently. That is change feed does not contain the document when it is deleted from the container. The recommendation is to again add a soft marker in the document that the item is deleted. Watch the item in the feed and set up a TTL on the item in the container to delete it later.

Use-Cases

Change Feeds often is used to react to the data changes. It gives a native medium to react to changes in the documents and build highly sophisticated solutions not limited to,

  • Aggregate and build a materialistic view of events across containers

  • Send transactional data to analyze and build real-time analytics

  • Move data to a different storage medium for retention

changefeedoverview.png

Example

Let us pick a simple use case to read the document from the change feed on item inserts, and move the document to another storage like Azure Blob Storage

CosmodDBChangeFeed.jpg

Setup

  • Azure CosmosDB Account - SQL API is used in this sample, however Change Feed is supported in CosmosDB MongoAPI, CosmosDB Cassandra API & Gremlin API too.

Have created a sample CosmosDB Account and the container/db as below,

SampleCosmosDBContainer.jpg

  • Azure Storage Account - Create your sample account , quickest one is through the portal as described in here

I have created a sample account and a container as below,

SampleContainerImage.jpg

  • Function App

Source Code here

Let's write a simple Azure function to receive the change feed and upload to the storage account.

    public static class ConsumeChangeFeed
    {
        [FunctionName("ConsumeChangeFeed")]
        public static void Run([CosmosDBTrigger(
            databaseName: "items-db",
            collectionName: "item-price-container",
            ConnectionStringSetting = "cosmos-connection-string",
            LeaseCollectionName = "leases", 
            CreateLeaseCollectionIfNotExists = true, 
            StartFromBeginning = true)]IReadOnlyList<Document> input, ILogger log)
        {
            if (input != null && input.Count > 0)
            {
                log.LogInformation("Documents modified " + input.Count);                
                UploadDataAsync(input);
            }
        }

        /// <summary>
        /// Upload the documents as received into blob storage
        /// </summary>
        /// <param name="inputDocuments"></param>
        private static async void UploadDataAsync(IReadOnlyList<Document> inputDocuments)
        {
            BlobContainerClient container = new BlobContainerClient
                (Environment.GetEnvironmentVariable("CloudStorageAccount"), "itemscontainer");
            await container.CreateIfNotExistsAsync();
            using (var memoryStream = new MemoryStream())
            {
                foreach (var document in inputDocuments)
                {
                    LoadStreamWithJson(memoryStream, document);
                    await container.UploadBlobAsync(document.Id, memoryStream);
                }               
            }           
        }

        /// <summary>
        /// MemoryStream
        /// </summary>
        /// <param name="ms"></param>
        /// <param name="obj"></param>
        private static void LoadStreamWithJson(Stream ms, object obj)
        {
            StreamWriter writer = new StreamWriter(ms);
            writer.Write(obj);
            writer.Flush();
            ms.Position = 0;
        }
    }

Output

Once the setup is done, any updates/inserts made to the document in the cosmos DB will be tracked and the function gets triggered to move the document to the storage account.

CosmosDB internally maintains the checkpoint in the leases collection - you can think of this as an offset/ledger that manages the changes passed to the change feed. This ensures the right changes are emitted. Even if the Azure function restarts it picks only from the checkpoint that was last processed (although there are filters that can play the changes from the beginning)

Few sample items are added in the CosmosDB, I used the portal to add the items.

SampleDocmentsAdded.jpg

Azure Function got triggered, watched the feed, and uploaded the documents to a storage account as seen below,

SampleDocumentsArchived.jpg