more G-Labs products

Author Topic: MQTT example for subscribe and receiving messages  (Read 1034 times)

September 29, 2016, 10:38:28 AM
Read 1034 times

wolfgang928

  • *
  • Information
  • Newbie
  • Posts: 10
Hello,

i'm just playing around to add a MQTT publish and subscribe mechanism to my existing Rfm69 HGX program.
The aim should be to send a publish MQTT when i switch on a module with the widget and..
also get an updated slider if someone sends an MQTT publish from a i.e. Android MQTT dashboard.


I think that the pure 'publish' is easy to do, but i have problems to understand how to integrate the 'receiving' of  subscribed messages.

Inside the program 'MQTT Network' there is a part ModuleParameterChanged, which is easy to use and works great.

Code: [Select]
// We want to do further processing whenever a module changes
When.ModuleParameterChanged( (module, property) => {
  if (module.HasFeature("MQTT.SensorPublish") && (property.Name.StartsWith("Sensor.") || property.Name.StartsWith("Status.") || property.Name.StartsWith("Meter.")) && !module.Instance.Domain.StartsWith("MQTT:"))
  {
    try{
      MqttClient.Publish(clientid + "/" + module.Instance.Domain + "/" + module.Instance.Address + "/event", Newtonsoft.Json.JsonConvert.SerializeObject(property));
    } catch  (Exception e){
      Program.Notify("MQTT ERROR!", e.Message);
      MqttIsConnected = false;
    }
  }
  // returning true, will route event to other listeners
  return true;
});

 
But it took a while to understand this lambda delegate callback thing MqttClient.Subscribe(string topic, Action<string,string> callback), but at the end i found the proper way
 
Code: [Select]

Action<string,string> ProcessReturns = (string topic, string msg) => {
  Log("topic = " + topic + "; msg = " + msg);
};

//
// Subscribe for MQTT messages
//
var mqtt = new MqttClientHelper();
var MqttIsConnected = false;
mqtt.WithCredentials(username, password);
mqtt.Service(server).Connect(port, clientid);
MqttIsConnected = true;

