Add more examples.

This commit is contained in:
Matthew Einhorn 2019-06-27 01:14:23 -04:00
parent 6c59fd612a
commit b25ad1f8d5
7 changed files with 366 additions and 15 deletions

View File

@ -0,0 +1,92 @@
'''Example shows the recommended way of how to run Kivy with the Python built
in asyncio event loop as just another async coroutine.
'''
import asyncio
import os
os.environ['KIVY_EVENTLOOP'] = 'async'
'''async needs to be set so that asyncio will be used for the event loop. '''
from kivy.app import App
from kivy.lang.builder import Builder
kv = '''
BoxLayout:
orientation: 'vertical'
BoxLayout:
ToggleButton:
id: btn1
group: 'a'
text: 'Sleeping'
allow_no_selection: False
on_state: if self.state == 'down': label.status = self.text
ToggleButton:
id: btn2
group: 'a'
text: 'Swimming'
allow_no_selection: False
on_state: if self.state == 'down': label.status = self.text
ToggleButton:
id: btn3
group: 'a'
text: 'Reading'
allow_no_selection: False
state: 'down'
on_state: if self.state == 'down': label.status = self.text
Label:
id: label
status: 'Reading'
text: 'Beach status is "{}"'.format(self.status)
'''
class AsyncApp(App):
other_task = None
def build(self):
return Builder.load_string(kv)
def app_func(self):
'''This will run both methods asynchronously and then block until they
are finished
'''
self.other_task = asyncio.ensure_future(self.waste_time_freely())
async def run_wrapper():
await self.async_run()
print('App done')
self.other_task.cancel()
return asyncio.gather(run_wrapper(), self.other_task)
async def waste_time_freely(self):
'''This method is also run by the asyncio loop and periodically prints
something.
'''
try:
i = 0
while True:
if self.root is not None:
status = self.root.ids.label.status
print('{} on the beach'.format(status))
# get some sleep
if self.root.ids.btn1.state != 'down' and i >= 2:
i = 0
print('Yawn, getting tired. Going to sleep')
self.root.ids.btn1.trigger_action()
i += 1
await asyncio.sleep(2)
except asyncio.CancelledError as e:
print('Wasting time was canceled', e)
finally:
# when canceled, print that it finished
print('Done wasting time')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(AsyncApp().app_func())
loop.close()

View File

@ -0,0 +1,61 @@
'''Example shows the recommended way of how to run Kivy with the Python built
in asyncio event loop as just another async coroutine.
'''
import asyncio
import os
os.environ['KIVY_EVENTLOOP'] = 'async'
'''async needs to be set so that asyncio will be used for the event loop. '''
from kivy.app import async_runTouchApp
from kivy.lang.builder import Builder
kv = '''
BoxLayout:
orientation: 'vertical'
Button:
id: btn
text: 'Press me'
BoxLayout:
Label:
id: label
text: 'Button is "{}"'.format(btn.state)
'''
async def run_app_happily(root, other_task):
'''This method, which runs Kivy, is run by the asyncio loop as one of the
coroutines.
'''
await async_runTouchApp(root) # run Kivy
print('App done')
# now cancel all the other tasks that may be running
other_task.cancel()
async def waste_time_freely():
'''This method is also run by the asyncio loop and periodically prints
something.
'''
try:
while True:
print('Sitting on the beach')
await asyncio.sleep(2)
except asyncio.CancelledError as e:
print('Wasting time was canceled', e)
finally:
# when canceled, print that it finished
print('Done wasting time')
if __name__ == '__main__':
def root_func():
'''This will run both methods asynchronously and then block until they
are finished
'''
root = Builder.load_string(kv) # root widget
other_task = asyncio.ensure_future(waste_time_freely())
return asyncio.gather(run_app_happily(root, other_task), other_task)
loop = asyncio.get_event_loop()
loop.run_until_complete(root_func())
loop.close()

View File

