MicroPython run tasks concurrently, base on time ticks, uasyncio and _thread.

Exercises in last post "MicroPython/NodeMCU ESP-C3-32S-Kit control onboard LEDs" run tasks in one-by-one sequence. It call blocking function time.sleep() to delay, so the tasks cannot run in parallel. In this post, I run tasks (to adjust LEDs pwm) concurrently, base on time ticks, uasyncio and _thread.


Based on time ticks


mpy_NodeMCU_ESPC3_Blink_Without_Sleep.py
"""
MicroPython/NodeMCU ESP-C3-32S-Kit exercise
to Change RGB LED (PWM) without calling time.sleep().

ref:
MicroPython doc -time module                                                                                                                              
https://docs.micropython.org/en/latest/library/time.html
"""
import os
import sys
import time
from machine import Pin, PWM

print()

print("====================================")
print(sys.implementation[0], os.uname()[3],
      "\nrun on", os.uname()[4])
print("====================================")
time.sleep(1)

# NodeMCU ESP-C3-32S-Kit onboard LEDs assignment
pwmR = PWM(Pin(3))
pwmG = PWM(Pin(4))
pwmB = PWM(Pin(5))
pwmWarm = PWM(Pin(18))
pwmCool = PWM(Pin(19))

pwmR.freq(1000)    # set PWM frequency from 1Hz to 40MHz
pwmG.freq(1000)
pwmB.freq(1000)
pwmWarm.freq(1000)
pwmCool.freq(1000)

pwmR.duty(0)
pwmG.duty(0)
pwmB.duty(0)
pwmWarm.duty(0)
pwmCool.duty(0)

class taskPWM_class:
    def __init__(self, dur, pwm):
        self.Tick_Dur = dur
        self.pwm = pwm
        
        self.nextTick = time.ticks_add(time.ticks_ms(), self.Tick_Dur)
        self.dir   = True  # increase
        self.duty = 0
        
        self.pwm.duty(0)
        
    def update(self):
        self.pwm.duty(self.duty)
        
        #prepare next 
        if self.dir:
            self.duty = self.duty+1
            if self.duty == 1024:
                self.dir = False
                self.duty = 1023
                
        else:
            self.duty = self.duty-1
            if self.duty == -1:
                self.dir = True
                self.duty = 0

    def prc(self):
        curTime = time.ticks_ms()
        if (time.ticks_diff(curTime, self.nextTick ) > 0):
            # do anything in fixed duration
            self.nextTick = time.ticks_add(curTime, self.Tick_Dur)
            self.update()

class taskRGB_class:
    pwmColor = [pwmR, pwmG, pwmB]

    def __init__(self, dur):
        self.Tick_Dur = dur
        self.nextTick = time.ticks_add(time.ticks_ms(), self.Tick_Dur)
        self.color = 0     # RED
        self.dir   = True  # increase
        self.duty = 0
        
        pwmR.duty(0)
        pwmG.duty(0)
        pwmB.duty(0)
        
    def update(self):
        self.pwmColor[self.color].duty(self.duty)
        
        #prepare next 
        if self.dir:
            self.duty = self.duty+1
            if self.duty == 1024:
                self.dir = False
                self.duty = 1023
                
        else:
            self.duty = self.duty-1
            if self.duty == -1:
                self.color += 1
                if self.color == len(self.pwmColor):  #BLUE?
                    self.color = 0   #cycle to RED
                self.dir = True
                self.duty = 0
        
    def prc(self):
        curTime = time.ticks_ms()
        if (time.ticks_diff(curTime, self.nextTick ) > 0):
            self.nextTick = time.ticks_add(curTime, self.Tick_Dur)
            # do anything in fixed duration
            self.update()
         
myTaskRGB = taskRGB_class(5)
myTaskWarm = taskPWM_class(15, pwmWarm)
myTaskCool = taskPWM_class(20, pwmCool)

while True:
    myTaskRGB.prc()
    myTaskWarm.prc()
    myTaskCool.prc()

Conceptly, it's same as the Arduino famous example "Blink Without Delay".

uasyncio



mpy_NodeMCU_ESPC3_Blink_uasyncio.py
"""
MicroPython/NodeMCU ESP-C3-32S-Kit exercise
to change LEDs PWM in concurrent using uasyncio.

ref:
MicroPython doc - uasyncio
https://docs.micropython.org/en/latest/library/uasyncio.html
"""
import os
import sys
import time
from machine import Pin, PWM
import uasyncio

# NodeMCU ESP-C3-32S-Kit onboard LEDs assignment
pwmR = PWM(Pin(3))
pwmG = PWM(Pin(4))
pwmB = PWM(Pin(5))
pwmWarm = PWM(Pin(18))
pwmCool = PWM(Pin(19))

pwmR.freq(1000)    # set PWM frequency from 1Hz to 40MHz
pwmG.freq(1000)
pwmB.freq(1000)
pwmWarm.freq(1000)
pwmCool.freq(1000)

# All LEDs OFF
pwmR.duty(0)
pwmG.duty(0)
pwmB.duty(0)
pwmWarm.duty(0)
pwmCool.duty(0)
    
print()
print("====================================")
print(sys.implementation[0], os.uname()[3],
        "\nrun on", os.uname()[4])
print("====================================")
time.sleep(1)

