Salesforce Streaming API MuleSoft Integration

Author: Shekh Muenuddeen

Streaming API enables streaming of events using Push Technology and those events received in real time via subscription mechanism.

 

Why we need streaming API?

 

In our daily to daily requirements, we need to send data from one system to another system in order to synchronize them.

If we use the pulling mechanism so will get data when the poller will execute and that data is not realtime.

Streaming API gives us data in near real time with certains features like reliability and durability.

 

Streaming API subscriptions mechanism support multiple type of events:

 

  1. Push Topic event
  2. Generic event
  3. Platform event
  4. Change data capture event

Event

 

Create, update, delete, undelete records, events can also be trigger notifications.

 

Notification

 

Message in response to the event, notification is sent to the channel to which one or more clients subscribe.

 

PushTopic

 

PushTopic triggers notification as Salesforce records based on SOQL criteria, its sending payload of which object got reflected along with attributes which have event information(event type) whether any operation has been made like create, update, delete or undelete. 

PushTopic notification is based on criteria that you have defined in the SOQL query. Only the fields specified in the query are included in the notification. 

The PushTopic defines a subscription channel. 

 

Channel

 

A stream of events to which a client can subscribe to receive event notifications.

 

Application That Poll Frequently 

 

The application that contacts polling action to salesforce consuming unnecessary API call and processing time, in that case streaming API is the good option in order to get events that reduce the number of requests that returns no data.

Push Technology

 

Push Technology also Called publish/subscribe model, information initiated from server to the client. This type of communication differs from pull technology where information is initiated from client to server.

In push technology, the server pushes out information to the client after the client has subscribed to a channel of information. For the client to receive the information, the client must maintain a connection to the server (Salesforce). 

Streaming API uses the Bayeux protocol and CometD, so the client-to-server connection is maintained through long polling.

 

Bayeux protocol

 

Asynchronous messages over HTTP

 

CometD

 

HTTP event routing bus that uses an AJAX push technology pattern known as Comet. It implements the Bayeux protocol.

 

Long polling

 

Information push from server to client, similar to normal pulling, the client connects via handshake, request information from the server, however instead of sending empty response from server if the response is not available, the server holds the request and wait until response available, then the server sends a response to the client, and client immediately sends re-request.

Salesforce Streaming API communication you can easily understand by event driven channels.

Event driven is a pattern when something happened event is triggered for example:

You have booked an Ola cab when it arrives you will get a notification.

You have ordered on Amazon when it’s shipped you will get the notification.

 

Good and Bad for An event driven architecture

 

Good – RealTime, One-way/broadcasting, De-coupling, Scaling, Reply (24 hours), Audit Logs

Bad – Storage, External System not support event driven, Distributed System

 

Channel

 

You have subscribed to youtube channel when new video is uploaded, you will get notification.

And all these notifications are possible via event driven with channel technology.

We can also understand this process by the diagram below.

Streaming API uses the HTTP/1.1 request-response

1.CometD sends a handshake request

2.After a successful handshake, your custom listener on the /meta/handshake channel sends a subscription request to a channel. 

3.CometD maintains the connection by using long polling.

Now, it’s time create Mule project and we are going to do.

  1. Publish PushTopic
  2. Subscribe PushTopic
  3. Subscribe ReplyTopic
  4. Insert/update/delete Salesforce Object via Salesforce create/update/delete connector
  5. Publish Streaming Channel
  6. Subscribe Channel
  7. Push Salesforce event data

PushTopic Creation

 

 

Salesforce connector configuration

 

<salesforce:sfdc-config name="Salesforce_Config" doc:name="Salesforce Config" doc:id="89a14788-17e5-42aa-b87a-ec3dd5686696" >
    <salesforce:basic-connection username="${secure::sf.username}" password="${secure::sf.password}" securityToken="${secure::sf.token}" />
</salesforce:sfdc-config>

We are going to use Publish Topic salesforce connector operation.

