←Back

Production Ready Prebuilt Websocket Server – Centrifugo

As mentioned in my previous blog, I am working on a mobile app that is essentially a social media application. As many of you know when building a social media application it requires live data, like updating a post’s like/comment count or more importantly, messaging people.

65% of DIY websocket servers experience significant downtime in the last 12-18 months

10.2 person-months is the average time to build basic WebSocket infrastructure, with limited scalability, in-house.

Half of all self-built WebSocket solutions require $100K-$200K a year in upkeep.

Ably

Keeping in mind the amount of tinkering, growth pains and infrastructure to deploy from scratch (or using a framework like socket.io, which still requires you to build the actual server and infrastructure) I opted for a pre-built, production ready open source project to handle all of this. Entering, Centrifugo; a real-time flexible, scalable and maintainable websocket server for your production environment.

Requirements

Before we jump into comparing and contrasting alternatives and implementation let’s outline what we are looking for in a websocket framework/ system.

  1. Easily scalable
  2. Quickly deployable
  3. Flexibility
  4. Avoid vendor lock-in
  5. Cheap
  6. Hardware conscious
  7. Preferably prebuilt

Considerations – Compare and Contrast

Socket.io

Coming from a javascript background, it was very tempting to utilize the socket.io framework on top of an express server which would give me complete flexibility and customization. Since this is just a framework it would require me to build everything including authentication, maintaining connections (and removing stale connections), creating a client-to-socket.io connection, a server-to-socket.io connection and more. This would truly be a large undertaking especially when considering how to scale this system assuming the concurrent connections surpass the server’s hard/ soft limits.

Stating the elephant in the room, socket.io is most commonly used in a javascript ecosystem (There are exceptions and ports to be used in other languages like golang and rust) which is great given my experience, but also requires more memory and processing for the same performance as a server written in a lower level languages like rust, C++, golang, etc. Even on a deployment with sufficient memory or processing, javascript’s garbage collection engine makes determining a hard limit of concurrent connections difficult which may result in out-of-memory exceptions thus resulting in dropped connections or unreliable connections.

Pusher, Ably, Pubnub

Did I mention I’ve used pusher before? Pusher has been an amazing product to use and has always been reliable in my experience. So, whats the draw back? Its expensive. My application has created quite the social media following with over 50,000 followers on all of the social media accounts and over 1,800 pre-orders. At the very minimum, I’d need the “pro” plan on pusher which is $100 a month. Ably’s pricing is a bit better with an estimated spend of $50-75 a month just to handle the pre-orders which would drastically increase our bootstrap/shoe-string budget.

The draw to use services like this is it already is set up to scale with you (assuming you continue to pay for more and more for more concurrent connections/ channels). Each of these products also offers their own SDK which is very useful but also backs us into a corner relying on a single vendor aka “vendor lock-in”.

Vendor Lock-in

Pusher’s original package pusher-js deprecated react native in favor of a new package just for react native. This would be good news especially because pusher has been a pleasure to work with in the react ecosystem, but upon testing this package in an expo + react native project it does not build with Expo EAS and when it does work, it will drop connections every 5 seconds. More information on this package in the next section; Soketi – a “pusher drop-in replacement”

Soketi

Soketi is a product i want to love. This was actually the first production ready, open source websocket service I tried; all the articles, youtube tutorials, and repositories raved about soketi. But none of these articals and tutorials used expo and react native. It seems that Soketi is pretty popular in the laravel/ PHP ecosystem.

Soketi is a pusher protocol drop-in replacement. It’s an open source project that can easily scale using redis and upon testing in next.js it worked beautifully!

This is where the good news ends though. As mentioned, pusher deprecated their original package for their own react native standalone package which does not support passing custom hosts and ports. The original package, pusher-js is javascript based and builds with Expo EAS easily and reliably but unfortunately their new react native package not only does not build with Expo EAS but also does not support different hosts or ports. I even went as far as forking the package and adding the functionality to add custom hosts and ports, but now we come full circle and face issues with building the application through EAS while maintaining reliable connections.

Overall I would recommend Soketi to anyone building web-apps using frameworks like next.js, nuxt, svelte, react, etc. If Soketi were to have their own package that plays nice with Expo I would love to revisit this in the future.

Centrifugo

