然后注意开启防火墙端口。

mqtt的命令和Java端的ssl 必须同时要带上ca.crt、clilent.crt、client.key三个文件,即CA证书、客户证书、客户私钥。

由于java 端不支持client.key的格式,需要命令进行转化

openssl pkcs8 -topk8 -in client.key -out client.pem -nocrypt

另外:

不知为何ubuntu下关闭防火墙后还是握手失败,cenos下正常,抓包后已经看不到明文了。

Java部分:

1.核心部分只需要设置SSLSocketFactory

MqttConnectOptions options = new MqttConnectOptions();

SSLSocketFactory factory=getSSLSocktet("youpath/ca.crt","youpath/client.crt","youpath/client.pem","password");

options.setSocketFactory(factory);

此处的密码应为生成证书的时候输入的密码,未认证。

private SSLSocketFactory getSSLSocktet(String caPath,String crtPath, String keyPath, String password) throws Exception {

// CA certificate is used to authenticate server

CertificateFactory cAf = CertificateFactory.getInstance("X.509");

FileInputStream caIn = new FileInputStream(caPath);

X509Certificate ca = (X509Certificate) cAf.generateCertificate(caIn);

KeyStore caKs = KeyStore.getInstance("JKS");

caKs.load(null, null);

caKs.setCertificateEntry("ca-certificate", ca);

TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");

tmf.init(caKs);

CertificateFactory cf = CertificateFactory.getInstance("X.509");

FileInputStream crtIn = new FileInputStream(crtPath);

X509Certificate caCert = (X509Certificate) cf.generateCertificate(crtIn);

crtIn.close();

// client key and certificates are sent to server so it can authenticate

// us

KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());

// ks.load(caIn,password.toCharArray());

ks.load(null, null);

ks.setCertificateEntry("certificate", caCert);

ks.setKeyEntry("private-key", getPrivateKey(keyPath), password.toCharArray(),

new java.security.cert.Certificate[]{caCert} );

KeyManagerFactory kmf = KeyManagerFactory.getInstance("PKIX");

kmf.init(ks, password.toCharArray());

// keyIn.close();

// finally, create SSL socket factory

SSLContext context = SSLContext.getInstance("TLSv1");

context.init(kmf.getKeyManagers(),tmf.getTrustManagers(), new SecureRandom());

return context.getSocketFactory();

}

Android上会报错,改进如下:

private SSLSocketFactory getSSLSocktet(String caPath,String crtPath, String keyPath, String password) throws Exception {

// CA certificate is used to authenticate server

CertificateFactory cAf = CertificateFactory.getInstance("X.509");

FileInputStream caIn = new FileInputStream(caPath);

X509Certificate ca = (X509Certificate) cAf.generateCertificate(caIn);

KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());

caKs.load(null, null);

caKs.setCertificateEntry("ca-certificate", ca);

TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());

tmf.init(caKs);

caIn.close();

CertificateFactory cf = CertificateFactory.getInstance("X.509");

FileInputStream crtIn = new FileInputStream(crtPath);

X509Certificate caCert = (X509Certificate) cf.generateCertificate(crtIn);

crtIn.close();

// client key and certificates are sent to server so it can authenticate

// us

KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());

// ks.load(caIn,password.toCharArray());

ks.load(null, null);

ks.setCertificateEntry("certificate", caCert);

ks.setKeyEntry("private-key", getPrivateKey(keyPath), password.toCharArray(),

new java.security.cert.Certificate[]{caCert} );

KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());

kmf.init(ks, password.toCharArray());

// keyIn.close();

// finally, create SSL socket factory

SSLContext context = SSLContext.getInstance("TLSv1");

context.init(kmf.getKeyManagers(),tmf.getTrustManagers(), new SecureRandom());

return context.getSocketFactory();

}

3.获取私钥代码部分

由于只能读取PKCS8的格式,所以需要转成pem

public PrivateKey getPrivateKey(String path) throws Exception{

org.apache.commons.codec.binary.Base64 base64=new Base64();

byte[] buffer= base64.decode(getPem(path));

PKCS8EncodedKeySpec keySpec= new PKCS8EncodedKeySpec(buffer);

KeyFactory keyFactory= KeyFactory.getInstance("RSA");

return (RSAPrivateKey) keyFactory.generatePrivate(keySpec);

}

