#include "ginlong_includes.h"

// #if  //def CM_MQTT_SUPPORT

// #define CMMQTT_MAX_TIMES 100
#define CMMQTT_CHECK_NORMAL_TIMES 100

CMQTT_ERR_STAT cmmqtt_err_stat;
// CMMQTT_Mutex *g_pubMutex = NULL;


static int connflag = 0;
// volatile u32_t cmmqtt_max_connecttimes = 3;

static char *cmmqtt_glob_subtopic = "888";  //此处填入sub主题
static int cmmqtt_glob_subqos = 1;

// static char *cmmqtt_glob_pubtopic = "$sys/389725/mqtts_test/dp/post/json";  //此处填入pub的 topic 
static char *cmmqtt_glob_pubtopic = "";  //此处填入pub的 topic 
static int cmmqtt_glob_pubqos = 0;          //推送的Qo s的类型
static int cmmqtt_glob_pubretain = 0;       //发送的retain保留  1的话服务端必须保存这个topic和Qos,以便可以被分发给未来的和主题名匹配的订阅者
static int cmmqtt_glob_pubdup = 0;          //设置为0表示是第一次发送这个publish报文,为1则是一个早期报文的重发
//当Qos=0时,DUP 必须为0,否则报错

// static char *cmmqtt_glob_pubmessage = " !?#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvw xyz{|}~end1234";
static char *cmmqtt_glob_pubmessage = "{'hello':'world'  我是一只😁😂}";//推送的消息
static char cmqtt_glob_cfgserver[129] = "";  //此处填入服务器地址
static int cmqtt_glob_cfgport = ;   //此处填port号,非加密
static int cmqtt_glob_tls_cfgport = ;   //此处填port号,加密
static char cmqtt_glob_cfgclient_id[129] = "mqtts_test";   //此处填client_id,加密
static int cmqtt_glob_cfgkeepAlive = 60;            //连接判断时间 60

static char cmqtt_glob_cfguser[129] = "";    //此处填用户名
// static char cmqtt_glob_cfgpasswd[129] = "version=2018-10-31&res=products%2F389725%2Fdevices%2Fmqtts_test&et=4102416000&method=md5&sign=A11aTcdn0%2Bz4jl%2FGGxfNyg%3D%3D";	   //此处填密码
static char cmqtt_glob_cfgpasswd[129] = "";	   //此处填密码
static int cmqtt_glob_cfgclean = 1;                    //0是开启会话重用机制 1 是关闭会话重用机制,每一连接都是一个新的Session

static char *cmqtt_glob_cfgversion = "2021_4_27";   //当前版本
static int cmqtt_glob_cfgencrypt = 0;               //加密
static int cmqtt_glob_willflag = 0;                 //确认是否需要使用will LWT机制;默认关闭;0关闭,1开启;
static char *cmqtt_glob_willtopic = "testabcdefg";  //连接异常终止时发送消息到相应topic;
static char *cmqtt_glob_willmsg = "00001_off";      //连接异常终止时发送的消息;

typedef struct mqtt_CPUB
{
	unsigned int qos;
	unsigned int retain;
	unsigned int dup;
	unsigned int state;
	int mqtt_error;
	char *topic;
}mqtt_CPUB;

static mqtt_CPUB mqtt_cpub_tmp;

extern CMMQTT_PUBCB cmmqtt_recpubcb[CMMQTT_PUBCB_NUM];

extern void cm_printf(char *format,...);
extern int char_to_int(unsigned char *s);
int mqtt_state_demo();
int mqtt_con_demo();
int mqtt_pub_demo();
int mqtt_sub_demo();
int mqtt_unsub_demo();
int mqtt_disc_demo();
int mqtt_del_demo();


//Store the received MQTT MSG
char *mqtt_msg_recv = NULL;
osiThread_t *onenet_mqttrecvTask = NULL;

