MicroPython run tasks concurrently, base on time ticks, uasyncio and _thread.
Exercises in last post "MicroPython/NodeMCU ESP-C3-32S-Kit control onboard LEDs" run tasks in one-by-one sequence. It call blocking function time.sleep() to delay, so the tasks cannot run in parallel. In this post, I run tasks (to adjust LEDs pwm) concurrently, base on time ticks, uasyncio and _thread.
Based on time ticks
mpy_NodeMCU_ESPC3_Blink_Without_Sleep.py
"""
MicroPython/NodeMCU ESP-C3-32S-Kit exercise
to Change RGB LED (PWM) without calling time.sleep().
ref:
MicroPython doc -time module
https://docs.micropython.org/en/latest/library/time.html
"""
import os
import sys
import time
from machine import Pin, PWM
print()
print("====================================")
print(sys.implementation[0], os.uname()[3],
"\nrun on", os.uname()[4])
print("====================================")
time.sleep(1)
# NodeMCU ESP-C3-32S-Kit onboard LEDs assignment
pwmR = PWM(Pin(3))
pwmG = PWM(Pin(4))
pwmB = PWM(Pin(5))
pwmWarm = PWM(Pin(18))
pwmCool = PWM(Pin(19))
pwmR.freq(1000) # set PWM frequency from 1Hz to 40MHz
pwmG.freq(1000)
pwmB.freq(1000)
pwmWarm.freq(1000)
pwmCool.freq(1000)
pwmR.duty(0)
pwmG.duty(0)
pwmB.duty(0)
pwmWarm.duty(0)
pwmCool.duty(0)
class taskPWM_class:
def __init__(self, dur, pwm):
self.Tick_Dur = dur
self.pwm = pwm
self.nextTick = time.ticks_add(time.ticks_ms(), self.Tick_Dur)
self.dir = True # increase
self.duty = 0
self.pwm.duty(0)
def update(self):
self.pwm.duty(self.duty)
#prepare next
if self.dir:
self.duty = self.duty+1
if self.duty == 1024:
self.dir = False
self.duty = 1023
else:
self.duty = self.duty-1
if self.duty == -1:
self.dir = True
self.duty = 0
def prc(self):
curTime = time.ticks_ms()
if (time.ticks_diff(curTime, self.nextTick ) > 0):
# do anything in fixed duration
self.nextTick = time.ticks_add(curTime, self.Tick_Dur)
self.update()
class taskRGB_class:
pwmColor = [pwmR, pwmG, pwmB]
def __init__(self, dur):
self.Tick_Dur = dur
self.nextTick = time.ticks_add(time.ticks_ms(), self.Tick_Dur)
self.color = 0 # RED
self.dir = True # increase
self.duty = 0
pwmR.duty(0)
pwmG.duty(0)
pwmB.duty(0)
def update(self):
self.pwmColor[self.color].duty(self.duty)
#prepare next
if self.dir:
self.duty = self.duty+1
if self.duty == 1024:
self.dir = False
self.duty = 1023
else:
self.duty = self.duty-1
if self.duty == -1:
self.color += 1
if self.color == len(self.pwmColor): #BLUE?
self.color = 0 #cycle to RED
self.dir = True
self.duty = 0
def prc(self):
curTime = time.ticks_ms()
if (time.ticks_diff(curTime, self.nextTick ) > 0):
self.nextTick = time.ticks_add(curTime, self.Tick_Dur)
# do anything in fixed duration
self.update()
myTaskRGB = taskRGB_class(5)
myTaskWarm = taskPWM_class(15, pwmWarm)
myTaskCool = taskPWM_class(20, pwmCool)
while True:
myTaskRGB.prc()
myTaskWarm.prc()
myTaskCool.prc()
Conceptly, it's same as the Arduino famous example "Blink Without Delay".uasyncio
mpy_NodeMCU_ESPC3_Blink_uasyncio.py
"""
MicroPython/NodeMCU ESP-C3-32S-Kit exercise
to change LEDs PWM in concurrent using uasyncio.
ref:
MicroPython doc - uasyncio
https://docs.micropython.org/en/latest/library/uasyncio.html
"""
import os
import sys
import time
from machine import Pin, PWM
import uasyncio
# NodeMCU ESP-C3-32S-Kit onboard LEDs assignment
pwmR = PWM(Pin(3))
pwmG = PWM(Pin(4))
pwmB = PWM(Pin(5))
pwmWarm = PWM(Pin(18))
pwmCool = PWM(Pin(19))
pwmR.freq(1000) # set PWM frequency from 1Hz to 40MHz
pwmG.freq(1000)
pwmB.freq(1000)
pwmWarm.freq(1000)
pwmCool.freq(1000)
# All LEDs OFF
pwmR.duty(0)
pwmG.duty(0)
pwmB.duty(0)
pwmWarm.duty(0)
pwmCool.duty(0)
print()
print("====================================")
print(sys.implementation[0], os.uname()[3],
"\nrun on", os.uname()[4])
print("====================================")
time.sleep(1)
async def PWMRGBTest(pwmpin, dur):
while True:
for d in range(0, 512, 8):
pwmpin.duty(d)
await uasyncio.sleep_ms(dur)
for d in range(511, -1, -8):
pwmpin.duty(d)
await uasyncio.sleep_ms(dur)
async def PWMTest(pwmpin,dur):
for d in range(0, 512, 8):
pwmpin.duty(d)
await uasyncio.sleep_ms(dur)
for d in range(511, -1, -8):
pwmpin.duty(d)
await uasyncio.sleep_ms(dur)
pwmpin.duty(0)
async def LEDTest():
while True:
await PWMTest(pwmCool, 100)
await PWMTest(pwmWarm, 100)
async def main():
uasyncio.create_task(PWMRGBTest(pwmR, 100))
uasyncio.create_task(PWMRGBTest(pwmG, 200))
uasyncio.create_task(PWMRGBTest(pwmB, 300))
uasyncio.create_task(LEDTest())
while True:
await uasyncio.sleep_ms(500)
uasyncio.run(main())
_thread
mpy_NodeMCU_ESPC3_Blink_thread.py
"""
MicroPython/NodeMCU ESP-C3-32S-Kit exercise
to change LEDs PWM in multithreading using _thread.
ref:
MicroPython doc - _thread
https://docs.micropython.org/en/latest/library/_thread.html
* This module is highly experimental and its API is not yet
fully settled and not yet described
"""
import os
import sys
import time
from machine import Pin, PWM
import _thread
# NodeMCU ESP-C3-32S-Kit onboard LEDs assignment
pwmR = PWM(Pin(3))
pwmG = PWM(Pin(4))
pwmB = PWM(Pin(5))
pwmWarm = PWM(Pin(18))
pwmCool = PWM(Pin(19))
pwmR.freq(1000) # set PWM frequency from 1Hz to 40MHz
pwmG.freq(1000)
pwmB.freq(1000)
pwmWarm.freq(1000)
pwmCool.freq(1000)
# All LEDs OFF
pwmR.duty(0)
pwmG.duty(0)
pwmB.duty(0)
pwmWarm.duty(0)
pwmCool.duty(0)
print()
print("====================================")
print(sys.implementation[0], os.uname()[3],
"\nrun on", os.uname()[4])
print("====================================")
time.sleep(1)
def thread_Pwm(pwm, dur):
while True:
for d in range(0, 512, 8):
pwm.duty(d)
time.sleep_ms(dur)
for d in range(511, -1, -8):
pwm.duty(d)
time.sleep_ms(dur)
_thread.start_new_thread(thread_Pwm, (pwmR, 10))
_thread.start_new_thread(thread_Pwm, (pwmG, 20))
_thread.start_new_thread(thread_Pwm, (pwmB, 30))
_thread.start_new_thread(thread_Pwm, (pwmCool, 10))
_thread.start_new_thread(thread_Pwm, (pwmWarm, 15))
while True:
pass
mpy_NodeMCU_ESPC3_Blink_thread_input.py, also get user input() via REPL consol without blocking.
"""
MicroPython/NodeMCU ESP-C3-32S-Kit exercise
to change LEDs PWM in multithreading using _thread.
also read user input in REPL consol.
ref:
MicroPython doc - _thread
https://docs.micropython.org/en/latest/library/_thread.html
* This module is highly experimental and its API is not yet
fully settled and not yet described
"""
import os
import sys
import time
from machine import Pin, PWM
import _thread
# NodeMCU ESP-C3-32S-Kit onboard LEDs assignment
pwmR = PWM(Pin(3))
pwmG = PWM(Pin(4))
pwmB = PWM(Pin(5))
pwmWarm = PWM(Pin(18))
pwmCool = PWM(Pin(19))
pwmR.freq(1000) # set PWM frequency from 1Hz to 40MHz
pwmG.freq(1000)
pwmB.freq(1000)
pwmWarm.freq(1000)
pwmCool.freq(1000)
# All LEDs OFF
pwmR.duty(0)
pwmG.duty(0)
pwmB.duty(0)
pwmWarm.duty(0)
pwmCool.duty(0)
print()
print("====================================")
print(sys.implementation[0], os.uname()[3],
"\nrun on", os.uname()[4])
print("====================================")
time.sleep(1)
def thread_Pwm(pwm, dur):
while True:
for d in range(0, 512, 8):
pwm.duty(d)
time.sleep_ms(dur)
for d in range(511, -1, -8):
pwm.duty(d)
time.sleep_ms(dur)
def thread_input():
while True:
print(input("Enter something:"))
_thread.start_new_thread(thread_Pwm, (pwmR, 10))
_thread.start_new_thread(thread_Pwm, (pwmG, 20))
_thread.start_new_thread(thread_Pwm, (pwmB, 30))
_thread.start_new_thread(thread_Pwm, (pwmCool, 10))
_thread.start_new_thread(thread_Pwm, (pwmWarm, 15))
_thread.start_new_thread(thread_input, ())
while True:
pass
Comments
Post a Comment