async def PWMRGBTest(pwmpin, dur):
    while True:
        for d in range(0, 512, 8):
            pwmpin.duty(d)
            await uasyncio.sleep_ms(dur)
        for d in range(511, -1, -8):
            pwmpin.duty(d)
            await uasyncio.sleep_ms(dur)
            
async def PWMTest(pwmpin,dur):
    for d in range(0, 512, 8):
        pwmpin.duty(d)
        await uasyncio.sleep_ms(dur)
    for d in range(511, -1, -8):
        pwmpin.duty(d)
        await uasyncio.sleep_ms(dur)
    pwmpin.duty(0)
        
async def LEDTest():
    while True:
        await PWMTest(pwmCool, 100)
        await PWMTest(pwmWarm, 100)

async def main():
    uasyncio.create_task(PWMRGBTest(pwmR, 100))
    uasyncio.create_task(PWMRGBTest(pwmG, 200))
    uasyncio.create_task(PWMRGBTest(pwmB, 300))
    uasyncio.create_task(LEDTest())
        
    while True:
        await uasyncio.sleep_ms(500)

uasyncio.run(main())


_thread



mpy_NodeMCU_ESPC3_Blink_thread.py
"""
MicroPython/NodeMCU ESP-C3-32S-Kit exercise
to change LEDs PWM in multithreading using _thread.

ref:
MicroPython doc - _thread
https://docs.micropython.org/en/latest/library/_thread.html
* This module is highly experimental and its API is not yet
  fully settled and not yet described
"""
import os
import sys
import time
from machine import Pin, PWM
import _thread

# NodeMCU ESP-C3-32S-Kit onboard LEDs assignment
pwmR = PWM(Pin(3))
pwmG = PWM(Pin(4))
pwmB = PWM(Pin(5))
pwmWarm = PWM(Pin(18))
pwmCool = PWM(Pin(19))

pwmR.freq(1000)    # set PWM frequency from 1Hz to 40MHz
pwmG.freq(1000)
pwmB.freq(1000)
pwmWarm.freq(1000)
pwmCool.freq(1000)

# All LEDs OFF
pwmR.duty(0)
pwmG.duty(0)
pwmB.duty(0)
pwmWarm.duty(0)
pwmCool.duty(0)
    
print()
print("====================================")
print(sys.implementation[0], os.uname()[3],
        "\nrun on", os.uname()[4])
print("====================================")
time.sleep(1)

def thread_Pwm(pwm, dur):
    while True:
        for d in range(0, 512, 8):
            pwm.duty(d)
            time.sleep_ms(dur)
        for d in range(511, -1, -8):
            pwm.duty(d)
            time.sleep_ms(dur)

_thread.start_new_thread(thread_Pwm, (pwmR, 10))
_thread.start_new_thread(thread_Pwm, (pwmG, 20))
_thread.start_new_thread(thread_Pwm, (pwmB, 30))
_thread.start_new_thread(thread_Pwm, (pwmCool, 10))
_thread.start_new_thread(thread_Pwm, (pwmWarm, 15))

while True:
    pass




mpy_NodeMCU_ESPC3_Blink_thread_input.py, also get user input() via REPL consol without blocking.
"""
MicroPython/NodeMCU ESP-C3-32S-Kit exercise
to change LEDs PWM in multithreading using _thread.
also read user input in REPL consol.

ref:
MicroPython doc - _thread
https://docs.micropython.org/en/latest/library/_thread.html
* This module is highly experimental and its API is not yet
  fully settled and not yet described
"""
import os
import sys
import time
from machine import Pin, PWM
import _thread

# NodeMCU ESP-C3-32S-Kit onboard LEDs assignment
pwmR = PWM(Pin(3))
pwmG = PWM(Pin(4))
pwmB = PWM(Pin(5))
pwmWarm = PWM(Pin(18))
pwmCool = PWM(Pin(19))

pwmR.freq(1000)    # set PWM frequency from 1Hz to 40MHz
pwmG.freq(1000)
pwmB.freq(1000)
pwmWarm.freq(1000)
pwmCool.freq(1000)

# All LEDs OFF
pwmR.duty(0)
pwmG.duty(0)
pwmB.duty(0)
pwmWarm.duty(0)
pwmCool.duty(0)
    
print()
print("====================================")
print(sys.implementation[0], os.uname()[3],
        "\nrun on", os.uname()[4])
print("====================================")
time.sleep(1)

def thread_Pwm(pwm, dur):
    while True:
        for d in range(0, 512, 8):
            pwm.duty(d)
            time.sleep_ms(dur)
        for d in range(511, -1, -8):
            pwm.duty(d)
            time.sleep_ms(dur)
            
def thread_input():
    while True:
        print(input("Enter something:"))

_thread.start_new_thread(thread_Pwm, (pwmR, 10))
_thread.start_new_thread(thread_Pwm, (pwmG, 20))
_thread.start_new_thread(thread_Pwm, (pwmB, 30))
_thread.start_new_thread(thread_Pwm, (pwmCool, 10))
_thread.start_new_thread(thread_Pwm, (pwmWarm, 15))

_thread.start_new_thread(thread_input, ())

while True:
    pass




Comments

Popular posts from this blog

MicroPython/ESP32-C3 + 1.8" 128x160 TFT ST7735 SPI, using boochow/MicroPython-ST7735 library.

CameraWebServe: ESP32-S3 (arduino-esp32) + OV5640 camera module