void mqtt_event_thread(void *p)
{
	osiEvent_t event;
	for (;;)
	{
		if(osiEventWait(onenet_mqttrecvTask, &event) == 1)
		{
			sys_arch_printf("[%s] event.id %d \r\n",__func__,event.id);
            cm_test_mqtt_json();
            osDelay(1000);
			switch(event.id)
			{
				// case MQTT_MSG_RECV:
				// 	sys_arch_printf("[%s][%d] event.id %d\r\n",__func__,__LINE__,event.id);
				// 	//recvive the signal indicate recived the message
				// 	cm_printf("[%s][%d] mqtt_msg_recv %s\r\n",__func__,__LINE__,mqtt_msg_recv);
				// 	break;
				default :
					break;
			}

		}
	}
	osiThreadExit();
}

void mqtt_recv_msg_thread_creat()
{

	if(onenet_mqttrecvTask == NULL)
	{
		onenet_mqttrecvTask = osiThreadCreate("mqttrecv_task",mqtt_event_thread, NULL, OSI_PRIORITY_BELOW_NORMAL, (8192 * 2), 64);
		if(onenet_mqttrecvTask == NULL)
		{
			sys_arch_printf("[%s][%d] mqttrecv_task create thread error\r\n",__func__,__LINE__);
			osiShutdown(0);
		}
	}

	if(mqtt_msg_recv == NULL)
	{
		mqtt_msg_recv = (char*)malloc(MQTT_MSG_RECV_SIZE);
		if(NULL == mqtt_msg_recv)
		{
			sys_arch_printf("[%s][%d] mqtt_msg_recv malloc error\r\n",__func__,__LINE__);
			osiShutdown(0);
		}
		memset(mqtt_msg_recv,0,MQTT_MSG_RECV_SIZE);
	}

}

/*+MQTTCFG="183.230.40.39",6002,"541100010",100,"269374","dOELqt6s35CMKigZg117glPO=kU=",0*/
//测试连接
void cm_test_mqtt_json() 
{
   	CMMQTTClient *myclient = cmmqtt_getclient();
    int cmmqtt_check_times = 0;
    int cmmqtt_check_times_1 = 0;
	int i = 0;
		cmqtt_glob_cfgencrypt = 0;


    int bufsize = 512;
	int connect_flag = -1;

	cm_printf("\r\n[MQTT]CM MQTT TEST BEGAIN!!!\r\n");

	// mqtt_recv_msg_thread_creat();

	if(mqtt_con_demo() == 0)
	{
		cm_printf("[MQTT]MQTTCONNECT: OK\r\n");
		connect_flag = 1;
	}
	else
	{
		cm_printf("[MQTT]MQTTCONNECT: ERROR\r\n");
		connect_flag = -1;
	}
	osDelay(1000);

	while((mqtt_state_demo()!=CONNECTED)&&(i<50))//wait for module connecting to mqtt platform
	{
		i++;
		osDelay(1000);
	}
	if((mqtt_state_demo()!=CONNECTED)&&(i >= 50))
	{
		cm_printf("[MQTT]CONNECT ERROR\r\n");
		return;
	}
	
	if(connect_flag == 1)
	{
		if(mqtt_sub_demo() == 0)
		{
			cm_printf("[MQTT]MQTTSUB: OK\r\n");
		}
		else
		{
			cm_printf("[MQTT]MQTTSUB: ERROR\r\n");
		}
		osDelay(1000);
		
		if(mqtt_pub_demo() == 0)
		{
			cm_printf("[MQTT]MQTT PUB: OK\r\n");
		}
		else
		{
			cm_printf("[MQTT]MQTT PUB: ERROR\r\n");
		}
		osDelay(5000);
	
		if(mqtt_unsub_demo() == 0)
		{
			cm_printf("[MQTT]MQTTUNSUB TOPIC: OK\r\n");
		}
		else
		{
			cm_printf("[MQTT]MQTTUNSUB TOPIC: ERROR\r\n");
		}
		
		osDelay(1000);
		if(0 == mqtt_disc_demo())
		{
			cm_printf("[MQTT]MQTTDISC: OK\r\n");
		}
		else
		{
			cm_printf("[MQTT]MQTTDISC: ERROR\r\n");
		}

	}
	
	if(mqtt_del_demo() == 0)
	{	
		cm_printf("[MQTT]MQTTDEL: OK\r\n");
	}
	else
	{
		cm_printf("[MQTT]MQTTDEL: ERROR\r\n");
	}

	cm_printf("\r\n[MQTT]CM MQTT TEST END!!!\r\n");

}