How does centrifugo compare to the others? It took hours of digging to even find Centrifugo but I am glad I found it. For local development it’s as simple as creating a config.json and a 7 line docker-compose.yml file. For production environments Centrifugo offers a number of ways to deploy, one of which is using docker or kubernetes.

Centrifugo uses the conventional websocket protocol (bidirectional and unidirectional) which avoids any kind of vendor lock-in. Centrifugo does have a javascript based package that does actually build with Expo EAS. If using the regular websocket protocol is not suitable for you, it also offers sockjs, RPC using protobufs and HTTP long polling.

Scaling

Credit: https://centrifugal.dev

Disclaimer: I personally have not tested scalability across multiple nodes in a production environment but plan to write a follow up blog post when we start needing to use multiple nodes.

According to Centrifugo’s documentation, deploying with Redis is only a 3 line change in the config and Centrifugo handles everything else. Assuming you already have a redis database deployed. I recommend reading Centrifugo’s Blog post on having 1,000,000+ connections, it’s not react native based but gives you great context as to what Centrifugo’s is capable of.

Implementation

Below’s implementation is not going to be taking the conventional chat room approach that you may have seen frequently in other tutorials. We are going to be focus on receiving live information for a social media post.

Relevant Stack

  • Postgres
  • Express
  • React Native
  • Centrifugo

Setting Up Centrifugo

We are going to be hosting Centrifugo in local docker container using a docker-compose.yml file and configuring Centrifugo using a config.json

Dockerfile
version: '3.8'

services:
  centrifugo:
    container_name: centrifugo
    image: centrifugo/centrifugo:v5
    volumes:
      - ./centrifugo/config.json:/centrifugo/config.json
    command: centrifugo -c config.json
    ports:
      - 8000:8000
    ulimits:
      nofile:
        soft: 65535
        hard: 65535

Before we can create a configuration file we need to generate a boilerplate config.json file.

Bash
docker compose up -d
docker exec -it centrifugo sh
centrifugo genconfig
cat config.json #Copy this file's content

Create a new directory in the root of your repository called “centrifugo” and create a config.json. then paste in the contents of the config.json file that was generated in the docker container.

JSON
{
  "token_hmac_secret_key": "",
  "admin_password": "",
  "admin_secret": "",
  "api_key": "",
  "admin": true,
  "log_level": "debug",
  "client_channel_limit": 10,
  "client_user_connection_limit": 15,
  "allow_anonymous_connect_without_token": false,
  "disallow_anonymous_connection_tokens": true,
  "allowed_origins": [
    "*"
  ],
  "namespaces": [
    {
      "name": "post"
    },{
      "name": "personal"
    }
  ]
}

We then want to add a couple more options.

Admin – We want to enable to admin view while in development so we can debug and interact directly with the server.

log_level – for debugging we want to log to the console. This will help when determining why clients are disconnecting or maybe not connecting at all.

client_channel_limit – Sets the maximum number of different channel subscriptions a single client can have.

client_user_connection_limit – This is the limit of channels a client can connect to per node. if you have 5 nodes theoretically the maximum number of channels a client can listen to is 50 unique channels.

allow_anonymous_connect_without_token – We want to disallow users connecting to the websocket server without a signed non-expired JWT Authentication token.

disallow_anonymous_connection_tokens – We also do not want to allow users to connect that we do not know their identity. A signed JWT token that we generate will always contain the user’s UUID, we do not want to allow users to connect with a JWT token but does not contain a user UUID.

allowed_origins – Since we are building a mobile application they do not have origins which means we need to allow all origins, hence why we are strict with authentication and user identity

namespaces – This is where we define channels that clients can listen to, this list varies from application to application so chances are your list will be different than mine.

Now we can restart the docker container and it will find the config.json file and reconfigure itself!

Disclaimer: This is for a local development build – at scale we are going to want to define a way to increase the number of nodes which this current configuration does not support. I recommend reading these resources by centrifugo’s team: Engines, Scaling websocket in go and beyond

Setting up Express

We have 2 objectives in express to integrate websockets into our stack:

  1. Create a JWT Endpoint to allow the client to authenticate against centrifugo
  2. Broadcast a message to a post channel when a user likes or comments on the post.
Signing a JWT Token

Within the centrifugo/config.json we have a token_hmac_secret_key field that we need to copy into our env variables

Bash
WS_SERVER_SECRET=1hd86856-2df2-a7d0-9f49-jst5dd9e2ia71