@ -0,0 +1,95 @@
'''Example shows the recommended way of how to run Kivy with a trio
event loop as just another async coroutine.
'''
import trio
import os
os.environ['KIVY_EVENTLOOP'] = 'trio'
'''trio needs to be set so that it'll be used for the event loop. '''
from kivy.app import App
from kivy.lang.builder import Builder
kv = '''
BoxLayout:
orientation: 'vertical'
BoxLayout:
ToggleButton:
id: btn1
group: 'a'
text: 'Sleeping'
allow_no_selection: False
on_state: if self.state == 'down': label.status = self.text
ToggleButton:
id: btn2
group: 'a'
text: 'Swimming'
allow_no_selection: False
on_state: if self.state == 'down': label.status = self.text
ToggleButton:
id: btn3
group: 'a'
text: 'Reading'
allow_no_selection: False
state: 'down'
on_state: if self.state == 'down': label.status = self.text
Label:
id: label
status: 'Reading'
text: 'Beach status is "{}"'.format(self.status)
'''
class AsyncApp(App):
nursery = None
def build(self):
return Builder.load_string(kv)
async def app_func(self):
'''trio needs to run a function, so this is it. '''
async with trio.open_nursery() as nursery:
'''In trio you create a nursery, in which you schedule async
functions to be run by the nursery simultaneously as tasks.
This will run all two methods starting in random order
asynchronously and then block until they are finished or canceled
at the `with` level. '''
self.nursery = nursery
async def run_wrapper():
await self.async_run()
print('App done')
nursery.cancel_scope.cancel()
nursery.start_soon(run_wrapper)
nursery.start_soon(self.waste_time_freely)
async def waste_time_freely(self):
'''This method is also run by trio and periodically prints something.'''
try:
i = 0
while True:
if self.root is not None:
status = self.root.ids.label.status
print('{} on the beach'.format(status))
# get some sleep
if self.root.ids.btn1.state != 'down' and i >= 2:
i = 0
print('Yawn, getting tired. Going to sleep')
self.root.ids.btn1.trigger_action()
i += 1
await trio.sleep(2)
except trio.Cancelled as e:
print('Wasting time was canceled', e)
finally:
# when canceled, print that it finished
print('Done wasting time')
if __name__ == '__main__':
trio.run(AsyncApp().app_func)

View File

@ -19,6 +19,7 @@ BoxLayout:
BoxLayout:
Label:
id: label
text: 'Button is "{}"'.format(btn.state)
'''
@ -37,6 +38,8 @@ async def waste_time_freely():
while True:
print('Sitting on the beach')
await trio.sleep(2)
except trio.Cancelled as e:
print('Wasting time was canceled', e)
finally:
# when canceled, print that it finished
print('Done wasting time')

View File

@ -327,7 +327,7 @@ async event loop to run it asynchronously.
'''
__all__ = ('App', )
__all__ = ('App', 'runTouchApp', 'async_runTouchApp', 'stopTouchApp')
import os
from inspect import getfile
@ -879,7 +879,7 @@ Context.html#getFilesDir()>`_ is returned.
'''Identical to :meth:`run`, but is a coroutine and can be
scheduled in a running async event loop.
.. versionadded:: 1.10.1
.. versionadded:: 2.0.0
'''
self._run_prepare()
await async_runTouchApp()

View File