int mqtt_state_demo()
{
	CMMQTTClient *myclient = cmmqtt_getclient();
	uint8 mystat = 0;
	 
	//TODO:Defined by the macro of different module version,such as the M5311
	switch(myclient->state)//To adapt to the M8321 state value.
	{
		case UNINITIALED:
			cm_printf("[MQTT]MQTTSTAT: UNINITIALED\r\n");
			break;

		case INITIALED:
			cm_printf("[MQTT]MQTTSTAT: INITIALED\r\n");
			break;

		case DISCONNECTED:
			cm_printf("[MQTT]MQTTSTAT: DISCONNECTED\r\n");
			break;

		case CONNECTED:
			cm_printf("[MQTT]MQTTSTAT: CONNECTED\r\n");
			break;
		
		case NET_CONNECTING:
			cm_printf("[MQTT]MQTTSTAT: NET_CONNECTING\r\n");
			break;

		case RECONNECTING:
			cm_printf("[MQTT]MQTTSTAT: RECONNECTING\r\n");
			break;

		case NET_CONNECTED:
			cm_printf("[MQTT]MQTTSTAT: NET_CONNECTED\r\n");
			break;

		case CONNECTING:
			cm_printf("[MQTT]MQTTSTAT: CONNECTING\r\n");
			break;

		default:
			cm_printf("[MQTT]MQTTSTAT: STATE ERROR\r\n");
			break;
	}

	return (myclient->state);
}


