blog

JUCE image

An RPC Framework for JUCE

by

Recently, I’ve been working on a project for a long-time client who came to us with a request that we haven’t seen in a while–we were tasked with taking their existing application written in C++ using the JUCE application framework into two separate applications:

  • A Windows Service Application that will start itself automatically at boot time to continually be processing streams of audio and network data using their proprietary hardware
  • A GUI application that connects to the service app doing the processing for monitoring and control.

Since the application was already cleanly written using the Model/View/Controller architecture, what I really wanted to have was a Remote Procedure Call layer that would let me chop the existing Controller object into two pieces:

  • an API half that is called by the UI code exactly as it’s already being called
  • an “engine”/server half that actually houses the logic to perform the work of the application.

…that would be connected together over some sort of an interprocess link, most likely sockets.

In a perfect world (where networks never fail and processes never exit without warning), the code in the GUI app portion of the project would have no reason to know that it was doing anything differently than it had been doing when all of the logic was contained in its process space.

All that I’d need to have to make that work is some magic in the middle that would perform the tasks of

  • Serializing/deserializing function calls across that boundary
  • Handling thread blocking issues sensibly (we want a function call made over the socket connection to behave as if it was a regular function call)
  • The application makes heavy use of JUCE’s ChangeBroadcaster/ChangeListener and ValueTree facilities; those need to be supported sensibly as well.

Fortunately, JUCE comes with a bunch of classes standard that make building that layer of secret sauce in the middle not too much work to get right. It’s easy to think of JUCE as just a framework to build music/audio apps, but there’s really an entire metric ton of useful and well-designed functionality hiding in there.

InterprocessConnection

https://www.juce.com/doc/classInterprocessConnection

The InterprocessConnection class provides a simple read/write interface to send data between processes, using either sockets or named pipes.

InterprocessConnectionServer

https://www.juce.com/doc/classInterprocessConnectionServer

When you’re using sockets for interprocess communications, you need to have code that listens on a socket for incoming connections. When a connection is made from a client, this class creates and returns a new InterprocessConnection object that can be used by the server code.

MemoryBlock

https://www.juce.com/doc/classMemoryBlock

A handy class that provides a sensible interface to a resizable block of raw bytes. Essentially, it’s a very smart void*.

ValueTreeSynchronizer

https://www.juce.com/doc/classValueTreeSynchroniser

https://www.juce.com/doc/classValueTree

A ValueTree is a tree structure that can hold many different types of data inside it on a key/value basis. Working with one should be straightforward to anyone who’s done work with XML files or Python (and conversion to/from XML representation is built into the ValueTree class).

The ValueTreeSynchronizer class is used to create an object that receives callbacks whenever the contents of a ValueTree are changed. When notified of changes, this class builds very compact representations of the delta introduced by the change that can be sent across the InterprocessCommunication channel and applied to a ValueTree inside the client’s process space.

What We Need To Build

At a high-level, the pieces that we need to build to make this work are:

  • Serialization We need to bundle up information on what function we’re trying to call on the server API, its arguments, and when the server replies we need to be able to pull out that function’s return value as well as dealing with anything that needs to be returned by reference or through a pointer.

  • Blocking When we send a block of data over the socket connection, that call returns immediately. We’re trying to emulate calling a function so the behavior of the application remains unchanged. UI code calls an API method and then blocks until that function call is complete (or fails because of a timeout or other exception case).

  • Dispatch The serialized function call packets need to be dispatched to the correct function on the server side, and the return values need to be sent back to the correct blocked call.

  • Controller Classes In this example, there are a pair of matching ServerController and ClientController classes, communicating over our RPC system.

The API that’s supported by our Controller classes is simple, but shows how a more useful API might be implemented:

/**
 * Need to be able to call fn returning void
 */
virtual void VoidFn() = 0;
/**
 * fn returning int
 */
virtual int IntFn(int val) = 0;
/**
 * fn taking/returning strings.
 */