With our new env variable we can create a helper class to generate and sign our tokens.

TypeScript
import axios, { Axios } from 'axios';
import jwt, { TAlgorithm } from 'jwt-simple';

/**
 * @description Websocket token class utilized for generating tokens for websocket server
 */
class WebsocketTokenServicesClass {
  private websocketJWTAlgorithm: TAlgorithm = 'HS256';
  private tokenExpirationMinutes: number = 2;
  private serverSecret: string = process.env.WS_SERVER_SECRET as string;

  /**
   * @description publish a payload to a single user channel
   * @returns {number} unix timestamp
   * @link https://centrifugal.dev/docs/server/authentication#connection-expiration
   */
  private getTokenExpiration = (): number => {
    return Math.floor(Date.now() / 1000) + 60 * this.tokenExpirationMinutes;
  };

  /**
   * @description generate a websocket token for a user to connect to web socket server
   * @param {String} userId Authenticated user id
   * @returns {Promise<String>} JWT token
   * @link https://centrifugal.dev/docs/server/authentication#simplest-token
   */
  generateWebsocketConnectionToken = async (userId: string): Promise<string> => {
    return jwt.encode(
      {
        sub: userId,
        exp: this.getTokenExpiration(),
      },
      this.serverSecret,
      this.websocketJWTAlgorithm,
    );
  };

  /**
   * @description generate a websocket token for a user to subscribe to a private channel
   * @param {String} userId Authenticated user id
   * @param {String} channel Channel to subscribe to
   * @returns {Promise<String>} JWT token
   * @link https://centrifugal.dev/docs/server/authentication#simplest-token
   */
  generateWebsocketSubscriptionToken = async (userId: string, channel: string): Promise<string> => {
    return jwt.encode(
      {
        sub: userId,
        channel,
        exp: this.getTokenExpiration(),
      },
      this.serverSecret,
      this.websocketJWTAlgorithm,
    );
  };
}

export const WebsocketTokenService = new WebsocketTokenServicesClass();

In order for a client to connect to our websocket server we need two tokens:

  1. A token to authenticate the client’s connection to the websocket server
  2. A token to authenticate the client’s subscription to a channel in the websocket server once a connection has been established

Assuming you already have an express server with authentication middleware or other authentication methods, within the express server we now need to make two new endpoints.

TypeScript
app.get('/api/connectionToken', (req, res) => {
  return res.json({
    token: WebsocketTokenService.generateWebsocketConnectionToken(authedUser.userId),
  });
});

app.get('/api/subscriptionToken', (req, res) => {
  const channel = req.query.channel as string;
  return res.json({
    token: WebsocketTokenService.generateWebsocketSubscriptionToken(authedUser.userId, channel),
  });
});
Publishing to Centrifugo

Now that we have a class that is able to generate JWT tokens for a client to connect to Centrifugo, we need to actually publish to Centrifugo channels.

Before we can publish to Centrifugo we need to add a second env variable. In your centrifugo/config.json there is an api_key field that we need to include with the other env variable we created previously

Bash
WS_API_KEY=67fe1e56-765f-jdf8-jd72-jdf7a8e0a9k7

Within your express server create a new class for handling event publications for specific events.

TypeScript

/**
 * @description Websocket action class utilized for sending messages to websocket server
 */
class WebsocketActionClass {
  private apiKey: string = `${process.env.WS_API_KEY}`;
  private basePath: string = `${process.env.WS_SERVER_PATH}:${process.env.WS_SERVER_PORT}`;
  private apiPath: string = this.basePath + '/api';

  /**
   * @description publish a payload to a single channel
   * @param channel {String} channel name
   * @param payload {Object} payload to send
   * @returns {Promise} axios promise
   * @link https://centrifugal.dev/docs/server/server_api#publish
   */
  private publishMessage = async (channel: string, payload: any): Promise<any> => {
    if (!this.apiKey || this.apiKey === undefined) throw new Error('No API key found');
    if (!this.apiPath || this.apiPath === undefined) throw new Error('No API path found');
    return axios.post(
      this.apiPath + '/publish',
      {
        channel: channel,
        data: payload,
      },
      {
        headers: {
          'X-API-Key': this.apiKey,
        },
      },
    );
  };