int mqtt_con_demo()
{
	CMMQTTClient *myclient = cmmqtt_getclient();
	int cmmqtt_check_times = 0;
	int cmmqtt_check_times_1 = 0;
	int i = 0;

	char *client_id = cmqtt_glob_cfgclient_id;

	char *user = cmqtt_glob_cfguser;

	char *passwd = cmqtt_glob_cfgpasswd;


	char *version = cmqtt_glob_cfgversion;

	char *ca = "";

	char *server = cmqtt_glob_cfgserver;
	
	
	int keepAlive = cmqtt_glob_cfgkeepAlive;
	
	int clean = cmqtt_glob_cfgclean;
	
	int bufsize = 512;
	
	int encrypt = cmqtt_glob_cfgencrypt;


    int64_t expire_time = 4102416000;
    
    int mqtt_error = CMMQTT_AT_OK;

    
    int mqtt_error1;
	int port;
	if(cmqtt_glob_cfgencrypt ==0)
		port = cmqtt_glob_cfgport;
	else
		port = cmqtt_glob_tls_cfgport;
    
    if(myclient->state == CONNECTED)
    {
        cm_printf("[MQTT][%s][%d] ERROR OneNET MQTT ALREADY CONNECTED\r\n",__func__,__LINE__);
		return 0;
    }
    
	if(myclient->state == UNINITIALED)
	{
    mqtt_error1 = cmmqtt_init_cfg(server, port, client_id, keepAlive * 1000, user, passwd, clean, bufsize, encrypt);
    if(0 > mqtt_error1)
    {
        cm_printf("[MQTT][%s][%d] ERROR cmmqtt_init_cfg error!\r\n",__func__,__LINE__);
        return 1;
    }
	}
    
    
    int user_flag = 1;
    
    int pwd_flag = 1;
    
    int will_flag = cmqtt_glob_willflag,retain_flag = MQTT_DEFAULT_RETAIN, willQos = MQTT_DEFAULT_QOS;

	char *will_topic = cmqtt_glob_willtopic, *will_msg = cmqtt_glob_willmsg;
    
    if(myclient->state == INITIALED)
    {
        CMMQTTConmsg *conmsg = (CMMQTTConmsg *)cmmqtt_malloc(sizeof(CMMQTTConmsg));
        memset(conmsg,0,sizeof(CMMQTTConmsg));
        if(user_flag == 1)
        {
            if(strlen((char*)myclient->usr) > 0)
                conmsg->connect_data.username.cstring = myclient->usr;
            else
                mqtt_error = CMMQTT_AT_PARAMETER_ERROR;
            
        }
        if(pwd_flag == 1)
        {
            if(strlen((char*)myclient->password) > 0)
                conmsg->connect_data.password.cstring = myclient->password;
            else
                mqtt_error = CMMQTT_AT_PARAMETER_ERROR;
        }
        conmsg->connect_data.MQTTVersion = 4;
        conmsg->connect_data.clientID.cstring = myclient->client_id;
        conmsg->connect_data.keepAliveInterval = myclient->keepAliveInterval/1000;
        conmsg->connect_data.cleansession = myclient->cleansession;
        conmsg->connect_data.willFlag = will_flag;
		if(will_flag == 1)
		{
			conmsg->connect_data.will.retained = retain_flag;
			conmsg->connect_data.will.qos = willQos;
			conmsg->connect_data.will.topicName.lenstring.data = NULL;
			conmsg->connect_data.will.topicName.lenstring.len = 0;
			conmsg->connect_data.will.topicName.cstring = (char*)cmmqtt_malloc(strlen(will_topic)+1);
			strcpy(conmsg->connect_data.will.topicName.cstring,will_topic);
			conmsg->connect_data.will.message.lenstring.data = NULL;
			conmsg->connect_data.will.message.lenstring.len = 0;
			conmsg->connect_data.will.message.cstring = (char*)cmmqtt_malloc(strlen(will_msg)+1);
			strcpy(conmsg->connect_data.will.message.cstring,will_msg);
		}

        conmsg->queueout_ackcb = cmmqtt_conack_cb;
        if(myclient->cmmqtt_conmsg != NULL)
        {
            cmmqtt_free_conmsg(myclient->cmmqtt_conmsg);
            myclient->cmmqtt_conmsg = NULL;
        }
        myclient->cmmqtt_conmsg = conmsg;
        myclient->connectnum = 1;

        
        if(mqtt_error == CMMQTT_AT_OK)
        {
            cmmqtt_err_stat.cmmqtt_connect_error = 0;
            myclient->state = NET_CONNECTING;
#if 0
            while(cmmqtt_check_times < CMMQTT_MAX_TIMES)
            {
                if(CMMQTT_CONMQTTSERVER_OK == cmmqtt_err_stat.cmmqtt_connect_error)
                {
                    while(cmmqtt_check_times_1 < CMMQTT_MAX_TIMES)
                    {
                        if(cmmqtt_err_stat.cmmqtt_response_error == CMMQTT_CMD_RESPONSE_FAIL)
                        {	
                            cm_printf("[MQTT][%s][%d] ERR_MQTT_UNKNOWN_DATA\r\n",__func__,__LINE__);
							return -1;
                        }
                        
                        switch(cmmqtt_err_stat.cmmqtt_conpasswd_error)
                        {
                            case CMMQTT_CONPASSWD_OK:
                                return 0;

                            case CMMQTT_CONPASSWD_FAIL:
                                cm_printf("[MQTT][%s][%d] ERR_MQTT_USERPASSWORD_ERR\r\n",__func__,__LINE__);
								return -1;

                            case -1:
                                cm_printf("[MQTT][%s][%d] ERR_MQTT_CONNECT_FAIL\r\n",__func__,__LINE__);
								return -1;
                        }

                        cmmqtt_check_times_1++;
                        osDelay(50);
                    }
                }
                else if(CMMQTT_CONMQTTSERVER_FAIL == cmmqtt_err_stat.cmmqtt_connect_error)
                {	
                    cm_printf("[MQTT][%s][%d] ERR_MQTT_CONNECT_FAIL\r\n",__func__,__LINE__);
					return -1;
                }
                else if(CMMQTT_CONMQTTSERVER_TIMEOUT == cmmqtt_err_stat.cmmqtt_connect_error)
                {	
                    cm_printf("[MQTT][%s][%d] ERR_MQTT_CONNECT_TIMEOUT\r\n",__func__,__LINE__);
					return -1;
                }
                else if(CMMQTT_SENDPACK_ERROR == cmmqtt_err_stat.cmmqtt_connect_error)
                {
                    cm_printf("[MQTT][%s][%d] ERR_MQTT_SENDPACK_FAIL\r\n",__func__,__LINE__);
					return -1;
                }
                cmmqtt_check_times++;
                osDelay(50);
            }
#endif
        }
    
        return 0;
    }
}