附录:

package com;

import java.awt.BorderLayout;

import java.awt.Container;

import java.awt.event.ActionEvent;

import java.awt.event.ActionListener;

import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.security.KeyFactory;

import java.security.KeyStore;

import java.security.PrivateKey;

import java.security.SecureRandom;

import java.security.cert.CertificateException;

import java.security.cert.CertificateFactory;

import java.security.cert.X509Certificate;

import java.security.interfaces.RSAPrivateKey;

import java.security.spec.PKCS8EncodedKeySpec;

import javax.net.ssl.KeyManagerFactory;

import javax.net.ssl.SSLContext;

import javax.net.ssl.SSLSocketFactory;

import javax.net.ssl.TrustManager;

import javax.net.ssl.TrustManagerFactory;

import javax.net.ssl.X509TrustManager;

import javax.swing.JButton;

import javax.swing.JFrame;

import javax.swing.JLabel;

import javax.swing.JOptionPane;

import javax.swing.JPanel;

import javax.swing.JTextArea;

import org.apache.commons.codec.binary.Base64;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.MqttTopic;

import org.eclipse.paho.client.mqttv3.internal.security.SSLSocketFactoryFactory;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Server extends JFrame {

private static final long serialVersionUID = 1L;

private JPanel panel;

private JPanel panelText;

private JPanel panelText2;

private JButton button;

private JButton button2;

private JButton subscribeButton;

private JTextArea textHost;

private JTextArea textClientID;

private JTextArea textPublishMsg;

private JTextArea textTopic;

private MqttClient client;

private String host = "ssl://192.168.10.233:1883";

private MqttTopic topic;

private MqttMessage message;

private String userToken = "999999";

private String myTopicRoot = "test";

private String myTopic = null;

private String clienID = "test1234567";

public Server() {

Container container = this.getContentPane();

panel = new JPanel();

panelText = new JPanel();

panelText2 = new JPanel();

button = new JButton("发布主题消息");

button2 = new JButton("更换客户机地址和IP");

button.addActionListener(new ActionListener() {

@Override

public void actionPerformed(ActionEvent ae) {

try {

host = textHost.getText();

clienID = textClientID.getText();

if (client == null) {

client = new MqttClient(host, clienID, new MemoryPersistence());

}

if (!client.isConnected()) {

connect();

}

publishMsg(textTopic.getText(), textPublishMsg.getText());

} catch (Exception e) {

e.printStackTrace();

showErrorMsg(e.toString());

}

}

});

button2.addActionListener(new ActionListener() {

@Override

public void actionPerformed(ActionEvent arg0) {

// TODO Auto-generated method stub

host = textHost.getText();

clienID = textClientID.getText();

try {

if (client != null)

client.disconnectForcibly();

client = new MqttClient(host, clienID, new MemoryPersistence());

connect();

} catch (Exception e) {

e.printStackTrace();

showErrorMsg(e.toString());

}

}

});

subscribeButton = new JButton("订阅主题");

subscribeButton.addActionListener(new ActionListener() {

@Override

public void actionPerformed(ActionEvent arg0) {

// TODO Auto-generated method stub

try {

if (client == null) {

client = new MqttClient(host, clienID, new MemoryPersistence());

}

if (!client.isConnected()) {

connect();

}

if (myTopic != null && !myTopic.equals(textTopic.getText())) {

client.subscribe(myTopic);

}

client.subscribe(textTopic.getText());

myTopic = textTopic.getText();

} catch (Exception e) {

e.printStackTrace();

showErrorMsg(e.toString());

}

}

});

textHost = new JTextArea();

textHost.setText(host);

textClientID = new JTextArea();

textClientID.setText(clienID);

panel.add(button);

panel.add(subscribeButton);

panelText.add(button2);

panelText.add(new JLabel("mqtt地址"));

panelText.add(textHost);

panelText.add(new JLabel("ClienId"));

panelText.add(textClientID);

panelText.add(new JLabel("主题"));

textTopic = new JTextArea();

textTopic.setText(myTopicRoot);

panelText.add(textTopic);

textPublishMsg = new JTextArea();

textPublishMsg.setText("@" + userToken + "@E@5@" + userToken + "@");

panelText2.add(new JLabel("mqtt消息"));

panelText2.add(textPublishMsg);

container.add(panel, BorderLayout.NORTH);

container.add(panelText, BorderLayout.CENTER);

container.add(panelText2, BorderLayout.SOUTH);

// try {

// client = new MqttClient(host, clienID,

// new MemoryPersistence());

// connect();

// } catch (Exception e) {

// showErrorMsg(e.toString());

// }

}

private SSLSocketFactory getSSLSocktet(String caPath,String crtPath, String keyPath, String password) throws Exception {

// CA certificate is used to authenticate server

CertificateFactory cAf = CertificateFactory.getInstance("X.509");

FileInputStream caIn = new FileInputStream(caPath);

X509Certificate ca = (X509Certificate) cAf.generateCertificate(caIn);

KeyStore caKs = KeyStore.getInstance("JKS");

caKs.load(null, null);

caKs.setCertificateEntry("ca-certificate", ca);

TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");

tmf.init(caKs);

CertificateFactory cf = CertificateFactory.getInstance("X.509");

FileInputStream crtIn = new FileInputStream(crtPath);

X509Certificate caCert = (X509Certificate) cf.generateCertificate(crtIn);

crtIn.close();

// client key and certificates are sent to server so it can authenticate

// us

KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());

// ks.load(caIn,password.toCharArray());

ks.load(null, null);

ks.setCertificateEntry("certificate", caCert);

ks.setKeyEntry("private-key", getPrivateKey(keyPath), password.toCharArray(),

new java.security.cert.Certificate[]{caCert} );

KeyManagerFactory kmf = KeyManagerFactory.getInstance("PKIX");

kmf.init(ks, password.toCharArray());

// keyIn.close();

// finally, create SSL socket factory

SSLContext context = SSLContext.getInstance("TLSv1");

context.init(kmf.getKeyManagers(),tmf.getTrustManagers(), new SecureRandom());

return context.getSocketFactory();

}

