go语言连接阿里云mqtt 实现消息收发
连接阿里云mqtt服务连接package mqttimport ("crypto/tls""crypto/x509""fmt"mqtt "github.com/eclipse/paho.mqtt.golang""io/ioutil""log""src/goProject1/base_64""src/goProject1/config""strconv""time")var (broker
·
连接阿里云mqtt服务
连接
package mqtt import ( "crypto/tls" "crypto/x509" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "io/ioutil" "log" "src/goProject1/base_64" "src/goProject1/config" "strconv" "time" ) var ( broker string port int AccessKey_ID string AccessKey_Secret string InstanceId string client mqtt.Client groupId string clientId string connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { fmt.Println("Connected") } connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { fmt.Printf("Connect lost: %v", err) } // 默认处理接收的消息 messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) } ) func init() { broker = config.ENV["mqtt"]["broker"] port, _ = strconv.Atoi(config.ENV["mqtt"]["port"]) AccessKey_ID = config.ENV["alibaba_mq"]["AccessKey_ID"] AccessKey_Secret = config.ENV["alibaba_mq"]["AccessKey_Secret"] InstanceId = config.ENV["alibaba_mq"]["InstanceId"] groupId = "GID_server" clientId = groupId + "@@@" + "maochang_test" conn() checkToken() } func GetClientID() string { return clientId } func checkToken() { if token := client.Connect(); token.Wait() && token.Error() != nil { fmt.Println(" -- ") fmt.Printf(token.Error().Error()) panic(token.Error()) } } func conn() { opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) userName := "Signature" + "|" + AccessKey_ID + "|" + InstanceId password := base_64.HmacSha1(AccessKey_Secret, clientId) fmt.Println(userName) fmt.Println("password == ", password) opts.SetClientID(clientId) opts.SetUsername(userName) opts.SetPassword(string(base_64.Decode(password))) opts.SetDefaultPublishHandler(messagePubHandler) //opts.SetPingTimeout(5) //opts.SetKeepAlive(5) opts.SetAutoReconnect(true) opts.SetMaxReconnectInterval(3) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client = mqtt.NewClient(opts) } func Main() { //var broker = "broker.emqx.io" //broker = "127.0.0.1" //port = 1883 //password := base_64.b64encode(hmac.new(secretKey.encode(), client_id.encode(), sha1).digest()).decode() //client.username_pw_set(userName, password) //sub(client) //m := make(map[string]byte)z //m["mao"] = 1 //client.SubscribeMultiple(m, MessageHandler) //publish(client) // 订阅 //SubOne("mao/p2p/"+clientId, 1, nil) //SubOne("mao", 1, nil) //SubOne("mao/mao", 1, nil) //SubOne("mao/mao/"+clientId, 1, nil) //client.AddRoute("mao", nil) //发布消息 //PublishOne("mao/mao/"+clientId, 0, false, "dsggfj") //PublishOne("mao/p2p/"+clientId, 0, false, "dsggfj") //PublishOne("mao", 1, true, "dsggfj2") //PublishOne("mao", 1, true, "dsggfj3") //PublishOne("mao", 1, true, "dsggfj4") time.Sleep(time.Second) client.Disconnect(250) } func MessageHandler(client mqtt.Client, msg mqtt.Message) { //fmt.Printf("%v\n",msg.MessageID()) fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) } func NewTlsConfig() *tls.Config { certpool := x509.NewCertPool() ca, err := ioutil.ReadFile("ca.pem") if err != nil { log.Fatalln(err.Error()) } certpool.AppendCertsFromPEM(ca) // Import client certificate/key pair clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem") if err != nil { panic(err) } return &tls.Config{ RootCAs: certpool, ClientAuth: tls.NoClientCert, ClientCAs: nil, InsecureSkipVerify: true, Certificates: []tls.Certificate{clientKeyPair}, } }
订阅接收
package mqtt import ( "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "time" ) func sub(client mqtt.Client) { topic := "mao" token := client.Subscribe(topic, 1, nil) token.Wait() fmt.Printf("Subscribed to topic %s %v \n", topic, token) } func SubOne(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token { token := client.Subscribe(topic, qos, callback) token.Wait() fmt.Printf("Subscribed to topic %s %v \n", topic, token) return token } func PublishOne(topic string, qos byte, retained bool, payload interface{}) mqtt.Token { token := client.Publish(topic, qos, retained, payload) token.Wait() if token.Error() != nil { fmt.Println("token = ", token.Error()) } return token } func publish(client mqtt.Client) { num := 5 for i := 0; i < num; i++ { text := fmt.Sprintf("Message %d", i) token := PublishOne("mao1", 1, false, text) fmt.Println("token:", token) time.Sleep(time.Second) } }
base_64 编码
package base_64 import ( "crypto/hmac" "crypto/sha1" "encoding/base64" "fmt" "io" ) func Encode(data string) string { // Base64 Standard Encoding sEnc := base64.StdEncoding.EncodeToString([]byte(data)) fmt.Println(sEnc) // return sEnc } func Decode(data string) []byte { sEnc := base64.StdEncoding.EncodeToString([]byte(data)) // Base64 Standard Decoding sDec, err := base64.StdEncoding.DecodeString(sEnc) if err != nil { fmt.Printf("Error decoding string: %s ", err.Error()) return nil } fmt.Println(string(sDec)) //hello world12345!?$*&()'-@~ return sDec } func Sha1() { h := sha1.New() io.WriteString(h, "aaaaaa") fmt.Printf("%x\n", h.Sum(nil)) //hmac ,use sha1 key := []byte("123456") mac := hmac.New(sha1.New, key) mac.Write([]byte("aaaaaa")) fmt.Printf("%x\n", mac.Sum(nil)) } func HmacSha1(keyStr, value string) string { key := []byte(keyStr) mac := hmac.New(sha1.New, key) mac.Write([]byte(value)) //进行base64编码 res := base64.StdEncoding.EncodeToString(mac.Sum(nil)) return res }
config.ini
[mqtt] broker = port = 1883 [alibaba_mq] broker = port = 1883 addr = AccessKey_ID = AccessKey_Secret = InstanceId =
更多推荐
所有评论(0)