int mqtt_pub_demo()
{
	int cmmqtt_check_times = 0;
	unsigned int msg_len = 0;
	CMMQTTClient *myclient = cmmqtt_getclient();

	char *msg = malloc(strlen(cmmqtt_glob_pubmessage));
	unsigned int pub__str_size = strlen(cmmqtt_glob_pubmessage);
	MEMCPY(msg, cmmqtt_glob_pubmessage, strlen(cmmqtt_glob_pubmessage));
	
	mqtt_cpub_tmp.qos = cmmqtt_glob_pubqos;
	mqtt_cpub_tmp.mqtt_error = CMMQTT_AT_OK;
	mqtt_cpub_tmp.dup = cmmqtt_glob_pubdup;
	mqtt_cpub_tmp.retain = cmmqtt_glob_pubretain;
	mqtt_cpub_tmp.topic = cmmqtt_glob_pubtopic;
		
	if(myclient->state == CONNECTED)
	{
		CMMQTTPubmsg *pubmsg = (CMMQTTPubmsg *)cmmqtt_malloc(sizeof(CMMQTTPubmsg));
		pubmsg->topicName.lenstring.len = 0;
		pubmsg->topicName.lenstring.data = NULL;
		pubmsg->topicName.cstring = (char*)cmmqtt_malloc(strlen(mqtt_cpub_tmp.topic) + 1);
		if(NULL == pubmsg->topicName.cstring)
		{
			if(NULL != msg)
			{
				free(msg);
				msg = NULL;
			}
			cm_printf("[MQTT][%s][%d] ERR_AT_CME_NO_MEMORY\r\n",__func__,__LINE__);
			return -1;
		}
		memset(pubmsg->topicName.cstring,0,strlen(mqtt_cpub_tmp.topic) + 1);
		strcpy(pubmsg->topicName.cstring,mqtt_cpub_tmp.topic);
		pubmsg->dup = mqtt_cpub_tmp.dup;
		pubmsg->qos = mqtt_cpub_tmp.qos;
		pubmsg->retained = mqtt_cpub_tmp.retain;
		if(msg_len == 0)// text mode
		{
			pubmsg->payloadlen = pub__str_size;
			pubmsg->payload = (unsigned char *)cmmqtt_malloc(pubmsg->payloadlen+1);
			memset(pubmsg->payload,0,pubmsg->payloadlen+1);
			memcpy(pubmsg->payload,msg,pubmsg->payloadlen);
			if(NULL != msg)
			{
				free(msg);
				msg = NULL;
			}
		}
		else // hex mode
		{
            //长度必须匹配
            if(msg_len * 2 != strlen(msg))
            {
				cmmqtt_free_pubmsg(pubmsg);
				return -1;
            }
			pubmsg->payloadlen = msg_len;
			pubmsg->payload = (unsigned char *)cmmqtt_malloc(pubmsg->payloadlen + 1);
			cmmqtt_hex_to_bin(pubmsg->payload,msg,pubmsg->payloadlen);
			if(NULL != msg)
			{
				free(msg);
				msg = NULL;
			}
		}
		int i = 0;
		for(i = 0;i < 3;i++)
		{
			cmmqtt_err_stat.cmmqtt_pub_error[i] = 0;
		}
		pubmsg->queueout_ackcb = cmmqtt_puback_cb;
      	mqtt_cpub_tmp.mqtt_error = cmmqtt_publish(pubmsg);
		if(mqtt_cpub_tmp.qos == 0 || mqtt_cpub_tmp.mqtt_error != CMMQTT_AT_OK)
		{
			cmmqtt_free_pubmsg(pubmsg);
		}
		
		if(NULL != msg)
		{
			free(msg);
			msg = NULL;
		}

		if(mqtt_cpub_tmp.mqtt_error != CMMQTT_AT_OK)
		{
			cm_printf("[MQTT][%s][%d] ERR_MQTT_SENDPACK_FAIL\r\n",__func__,__LINE__);				
			return -1;
		}
		
		if(mqtt_cpub_tmp.qos == QOS1)//QoS 1
		{
		
			if(mqtt_cpub_tmp.mqtt_error == CMMQTT_AT_OK)
			{
				while(cmmqtt_check_times < CMMQTT_CHECK_NORMAL_TIMES)
				{
					if(cmmqtt_err_stat.cmmqtt_response_error == CMMQTT_CMD_RESPONSE_FAIL)
					{	
						cm_printf("[MQTT][%s][%d] ERR_MQTT_UNKNOWN_DATA\r\n",__func__,__LINE__);				
						return -1;
					}
					if(cmmqtt_err_stat.cmmqtt_pub_error[0] == CMMQTT_PUBACK_FAIL)
					{
						cm_printf("[MQTT][%s][%d] ERR_MQTT_PUBACK_FAIL\r\n",__func__,__LINE__);				
						return -1;
					}
					else if(cmmqtt_err_stat.cmmqtt_pub_error[0] == CMMQTT_PUB_OK)
					{
						//cm_printf("[%s][%d] pub OK\r\n",__func__,__LINE__);				
						return 0;
					}
				
					cmmqtt_check_times++;
					osDelay(50);
				}
			}
		}

		if(mqtt_cpub_tmp.qos == QOS2)
		{
			if(mqtt_cpub_tmp.mqtt_error == CMMQTT_AT_OK)
			{
					while(cmmqtt_check_times < CMMQTT_CHECK_NORMAL_TIMES)
					{
				
						if(cmmqtt_err_stat.cmmqtt_response_error == CMMQTT_CMD_RESPONSE_FAIL)
						{	
							cm_printf("[MQTT][%s][%d] ERR_MQTT_UNKNOWN_DATA\r\n",__func__,__LINE__);				
							return -1;
						}

						if(((cmmqtt_err_stat.cmmqtt_pub_error[0]) == CMMQTT_PUBACK_FAIL ||
							(cmmqtt_err_stat.cmmqtt_pub_error[1]) == CMMQTT_PUBACK_FAIL ||
							(cmmqtt_err_stat.cmmqtt_pub_error[2]) == CMMQTT_PUBACK_FAIL)||
							((cmmqtt_err_stat.cmmqtt_pub_error[0]) == CMMQTT_PUB_OK &&
							(cmmqtt_err_stat.cmmqtt_pub_error[1]) == CMMQTT_PUB_OK &&
							(cmmqtt_err_stat.cmmqtt_pub_error[2]) == CMMQTT_PUB_OK))
						{
							break;
						}

						cmmqtt_check_times++;
						osDelay(50);
					}

					if(cmmqtt_err_stat.cmmqtt_pub_error[0] == CMMQTT_PUBACK_FAIL)
					{
 						cm_printf("[MQTT][%s][%d] ERR_MQTT_PUBREC_FAIL\r\n",__func__,__LINE__);				
						return -1;
					}
					
					if(cmmqtt_err_stat.cmmqtt_pub_error[1] == CMMQTT_PUBACK_FAIL)
					{
  						cm_printf("[MQTT][%s][%d] ERR_MQTT_SENDPACK_FAIL\r\n",__func__,__LINE__);				
						return -1;
					}

					if(cmmqtt_err_stat.cmmqtt_pub_error[2] == CMMQTT_PUBACK_FAIL)
					{
   						cm_printf("[MQTT][%s][%d] ERR_MQTT_PUBCOMP_FAIL\r\n",__func__,__LINE__);				
						return -1;
					}

				}

		}	
	}
	else
		mqtt_cpub_tmp.mqtt_error = CMMQTT_AT_NOTCONN;

	if(mqtt_cpub_tmp.mqtt_error == CMMQTT_AT_OK)
	{
		return 0;
	}
	else
	{
		return -1;
	}
}

