CONTACT US

SHARE

Introduction to Force.com Streaming API

A while ago, I wrote an article about integrating SFDC Rest API using Scala. The scope of this article is to provide a way to achieve syncing your Salesforce data with your server based application’s database.
Force.com Streaming API will send a notification to your app, whenever changes into Salesforce data occur and match to a defined SOQL query.
 
Salesforce Streaming API is based on Push Topics, which defines a channel where your Scala app must subscribe. A notification will be generated when a record is created, updated, deleted, or undeleted and if it matches  the criteria of the PushTopic query.
 

Creating new PushTopic Salesforce

Creating a new PushTopic is a simply process and can be acquired by opening the Developer Console and then Debug | Open Execute Anonymous Window(CTRL + E). A new window will appear where you will write some code, that defines your pushtopic. Click on Execute button to complete the action.
 
The following  Apex code defines my pushtopic created for Contact object:
PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'ContactPushTopic';
pushTopic.Query = 'SELECT id, Email, accountId FROM Contact where my_external_key__c != null';
pushTopic.ApiVersion = 39.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = false;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = 'Referenced';
insert pushTopic;

Whenever a change occur in Contact table and matches the pushTopic query, a new notification is generated for channel ContactPushTopic. This means that the push topic will fire only if a record from Contact table that has a value in  my_external_key__c, and only if either of fields Email/accountId will be modified. As you can see, in my case, this notification will be generated for create, update and delete events, not for undelete. Where clause is mandatory. Field my_external_key__c is a custom field, created by me, so you should replace it with a field that is representative for your use case, and this one is mandatory as well.
 

Scala implementation

My application is written in Scala using play framework and sbt for dependencies management. So, the following code sample will have marks of the framework I use. The concept is based on push technology, where basically our scala app subscribes to a channel, and listen for notifications send by Salesforce Server. Streaming API uses the Bayeaux protocol and CommetD for maintaining the connection and for long polling. More info can be found here
In order to achieve that, scala app must follow some steps:
  • Connects to server
  • Handshake( handshake with server and create a long polling connection)
  • Subscribe to channel
For subscribing to a channel, I used akka actors. For connecting to server there are 2 ways for authentication: using username and password for obtaining an auth token or refresh token method. However, this is another discussion and it is not the scope of this article. 
 
First step is to add some dependencies on build.sbt file:
"com.force.api" % "force-wsc" % "39.0.5",
"com.force.api" % "force-partner-api" % "39.0.0",
"com.typesafe.akka" %% "akka-stream-kafka" % "0.14",
"com.typesafe.akka" %% "akka-slf4j" % "2.4.0"
I used an actor for subscribing the channel, and a second one for manipulating notifications received. First actor implementation is simple:
class PushTopicSubscriber extends Actor {

 implicit val system = ActorSystem()

 override def preStart() = {
   val subscribeTopic ="ContactPushTopic"
   TopicListener.subscribe(subscribeTopic) { source =>
     implicit val materializer = ActorMaterializer()( system )
     while (true) {}
   }
 }

 override def postStop() = {
  //do Something
 }

 def receive = {
   case message =>
    //do Something
 }

}
Method subscribe is responsible with connecting to the server, make the handshake and subscribing to the channel. A new actor will manipulate results received from pushtopic. So, whenever a new notification is received, TopicListener will send this message to the actor SysActor who will update records in my app database. TopicListener implementation looks like:
object TopicListener extends SalesforceAuth {

 val connectionTimeout = 20 * 1000
 val readTimeout = 120 * 1000

 def subscribe[T](topic: String)(f: Source[Message, NotUsed] => T)(implicit actorSystem: ActorSystem): Try[T] = {
   getAuthToken().flatMap { case accessToken =>
     val httpClient = new HttpClient(new SslContextFactory())
     httpClient.setConnectTimeout(connectionTimeout)
     httpClient.setIdleTimeout(readTimeout)
     httpClient.start()

     val transport = new LongPollingTransport(null, httpClient) {
       override def customize(request: Request): Unit = {
         super.customize(request)
         request.header("Authorization", "Bearer " + accessToken.accessToken)
       }
     }

     val url = accessToken.instanceURL + "/cometd/36.0"
     val bayeuxClient = new BayeuxClient(url, transport)
     val tryResult = Try {
       val actorRef = actorSystem.actorOf(Props(new SysActor))
       val actorPublisher = ActorPublisher(actorRef)
       val source = Source.fromPublisher(actorPublisher)

       bayeuxClient.handshake()

       val handshaken = bayeuxClient.waitFor(connectionTimeout, BayeuxClient.State.CONNECTED)
       if (!handshaken) {
         throw new Error("Failed to handshake: " + bayeuxClient.getURL)
       }
       else {
           bayeuxClient.getChannel(s"/topic/$topic").subscribe {
             new MessageListener() {
               override def onMessage(channel: ClientSessionChannel, message: Message) {
                   actorRef ! message
                }
              }
            }
          }

       f(source)
     }
     bayeuxClient.disconnect()
     httpClient.stop()
     tryResult
   }
 }
}
TopicListener extends  a trait call SalesforceAuth, which is responsible for getting the authentication token, in order to connect to the right channel. Function getAuthToken returns a case class:
case class SalesforceAuth (
 exp: DateTime,
 accessToken: String,
 instanceURL: String
)
As you can see, we need only the token and instanceURL. I encountered some issues when I subscribed to “{instanceURL}/cometd/39.0”, that’s why I’ve used version 36.0. All messages, received will be send to the actor  called SysActor. 
class SysActor extends ActorPublisher[Message] {
 
 def receive = {
   case message: Message =>
     //do something BIG
   case Cancel =>
     context.stop(self)
 }

}
For SysActor it’s enough to override just receive method. Whenever this actor receives a new message, it will update my app database. This entire implementation can achieve syncing your Salesforce data with your server based application’s database.
 

Tips

Some tips for better efficiency:
  • You can send a list of pushtopics names, for subscribing to multiple channels.
  • In order to listen continuously for notifications from PushTopic, note that PushTopicSubscriber in method preStart need to have a piece of code like “while(true) {}”. Otherwise when first message will be received, the actor won’t listen any more.
  • You can filter the results from a pushtopic results to match to a criteria.(I.E ContactPushTopic?Email=’myemail@email.com’ ). So instead of sending only the name of your pushtopic to TopicListener.subscribe, send the example String for filtering the results of your PushTopic.
  • You cannot have in your pushTopic query a field from a lookup table(I.E. account.name is now allowed). You can use only the id(I.E.accountid)
 

Conclusions

Salesforce Streaming API is used by applications that require to keep in sync data changes in an organization and must be considered for apps that poll frequently. It is used to send notification from server to your client application, notification that matches to a particular criteria defined by your expectations. 
By integrating Streaming API into your application will improve the performances of your product and reduce the number of API calls.

Ready to make it happen?

Drop us a line. We’d love to hear from you and see how we can help in solving your digital challenges. As one of the best software outsourcing companies in Romania, Eastern Europe, Europe and the world really, we are sure we can ASSIST.

GET IN TOUCH