(2025-June-30) Working with different abbreviations or acronyms can speed up communication, but at the same time, it can exclude those who are not familiar with them. Sometimes this exclusion happens without intent, but the worst case is when a speaker knowingly acts as a gatekeeper, using specialized professional abbreviations as a display of personal pride or achievement, often mixed with a hint of arrogance or disregard for others’ understanding.

I think there is real value in providing a brief explanation when shortened terms are unintentionally omitted; it’s a small gesture that can go a long way in fostering clarity and inclusion. As for the latter case, when jargon is used to exclude or impress, just don’t be that way! 😊

Image by Robert Owen-Wahl from Pixabay

There is a scene in the Shrek 2 animated movie where Donkey can’t help but annoy everyone with his endless question: “Are we there yet?” Shrek tries to keep his cool, but after hearing the same question over and over again, there is nothing left of his patience but a scream.

Imagine you’re working on a data integration project that aims to collect and store any new data changes from your Salesforce platform or any other source system. You want to retrieve those change events as soon as they become available, which instantly puts you in a position to ask the question: “Are there any updates?”. And you may find yourself asking these questions repeatedly.

Salesforce offers a robust Streaming API built on a publish-subscribe model. Instead of repeatedly querying Salesforce with “Are there any updates?”, you can rely on this API to receive notifications whenever new records appear. This approach enables you to create connected applications that achieve near real-time data synchronization without the overhead of constant polling.

Here are several key stages to enable this streaming workflow:

1) Enable Change Data Capture for Objects
First, you select the objects you want to track (e.g., Account, Contact, Opportunity) in the Salesforce configuration setup. When enabled, Salesforce will generate change events automatically whenever records in these objects are created, updated, deleted, or undeleted.

2) Subscribe to Change Events

Each object has a dedicated event channel, which is named as /data/{SalesforceEntityName}ChangeEvent

For example: 

  • /data/AccountChangeEvent
  • /data/CaseChangeEvent

Your application subscribes to these channels using a CometD client with the Bayeux protocol over HTTPS long-polling.

3) Publish and Deliver Events

Whenever a tracked record changes, Salesforce emits a JSON Change Event containing:

  • The change type (CREATE, UPDATE, DELETE, UNDELETE)
  • Record IDs
  • Changed fields and their new values
  • Transaction and commit metadata
  • A Replay ID (a unique identifier for the event)

4) Receive and Process Events

Your subscribed client receives the event message within seconds of the change happening. You can update your downstream systems (databases, analytics, caches) to stay in sync. Also, you can trigger additional workflows or notifications. Salesforce publishes these events to the corresponding channel.

5) Replay Missed Events

Each event carries a Replay ID. If your client disconnects, you can reconnect and specify the last Replay ID you received. Salesforce will then replay all missed events (within a retention window of up to 3 days).

How Salesforce CDC Streaming Works Behind the Scenes

2025 06 30%2018 06 37 Technical%20Blogging%20 %20Miro

This diagram shows the end-to-end workflow of subscribing to Salesforce CDC events. It begins with user authentication, where the web application requests an access token from Salesforce’s OAuth service. After successfully obtaining the token, the application uses it to initiate a CometD handshake. This handshake is a negotiation step that sets up the communication protocol and ensures the client is recognized and allowed to connect securely.

Once the handshake succeeds, the CometD client proceeds to subscribe to the relevant CDC channels. Each Salesforce object you want to track, like Account, Contact, or Opportunity, has its own dedicated channel. The client sends subscription requests for these channels, and the CometD server responds with confirmations to acknowledge that the client is now registered to receive change events for those objects.

After all subscriptions are confirmed, the client establishes a long-polling connection that stays open to listen for incoming notifications. As data changes occur in Salesforce, such as record creations, updates, deletions, or undeletions, CDC channels emit events. These updates are pushed through the CometD server (Salesforce internal Service Bus) to the client in near real-time.

Implementing Salesforce CDC Streaming in Azure Functions with Python

I’m going to describe a few key functions for implementing Salesforce CDC data integration using a Python Azure Function App. You can always extend this example and add more functional steps as needed.

Obtain a Salesforce session with an access token using OAuth 2.0

def get_salesforce_session(self):
        # Obtain Salesforce session with access token using OAuth 2.0.
        try:
            token_url = f'{INSTANCE_URL}/services/oauth2/token'
            payload = {
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET
            }
            response = requests.post(token_url, data=payload)
            response.raise_for_status()
            token_data = response.json()
            access_token = token_data.get("access_token")
            instance_url = token_data.get("instance_url")
            if not access_token or not instance_url:
                raise Exception("Failed to obtain access token or instance URL")
            self.sf = Salesforce(instance_url=instance_url, session_id=access_token)
            logger.info("Successfully authenticated with Salesforce")
            return self.sf
        except Exception as e:
            logger.error(f"Error obtaining Salesforce session: {e}")
            raise

Perform a single CometD handshake for all channels.

def cometd_handshake(self, cometd_url):
        # Perform single CometD handshake for all channels.
        self.session = self.get_cometd_session()
        handshake_payload = [{
            "version": "1.0",
            "minimumVersion": "0.9",
            "channel": "/meta/handshake",
            "supportedConnectionTypes": ["long-polling"],
            "advice": {"timeout": 60000, "interval": 0}
        }]
        
        response = self.session.post(cometd_url, json=handshake_payload)
        if response.status_code == 200:
            result = response.json()[0]
            if result.get('successful'):
                self.client_id = result['clientId']
                logger.info(f"CometD handshake successful. Client ID: {self.client_id}")
                return True
        logger.error(f"Handshake failed: {response.text}")
        return False