int mqtt_sub_demo()
{
	CMMQTTClient *myclient = cmmqtt_getclient();
	int mqtt_error = CMMQTT_AT_OK;
	volatile int cmmqtt_check_times = 0;

	if(myclient->state == CONNECTED)
	{
		CMMQTTSubmsg *submsg = (CMMQTTSubmsg *)cmmqtt_malloc(sizeof(CMMQTTSubmsg));
		cmmqtt_err_stat.cmmqtt_sub_error = 0;
		submsg->topicName.lenstring.len = 0;
		submsg->topicName.lenstring.data = NULL;
		submsg->topicName.cstring = (char*)cmmqtt_malloc(strlen(cmmqtt_glob_subtopic)+1);
		if(NULL == submsg->topicName.cstring)
		{
			return -1;
		}
		
		strcpy(submsg->topicName.cstring, cmmqtt_glob_subtopic);
		submsg->dup = 0;
		submsg->qos = cmmqtt_glob_subqos;
		submsg->queueout_ackcb = cmmqtt_suback_cb;
		submsg->topic_matchcb = cmmqtt_recpubcb[0];
      	mqtt_error = cmmqtt_subscribe(submsg);
		
		if(mqtt_error != CMMQTT_AT_OK)
			cmmqtt_free_submsg(submsg);
	}
	else
		mqtt_error = CMMQTT_AT_NOTCONN;

	if(mqtt_error == CMMQTT_AT_TOPIC_SUB_DONE)
	{
		cm_printf("[MQTT][%s][%d] ERR_MQTT_TOPIC_SUB_DONE\r\n",__func__,__LINE__);
		return -1;
	}
	else if(mqtt_error == CMMQTT_AT_NOTCONN)
	{
		cm_printf("[MQTT][%s][%d] ERR_MQTT_GPRS_NOTACT\r\n",__func__,__LINE__);
		return -1;
	}
	else if(mqtt_error == CMMQTT_AT_OK)
	{
		while(cmmqtt_check_times < CMMQTT_CHECK_NORMAL_TIMES)
		{
	
			if(cmmqtt_err_stat.cmmqtt_response_error == CMMQTT_CMD_RESPONSE_FAIL)
			{	
				cm_printf("[MQTT][%s][%d] ERR_MQTT_UNKNOWN_DATA\r\n",__func__,__LINE__);
				return -1;
			}
			if(cmmqtt_err_stat.cmmqtt_sub_error == CMMQTT_SUBACK_FAIL)
			{
				cm_printf("[MQTT][%s][%d] ERR_MQTT_SUBACK_FAIL\r\n",__func__,__LINE__);
				return -1;
			}
			else if(cmmqtt_err_stat.cmmqtt_sub_error == CMMQTT_SUB_OK)
			{
				return 0;
			}
			
			cmmqtt_check_times++;
			osDelay(50);
		}
	
	}
	
	return -1;
	
}