virtual String StringFn(const String& inString) = 0;
/**
 * The Controller has two ValueTree objects; request one by index, and
 * operate on it directly.
 * @param index Index (0/1) of the tree you'd like to work with
 * @return ValueTree object.
 */
ValueTree GetTree(int index);

Another important idiom used in JUCE programming is the use of ChangeBroadcaster and ChangeListener classes. These classes can be used to let Broadcaster objects asynchronously notify interested Listener objects that something about them has changed. Our RPC design also accomodates these messages — in our demo, we have a timer that elapses once per second, sending a change message to the client. Every 15 seconds, the timer also changes some values in a ValueTree that we’re keeping in sync between the two processes.

void ServerController::timerCallback()
{
   ++fTimerCount;
   if (0 == fTimerCount % 15)
   {
      var lastVal = fTree1.getProperty("count");
      int newVal = (int) lastVal + 1;
      ValueTree sub = fTree1.getChildWithName("sub");
      Random r;
      sub.setProperty("text", String("*** ") + String(r.nextInt()) + \
       " ***", nullptr);
      fTree1.setProperty("count", newVal, nullptr);
      fTree1.setProperty("even", (0 == newVal % 2), nullptr);
   }
   // Notify listeners that we've changed.
   this->sendChangeMessage();
}

Shared Code

Before walking through the circuitous path taken by a function call across the process boundary, it’s useful to look at a few classes that are used on both sides of that transaction:

RpcMessage

All procedure calls made between processes use a simple serial data structure to hold data. Each RpcMessage object contains:

  • Message Code (uint32)
  • Message Sequence number (uint32)
  • Data payload (0..n bytes long)

The Message Code is used by the server to dispatch incoming messages to the correct API endpoint, and the sequence number is used on the client-side to reconnect server responses with the originating function call.
Change broadcast messages always use a sequence number of 0, because by definition there’s no pending call that needs to be replied to; change messages are always a surprise to the client.
Internally, the RpcMessage class uses a JUCE MemoryBlock to hold the data, and we add new data to the end of the buffer with a simple AppendData() member function:

/**
 * Add some data bytes to the end of our block.
 * @param data Pointer to the new data
 * @param numBytes Length of data to be added.
 */
void AppendData(const void* data, size_t numBytes);

To avoid dealing with potential errors from using incorrect byte counts, a C++ template function lets us add values of any POD-type easily and correctly:

template <typename T>
void AppendData(T val)
{
   this->AppendData(&val, sizeof(T));
}

