Author: Shekh Muenuddeen

Streaming API enables streaming of events using Push Technology and those events received at real time via subscription mechanism.

Why we need streaming API?

In our daily to daily requirement we need to send data from one system to another system in order to synchronize them.

If we use the pulling mechanism so will get data when poller will execute and that data is not realtime.

Streaming API gives us data in near real time with certains features like reliability and durability.

Streaming API subscriptions mechanism support multiple type of events:

  1. Push Topic event
  2. Generic event
  3. Platform event
  4. Change data capture event

Event

Create, update, delete, undelete of records, event can also be trigger notification.

Notification

Message in response to the event, notification is sent to the channel to which one or more clients subscribe.

PushTopic

PushTopic triggers notification as Salesforce records based on SOQL criteria, its sending payload of which object got reflected along with attributes which have event information(event type) whether any operation has been made like create, update, delete or undelete. 

PushTopic notification is based on criteria which you have defined in the SOQL query. Only the fields specified in the query are included in the notification

The PushTopic defines a subscription channel. 

Channel

A stream of events to which a client can subscribe to receive event notifications.

Application That Poll Frequently 

Application that contact polling action to salesforce consuming uncessory api call and processing time, in that case streaming api is the good option in order to get events which reduce the number of requests that returns no data.

Push Technology

Push Technology also Called publish/subscribe model, information initiated from server to the client. This type of communication differs from pull technology where information is initiated from client to server.

In push technology, the server pushes out information to the client after the client has subscribed to a channel of information. For the client to receive the information, the client must maintain a connection to the server (Salesforce). 

Streaming API uses the Bayeux protocol and CometD, so the client-to-server connection is maintained through long polling.

Bayeux protocol

Asynchronous messages over HTTP

CometD

HTTP event routing bus that uses an AJAX push technology pattern known as Comet. It implements the Bayeux protocol.

Long polling

Information push from server to client, similar to normal pulling, the client connect via handshake, request information from server, however instead of sending empty response from server if response is not available, server hold the request and wait until response available, then server send response to the client, and client immediately send re-request.

Salesforce Streaming api communication you can easily understand by event driven channels.

Event driven is pattern when something happened event is triggered for example:

You have booked Ola cab when it arrives you will get notification.

You have ordered on Amazon when its shipped you will get notification.

Good and Bad for An event driven architecture

Good – RealTime, One-way/broadcasting, De-coupling, Scaling, Reply (24 hours), Audit Logs

Bad – Storage, External System not support event driven, Distributed System

Channel

You have subscribed to youtube channel when new video is uploaded, you will get notification.

And all these notifications are possible via event driven with channel technology.

We can also understand this process by the diagram below.

Streaming API uses the HTTP/1.1 request-response

1.CometD sends a handshake request

2.After a successful handshake, your custom listener on the /meta/handshake channel sends a subscription request to a channel. 

3.CometD maintains the connection by using long polling.

Now, it’s time create Mule project and we are going to do.

  1. Publish PushTopic
  2. Subscribe PushTopic
  3. Subscribe ReplyTopic
  4. Insert/update/delete Salesforce Object via Salesforce create/update/delete connector
  5. Publish Streaming Channel
  6. Subscribe Channel
  7. Push Salesforce event data

PushTopic Creation

Salesforce connector configuration

<salesforce:sfdc-config name="Salesforce_Config" doc:name="Salesforce Config" doc:id="89a14788-17e5-42aa-b87a-ec3dd5686696" >
    <salesforce:basic-connection username="${secure::sf.username}" password="${secure::sf.password}" securityToken="${secure::sf.token}" />
</salesforce:sfdc-config>

We are going to use Publish Topic salesforce connector operation.

Below is the configuration for which I have provided the topic name and SOQL query:

PushTopic Name- accountInsertUpdateTopic

Query- SELECT Id,Name,Phone,Fax FROM Account

Note- the fields which you have defined in SOQL criteria, you will only get notification when those fields are updated/inserted/deleted.

<flow name="salesforce-streaming-api-demo-1Flow" doc:id="68ede2a3-9518-4980-9f1f-4ba5853a7a76" >
        <http:listener doc:name="PushTopic Creation Request" doc:id="2999f676-a117-4020-af62-61fc317b3467" config-ref="HTTP_Listener_config" path="/publish/topic"/>
        <salesforce:publish-topic doc:name="accountInsertUpdateTopic" doc:id="3d07eff6-04b8-4084-96f8-ec1b3f5dd308" config-ref="Salesforce_Config" topicName="#[payload.topic]" query="#[payload.query]"/>
        <ee:transform doc:name="To JSON" doc:id="8cf73bdc-2eca-43ba-a789-47669bd8d9a5" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
</flow>

Postman curl for creation PushTopic- As HTTP Request for which I have created.

curl --location --request POST 'localhost:8081/publish/topic' \
--header 'Content-Type: application/json' \
--data '{
    "topic" : "accountInsertUpdateTopic",
    "query" : "SELECT Id,Name,Phone,Fax FROM Account"
}'

Response

Below is the response of Push Topic creation and you will get topic id:

{
  "success": true,
  "id": "0IF2w000000bpMEGAY",
  "errors": []
}

As of now, we have successfully created PushTopic, now it’s time to give topic name to the client and they need to subscribe to that topic. Whenever any changes are made such as insert/update/delete, you will get an event notification.

Subscribe PushTopic

We are going to use Subscribe topic listener salesforce connector operation.

Topic- accountInsertUpdateTopic

<flow name="salesforce-streaming-api-demo-1Flow1" doc:id="35dc47d4-e364-4a1b-953a-e4f8d031f094" >
    <salesforce:subscribe-topic-listener topic="accountInsertUpdateTopic" doc:name="accountInsertUpdateTopic" doc:id="f0e36222-0303-4223-b1e0-7015fc691e7a" config-ref="Salesforce_Config"/>
    <logger level="INFO" doc:name="Salesforce Message Event" doc:id="a9ee67f6-3a10-4a98-b138-7ff28482eec8" message='Salesforce Message Event Push Topic #["\n"] #[output application/json --- payload] and #["\n"] Attributes #[output application/json --- attributes]'/>
</flow>

Here, I have defined the topic name which we have created using PushTopic salesforce connector operation.

When application is successfully deployed, you will get below logs on console:

org.mule.extension.salesforce.internal.service.streaming.StreamingClient: Handshake successful, subscribing.
org.mule.extension.salesforce.internal.service.streaming.StreamingClient: Subscribing to: /topic/accountInsertUpdateTopic from replyId: -1
org.mule.extension.salesforce.internal.service.streaming.StreamingClient: Successfully subscribed to /topic/accountInsertUpdateTopic

Now it’s time to create/update/delete salesforce objects, here I am going to do an account object.

Why? Can you guess ?

Because I have defined the SOQL for account while I have created the PushTopic.

Let me remind you- SELECT Id,Name,Phone,Fax FROM Account

Below is the Account Creation input I have given:

{
      "Name" : "Shekh Muenuddeen Account"
}

When I have created the account I received one event, let me paste event here:

Payload
{  
  "Phone": null,
  "Id": "0012w00000FvxyjAAB",
  "Fax": null,
  "Name": "Shekh Muenuddeen Account"
}  
Attributes {
  "createdDate": "2020-07-12T06:00:41.716Z",
  "channel": "/topic/accountInsertUpdateTopic",
  "replayId": 35,
  "type": "created"
}

As you can see attributes type value are created because just now I have created one account.

Update

Based on the salesforce account object id, I have updated below fields:

{
      "Id": "0012w00000FvxyjAAB",
      "Phone": "1234567890",
      "Fax": "0987654321"
}

I have received the event, let me paste the event here:

Payload 
{
  "Phone": "1234567890",
  "Id": "0012w00000FvxyjAAB",
  "Fax": "0987654321",
  "Name": "Shekh Muenuddeen Account"
}
 Attributes {
  "createdDate": "2020-07-12T06:03:46.153Z",
  "channel": "/topic/accountInsertUpdateTopic",
  "replayId": 36,
  "type": "updated"
}

As you can see, the type field value is updated because I have updated the Phone and Fax field.

If I will update the fields which I have not defined in the SOQL so I will not receive any events.

Delete

Now Delete salesforce account which I have created and updated.

I have deleted the salesforce account based on salesforce id.

And I have got event let me paste here:

Payload
{
  "Id": "0012w00000FvxyjAAB"
} 
Attributes {
  "createdDate": "2020-07-12T06:07:30.048Z",
  "channel": "/topic/accountInsertUpdateTopic",
  "replayId": 37,
  "type": "deleted"
}

As you can see, in the payload only id is received, when we delete the account and type field value is deleted.

Reply Topic Listener

You can also use the Reply topic listener Salesforce connector operation in order to get an event. So, now the question is why we need to use reply topic listener?

Here are some advantages of this over PushTopic Listener:

Can get events when our listener server is offline.

Can get events after certain events by using reply id – need to define the event id.

Can use Reply option

ALL- all events received

ONLY_NEW- only new events received after subscriptions

FROM_REPLY_ID- only those events received which are greater than the defined reply event id.

<flow name="topicFlow" doc:id="e2f23c4e-0574-4ee6-b1e4-a6c4bf64cd4a" >
    <salesforce:replay-topic-listener topic="accountInsertUpdateTopic" replayOption="FROM_REPLAY_ID" doc:name="accountInsertUpdateTopic" doc:id="edb7e630-8225-4291-b330-81ab0ef9211f" config-ref="Salesforce_Config"/>
    <logger level="INFO" doc:name="Salesforce Message Event Reply Topic" doc:id="4a6ab022-e4b7-4be7-94e6-6a2dcc2efe8b" message='Salesforce Message Event Reply Topic #["\n"] #[output application/json --- payload] and #["\n"] Attributes #[output application/json --- attributes]'/>
</flow>

Here, I have created/updated account object, but I am not listening to that PushTopic, so my event did not get lost because I will get those events via reply topic listener.

When my listener server gets online, I receive the below events:

Salesforce Message Event Reply Topic 
 {
  "Phone": null,
  "Id": "0012w00000FvxzNAAR",
  "Fax": null,
  "Name": "Shekh Muenuddeen Account"
} and 
 Attributes {
  "createdDate": "2020-07-12T06:19:38.801Z",
  "channel": "/topic/accountInsertUpdateTopic",
  "replayId": 38,
  "type": "created"
}
Salesforce Message Event Reply Topic 
 {
  "Phone": "1234567890",
  "Id": "0012w00000FvxzNAAR",
  "Fax": "0987654321",
  "Name": "Shekh Muenuddeen Account"
} and 
 Attributes {
  "createdDate": "2020-07-12T06:19:45.298Z",
  "channel": "/topic/accountInsertUpdateTopic",
  "replayId": 39,
  "type": "updated"
}

Publish Channel and Subscription

Publish streaming channel 

I have defined the input where we need to give the channel name with owner user id.

{
    "channel" : "/u/mychannel/accountInsertUpdateTopic",
    "owner" : "0052w000003BUKFAA4"
}
<flow name="channelFlow" doc:id="482d2b57-d6f7-45e5-a5fd-f2bc61214b08" >
        <http:listener doc:name="Listener" doc:id="6e463df7-f142-4d2f-ba9e-f8c59ced9d99" config-ref="HTTP_Listener_config" path="/publish/channel">
            <http:error-response >
                <http:body ><![CDATA[#[output application/json --- payload]]]></http:body>
            </http:error-response>
        </http:listener>
        <logger level="INFO" doc:name="Input Request for creating channel" doc:id="497b702a-3636-4309-854a-9c978039e6c2" message="Input Request for creating channel #[payload]"/>
        <salesforce:publish-streaming-channel doc:name="Publish streaming channel" doc:id="e808a91a-177c-4432-91fa-74250a315ad7" config-ref="Salesforce_Config" channelName="#[payload.channel]" ownerId="#[payload.owner]"/>
        <ee:transform doc:name="Transform Message" doc:id="71d99a90-cec5-42a7-9c33-20da62d9bed4" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
    </flow>

Channel is successfully created. We will receive channel id in the response of Publish Channel. We will be using channel id when we need to send notification to that channel via push generic event Salesforce connector operation.

{
  "success": true,
  "id": "0M62w000000bxF8CAI",
  "errors": []
}

Subscription Channel

<flow name="channelFlow1" doc:id="31a4e997-4ce5-42d8-8511-61d4d5117fa0" >
        <salesforce:subscribe-channel-listener streamingChannel="/u/mychannel/accountInsertUpdateTopic" doc:name="Subscribe channel listener" doc:id="b6a10989-d63c-4128-8f31-d649c240c554" config-ref="Salesforce_Config"/>
        <logger level="INFO" doc:name="Salesforce Message Event Channel" doc:id="47cbd28c-e0fd-4304-96ef-11a8a3f21669" message='Salesforce Message Event Channel #["\n"] #[output application/json --- payload] and #["\n"] Attributes #[output application/json --- attributes]'/>
    </flow>

Channel Name – /u/mychannel/accountInsertUpdateTopic

We are going to use this channel id in order to send push event data.

As you can see that I have used the channel id which I got from publishing the streaming channel Salesforce connector operation.

Below is the Request for push generic event:

[
    {
        "payload": "private message",
        "userIds": [
            "0052w000003BUKFAA4",
            "0052w000004Mie2AAC"
        ]
    },
    {
        "payload": "Broadcast message",
        "userIds": []
    }
]

Payload- we can send whatever message we want to send

UserIds-  if we define the user ids, only those users can get this message as you can see that it’s kind of a private message.

If we don’t define the user ids, all the users can get those messages if they subscribe to that channel, it’s a kind of broadcast message.

Here, I have successfully received the events. One private message with broadcast messages because we sent two generic events one to my user and one to all users.

Salesforce Message Event Channel 
 {
  "data": {
    "payload": "Broadcast message",
    "event": {
      "createdDate": "2020-07-12T06:36:09.229Z",
      "replayId": 19
    }
  },
  "channel": "/u/mychannel/accountInsertUpdateTopic"
} and 
 Attributes {
  "createdDate": "2020-07-12T06:36:09.229Z",
  "channel": "/u/mychannel/accountInsertUpdateTopic",
  "replayId": 19
}
Salesforce Message Event Channel 
 {
  "data": {
    "payload": "private message",
    "event": {
      "createdDate": "2020-07-12T06:36:09.229Z",
      "replayId": 18
    }
  },
  "channel": "/u/mychannel/accountInsertUpdateTopic"
} and 
 Attributes {
  "createdDate": "2020-07-12T06:36:09.229Z",
  "channel": "/u/mychannel/accountInsertUpdateTopic",
  "replayId": 18
}

So finally, we have got the pushtopic published and get events if account changes based on SOQL criteria. We have published the steaming channel in order to get events which are raised by generic push event connectors to users.

Difference between Publish PushTopic and Publish Streaming channel:

PushTopic Events are associated with SOQL query criteria and Channel is associated with any event message, those users can get if user ids are defined otherwise broadcast messages.

Leave a Comment