int mqtt_unsub_demo()
{
	 CMMQTTClient *myclient = cmmqtt_getclient();
	 int mqtt_error = CMMQTT_AT_OK;
	 volatile int cmmqtt_check_times = 0;
	 
	 if(myclient->state == CONNECTED)
	 {
		CMMQTTUnsubmsg *unsubmsg = (CMMQTTUnsubmsg *)cmmqtt_malloc(sizeof(CMMQTTUnsubmsg));
		unsubmsg->topicName.lenstring.len = 0;
		unsubmsg->topicName.lenstring.data = NULL;
		unsubmsg->topicName.cstring = (char*)cmmqtt_malloc(strlen(cmmqtt_glob_subtopic)+1);
		strcpy(unsubmsg->topicName.cstring,cmmqtt_glob_subtopic);
		unsubmsg->dup = 0;
		unsubmsg->queueout_ackcb = cmmqtt_unsuback_cb;
		cmmqtt_err_stat.cmmqtt_unsub_error = 0;

		cmmqtt_entry *topic = cmmqtt_find_topic(&myclient->topictable,unsubmsg->topicName.cstring,cmmqtt_entry,hash_head);
		if(NULL == topic)
		{				
			cm_printf("[MQTT][%s][%d] TOPIC IS NOT SUBBED\r\n",__func__,__LINE__);
			return -2;
		}
      	mqtt_error = cmmqtt_unsubscribe(unsubmsg);
		
		if (mqtt_error != CMMQTT_AT_OK)
		{
			cmmqtt_free_unsubmsg(unsubmsg);
			cm_printf("[MQTT][%s][%d] SEND UNSUB PACKET ERROR\r\n",__func__,__LINE__);
			return -1;
		}
	 }
	 else
		 mqtt_error = CMMQTT_AT_NOTCONN;

	if(mqtt_error == CMMQTT_AT_OK)
	{
		while(cmmqtt_check_times < CMMQTT_CHECK_NORMAL_TIMES)
		{
		
			if(cmmqtt_err_stat.cmmqtt_response_error == CMMQTT_CMD_RESPONSE_FAIL)
			{	
				cm_printf("[MQTT][%s][%d] MQTT RECEIVED ERROR PACKET\r\n",__func__,__LINE__);
			}

			switch(cmmqtt_err_stat.cmmqtt_unsub_error)
			{
				case CMMQTT_UNSUBACK_FAIL:
					cm_printf("[MQTT][%s][%d] MQTT UNSUB PACKET ERROR\r\n",__func__,__LINE__);
					return -1;
					break;
				case CMMQTT_UNSUB_OK:
					return 0;
					break;
				default:
					break;
			}
			
			cmmqtt_check_times++;
			osDelay(50);
		}
	}
	 
	 if(mqtt_error == CMMQTT_AT_OK)
	 {
		 return 0;
	 }
	 else
	 {			
		 return -1;
	 }

}