(and on the other end of the connection, there’s a matching GetData() member function:

/**
* Return a primitive POD object from the message object,
* by default at the next offset.
*/
template <typename T>
T GetData(size_t offset=kUseNextOffset)
{
   void* p = this->GetDataPointer(offset);
   fNextOffset += sizeof(T);
   return * (static_cast<T*>(p));
}

We can pull out the message code and sequence number with the GetMetadata() member function:

/**
 * Retrieve the message's function code and sequence number, leaving the
 * offset index set to get the first real piece of data in the message.
 * @param code Message function code
 * @param sequence
 * @return true if the code/sequence are valid.
 */
bool GetMetadata(uint32& code, uint32& sequence);

Controller abstract base class

We define an abstract base class Controller that describes the API that’s made available. The actual solution will require separate ClientController and ServerController classes to be derived from this base class. We also define all of the message codes here in the Controller base class:

class Controller : public ChangeBroadcaster
{
public:
   enum FunctionCodes
   {
      /**
       * Codes below 1000 are function calls.
       */
      kVoidFn = 1,
      kIntFn,
      kStringFn,
      /**
       * A range of codes to alter value trees
       */
      kValueTree1SetProp = 1000,
      kValueTree2SetProp,
      /**
       * A range of codes that are only sent from the server
       * to the client as one-way messages.
       */
      kTimerAlert = 10000,
      kValueTree1Update,
      kValueTree2Update
   };
   Controller();
   virtual ~Controller()
   {
   }
   /**
    * Need to be able to call fn returning void
    */
   virtual void VoidFn() = 0;
   /**
    * fn returning int
    */
   virtual int IntFn(int val) = 0;
   /**
    * fn taking/returning strings.
    */
   virtual String StringFn(const String&amp; inString) = 0;
   /**
    * The Controller has two ValueTree objects; request one by index, and
    * operate on it directly.
    * @param index Index (0/1) of the tree you&#039;d like to work with
    * @return ValueTree object.
    */
   ValueTree GetTree(int index);
protected:
   ValueTree fTree1;
   ValueTree fTree2;
};

Walking Through a Function Call

To illustrate, we’ll follow a single function call being made from the client code, down into the server process, and back up to return a value to the original caller. To be interesting but not too interesting, we’ll use the simple Controller::IntFn(), which takes a single int argument and returns an int (in this case, the input value * 2).

Client-side: Call The Function

On the client side of things, we derive a class ClientController from the abstract base Controller class, above.

int ClientController::IntFn(int val)
{
   // #1
   RpcMessage msg(Controller::kIntFn);
   // #2
   RpcMessage response;
   int retval = 0;
   msg.AppendData(val);
   // #3
   if (this->CallFunction(msg, response))
   {
      // ...tune in later for this part.
      retval = response.GetData();
   }
   else
   {
      DBG("ERROR calling IntFn();");
   }
   return retval;
}

The implementation of the IntFn() member needs to do the following things:

  1. Create an instance of the RpcMessage class that will hold the enum code identifying the function being called, a message sequence number (automatically generated by the RpcMessage constructor), and all arguments to the function (in this case, a single int).

  2. Create an empty instance of the RpcMessage class that will be filled with the result of the function call.

  3. Pass those two RpcMessage objects to the ClientController::CallFunction(), which contains the logic to send the function call message to the server, wait for a response, and populate the return RpcMessage object with the result of the function call.

Let’s burrow down a level here, into the ClientController::CallFunction() member:


bool ClientController::CallFunction(RpcMessage& call, RpcMessage& response) { bool retval = false; uint32 messageCode; uint32 sequence; // #1 call.GetMetadata(messageCode, sequence); // #2: Remember which call we're wawiting for. PendingCall pc(sequence); fPending.Append(&pc); // #3 if (fRpc->sendMessage(call.GetMemoryBlock())) { // #4: wait for a response if (pc.Wait(50000)) { // // ...come back later for this part... // if we get here, the pending call object has a memoryBlock // that we can use to populate our response. response.FromMemoryBlock(pc.GetMemoryBlock()); // advance past the header bits... response.GetMetadata(messageCode, sequence); retval = true; } else { DBG("ERROR (timeout?) calling function code = " + String(messageCode)); } } else { DBG("ERROR sending call message."); } // remove the pending call object from the list fPending.Remove(&pc); return retval; }

Here, we:

  1. Pull the message code and sequence number out of the RpcMessage that contains the function call. Each function call made by our system has a non-zero sequence ID that we’ll use later to match the response message to the correct originating call.

  2. Create an instance of the PendingCall class that contains the sequence ID, a JUCE WaitableEvent object, and an empty MemoryBlock object that will eventually be filled with the result of the function call. That PendingCall object is appended to a list of pending function calls. In practice, with a single-threaded client making calls, there will be at most one call waiting for a response, but it made sense to anticipate multi-threaded use cases as well.

  3. Pull out the MemoryBlock that contains the payload we need to send across the socket, and pass that to our instance of an RpcClient object (a relatively thin wrapper of the JUCE InterprocessConnection class).

  4. Call the PendingCall::Wait() member function to make this thread block until we either receive a response from the server or timeout with no response.

Server-side: Calling the Actual Function

At this point, the calling thread is completely blocked, and the action moves over to the server side, where he have an instance of our RpcServerConnection class (another wrapper of the JUCE InterprocessConnection class) ready to handle incoming traffic over the socket connection.


void RpcServerConnection::messageReceived(const MemoryBlock& message) { // a received message from a client needs to be decoded and // converted into a function call that results in us sending // a message back over this connection. // #1 RpcMessage ipcMessage(message); // #2 uint32 messageCode; uint32 sequence; ipcMessage.GetMetadata(messageCode, sequence); // #3 RpcMessage response(messageCode, sequence); switch (messageCode) { case Controller::kIntFn: { // #4 int arg = ipcMessage.GetData(); int retval = fController->IntFn(arg); response.AppendData(retval); } break; // additional cases deleted for clarity... default: { DBG("Received unknown message code" + String(messageCode)); } break; } // #5 this->SendRpcMessage(response); // ...more code deleted for clarity }
  1. We create an RpcMessage object, intializing it with the contents of the message we just received.

  2. We pull out the message code and sequence ID so we can build…

  3. A response RpcMessage, initialized with the matching message code and sequence.

  4. A handler for the IntFn function code pulls out the int argument, calls the ServerController::IntFn() member function, and appends its return value to the response RpcMessage object.

  5. We send the response message back to the client side.

Client-side: Handle the Response

Back on the client-side, a separate thread receives the reply message from the server, and our RpcClient::messageReceived() function passes the response message to the ClientController::HandleReceivedMessage() function:

void ClientController::HandleReceivedMessage(const MemoryBlock& message)
{
   // #1
   RpcMessage ipc(message);
   uint32 code;
   uint32 sequence;
   ipc.GetMetadata(code, sequence);
   // #2
   if (sequence != 0)
   {
      // #3
      PendingCall* pc = fPending.FindCallBySequence(sequence);
      if (pc)
      {
         // #4
         pc->SetMemoryBlock(message);
         // #5
         pc->Signal();
      }
      else
      {
         DBG("ERROR: Unexpected reply to call sequence #" +
          String(sequence));
      }
   }
   else
   {
      // Unexpected message -- ChangeBroadcast or a ValueTree
      // sync message.
      // (deleted for clarity, see below for more)
   }
}
  1. As before — create an RpcMessage object with the message payload, and extract the function code and sequence number.

  2. We check for a non-zero sequence, which identifies this message as a response to a function call that we made.

  3. We look through the list of PendingCall objects for the one associated with this sequence (if there’s no match in the list, it’s an error case…).

  4. The message payload is copied into the PendingCall object.

  5. Finally, we signal the blocked thread that issued the function call, waking it up.

Back in that original thread:


int ClientController::IntFn(int val) { RpcMessage msg(Controller::kIntFn); RpcMessage response; int retval = 0; msg.AppendData(val); if (this->CallFunction(msg, response)) { // #1 retval = response.GetData(); } else { DBG("ERROR calling IntFn();"); } // #2 return retval; }
  1. We wake up by returning from the call to ClientController::CallFunction() that was blocked, and in the good case that call returns true, indicating a successful call. We extract the return value of the function call from the response message, and…

  2. …return that value to whoever called this function in the first place.

Responding to a ChangeBroadcast Message

The ChangeBroadcaster/ChangeListener set of base classes in JUCE is a great and widely used mechanism for sending asynchronous contentless change messages inside a process. To add this functionality in our RPC framework, all that we need to do is:

Server-side

All of the socket communications are being handled by the RpcServerConnection class, which is derived from the JUCE base classes InterprocessConnection and ChangeListener:


class RpcServerConnection : public InterprocessConnection , public ChangeListener { public: RpcServerConnection(ServerController* controller); // remaining code...

In the constructor code, we immediately subscribe to the Controller object’s change updates ((#1, below:


RpcServerConnection::RpcServerConnection(ServerController* controller) : InterprocessConnection(false, 0xf2b49e2c) , fController(controller) , fConnected(RpcServerConnection::kConnecting) { DBG("RpcServerConnection created." ); MessageManagerLock mmLock; // #1 fController->addChangeListener(this); }

When the ServerController object updates its listeners, we execute this code:


void RpcServerConnection::changeListenerCallback(ChangeBroadcaster* source) { if (RpcServerConnection::kConnected == fConnected) { // #1 RpcMessage notify(Controller::kTimerAlert, 0); // #2 this->SendRpcMessage(notify); } }

In this case, the only change that we’ll be getting notifications for is a 1 Hertz timer tick.

  1. We create an empty RpcMessage object that has the message code for this timer update, and a sequence number of zero (which will tell the client code that there’s no need to look for an entry in the PendingCallList).

  2. We send the message to our client.

Client-side

On the client side, things are simple:


void ClientController::HandleReceivedMessage(const MemoryBlock& message) { RpcMessage ipc(message); uint32 code; uint32 sequence; ipc.GetMetadata(code, sequence); if (sequence != 0) { // discussed above -- find the pending call & dispatch } else { // this is a change notification and we're not expecting it. DBG("Change notification, code " + String(code)); // #1 this->sendChangeMessage(); } }
  1. In the ClientController (which is also derived from ChangeBroadcaster), we just call sendChangeMessage() to alert any listeners who have subscribed.

The Demo App

IpcTest_and_IpcTest
The demo app can run as either a server or client, selected at runtime by clicking on one of a pair of buttons.
The server exposes this functionality:

  • VoidFn() — does nothing (except print a debug message), returns nothing.
  • IntFn() — accepts an int, returns an int that’s its argument * 2.
  • StringFn() — Accepts a string, returns a string; its argument is printed as a debug message, and the current time is returned to the caller. This function is called by the client every time the server sends a change notification.
  • Every 15 seconds, a ValueTree in the server is updated, causing a matching ValueTree in the client to be updated in sync, and the updated ValueTree is displayed as XML.

What’s Missing

The demo code that’s on Github also includes facilities to keep JUCE ValueTree objects in the client synchronized with matching ValueTrees in the server, and an API to push new values down into the server’s ValueTrees; discussion of those has been left out of this post, and the design and use of these should be fairly obvious from reading the source.
Since this is primarily intended as a demo or proof of concept, not a drop-in solution, there are some obvious things that need to be addressed before using this in any kind of a production environment:

  • Authentication — in this version, if your client app can connect to the server, it has full access to it.
  • Permissions — if you add authentication and accounts, the next obvious use case would be to restrict some parts of the server app’s functionality to admin or superusers.
  • Better Error Reporting/Handling — more than a little spartan in this demo. In particular, we don’t have a clean way to indicate the outright failure of an attempt to make a call over this connection; I’d expect a production solution to this to define exceptions that can be thrown in this event.
  • Endian-ness — in the case I developed this for, the client and server will always be running as separate processes on the same physical host, so endianness is not an issue.

…and there are some additions that would improve things:

  • IDL — Manually writing the serialization/de-serialization code to pack function calls, arguments, and return values for transmission is tedious and probably error-prone. Developing a little utility to let us define the way things are packed and generate the code for this automatically would be a great advance.

In the grand tradition of computer science, these topics are left as exercises for the reader.

Get The Code

You can get the code for this project at
https://github.com/bgporter/JuceRpc
Hope that you find it interesting and possibly useful.

Related

Check out the series of posts I wrote about developing audio applications using JUCE. They could stand some updating to reflect the changes in JUCE 4, but are still a pretty solid overview of the big pieces that you need to understand. Start here.

+ more

Accurate Timing

Accurate Timing

In many tasks we need to do something at given intervals of time. The most obvious ways may not give you the best results. Time? Meh. The most basic tasks that don't have what you might call CPU-scale time requirements can be handled with the usual language and...

read more