if (MqttIsConnected) {
// register to receive all messages from defined topic
// i.e. topic = "Rfm/Switches"
mqtt.Subscribe(topic, ProcessReturns);       

Inside the Action<string,string> ProcessReturns = (string topic, string msg) => {
  Log("topic = " + topic + "; msg = " + msg);
};

you could act to switch modules on or off
That's all to establish a MQTT Subscribe routine for your program.

To detect when a connection is closed i.e. by broker, i have added an overload method in MqttHelper class
Code: [Select]
public MqttClientHelper Connect(int port, string clientId, Action callback) which uses a callback to react on this event, like reconnect again.
(Currently this is only available on homegenie github package, not in the r525 release)

But keep in mind that after each connect you have to subscribe again, because any subscription is removed by disconnect.

Best regards Wolfgang
« Last Edit: October 13, 2016, 07:06:04 PM by wolfgang928 »

January 23, 2017, 11:28:54 PM
Reply #1

djatie

  • **
  • Information
  • Jr. Member
  • Posts: 34
I have edit some script on switch (type) with my own code
Adding new control case
Code: [Select]

string server = Program.Option("ServerAddress").Value.Trim();
int port = 1883; int.TryParse(Program.Option("ServerPort").Value, out port);
string topic = Program.Option("ServerTopic").Value.Trim();
string clientid = Program.Option("ClientId").Value.Trim();
string username = Program.Option("Username").Value.Trim();
string password = Program.Option("Password").Value.Trim();

var MqttIsConnected = false;

if (server == "")
{
  Program.Notify("MQTT", "Please configure server address");
  Pause(5);
  return;
}
else
{
  Program.Notify("MQTT", "Connecting to " + server + "...");
  try
  {
    if (username != "")
    {
      MqttClient.WithCredentials(username, password);
    }
    MqttClient
      .Service(server)
      .Connect(port, clientid);
    Program.Notify("MQTT", "Connected!");
    MqttIsConnected = true;
  }
  catch (Exception e)
  {
    Program.Notify("MQTT", e.Message);
    Pause(5);
    return;
  }
}

MqttClient.Subscribe(topic, (mtopic, mpayload) => {
  var parts = mtopic.Split('/');
  var cid = parts[0];
  //
  if (parts.Length == 4)
  {
    try
    {
      var domain = parts[1];
      var address = parts[2];
      var type = parts[3];

      var module = Modules.InDomain("MQTT:" + cid + "." + domain).WithAddress(address).Get();

      switch (type)
      {
        case "command":
        if (domain == "MQTT.Listeners" && address == clientid)
        {
          var cmdobj = Newtonsoft.Json.JsonConvert.DeserializeObject<dynamic>(mpayload);
          var targetmodule = Modules.InDomain(cmdobj.Domain.ToString()).WithAddress(cmdobj.Address.ToString());
          if (targetmodule.Get().Instance != null)
          {
            string command = cmdobj.Command.ToString();
            switch (command)
            {
              case "Module.Describe":
              var modinstance = targetmodule.Get().Instance;
              MqttClient.Publish(clientid + "/" + modinstance.Domain + "/" + modinstance.Address + "/description", Utility.Module2Json(modinstance, false));
              Program.Notify("Module.Describe" + clientid + "/" + modinstance.Domain + "/" + modinstance.Address + "/description", Utility.Module2Json(modinstance, false));
              break;
              default:
              targetmodule.Command(command).Execute();
              break;
            }
          }
        }
        break;
        case "description":
        if (cid != clientid && module.Instance != null)
        {
          var modobj = Newtonsoft.Json.JsonConvert.DeserializeObject<Module>(mpayload);
          module.Instance.Name = modobj.Name;
          module.Instance.Description = modobj.Description;
          module.Instance.DeviceType = modobj.DeviceType;
          var parentid = module.Parameter("VirtualModule.ParentId").Value;
          module.Instance.Properties = modobj.Properties;
          module.Parameter("VirtualModule.ParentId").Value = parentid;
          module.Parameter("MQTT.SourceNode").Value = cid;
          Program.Notify("MQTT", "Created remote module " + module.Instance.Domain + " " + module.Instance.Address);
        }
        break;
        case "event":
        if (cid != clientid)
        {
          var property = Newtonsoft.Json.JsonConvert.DeserializeObject<ModuleParameter>(mpayload);
          if (module.Instance == null)
          {
            Program.AddVirtualModule("MQTT:" + cid + "." + domain, address, "Sensor", "");
            module = Modules.InDomain("MQTT:" + cid + "." + domain).WithAddress(address).Get();
          }
          else if (!module.HasParameter("MQTT.SourceNode"))
          {
            MqttClient.Publish(clientid + "/MQTT.Listeners/" + cid + "/command", "{ 'Domain': " + "'" + domain + "', 'Address' : '" + address + "', 'Command' : 'Module.Describe' }");
          }
          Program.RaiseEvent(module, property.Name, property.Value, "");
        }
        break;
    //===========================================================================//   
        case "control":   // CONTROLING GPIO INDOMAIN ADRESS         
        if (cid == clientid && module.Instance == null)
        {
            if (mpayload == address+"/ON")
            {
            Modules.InDomain(domain).WithAddress(address).Command("Control.On").Set();
             
            return;
            }
            if (mpayload == address+"/OFF")
            {
            Modules.InDomain(domain).WithAddress(address).Command("Control.Off").Set();
             
            return;
        }
          if (mpayload == address+"/1")
            {
            Modules.InDomain(domain).WithAddress(address).Command("Control.On").Set();
             
            return;
            }
            if (mpayload == address+"/0")
            {
            Modules.InDomain(domain).WithAddress(address).Command("Control.Off").Set();
             
            return;
        }
        }
        break;
      }
    } catch (Exception e) {
      Program.Notify("MQTT ERROR!", e.Message);
      MqttIsConnected = false;
    }
  }
  //Console.WriteLine("MQTT {0} : {1}", mtopic, mpayload);
  Program.Notify(mtopic, mpayload);
});

When.WebServiceCallReceived("MQTT:", ( args ) => {
  string[] reqs = ((string)args).Split('/');
  string domain = reqs[0];
  string address = reqs[1];
  var commands = new List<string>(reqs);
  // remove domain and address to obtain the command parts only
  commands.RemoveAt(0); commands.RemoveAt(0);
  string command = String.Join("/", commands.ToArray());
  try
  {
    int mqttend = domain.IndexOf(".");
    int mqttdel = domain.IndexOf(":");
    var mqttdest  = domain.Substring(mqttdel + 1, mqttend - mqttdel - 1);
    domain = domain.Substring(mqttend + 1);
    MqttClient.Publish(clientid + "/MQTT.Listeners/" + mqttdest + "/command", "{ 'Domain': " + "'" + domain + "', 'Address' : '" + address + "', 'Command' : '" + command + "' }");         
    return "{ 'ResponseValue' : 'OK' }";   
  } catch (Exception e) {
    Program.Notify("MQTT ERROR!", e.Message);
    MqttIsConnected = false;
  }
  return "{ 'ResponseValue' : 'ERROR' }";
});

// We want to do further processing whenever a module changes
When.ModuleParameterChanged( (module, property) => {
 
  if (module.HasFeature("MQTT.SensorPublish") && (property.Name.StartsWith("Sensor.") || property.Name.StartsWith("Status.") || property.Name.StartsWith("Meter.")) && !module.Instance.Domain.StartsWith("MQTT:"))
  {
    try{
      MqttClient.Publish(clientid + "/" + module.Instance.Domain + "/" + module.Instance.Address + "/event", Newtonsoft.Json.JsonConvert.SerializeObject(property));
   
    } catch  (Exception e){
      Program.Notify("MQTT ERROR!", e.Message);
      MqttIsConnected = false;
    }
  }

  // returning true, will route event to other listeners
  return true;
 
  //MqttClient.Publish(clientid + "/" + module.Instance.Domain + "/" + module.Instance.Address + "/control", mpayload);
});

while (Program.IsRunning && MqttIsConnected) Pause(1);


i use mqtt client on android. setting is like attach.
the problem is, were i change the gpio from hg web widget.
state on android not change. any sugestion to fix? or may have another client for android? or maybe have any working script?

thank you
« Last Edit: January 23, 2017, 11:36:40 PM by djatie »