private String getPem(String path) throws Exception{

FileInputStream fin=new FileInputStream(path);

BufferedReader br= new BufferedReader(new InputStreamReader(fin));

String readLine= null;

StringBuilder sb= new StringBuilder();

while((readLine= br.readLine())!=null){

if(readLine.charAt(0)=='-'){

continue;

}else{

sb.append(readLine);

sb.append('\r');

}

}

fin.close();

return sb.toString();

}

public PrivateKey getPrivateKey(String path) throws Exception{

org.apache.commons.codec.binary.Base64 base64=new Base64();

byte[] buffer= base64.decode(getPem(path));

PKCS8EncodedKeySpec keySpec= new PKCS8EncodedKeySpec(buffer);

KeyFactory keyFactory= KeyFactory.getInstance("RSA");

return (RSAPrivateKey) keyFactory.generatePrivate(keySpec);

}

private void connect() {

MqttConnectOptions options = new MqttConnectOptions();

options.setCleanSession(false);

// options.setUserName(userName);

// options.setPassword(passWord.toCharArray());

// 设置超时时间

// options.setConnectionTimeout(10);

// 设置会话心跳时间

// options.setKeepAliveInterval(20);

// try {

// options.setWill("willtest", "SENDgpslost".getBytes(), 1, false);

// } catch (Exception e1) {

// // TODO Auto-generated catch block

// System.out.print(e1);

// }

try {

if (!SSLSocketFactoryFactory.isSupportedOnJVM()) {

System.out.print("isSupportedOnJVM=false");

}

SSLSocketFactory factory=getSSLSocktet("F:/ssl/ca.crt","F:/ssl/client.crt","F:/ssl/client.pem","brt123");

options.setSocketFactory(factory);

client.setCallback(new MqttCallback() {

@Override

public void connectionLost(Throwable cause) {

System.out.println("connectionLost-----------");

}

@Override

public void deliveryComplete(IMqttDeliveryToken token) {

System.out.println("deliveryComplete---------" + token.isComplete());

}

@Override

public void messageArrived(String topic, MqttMessage arg1) throws Exception {

System.out.println("messageArrived----------");

String msg = new String(arg1.getPayload());

showErrorMsg("主题:" + topic + "\r\n消息:" + msg);

}

});

topic = client.getTopic(myTopicRoot + userToken);

client.connect(options);

} catch (Exception e) {

e.printStackTrace();

}

}

public void

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