Below is the configuration for which I have provided the topic name and SOQL query:

PushTopic Name- accountInsertUpdateTopic

Query- SELECT Id,Name,Phone,Fax FROM Account

Note- the fields which you have defined in SOQL criteria, you will only get notification when those fields are updated/inserted/deleted.

<flow name="salesforce-streaming-api-demo-1Flow" doc:id="68ede2a3-9518-4980-9f1f-4ba5853a7a76" >
        <http:listener doc:name="PushTopic Creation Request" doc:id="2999f676-a117-4020-af62-61fc317b3467" config-ref="HTTP_Listener_config" path="/publish/topic"/>
        <salesforce:publish-topic doc:name="accountInsertUpdateTopic" doc:id="3d07eff6-04b8-4084-96f8-ec1b3f5dd308" config-ref="Salesforce_Config" topicName="#[payload.topic]" query="#[payload.query]"/>
        <ee:transform doc:name="To JSON" doc:id="8cf73bdc-2eca-43ba-a789-47669bd8d9a5" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
</flow>

Postman curl for creation PushTopic- As HTTP Request for which I have created.

curl --location --request POST 'localhost:8081/publish/topic' \
--header 'Content-Type: application/json' \
--data '{
    "topic" : "accountInsertUpdateTopic",
    "query" : "SELECT Id,Name,Phone,Fax FROM Account"
}'

Response

Below is the response of Push Topic creation and you will get topic id:

{
  "success": true,
  "id": "0IF2w000000bpMEGAY",
  "errors": []
}

As of now, we have successfully created PushTopic, now it’s time to give topic name to the client and they need to subscribe to that topic. Whenever any changes are made such as insert/update/delete, you will get an event notification.

 

Subscribe PushTopic

 

We are going to use Subscribe topic listener salesforce connector operation.

Topic- accountInsertUpdateTopic

<flow name="salesforce-streaming-api-demo-1Flow1" doc:id="35dc47d4-e364-4a1b-953a-e4f8d031f094" >
    <salesforce:subscribe-topic-listener topic="accountInsertUpdateTopic" doc:name="accountInsertUpdateTopic" doc:id="f0e36222-0303-4223-b1e0-7015fc691e7a" config-ref="Salesforce_Config"/>
    <logger level="INFO" doc:name="Salesforce Message Event" doc:id="a9ee67f6-3a10-4a98-b138-7ff28482eec8" message='Salesforce Message Event Push Topic #["\n"] #[output application/json --- payload] and #["\n"] Attributes #[output application/json --- attributes]'/>
</flow>

Here, I have defined the topic name which we have created using the PushTopic salesforce connector operation.

When the application is successfully deployed, you will get the below logs on the console:

org.mule.extension.salesforce.internal.service.streaming.StreamingClient: Handshake successful, subscribing.
org.mule.extension.salesforce.internal.service.streaming.StreamingClient: Subscribing to: /topic/accountInsertUpdateTopic from replyId: -1
org.mule.extension.salesforce.internal.service.streaming.StreamingClient: Successfully subscribed to /topic/accountInsertUpdateTopic

Now it’s time to create/update/delete salesforce objects, here I am going to do an account object.

Why? Can you guess?

Because I have defined the SOQL for account while I have created the PushTopic.

Let me remind you- SELECT Id, Name, Phone, Fax FROM Account

Below is the Account Creation input I have given:

{
      "Name" : "Shekh Muenuddeen Account"
}

When I have created the account I received one event, let me paste the event here:

Payload
{  
  "Phone": null,
  "Id": "0012w00000FvxyjAAB",
  "Fax": null,
  "Name": "Shekh Muenuddeen Account"
}  
Attributes {
  "createdDate": "2020-07-12T06:00:41.716Z",
  "channel": "/topic/accountInsertUpdateTopic",
  "replayId": 35,
  "type": "created"
}

As you can see attributes type values are created because just now I have created one account.

Update

Based on the salesforce account object id, I have updated the below fields:

{
      "Id": "0012w00000FvxyjAAB",
      "Phone": "1234567890",
      "Fax": "0987654321"
}

I have received the event, let me paste the event here:

Payload 
{
  "Phone": "1234567890",
  "Id": "0012w00000FvxyjAAB",
  "Fax": "0987654321",
  "Name": "Shekh Muenuddeen Account"
}
 Attributes {
  "createdDate": "2020-07-12T06:03:46.153Z",
  "channel": "/topic/accountInsertUpdateTopic",
  "replayId": 36,
  "type": "updated"
}

As you can see, the type field value is updated because I have updated the Phone and Fax field.

If I will update the fields which I have not defined in the SOQL so I will not receive any events.

Delete

Now Delete the salesforce account which I have created and updated.

I have deleted the salesforce account based on the salesforce id.

And I have got event let me paste here:

Payload
{
  "Id": "0012w00000FvxyjAAB"
} 
Attributes {
  "createdDate": "2020-07-12T06:07:30.048Z",
  "channel": "/topic/accountInsertUpdateTopic",
  "replayId": 37,
  "type": "deleted"
}

As you can see, in the payload only id is received, when we delete the account and type field value is deleted.

Reply Topic Listener

You can also use the Reply topic listener Salesforce connector operation in order to get an event. So, now the question is why we need to use reply topic listener?

Here are some advantages of this over PushTopic Listener:

Can get events when our listener server is offline.

Can get events after certain events by using reply id – need to define the event id.

Can use Reply option

ALL- all events received

ONLY_NEW- only new events received after subscriptions

FROM_REPLY_ID- only those events received which are greater than the defined reply event id.

<flow name="topicFlow" doc:id="e2f23c4e-0574-4ee6-b1e4-a6c4bf64cd4a" >
    <salesforce:replay-topic-listener topic="accountInsertUpdateTopic" replayOption="FROM_REPLAY_ID" doc:name="accountInsertUpdateTopic" doc:id="edb7e630-8225-4291-b330-81ab0ef9211f" config-ref="Salesforce_Config"/>
    <logger level="INFO" doc:name="Salesforce Message Event Reply Topic" doc:id="4a6ab022-e4b7-4be7-94e6-6a2dcc2efe8b" message='Salesforce Message Event Reply Topic #["\n"] #[output application/json --- payload] and #["\n"] Attributes #[output application/json --- attributes]'/>
</flow>

Here, I have created/updated account object, but I am not listening to that PushTopic, so my event did not get lost because I will get those events via reply topic listener.

When my listener server gets online, I receive the below events:

Salesforce Message Event Reply Topic 
 {
  "Phone": null,
  "Id": "0012w00000FvxzNAAR",
  "Fax": null,
  "Name": "Shekh Muenuddeen Account"
} and 
 Attributes {
  "createdDate": "2020-07-12T06:19:38.801Z",
  "channel": "/topic/accountInsertUpdateTopic",
  "replayId": 38,
  "type": "created"
}
Salesforce Message Event Reply Topic 
 {
  "Phone": "1234567890",
  "Id": "0012w00000FvxzNAAR",
  "Fax": "0987654321",
  "Name": "Shekh Muenuddeen Account"
} and 
 Attributes {
  "createdDate": "2020-07-12T06:19:45.298Z",
  "channel": "/topic/accountInsertUpdateTopic",
  "replayId": 39,
  "type": "updated"
}

 

Publish Channel and Subscription

 

Publish streaming channel 

I have defined the input where we need to give the channel name with owner user id.

