So, the most recent project I am wrapping up is an IoT project, and we were in need of an MQTT broker.
The first choice, so we could easily package and ship our solution, being we are working in dotnet core, was to grab MQTTnet to quickly build an MQTT server and ship that with our product. MQTTnet is the library to use if you are developing with dotnet or dotnet core, and it is MIT licensed. Christian and Jan are both rock star dudes, and they’ve been instrumental in quickly understanding the MQTT protocol, along with the library itself.
Ok, so the build-out of a super-simple MQTT Server using MQTTnet should have been ~100-150 lines in a console application wtihin dotnet core, and everything’s ace. Here’s the code we came up with:
namespace {Your namespace of choice}
{
public class Startup
{
private static string idSrvPass = "";
private static string idSrvApiName = "";
private static string idSrvAuthority = "";
private static string storageCS = "UseDevelopmentStorage=true";
private static string mongodbCS = "";
private static string mongodbDatabaseName = "";
private static bool _isRestarting = false;
static IConfigurationRoot Configuration { get; set; }
static IServiceProvider ApplicationServices { get; set; }
public Startup(Microsoft.AspNetCore.Hosting.IHostingEnvironment env)
{
var builder = new ConfigurationBuilder()
.SetBasePath(env.ContentRootPath)
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true)
.AddEnvironmentVariables();
Configuration = builder.Build();
MongoDBSettings.Configure();
}
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
var settings = Configuration.AsEnumerable().ToDictionary(kv => kv.Key, kv => kv.Value);
if (!settings.TryGetValue("CS", out storageCS)) { throw new ArgumentNullException(nameof(storageCS)); }
if (!settings.TryGetValue("IDSRV", out idSrvAuthority)) { throw new ArgumentNullException(nameof(idSrvAuthority)); }
if (!settings.TryGetValue("API_IDSRV_PASS", out idSrvPass)) { throw new ArgumentNullException(nameof(idSrvPass)); }
if (!settings.TryGetValue("API_IDSRV_APINAME", out idSrvApiName)) { throw new ArgumentNullException(nameof(idSrvApiName)); }
if (!settings.TryGetValue("MONGO_DB", out mongodbCS)) { throw new ArgumentNullException(nameof(mongodbCS)); }
if (!settings.TryGetValue("MONGO_DATABASE", out mongodbDatabaseName)) { throw new ArgumentNullException(nameof(mongodbDatabaseName)); }
var storage = CloudStorageAccount.Parse(storageCS);
var storageClient = storage.CreateCloudBlobClient();
services.AddSingleton(storageClient);
var keyContainer = storageClient.GetContainerReference("protect-keys");
keyContainer.CreateIfNotExistsAsync().GetAwaiter().GetResult();
services.AddDataProtection().PersistKeysToAzureBlobStorage(keyContainer, "mqtt-keys.xml");
// Mongodb connection.
var mClient = new MongoClient(mongodbCS);
var mDatabase = mClient.GetDatabase(mongodbDatabaseName);
services.AddSingleton(mDatabase);
services.AddMediatR(typeof(Adapt.Hegemon.TMS.Core.Administration.Users.GetUserQueryHandler).GetTypeInfo().Assembly);
services.AddSingleton<IClusterClient>(CreateClusterClient);
services.AddHostedOrleansMqttServer((builder) =>
{
builder
.WithDefaultEndpoint()
.WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(15))
.WithConnectionBacklog(4000)
.WithConnectionValidator((ctx) =>
{
AsyncHelpers.RunSync(async () =>
{
var log = ApplicationServices.GetService<ILogger<IMqttServer>>();
try
{
if (ApplicationServices == null)
{
ctx.ReturnCode = MqttConnectReturnCode.ConnectionRefusedServerUnavailable;
return;
}
log.LogInformation("Client {0} is attempting to connect.", ctx.ClientId);
if (_isRestarting)
{
log.LogWarning("Server is shutting down. Cannot accept new connections.");
ctx.ReturnCode = MqttConnectReturnCode.ConnectionRefusedServerUnavailable;
return;
}
using (var scope = ApplicationServices.CreateScope())
{
var mediator = scope.ServiceProvider.GetService<IMediator>();
var isAuthenticated = await mediator.Send(new AuthenticateTerminalForMqttCommand(ctx.Username, ctx.Password));
log.LogDebug("Authentication was successful? {isAuthenticated}", isAuthenticated);
if (isAuthenticated)
{
log.LogInformation("Client {ClientId} has connected.", ctx.ClientId);
ctx.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
}
else
{
log.LogWarning("Client {ClientId} has failed to authenticate.", ctx.ClientId);
ctx.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}
}
catch (Exception exc)
{
log.LogError(exc, "Authentication has not completed.");
ctx.ReturnCode = MqttConnectReturnCode.ConnectionRefusedServerUnavailable;
}
});
});
})
.AddMqttWebSocketServerAdapter()
.AddMqttConnectionHandler();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, Microsoft.AspNetCore.Hosting.IHostingEnvironment env, ILoggerFactory loggerFactory, IApplicationLifetime applicationLifetime)
{
ApplicationServices = app.ApplicationServices;
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseForwardedHeaders(new ForwardedHeadersOptions
{
ForwardedHeaders = ForwardedHeaders.All
});
app.UseMqttEndpoint("/")
.UseOrleansMqttServer((baseServer) =>
{
var log = ApplicationServices.GetRequiredService<ILogger<IMqttServer>>();
var server = baseServer as OrleansManagedMqttServer;
if (server == null)
{
log.LogWarning("MQTT Server is of type {type} and not OrleansManagedMqttServer.", baseServer.GetType().Name);
return;
}
server.Started += (s, e) => log.LogInformation("MQTT Server started.");
server.ClientConnected += (s, e) => log.LogInformation("{0} has connected.", e.ClientId);
server.ClientDisconnected += (s, e) => log.LogInformation("{0} has disconnected.", e.ClientId);
server.ClientSubscribedTopic += (s, e) => log.LogInformation("{0} has subscribed to {1}.", e.ClientId, e.TopicFilter.ToString());
server.ClientUnsubscribedTopic += (s, e) => log.LogInformation("{0} has unsubscribed from {1}.", e.ClientId, e.TopicFilter);
server.ApplicationMessageReceived += async (s, e) =>
{
log.LogInformation("Received message {message} for client {clientId}", e.ApplicationMessage.Topic, e.ClientId);
if (string.IsNullOrWhiteSpace(e.ClientId)) { return; }
try
{
using (var scope = ApplicationServices.CreateScope())
{
var mediator = scope.ServiceProvider.GetService<IMediator>();
var terminalInfo = await mediator.Send(new FindTerminalCriteria(e.ClientId));
var clusterClient = scope.ServiceProvider.GetService<IClusterClient>();
var terminal = clusterClient.GetGrain<ITerminalGrain>(terminalInfo.GlobalID);
var payload = e.ApplicationMessage.ConvertPayloadToString();
log.LogDebug("Received {@payload} for {@topic}", payload, e.ApplicationMessage.Topic);
switch (e.ApplicationMessage.Topic)
{
// ... code to handle published topics from clients.
}
};
}
catch (Exception exc)
{
log.LogCritical(exc, "Failed completing and event from a terminal.");
}
};
applicationLifetime.ApplicationStopping.Register(async () =>
{
log.LogWarning("Server is shutting down. Setting vars to refuse further connections.");
try
{
_isRestarting = true;
await server.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("reconnect").WithExactlyOnceQoS().Build()).ConfigureAwait(false);
}
catch (Exception exc)
{
log.LogCritical(exc, "Could not send reconnect signal to terminals.");
}
});
});
}
static IClusterClient CreateClusterClient(IServiceProvider serviceProvider) => AsyncHelpers.RunSync(async () =>
{
var log = serviceProvider.GetRequiredService<ILogger<IClusterClient>>();
var client = default(IClusterClient);
client = new ClientBuilder()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = HegemonConstants.Silo.ClusterId;
options.ServiceId = HegemonConstants.Silo.ServiceId;
})
.ConfigureServices((services) =>
{
})
.UseKubeGatewayListProvider()
//.UseMqtt()
.UseMqttWithAzureStorage(storageCS, Enumerable.Range(1, 4).Select(x => $"devices-{x}").ToList())
.ConfigureApplicationParts(parts => parts.AddApplicationPart(typeof(ITerminalGrain).Assembly).WithReferences())
.Build();
await client.Connect(RetryFilter);
return client;
async Task<bool> RetryFilter(Exception exc)
{
log.LogWarning("Exception while attempting to connect to Orleans cluster: {0}", exc);
await Task.Delay(TimeSpan.FromSeconds(2));
return true;
}
});
}
}
Now, with that in-place, this was cool (and notice the config for UseMqttWithAzureStorage line(s) throughout the code)… but we needed a way to provide High Availability for when we have 20k+ devices connecting to our cloud environment for communication. Luckily enough, our platform is built using the Orleans framework. Perusing on the web, we ran across a project to allow communication between the Silo and SignalR clients through Orleans Streams. After pondering this for a few days, we built a streams provider for MQTTnet to allow “load balancing” the same. After much debugging, etc., this portion of the system worked :thumbsup:.
Next step was using the Last Will and Testament (LWT) feature of MQTT so that we could report on the web console whether a device was connected or not, based on how MQTT determines a “connected” client. This is where things went haywire. We tested and tested and tested, and we kept coming up that MQTTnet server libs did not properly issue a LWT message when the client disconnects, and quickly re-connects. In reality, the LWT should not be sent, but the actuality was that it was still being sent.
Ok, so after much grievances, we decided to purge MQTTnet server from our solution and seek an alternative that would be more robust. We looked at mosquitto, a few node.js versions, etc. and then came upon a YouTube video of a Proof-of-Concept deployment of VerneMQ for Bose. The video discussed how the research team was able to get a stable load of five million (5,000,000) mqtt connections on VerneMQ within a cloud cluster. Ding, ding, ding… I think we have a winner folks!
Before completely “drinking the kool-aide”, we read into the documentation on VerneMQ’s site. We had a few wants coming out of this:
- A MQTT broker which can scale
- Ability to perform database authentication
- Something out-of-the-box with minimal work.
VerneMQ’s site seemed to have told us that they’d meet all known requirements as they can scale up/down, use database authentication, and since it’s within a Docker image on docker hub, it should be minimal work to get something live.
Going into this without any knowledge of how VerneMQ works, how to configure it, etc. (the usual way we all work in development), I pulled the VerneMQ docker image, and tried to make it work. No dice out-of-the-box, and no real guidance from the articles located on the VerneMQ site, etc. VerneMQ can be deployed into a kubernetes cluster using a helm script, which is how I chose to use it, but even the docs for the helm script were a bit lacking in nature.
Ok, so I am comfortable in Windows/Linux/MacOS, and I know that VerneMQ is being hosted within Linux, so I can grab a shell from the cluster, pull the vernemq.conf file, edit that, and “mount” the file as part of the pod deployment. Let’s just say, getting this in place was no small feat. You jump through a lot within kubernetest to mount a config file onto the filesystem, but after about three days of experimentation, I was able to achieve that.
Next step was configuring the database connection.. Should be super-simple, right? Wrong. Our database of choice was MongoDB at the time, as this was a NoSQL store, and was offered by MongoDB themselves (the hosted product is called MongoDB Atlas). What’s cool about Atlas is that it, by default, has TLS connections REQUIRED without any additional configuration. So, we grab the connection information such as the server name, username, password, ports, etc. (the usual pieces) and begin configuration. We hit a few roadblocks immediately, as you typically provide a connection string with all the nodes in your cluster, and VerneMQ only accepts a single node. No worries, we provided the master node’s hostname, the username, password, default document store, etc. and thought we were done. Boot VerneMQ and the diversity plugin keeps balking it can’t make a connection. After much trial and error, the final verdict is that the erlang driver for VerneMQ is not configure for TLS connections, and again, we were not going to build a custom anything to support this.
How are we going to solve this issue?
Before we moved on, we kept reading the docs, and two relational databases were available for ACL lists:
- MySQL (MariaDB too)
- PostgreSQL
Between the two, our preference is with PostgreSQL. We checked Azure’s site, and sure enough, PostgreSQL is now a first class citizen, so we are still able to focus on business and not on infrastructure. Good stuff, still all green to go.
Before any additional changes, we did a quick mock-up of the isolated pieces that would need to be adjusted to support Postgres + VerneMQ, which was to create the vmq_auth_acl table in a Postgres database, populate it with a user (and password) and try a connection. VerneMQ has it noted on their docs site that the passwords had to be encoded using BCrypt 2a for use with securing passwords… so how are we going to do this…?
There is a library on nuget.org BCrypt.Net-Core that can be used, and it supports the proper encoding that was required for the passwords. So, we bring this in, mock-up creating a client (with the encoded password) and attempted a connection… Failed! After a little research, we determined that the dotnet BCrypt protocol was not compatible with the postgres and erlang versions of the protocol, but luckily enough, VerneMQ guys had a sample of how to store a user in the vmq_auth_acl table, including encoding the password using postgres’ crypt() function. Here is that sql statement, for reference:
WITH x AS (
SELECT
@mountpoint::text AS mountpoint,
@client::text AS client_id,
@user::text AS username,
@pass::text AS password,
gen_salt('bf')::text AS salt,
'[{""pattern"": ""#""}]'::json AS publish_acl,
'[{""pattern"": ""#""}]'::json AS subscribe_acl
)
INSERT INTO vmq_auth_acl (mountpoint, client_id, username, password, publish_acl, subscribe_acl)
SELECT
x.mountpoint,
x.client_id,
x.username,
crypt(x.password, x.salt),
publish_acl,
subscribe_acl
FROM x;
With that statement in place, we were finally able to create a basic connection from our test client to VerneMQ, which means we can do final code changes to our system and publish. Code changes done, and we were doing a final review of our deployment scripts, and felt uncomfortable with using something in files\ folder of a helm script, as key/value pairs cannot come from values.yml during “installation”, so back to learning more about Helm, VerneMQ, and re-doing our deployment to better support deployment of VerneMQ in a logical way.
Long story short, and after a bit of experimentation, we were able to get the environmetn variables (VerneMQ’s preferred way of adjusting the vernemq.conf file) working.
This long story was to provide the following piece of information, in case someone else runs into this in the future, as the key/value pairs are not completely apparent from docs.
To enable the database authentication, a few things must happen:
- You must turn off anonymous authentication
- You must turn off file-based authentication
- You must activate the diversity plugin
- You must activate the Postgres plugin
- You must provide the connection information for your Postgres database.
All of this made sense, however, the tricky part was figuring out what the environment variables should be withn the system.
After much ado, it was figured out, and we have since stored this for long-term use.
Below is what our configuration became (sans passwords, of course). I am hoping this helps readers to configure their own VerneMQ cluster in the future.
replicaCount: 1
image:
repository: erlio/docker-vernemq
tag: 1.7.1-2-alpine
pullPolicy: IfNotPresent
nameOverride: ""
fullnameOverride: ""
service:
type: ClusterIP
mqtt:
enabled: true
port: 1883
nodePort: 1883
mqtts:
enabled: false
port: 8883
nodePort: 8883
ws:
enabled: false
port: 8080
nodePort: 8080
annotations: {}
labels: {}
resources: {}
nodeSelector: {}
tolerations: []
podAntiAffinity: soft
securityContext:
runAsUser: 10000
runAsGroup: 10000
fsGroup: 10000
rbac:
create: true
serviceAccount:
create: true
persistentVolume:
enabled: false
accessModes:
- ReadWriteOnce
size: 5Gi
annotations: {}
statefulset:
podManagementPolicy: OrderedReady
updateStrategy: RollingUpdate
terminationGracePeriodSeconds: 60
livenessProbe:
initialDelaySeconds: 90
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 3
readinessProbe:
initialDelaySeconds: 90
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 3
podAnnotations: {}
annotations: {}
labels: {}
additionalEnv:
- name: DOCKER_VERNEMQ_ALLOW_ANONYMOUS
value: "off"
- name: DOCKER_VERNEMQ_ALLOW_REGISTER_DURING_NETSPLIT
value: "on"
- name: DOCKER_VERNEMQ_ALLOW_PUBLISH_DURING_NETSPLIT
value: "on"
- name: DOCKER_VERNEMQ_ALLOW_SUBSCRIBE_DURING_NETSPLIT
value: "on"
- name: DOCKER_VERNEMQ_ALLOW_UNSUBSCRIBE_DURING_NETSPLIT
value: "on"
- name: DOCKER_VERNEMQ_plugins.vmq_passwd
value: "off"
- name: DOCKER_VERNEMQ_plugins.vmq_acl
value: "off"
- name: DOCKER_VERNEMQ_plugins.vmq_diversity
value: "on"
- name: DOCKER_VERNEMQ_vmq_diversity.auth_postgres.enabled
value: "on"
- name: DOCKER_VERNEMQ_vmq_diversity.postgres.host
value: "{the database hostname or ip address}"
- name: DOCKER_VERNEMQ_vmq_diversity.postgres.port
value: "{the database connection port}"
- name: DOCKER_VERNEMQ_vmq_diversity.postgres.user
value: "{your user's name}"
- name: DOCKER_VERNEMQ_vmq_diversity.postgres.password
value: "{your user's password}"
- name: DOCKER_VERNEMQ_vmq_diversity.postgres.database
value: "{your database name}"
HTH
Until next time…