Subscribe to all CDC channels one by one, using replayIds from saved checkpoints

    def cometd_subscribe_all(self, cometd_url):
        # Subscribe to all CDC channels one by one, using replayIds from Azure Blob checkpoints.
        replay_ids = {}

        for channel in self.channels:
            # Construct Azure-safe blob name
            blob_name = f"Checkpoint{channel.replace('/', '_').replace(':', '')}.json"
            blob_client = BLOB_SERVICE_CLIENT.get_blob_client(container=CHECKPOINT_CONTAINER, blob=blob_name)

            try:
                # Attempt to read the blob content
                blob_data = blob_client.download_blob().readall()
                replay_id = json.loads(blob_data).get("replayId")
                replay_ids[channel] = replay_id
            except ResourceNotFoundError:
                replay_ids[channel] = None
                logger.info(f"No checkpoint found for channel {channel}. Starting fresh.")
            except (json.JSONDecodeError, Exception) as e:
                replay_ids[channel] = None
                logger.error(f"Failed to load checkpoint for channel {channel}: {e}", exc_info=True)

        def build_payload(channel, replay_id):
            return {
                "channel": "/meta/subscribe",
                "clientId": self.client_id,
                "subscription": channel,
                "ext": {
                    "payload.format": "FULL",
                    "replay": {channel: replay_id}
                }
            }

        final_channels = []

        for channel in self.channels:
            current_replay_id = replay_ids[channel] if replay_ids[channel] is not None else -1
            payload = build_payload(channel, current_replay_id)

            response = self.session.post(cometd_url, json=[payload])
            if response.status_code != 200:
                logger.error(f"Failed to connect to channel {channel}. HTTP {response.status_code}: {response.text}")
                continue

            results = response.json()
            for result in results:
                result_channel = result.get('channel', '')

                if result_channel.startswith("/meta/"):  # Subscription confirmation
                    if result.get('successful'):
                        logger.info(f"Subscribed to channel: {channel} with replayId={current_replay_id}.")
                        final_channels.append(channel)
                    else:
                        error = result.get('error', '')
                        logger.warning(f"Subscription failed for {channel} with replayId={current_replay_id}: {error}")

                        if "invalid" in error.lower() and "replayid" in error.lower():
                            logger.info(f"Retrying subscription for {channel} with replayId = -1")
                            retry_payload = build_payload(channel, -1)
                            retry_response = self.session.post(cometd_url, json=[retry_payload])
                            retry_result = retry_response.json()[0]
                            if retry_response.status_code == 200 and retry_result.get('successful'):
                                logger.info(f"Subscribed to channel: {channel} with replayId = -1 after retry")
                                final_channels.append(channel)
                                save_replay_id(channel, -1)
                            else:
                                logger.error(f"Retry failed for {channel}: {retry_response.text}")
                        elif "403::User not allowed" in error:
                            logger.error(f"Permission denied for channel {channel}. Skipping.")
                        else:
                            logger.error(f"Unknown error for channel {channel}: {error}")
                            logger.warning(f"Subscription result: {result}")

                elif result_channel.startswith("/data/"):  # Data message bundled in subscription response
                    logger.info(f"Received data event for channel {result_channel}: {json.dumps(result)}")
                    self.on_message(result)

        self.channels = final_channels
        return bool(final_channels)

Maintain a single long-polling connection for all channels

def cometd_connect(self, cometd_url):
        # Maintain single long-polling connection for all channels.
        MAX_RUNTIME = 260  # 4m20s for a 5m timeout (40s buffer)
        start_time = time.time()

        while True:
            # Timeout check at the start of each iteration
            elapsed = time.time() - start_time
            if elapsed > MAX_RUNTIME:
                logger.info("Approaching function timeout - exiting gracefully")
                return True  # Graceful exit, run() will treat as successful

            try:
                connect_payload = [{
                    "channel": "/meta/connect",
                    "clientId": self.client_id,
                    "connectionType": "long-polling"
                }]

                response = self.session.post(cometd_url, json=connect_payload, timeout=40)
                            
                if response.status_code == 200:
                    messages = response.json()
                    for message in messages:
                        if message.get('channel') in self.channels:  # CDC event
                            self.on_message(message)
                        elif message.get('error') == '403::Unknown client':
                            logger.warning("Client expired, reconnecting...")
                            return False
                else:
                    logger.error(f"Connect failed: {response.text}")
                    time.sleep(10)
        
            except requests.exceptions.ReadTimeout:
                logger.info("Long-polling timeout reached, re-establishing connection...")
                continue  # Immediately try again
            
            except requests.exceptions.RequestException as e:
                logger.error(f"Connect error: {e}")
                time.sleep(10)
            
            except Exception as e:
                logger.error(f"Unexpected error: {e}")
                time.sleep(10)

Since I didn’t post the entire code of my Azure Function App, you might have inferred some details from the code comments. All the processed event JSON messages were saved in Azure File Storage along with checkpoint files that stored the last replayId of each successfully processed message. This allowed the application to safely reconnect to the channels after a possible connection disruption and retrieve all the “missed” event messages during the downtime. 

Testing confirmed that this mechanism, using the replayIds, was reliable and stable. Kudos to the Salesforce team for retaining all those messages in its internal Service Bus message repository for the configured CDC entities!

Share.
Leave A Reply