{
    "channel" : "/u/mychannel/accountInsertUpdateTopic",
    "owner" : "0052w000003BUKFAA4"
}
<flow name="channelFlow" doc:id="482d2b57-d6f7-45e5-a5fd-f2bc61214b08" >
        <http:listener doc:name="Listener" doc:id="6e463df7-f142-4d2f-ba9e-f8c59ced9d99" config-ref="HTTP_Listener_config" path="/publish/channel">
            <http:error-response >
                <http:body ><![CDATA[#[output application/json --- payload]]]></http:body>
            </http:error-response>
        </http:listener>
        <logger level="INFO" doc:name="Input Request for creating channel" doc:id="497b702a-3636-4309-854a-9c978039e6c2" message="Input Request for creating channel #[payload]"/>
        <salesforce:publish-streaming-channel doc:name="Publish streaming channel" doc:id="e808a91a-177c-4432-91fa-74250a315ad7" config-ref="Salesforce_Config" channelName="#[payload.channel]" ownerId="#[payload.owner]"/>
        <ee:transform doc:name="Transform Message" doc:id="71d99a90-cec5-42a7-9c33-20da62d9bed4" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
    </flow>

Channel is successfully created. We will receive channel id in the response of Publish Channel. We will be using channel id when we need to send notification to that channel via push generic event Salesforce connector operation.

{
  "success": true,
  "id": "0M62w000000bxF8CAI",
  "errors": []
}

Subscription Channel

<flow name="channelFlow1" doc:id="31a4e997-4ce5-42d8-8511-61d4d5117fa0" >
        <salesforce:subscribe-channel-listener streamingChannel="/u/mychannel/accountInsertUpdateTopic" doc:name="Subscribe channel listener" doc:id="b6a10989-d63c-4128-8f31-d649c240c554" config-ref="Salesforce_Config"/>
        <logger level="INFO" doc:name="Salesforce Message Event Channel" doc:id="47cbd28c-e0fd-4304-96ef-11a8a3f21669" message='Salesforce Message Event Channel #["\n"] #[output application/json --- payload] and #["\n"] Attributes #[output application/json --- attributes]'/>
    </flow>

Channel Name – /u/mychannel/accountInsertUpdateTopic

We are going to use this channel id in order to send push event data.

As you can see that I have used the channel id which I got from publishing the streaming channel Salesforce connector operation.

Below is the Request for push generic event:

[
    {
        "payload": "private message",
        "userIds": [
            "0052w000003BUKFAA4",
            "0052w000004Mie2AAC"
        ]
    },
    {
        "payload": "Broadcast message",
        "userIds": []
    }
]

Payload- we can send whatever message we want to send

UserIds-  if we define the user ids, only those users can get this message as you can see that it’s kind of a private message.

If we don’t define the user ids, all the users can get those messages if they subscribe to that channel, it’s a kind of broadcast message.

Here, I have successfully received the events. One private message with broadcast messages because we sent two generic events one to my user and one to all users.

Salesforce Message Event Channel 
 {
  "data": {
    "payload": "Broadcast message",
    "event": {
      "createdDate": "2020-07-12T06:36:09.229Z",
      "replayId": 19
    }
  },
  "channel": "/u/mychannel/accountInsertUpdateTopic"
} and 
 Attributes {
  "createdDate": "2020-07-12T06:36:09.229Z",
  "channel": "/u/mychannel/accountInsertUpdateTopic",
  "replayId": 19
}
Salesforce Message Event Channel 
 {
  "data": {
    "payload": "private message",
    "event": {
      "createdDate": "2020-07-12T06:36:09.229Z",
      "replayId": 18
    }
  },
  "channel": "/u/mychannel/accountInsertUpdateTopic"
} and 
 Attributes {
  "createdDate": "2020-07-12T06:36:09.229Z",
  "channel": "/u/mychannel/accountInsertUpdateTopic",
  "replayId": 18
}

So finally, we have got the pushtopic published and get events if account changes based on SOQL criteria. We have published the steaming channel in order to get events which are raised by generic push event connectors to users.

Difference between Publish PushTopic and Publish Streaming channel:

PushTopic Events are associated with SOQL query criteria and Channel is associated with any event message, those users can get if user ids are defined otherwise broadcast messages.

We use cookies on this site to enhance your user experience. For a complete overview of how we use cookies, please see our privacy policy.