Integrating Salesforce Streaming API into SCALA App

October 05, 2017
5 min read
Introduction to Force.com Streaming API
A while ago, I wrote an article about integrating SFDC Rest API using Scala. The scope of this article is to provide a way to achieve syncing your Salesforce data with your server based application’s database.
Force.com Streaming API will send a notification to your app, whenever changes into Salesforce data occur and match to a defined SOQL query.
Salesforce Streaming API is based on Push Topics, which defines a channel where your Scala app must subscribe. A notification will be generated when a record is created, updated, deleted, or undeleted and if it matches  the criteria of the PushTopic query.
Creating new PushTopic Salesforce
Creating a new PushTopic is a simply process and can be acquired by opening the Developer Console and then Debug | Open Execute Anonymous Window(CTRL + E). A new window will appear where you will write some code, that defines your pushtopic. Click on Execute button to complete the action.
The following  Apex code defines my pushtopic created for Contact object:

PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'ContactPushTopic';
pushTopic.Query = 'SELECT id, Email, accountId FROM Contact where my_external_key__c != null';
pushTopic.ApiVersion = 39.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = false;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = 'Referenced';
insert pushTopic;

Whenever a change occur in Contact table and matches the pushTopic query, a new notification is generated for channel ContactPushTopic. This means that the push topic will fire only if a record from Contact table that has a value in  my_external_key__c, and only if either of fields Email/accountId will be modified. As you can see, in my case, this notification will be generated for create, update and delete events, not for undelete. Where clause is mandatory. Field my_external_key__c is a custom field, created by me, so you should replace it with a field that is representative for your use case, and this one is mandatory as well.
Scala implementation
My application is written in Scala using play framework and sbt for dependencies management. So, the following code sample will have marks of the framework I use. The concept is based on push technology, where basically our scala app subscribes to a channel, and listen for notifications send by Salesforce Server. Streaming API uses the Bayeaux protocol and CommetD for maintaining the connection and for long polling. More info can be found here
In order to achieve that, scala app must follow some steps:
  • Connects to server
  • Handshake( handshake with server and create a long polling connection)
  • Subscribe to channel
For subscribing to a channel, I used akka actors. For connecting to server there are 2 ways for authentication: using username and password for obtaining an auth token or refresh token method. However, this is another discussion and it is not the scope of this article. 
First step is to add some dependencies on build.sbt file:

"com.force.api" % "force-wsc" % "39.0.5",
"com.force.api" % "force-partner-api" % "39.0.0",
"com.typesafe.akka" %% "akka-stream-kafka" % "0.14",
"com.typesafe.akka" %% "akka-slf4j" % "2.4.0"
I used an actor for subscribing the channel, and a second one for manipulating notifications received. First actor implementation is simple:

class PushTopicSubscriber extends Actor {

 implicit val system = ActorSystem()

 override def preStart() = {
   val subscribeTopic ="ContactPushTopic"
   TopicListener.subscribe(subscribeTopic) { source =>
     implicit val materializer = ActorMaterializer()( system )
     while (true) {}
   }
 }

 override def postStop() = {
  //do Something
 }

 def receive = {
   case message =>
    //do Something
 }

}
Method subscribe is responsible with connecting to the server, make the handshake and subscribing to the channel. A new actor will manipulate results received from pushtopic. So, whenever a new notification is received, TopicListener will send this message to the actor SysActor who will update records in my app database. TopicListener implementation looks like:

object TopicListener extends SalesforceAuth {

 val connectionTimeout = 20 * 1000
 val readTimeout = 120 * 1000

 def subscribe[T](topic: String)(f: Source[Message, NotUsed] => T)(implicit actorSystem: ActorSystem): Try[T] = {
   getAuthToken().flatMap { case accessToken =>
     val httpClient = new HttpClient(new SslContextFactory())
     httpClient.setConnectTimeout(connectionTimeout)
     httpClient.setIdleTimeout(readTimeout)
     httpClient.start()

     val transport = new LongPollingTransport(null, httpClient) {
       override def customize(request: Request): Unit = {
         super.customize(request)
         request.header("Authorization", "Bearer " + accessToken.accessToken)
       }
     }

     val url = accessToken.instanceURL + "/cometd/36.0"
     val bayeuxClient = new BayeuxClient(url, transport)
     val tryResult = Try {
       val actorRef = actorSystem.actorOf(Props(new SysActor))
       val actorPublisher = ActorPublisher(actorRef)
       val source = Source.fromPublisher(actorPublisher)

       bayeuxClient.handshake()

       val handshaken = bayeuxClient.waitFor(connectionTimeout, BayeuxClient.State.CONNECTED)
       if (!handshaken) {
         throw new Error("Failed to handshake: " + bayeuxClient.getURL)
       }
       else {
           bayeuxClient.getChannel(s"/topic/$topic").subscribe {
             new MessageListener() {
               override def onMessage(channel: ClientSessionChannel, message: Message) {
                   actorRef ! message
                }
              }
            }
          }

       f(source)
     }
     bayeuxClient.disconnect()
     httpClient.stop()
     tryResult
   }
 }
}
TopicListener extends  a trait call SalesforceAuth, which is responsible for getting the authentication token, in order to connect to the right channel. Function getAuthToken returns a case class:

case class SalesforceAuth (
 exp: DateTime,
 accessToken: String,
 instanceURL: String
)
As you can see, we need only the token and instanceURL. I encountered some issues when I subscribed to “{instanceURL}/cometd/39.0”, that’s why I’ve used version 36.0. All messages, received will be send to the actor  called SysActor.

class SysActor extends ActorPublisher[Message] {
 
 def receive = {
   case message: Message =>
     //do something BIG
   case Cancel =>
     context.stop(self)
 }

}
For SysActor it’s enough to override just receive method. Whenever this actor receives a new message, it will update my app database. This entire implementation can achieve syncing your Salesforce data with your server based application’s database.
Tips
Some tips for better efficiency:
  • You can send a list of pushtopics names, for subscribing to multiple channels.
  • In order to listen continuously for notifications from PushTopic, note that PushTopicSubscriber in method preStart need to have a piece of code like “while(true) {}”. Otherwise when first message will be received, the actor won’t listen any more.
  • You can filter the results from a pushtopic results to match to a criteria.(I.E ContactPushTopic?Email=’myemail@email.com’ ). So instead of sending only the name of your pushtopic to TopicListener.subscribe, send the example String for filtering the results of your PushTopic.
  • You cannot have in your pushTopic query a field from a lookup table(I.E. account.name is now allowed). You can use only the id(I.E.accountid)
Conclusions
Salesforce Streaming API is used by applications that require to keep in sync data changes in an organization and must be considered for apps that poll frequently. It is used to send notification from server to your client application, notification that matches to a particular criteria defined by your expectations. 
By integrating Streaming API into your application will improve the performances of your product and reduce the number of API calls.

Share on:

Want to stay on top of everything?

Get updates on industry developments and the software solutions we can now create for a smooth digital transformation.

* I read and understood the ASSIST Software website's terms of use and privacy policy.

Frequently Asked Questions

1. What is ASSIST Software's development process?  

The Software Development Life Cycle (SDLC) we employ defines the following stages for a software project. Our SDLC phases include planning, requirement gathering, product design, development, testing, deployment, and maintenance.

2. What software development methodology does ASSIST Software use?  

ASSIST Software primarily leverages Agile principles for flexibility and adaptability. This means we break down projects into smaller, manageable sprints, allowing continuous feedback and iteration throughout the development cycle. We also incorporate elements from other methodologies to increase efficiency as needed. For example, we use Scrum for project roles and collaboration, and Kanban boards to see workflow and manage tasks. As per the Waterfall approach, we emphasize precise planning and documentation during the initial stages.

3. I'm considering a custom application. Should I focus on a desktop, mobile or web app?  

We can offer software consultancy services to determine the type of software you need based on your specific requirements. Please explore what type of app development would suit your custom build product.   

  • A web application runs on a web browser and is accessible from any device with an internet connection. (e.g., online store, social media platform)   
  • Mobile app developers design applications mainly for smartphones and tablets, such as games and productivity tools. However, they can be extended to other devices, such as smartwatches.    
  • Desktop applications are installed directly on a computer (e.g., photo editing software, word processors).   
  • Enterprise software manages complex business functions within an organization (e.g., Customer Relationship Management (CRM), Enterprise Resource Planning (ERP)).

4. My software product is complex. Are you familiar with the Scaled Agile methodology?

We have been in the software engineering industry for 30 years. During this time, we have worked on bespoke software that needed creative thinking, innovation, and customized solutions. 

Scaled Agile refers to frameworks and practices that help large organizations adopt Agile methodologies. Traditional Agile is designed for small, self-organizing teams. Scaled Agile addresses the challenges of implementing Agile across multiple teams working on complex projects.  

SAFe provides a structured approach for aligning teams, coordinating work, and delivering value at scale. It focuses on collaboration, communication, and continuous delivery for optimal custom software development services. 

5. How do I choose the best collaboration model with ASSIST Software?  

We offer flexible models. Think about your project and see which models would be right for you.   

  • Dedicated Team: Ideal for complex, long-term projects requiring high continuity and collaboration.   
  • Team Augmentation: Perfect for short-term projects or existing teams needing additional expertise.   
  • Project-Based Model: Best for well-defined projects with clear deliverables and a fixed budget.   

Contact us to discuss the advantages and disadvantages of each model. 

ASSIST Software Team Members

See the past, present and future of tech through the eyes of an experienced Romanian custom software company. The ASSIST Insider newsletter highlights your path to digital transformation.

* I read and understood the ASSIST Software website's terms of use and privacy policy.

Follow us

© 2025 ASSIST Software. All rights reserved. Designed with love.