int mqtt_disc_demo()
{
	CMMQTTClient *myclient = cmmqtt_getclient();
	int ret_value = 0;
	volatile int cmmqtt_check_times = 0;
	 
	if(myclient->state == CONNECTED)
	{	
		ret_value = cmmqtt_disc();

		if(myclient->cmmqtt_conmsg != NULL)
		{
			cmmqtt_free_conmsg(myclient->cmmqtt_conmsg);
			myclient->cmmqtt_conmsg = NULL;
		}
		myclient->state = DISCONNECTED;

		if(CMMQTT_AT_OK != ret_value)
		{
			cm_printf("[MQTT][%s][%d] ERR_MQTT_SENDPACK_FAIL\r\n",__func__,__LINE__);
			return -1;
		}
		
		while((INITIALED != myclient->state)&&(cmmqtt_check_times < CMMQTT_CHECK_NORMAL_TIMES))
		{
			osDelay(50);
			cmmqtt_check_times++;
		}			

		return 0;
	}
	else if(myclient->state == DISCONNECTED)
	{
		return -1;
	}

	return -1;

 }


int mqtt_del_demo()
{
	CMMQTTClient *myclient = cmmqtt_getclient();
	 
	if(myclient->state == INITIALED )
	{	
		cmmqtt_deinit();
		return 0;
	}
	else
	{
		return -1;
	}
 }


// #endif
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