  /**
   * @description publish to websocket server that a user has liked a post
   * @param postId {String} post id
   * @param username {String} user id
   * @param action {String} action to perform LIKE | UNLIKE
   * @returns {Promise} axios promise
   */
  publishLikePost = async (
    postId: string,
    userId: string,
    action: 'LIKE' | 'UNLIKE',
  ): Promise<any> => {
    return this.publishMessage(`post:${postId}`, {
      [action?.toLocaleLowerCase()]: {
        userId,
      },
    });
  };

  /**
   * @description publish to websocket server that a user has commented on a post
   * @param postId {String} post id
   * @param userId {String} user id of the user who commented
   * @returns {Promise} axios promise
   */
  publishCommentPost = async (postId: string, userId: string): Promise<any> => {
    return this.publishMessage(`post:${postId}`, {
      comment: {
        userId,
      },
    });
  };
}

export const WebsocketActionService = new WebsocketActionClass();

Finally within your function that likes posts or comments on posts you need to call their respected functions

TypeScript
import { WebsocketActionService } from '@utils/websocket';

WebsocketActionService.publishLikePost(postId, authed_user_id, LikeAction.UNLIKE);

And you’re done!

Setting up React Native Using Expo

My first reaction when understanding the connection and subscription logic was to create a provider and context that maintains connection in react native. Keeping in mind resources used while maintaining a concurrent connections on the server side I opted to wrapping everything in a custom hook to handle connecting and subscribing that way a client only connects + subscribes when a page/ component is rendered, this approach will dramatically reduce the number of concurrent connections along with the length of the connection and subscription. This is why our JWT token expiration is only 3 minutes.

First we want to actually install Centrifugo’s javascript package

Bash
npx expo install centrifuge
# or
npm install centrifuge
# or
yarn add centrifuge

This package is nothing proprietary so if you want to use your own preferred websocket package for react native you are able to, just keep in mind you will need to implement your own JWT token refreshing logic for when tokens expire.

Time to make your own custom hook! You can take bits and piece of this

TypeScript
import {
  PublicationContext,
  SubscribedContext,
  UnsubscribedContext,
  SubscriptionStateContext,
  ErrorContext,
  LeaveContext,
  SubscribingContext,
  JoinContext,
  SubscriptionState,
  Centrifuge,
  StateContext,
  State,
} from 'centrifuge';
import { useEffect, useState } from 'react';
import { usePathname } from 'expo-router';
import useAppState from './useAppState';
import Constants from 'expo-constants';

interface useWebsocketProps {
  channel: string;
  onPublication?: (message: PublicationContext) => void;
  onState?: (state: SubscriptionStateContext) => void;
  onError?: (error: ErrorContext) => void;
  onSubscribed?: (message: SubscribedContext) => void;
  onUnsubscribed?: (message: UnsubscribedContext) => void;
  onSubscribing?: (message: SubscribingContext) => void;
  onJoin?: (message: JoinContext) => void;
  onLeave?: (message: LeaveContext) => void;
}

/**
 * @description hook to handle websocket subscription based on channel. Assumes that the channel being subscribed to is a protected channel
 * @link https://centrifugal.dev
 */
