1,直接使用python库

代码如下

import RPi.GPIO as GPIO
import dht11
import time
import datetime

GPIO.setwarnings(True)
GPIO.setmode(GPIO.BCM)

instance = dht11.DHT11(pin=14)

try:
	while True:
		result = instance.read()
		if result.is_valid():
			print('ok')
			print("Temperature: %-3.1f C" % result.temperature)
			print("Humidity: %-3.1f " % result.humidity)
			time.sleep(3)
except KeyboardInterrupt:
    print("Cleanup")
    GPIO.cleanup()

结果如下

2,使用GPIO引脚驱动

代码如下

import RPi.GPIO as GPIO
from pin_dic import pin_dic
import time

class DHT11(object):
    def __init__(self,pin_D):
        self._pin = pin_D      
    def collect_input(self):
        unchanged_count = 0
        max_unchanged_count = 100
        last = -1
        data = []
        while True:
            current = GPIO.input(self._pin)
            data.append(current)
            if last != current:
                unchanged_count = 0
                last = current
            else:
                unchanged_count += 1
                if unchanged_count > max_unchanged_count:
                    break
        return data

    def get_high_state_lengths_data(self, data):
        STATE_DELAY_H = 1
        STATE_START_L = 2
        STATE_START_H = 3
        STATE_DATA_L = 4
        STATE_DATA_H = 5

        state = STATE_DELAY_H

        lengths = [] 
        current_length = 0 

        for i in range(len(data)):
            current = data[i]
            current_length += 1
            if state == STATE_DELAY_H:
                if current == GPIO.LOW:
                    state = STATE_START_L
                    continue
                else:
                    continue 
            if state == STATE_START_L:
                if current == GPIO.HIGH:
                    state = STATE_START_H
                    continue
                else:
                    continue
            if state == STATE_START_H:
                if current == GPIO.LOW:
                    state = STATE_DATA_L
                    continue
                else:
                    continue
            if state == STATE_DATA_L:
                if current == GPIO.HIGH:
                    current_length = 0
                    state = STATE_DATA_H
                    continue
                else:
                    continue
            if state == STATE_DATA_H:
                if current == GPIO.LOW:
                    lengths.append(current_length)
                    state = STATE_DATA_L
                    continue
                else:
                    continue
        return lengths

    def calculate_bits(self,high_state_lengths_data):
        shortest_pull_up = 1000
        longest_pull_up = 0
        for i in range(0, len(high_state_lengths_data)):
            length = high_state_lengths_data[i]
            if length < shortest_pull_up:
                shortest_pull_up = length
            if length > longest_pull_up:
                longest_pull_up = length

        halfway = (longest_pull_up + shortest_pull_up) / 2
        bits = []
        for i in range(0, len(high_state_lengths_data)):
            bit = False
            if high_state_lengths_data[i] > halfway:
                bit = True
            bits.append(bit)
        return bits

    def bits_to_bytes(self, bits):
        the_bytes = []
        byte = 0

        for i in range(0, len(bits)):
            byte = byte << 1
            if (bits[i]):
                byte = byte | 1
            else:
                byte = byte | 0
            if ((i + 1) % 8 == 0):
                the_bytes.append(byte)
                byte = 0

        return the_bytes

    def calculate_checksum(self, the_bytes):
        return (the_bytes[0] + the_bytes[1] + the_bytes[2] + the_bytes[3]) & 255

    def read_DHT(self):

        GPIO. setup (self._pin , GPIO.OUT)

        GPIO.output(self._pin, GPIO.HIGH)
        time.sleep(0.05)

        GPIO.output(self._pin, GPIO.LOW)
        time.sleep(0.02)

        GPIO.setup(self._pin, GPIO.IN, GPIO.PUD_UP)

        data = self.collect_input()

        high_state_lengths_data = self.get_high_state_lengths_data(data)

        if len(high_state_lengths_data) != 40:
            return False,0

        bits = self.calculate_bits(high_state_lengths_data)

        the_bytes = self.bits_to_bytes(bits)

        checksum = self.calculate_checksum(the_bytes)
        if the_bytes[4] != checksum:
            return False,0

        temperature = the_bytes[2] + float(the_bytes[3]) / 10
        humidity = the_bytes[0] + float(the_bytes[1]) / 10

        return True, [temperature, humidity]

if __name__ == "__main__":
    pin_dht=  pin_dic['G16']
    GPIO.setmode(GPIO.BOARD)
    
    m_DHT11 =  DHT11(pin_dht)
    
    try: 
        while True:
            flag, result = m_DHT11.read_DHT()        
            if flag:
                print("temp: %-3.1f,humi:%-3.1f\n\r"%(result[0],result[1]))
            else:
                print("ERROR")                
            time.sleep(3)    
    except KeyboardInterrupt:
        print('cleanup')               
    finally:
        GPIO.cleanup()    

结果如下 

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