连接阿里云mqtt服务

连接

package mqtt

import (
   "crypto/tls"
   "crypto/x509"
   "fmt"
   mqtt "github.com/eclipse/paho.mqtt.golang"
   "io/ioutil"
   "log"
   "src/goProject1/base_64"
   "src/goProject1/config"
   "strconv"
   "time"
)

var (
   broker           string
   port             int
   AccessKey_ID     string
   AccessKey_Secret string
   InstanceId       string
   client           mqtt.Client
   groupId          string

   clientId string

   connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
      fmt.Println("Connected")
   }
   connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
      fmt.Printf("Connect lost: %v", err)
   }
   // 默认处理接收的消息
   messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
      fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
   }
)

func init() {
   broker = config.ENV["mqtt"]["broker"]
   port, _ = strconv.Atoi(config.ENV["mqtt"]["port"])

   AccessKey_ID = config.ENV["alibaba_mq"]["AccessKey_ID"]
   AccessKey_Secret = config.ENV["alibaba_mq"]["AccessKey_Secret"]
   InstanceId = config.ENV["alibaba_mq"]["InstanceId"]

   groupId = "GID_server"
   clientId = groupId + "@@@" + "maochang_test"

   conn()
   checkToken()

}

func GetClientID() string {
   return clientId
}

func checkToken() {
   if token := client.Connect(); token.Wait() && token.Error() != nil {
      fmt.Println("  -- ")
      fmt.Printf(token.Error().Error())
      panic(token.Error())
   }
}

func conn() {
   opts := mqtt.NewClientOptions()
   opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))

   userName := "Signature" + "|" + AccessKey_ID + "|" + InstanceId
   password := base_64.HmacSha1(AccessKey_Secret, clientId)
   fmt.Println(userName)
   fmt.Println("password  ==  ", password)

   opts.SetClientID(clientId)
   opts.SetUsername(userName)
   opts.SetPassword(string(base_64.Decode(password)))
   opts.SetDefaultPublishHandler(messagePubHandler)
   //opts.SetPingTimeout(5)
   //opts.SetKeepAlive(5)
   opts.SetAutoReconnect(true)
   opts.SetMaxReconnectInterval(3)
   opts.OnConnect = connectHandler
   opts.OnConnectionLost = connectLostHandler
   client = mqtt.NewClient(opts)
}

func Main() {
   //var broker = "broker.emqx.io"
   //broker = "127.0.0.1"
   //port = 1883

   //password := base_64.b64encode(hmac.new(secretKey.encode(), client_id.encode(), sha1).digest()).decode()
   //client.username_pw_set(userName, password)

   //sub(client)
   //m := make(map[string]byte)z
   //m["mao"] = 1
   //client.SubscribeMultiple(m, MessageHandler)
   //publish(client)

   // 订阅
   //SubOne("mao/p2p/"+clientId, 1, nil)
   //SubOne("mao", 1, nil)
   //SubOne("mao/mao", 1, nil)
   //SubOne("mao/mao/"+clientId, 1, nil)

   //client.AddRoute("mao", nil)

   //发布消息
   //PublishOne("mao/mao/"+clientId, 0, false, "dsggfj")
   //PublishOne("mao/p2p/"+clientId, 0, false, "dsggfj")
   //PublishOne("mao", 1, true, "dsggfj2")
   //PublishOne("mao", 1, true, "dsggfj3")
   //PublishOne("mao", 1, true, "dsggfj4")

   time.Sleep(time.Second)
   client.Disconnect(250)

}

func MessageHandler(client mqtt.Client, msg mqtt.Message) {
   //fmt.Printf("%v\n",msg.MessageID())
   fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())

}

func NewTlsConfig() *tls.Config {
   certpool := x509.NewCertPool()
   ca, err := ioutil.ReadFile("ca.pem")
   if err != nil {
      log.Fatalln(err.Error())
   }
   certpool.AppendCertsFromPEM(ca)
   // Import client certificate/key pair
   clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem")
   if err != nil {
      panic(err)
   }
   return &tls.Config{
      RootCAs:            certpool,
      ClientAuth:         tls.NoClientCert,
      ClientCAs:          nil,
      InsecureSkipVerify: true,
      Certificates:       []tls.Certificate{clientKeyPair},
   }
}

 

订阅接收

package mqtt

import (
   "fmt"
   mqtt "github.com/eclipse/paho.mqtt.golang"
   "time"
)

func sub(client mqtt.Client) {
   topic := "mao"
   token := client.Subscribe(topic, 1, nil)
   token.Wait()
   fmt.Printf("Subscribed to topic %s  %v \n", topic, token)

}

func SubOne(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token {
   token := client.Subscribe(topic, qos, callback)
   token.Wait()
   fmt.Printf("Subscribed to topic %s  %v \n", topic, token)
   return token
}

func PublishOne(topic string, qos byte, retained bool, payload interface{}) mqtt.Token {
   token := client.Publish(topic, qos, retained, payload)
   token.Wait()
   if token.Error() != nil {
      fmt.Println("token = ", token.Error())
   }
   return token
}

func publish(client mqtt.Client) {
   num := 5
   for i := 0; i < num; i++ {
      text := fmt.Sprintf("Message %d", i)
      token := PublishOne("mao1", 1, false, text)
      fmt.Println("token:", token)
      time.Sleep(time.Second)
   }
}

base_64 编码

package base_64

import (
   "crypto/hmac"
   "crypto/sha1"
   "encoding/base64"
   "fmt"
   "io"
)

func Encode(data string) string {

   // Base64 Standard Encoding
   sEnc := base64.StdEncoding.EncodeToString([]byte(data))

   fmt.Println(sEnc) //
   return sEnc
}

func Decode(data string) []byte {

   sEnc := base64.StdEncoding.EncodeToString([]byte(data))

   // Base64 Standard Decoding
   sDec, err := base64.StdEncoding.DecodeString(sEnc)
   if err != nil {
      fmt.Printf("Error decoding string: %s ", err.Error())
      return nil
   }

   fmt.Println(string(sDec)) //hello world12345!?$*&()'-@~
   return sDec
}

func Sha1() {
   h := sha1.New()
   io.WriteString(h, "aaaaaa")
   fmt.Printf("%x\n", h.Sum(nil))

   //hmac ,use sha1
   key := []byte("123456")
   mac := hmac.New(sha1.New, key)
   mac.Write([]byte("aaaaaa"))
   fmt.Printf("%x\n", mac.Sum(nil))
}

func HmacSha1(keyStr, value string) string {

   key := []byte(keyStr)
   mac := hmac.New(sha1.New, key)
   mac.Write([]byte(value))
   //进行base64编码
   res := base64.StdEncoding.EncodeToString(mac.Sum(nil))

   return res
}

config.ini

 

[mqtt]
broker =
port = 1883

[alibaba_mq]
broker = 
port = 1883

addr = 
AccessKey_ID = 
AccessKey_Secret = 
InstanceId = 
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