const useWebsocket = ({ channel }: useWebsocketProps) => {
  const pathname = usePathname();
  const [connectionError, setConnectionError] = useState<ErrorContext | null>(null);
  const [subscriptionError, setSubscriptionError] = useState<ErrorContext | null>(null);
  const [websocketState, setWebsocketState] = useState<StateContext | null>(null);

  const { appStateVisible } = useAppState();
  const isAppActive = appStateVisible === 'active';

  const WS_PATH = `${Constants.expoConfig.extra.WS_BASE_PATH}:${Constants.expoConfig.extra.WS_PORT}/connection/websocket`;

  /**
   * @description fetches websocket connection token from graphql endpoint
   * @returns {Promise<string>} websocket connection token
   */
  const refetchConnectionToken = async (): Promise<string> => {
    // Handle any authentication logic, cookie, header, api key etc...
    const results: any = await fetch(`${Constants.expoConfig.extra.BACKEND_ROUTE}:${Constants.expoConfig.extra.BACKEND_PORT}/api/connectionToken`);

    const response = await results.json()
    return response.token;
  };

  /**
   * @description fetches websocket subscription token
   * @param channel {String} channel to subscribe to
   * @returns {Promise<string>} websocket subscription token
   */
  const refetchSubscriptionToken = async (channel: string): Promise<string> => {
    // Handle any authentication logic, cookie, header, api key etc...
    const results: any = fetch(`${Constants.expoConfig.extra.BACKEND_ROUTE}:${Constants.expoConfig.extra.BACKEND_PORT}/api/subscriptionToken`,
      {
        body: {
          channel
        }
      }
    );

    const response = await results.json()
    return response.token;
  };
  
  
  /**
   * @description initializes Centrifuge when channel is defined
   */
  useEffect(() => {
    // do not subscribe if on auth screen
    if (!pathname || pathname?.includes('/auth') || pathname === '/') return;

    if (!channel) return;

    const centrifuge = new Centrifuge(WS_PATH, {
      debug: true,
      getToken: refetchConnectionToken,
    });

    let sub = centrifuge.getSubscription(channel);
    // if subscription exists, re-subscribe with it
    if (sub) {
      // if subscription is unsubscribed, subscribe
      if (sub.state === SubscriptionState.Unsubscribed) {
        sub.subscribe();
      }
    } else if (!sub) {
      sub = centrifuge.newSubscription(channel, {
        getToken: () => refetchSubscriptionToken(channel),
      });
    }

    centrifuge.on('error', (err: ErrorContext) => {
      setConnectionError(err);
    });

    // set up listeners for state change
    centrifuge.on('state', (message: StateContext) => {
      setWebsocketState(message);
    });

    // set up listeners
    sub.on('state', (message: SubscriptionStateContext) => {
      setSubscriptionState(message);
    });

    sub.on('publication', (message: PublicationContext) => {
      setPublication(message);
    });

    sub.on('error', (error: ErrorContext) => {
      setSubscriptionError(error);
    });

    sub.on('join', (message: JoinContext) => {
      setJoin(message);
    });

    sub.on('leave', (message: LeaveContext) => {
      setLeave(message);
    });

    // only connect to a client that is disconnected and app is active
    if (centrifuge.state === State.Disconnected && isAppActive) {
      centrifuge.connect();
    }

    // only subscribe if subscription is not subscribed and app is active
    if (sub.state === SubscriptionState.Unsubscribed) {
      if (isAppActive) {
        sub.subscribe();
      }
    }

    // if the app is in the background or inactive, disconnect the websocket
    if (!isAppActive) {
      centrifuge.disconnect();
      centrifuge.removeAllListeners();

      sub.unsubscribe();
      sub.removeAllListeners();
    }

    return () => {
      if (centrifuge) {
        centrifuge.disconnect();
        centrifuge.removeAllListeners();
      }
      if (sub) {
        sub.unsubscribe();
        sub.removeAllListeners();
      }
    };
  }, [channel, pathname, appStateVisible]);

  return { 
    connectionError, 
    subscriptionError, 
    websocketState, 
    subscriptionState, 
    publication, 
    join, 
    leave 
  };
};

export default useWebsocket;

This hook is far from a minimum example, it is however an example of a working hook in an Expo EAS Build that is flexible, modular, and understandable.

Notice how we remove listeners, disconnect, and unsubscribe when the app is the background while it also re-connects and re-subscribes when the app is brought back into the foreground. This is because we do not want to maintain a connection to the websocket server when the app is in the background. If you are like me, I never close any of my background apps.

To use this hook in your pages or components its as simple as:

TypeScript

  const { publication } = useWebsocket({ channel: `post:${postId}` });
  

  useEffect(() => {
    if (publication) {
      handlePublication(publication);
    }
  }, [publication]);

Last step, We need to add the environment variables and configure EAS.

Bash
EXPO_PUBLIC_WS_BASE_PATH=ws://192.168.1.42
EXPO_PUBLIC_WS_PORT=8000
BACKEND_ROUTE=http://192.168.1.42
BACKEND_PORT=3000

Within your app.config.ts file you need to pass in the

TypeScript
 extra: {
  WS_BASE_PATH: process.env.EXPO_PUBLIC_WS_BASE_PATH,
  WS_PORT: process.env.EXPO_PUBLIC_WS_PORT,
  BACKEND_ROUTE: process.env.BACKEND_ROUTE,
  BACKEND_PORT: process.env.BACKEND_PORT
},

Now you’re done! You can build your app normally through EAS.

Leave a Reply

Your email address will not be published. Required fields are marked *