MicroPython bluetooth example using aioble library, between Raspberry Pi Pico W and ESP32-C3. (Both run MicroPython 1.21.0)

With recent update of MicroPython, bluetooth is supported in Raspberry Pi Pico W build.

The official guide "Connecting to the Internet with Raspberry Pi Pico W book (2023-06-14)" updated with example using bluetooth.
Read my previous post "Prepare MicroPython Bluetooth service example on Raspberry Pi Pico W".

Refer to MicroPython libraries document - bluetooth, higher-level aioble library is recommended. aioble provides an object-oriented, asyncio-based wrapper for MicroPython's bluetooth API.
(* This module is still under development and its classes, functions, methods and constants are subject to change.)

This video introduce the alternative using aioble library, run on Raspberry Pi Pico W and Espressif ESP32-C3-DevKitM-1, both run MicroPython v1.21.0.



Notice that aioble is included in Raspberry Pi Pico W MicroPython v1.21.0, but not in v1.20.0.





Fail in await connection.disconnected() using aioble library, and solution.

In above MicroPython bluetooth example using aioble library, temp_sensor.py crash in the following line after connected for a while.
            await connection.disconnected()
To fix it, I replace it with checking connection.is_connected():
            while connection.is_connected():
                await asyncio.sleep_ms(500)


As record, both py code listed here.

temp_sensor.py
import sys

sys.path.append("")

from micropython import const

import uasyncio as asyncio
import aioble
import bluetooth

import random
import struct

# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)

# How frequently to send advertising beacons.
_ADV_INTERVAL_MS = 250_000


# Register GATT server.
temp_service = aioble.Service(_ENV_SENSE_UUID)
temp_characteristic = aioble.Characteristic(
    temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True
)
aioble.register_services(temp_service)


# Helper to encode the temperature characteristic encoding (sint16, hundredths of a degree).
def _encode_temperature(temp_deg_c):
    return struct.pack("<h", int(temp_deg_c * 100))


# This would be periodically polling a hardware sensor.
async def sensor_task():
    t = 24.5
    while True:
        temp_characteristic.write(_encode_temperature(t))
        t += random.uniform(-0.5, 0.5)
        await asyncio.sleep_ms(1000)


# Serially wait for connections. Don't advertise while a central is
# connected.
async def peripheral_task():
    while True:
        async with await aioble.advertise(
            _ADV_INTERVAL_MS,
            name="mpy-temp",
            services=[_ENV_SENSE_UUID],
            appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
        ) as connection:
            print("Connection from", connection.device)
            
            #await connection.disconnected()
            while connection.is_connected():
                await asyncio.sleep_ms(500)
                
            print("Dis-connected")

# Run both tasks.
async def main():
    t1 = asyncio.create_task(sensor_task())
    t2 = asyncio.create_task(peripheral_task())
    await asyncio.gather(t1, t2)


asyncio.run(main())

temp_client.py
import sys

sys.path.append("")

from micropython import const

import uasyncio as asyncio
import aioble
import bluetooth

import random
import struct

# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)


# Helper to decode the temperature characteristic encoding (sint16, hundredths of a degree).
def _decode_temperature(data):
    return struct.unpack("<h", data)[0] / 100


async def find_temp_sensor():
    # Scan for 5 seconds, in active mode, with very low interval/window (to
    # maximise detection rate).
    async with aioble.scan(5000, interval_us=30000, window_us=30000, active=True) as scanner:
        async for result in scanner:
            # See if it matches our name and the environmental sensing service.
            if result.name() == "mpy-temp" and _ENV_SENSE_UUID in result.services():
                return result.device
    return None


async def main():
    device = await find_temp_sensor()
    if not device:
        print("Temperature sensor not found")
        return

    try:
        print("Connecting to", device)
        connection = await device.connect()
    except asyncio.TimeoutError:
        print("Timeout during connection")
        return

    async with connection:
        try:
            temp_service = await connection.service(_ENV_SENSE_UUID)
            temp_characteristic = await temp_service.characteristic(_ENV_SENSE_TEMP_UUID)
        except asyncio.TimeoutError:
            print("Timeout discovering services/characteristics")
            return

        while True:
            temp_deg_c = _decode_temperature(await temp_characteristic.read())
            print("Temperature: {:.2f}".format(temp_deg_c))
            await asyncio.sleep_ms(1000)


asyncio.run(main())




Next:
MicroPython bluetooth remote control LED between Raspberry Pi Pico W and ESP32-C3, using aioble library.


Comments

Popular posts from this blog

MicroPython/ESP32-C3 + 1.8" 128x160 TFT ST7735 SPI, using boochow/MicroPython-ST7735 library.

CameraWebServe: ESP32-S3 (arduino-esp32) + OV5640 camera module