@ -136,7 +136,7 @@ class UnitKivyApp(object):
return values
async def do_touch_down_up(
self, pos=None, widget=None, duration=.5, pos_jitter=None,
self, pos=None, widget=None, duration=.2, pos_jitter=None,
widget_jitter=False, jitter_dt=1 / 15., end_on_pos=False):
x, y = pos or widget.center
touch = AsyncUnitTestTouch(x, y)
@ -144,13 +144,13 @@ class UnitKivyApp(object):
ts = time.perf_counter()
touch.touch_down()
await self.wait_clock_frames(1)
yield touch.pos, 'down'
yield 'down', touch.pos
if not pos_jitter and not widget_jitter:
await async_sleep(duration)
touch.touch_up()
await self.wait_clock_frames(1)
yield touch.pos, 'move'
yield 'move', touch.pos
return
@ -170,27 +170,27 @@ class UnitKivyApp(object):
y + (random.random() * 2 - 1) * dy
)
await self.wait_clock_frames(1)
yield touch.pos, 'move'
yield 'move', touch.pos
if end_on_pos and moved:
touch.touch_move(x, y)
await self.wait_clock_frames(1)
yield touch.pos, 'move'
yield 'move', touch.pos
touch.touch_up()
await self.wait_clock_frames(1)
yield touch.pos, 'up'
yield 'up', touch.pos
async def do_touch_drag(
self, pos=None, widget=None, target_pos=None, target_widget=None,
duration=.5, drag_n=10):
duration=.2, drag_n=10):
x, y = pos or widget.center
tx, ty = target_pos or target_widget.center
touch = AsyncUnitTestTouch(x, y)
touch.touch_down()
await self.wait_clock_frames(1)
yield touch.pos, 'down'
yield 'down', touch.pos
dx = (tx - x) / drag_n
dy = (ty - y) / drag_n
@ -201,13 +201,57 @@ class UnitKivyApp(object):
touch.touch_move(x + (i + 1) * dx, y + (i + 1) * dy)
await self.wait_clock_frames(1)
yield touch.pos, 'move'
yield 'move', touch.pos
if touch.pos != target_pos:
touch.touch_move(*target_pos)
await self.wait_clock_frames(1)
yield touch.pos, 'move'
yield 'move', touch.pos
touch.touch_up()
await self.wait_clock_frames(1)
yield touch.pos, 'up'
yield 'up', touch.pos
async def do_keyboard_key(
self, key, modifiers=(), duration=.2, num_press=1):
from kivy.core.window import Window
key_code = Window._system_keyboard.string_to_keycode(key.lower())
known_modifiers = (
'shift', 'alt', 'ctrl', 'meta', 'numlock', 'capslock')
special_keys = {
27: 'escape',
9: 'tab',
8: 'backspace',
13: 'enter',
127: 'del',
271: 'enter',
273: 'up',
274: 'down',
275: 'right',
276: 'left',
278: 'home',
279: 'end',
280: 'pgup',
281: 'pgdown'}
text = None
if (key not in modifiers and key not in known_modifiers and
key_code not in special_keys):
try:
text = chr(key_code)
except ValueError:
pass
dt = duration / num_press
for i in range(num_press):
await async_sleep(dt)
Window.dispatch('on_key_down', key_code, 0, text, modifiers)
Window.dispatch('on_textinput', key)
await self.wait_clock_frames(1)
yield 'down', (key, key_code, 0, text, modifiers)
Window.dispatch('on_key_up', key_code, 0)
await self.wait_clock_frames(1)
yield 'up', (key, key_code, 0, text, modifiers)

View File

@ -61,7 +61,7 @@ async def test_button_app(kivy_app):
assert kivy_app.root.text == 'Hello, World!'
assert kivy_app.root.state == 'normal'
async for touch_pos, state in kivy_app.do_touch_down_up(
async for state, touch_pos in kivy_app.do_touch_down_up(
widget=kivy_app.root, widget_jitter=True):
pass
@ -88,8 +88,64 @@ async def test_drag_app(kivy_app):
scatter = kivy_app.root
assert tuple(scatter.pos) == (0, 0)
async for touch_pos, state in kivy_app.do_touch_drag(
async for state, touch_pos in kivy_app.do_touch_drag(
pos=(100, 100), target_pos=(200, 200)):
pass
assert tuple(scatter.pos) == (100, 100)
def text_app():
from kivy.app import App
from kivy.uix.textinput import TextInput
class TestApp(UnitKivyApp, App):
def build(self):
return TextInput()
return TestApp()
@async_run(app_cls_func=text_app)
async def test_text_app(kivy_app):
text = kivy_app.root
assert text.text == ''
# activate widget
async for state, touch_pos in kivy_app.do_touch_down_up(widget=text):
pass
async for state, value in kivy_app.do_keyboard_key(key='A', num_press=4):
pass
async for state, value in kivy_app.do_keyboard_key(key='q', num_press=3):
pass
assert text.text == 'AAAAqqq'
def graphics_app():
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.graphics import Color, Rectangle
class TestApp(UnitKivyApp, App):
def build(self):
widget = Widget()
with widget.canvas:
Color(1, 0, 0, 1)
Rectangle(pos=(0, 0), size=(100, 100))
Color(0, 1, 0, 1)
Rectangle(pos=(100, 0), size=(100, 100))
return widget
return TestApp()
@async_run(app_cls_func=graphics_app)
async def test_graphics_app(kivy_app):
widget = kivy_app.root
(r1, g1, b1, a1), (r2, g2, b2, a2) = kivy_app.get_widget_pos_pixel(
widget, [(50, 50), (150, 50)])
assert not g1 and not b1 and not r2 and not b2
assert r1 > 50 and a1 > 50 and g2 > 50 and a2 > 50