Added example of using ProcessPoolExecutor for CPU-intensive tasks

This commit is contained in:
Vladimir Magamedov 2019-05-22 21:31:56 +03:00
parent 52a9db8a83
commit 898f4467d4
7 changed files with 298 additions and 0 deletions

View File

@ -17,6 +17,9 @@ clean:
rm -f ./examples/streaming/*_pb2.py
rm -f ./examples/streaming/*_grpc.py
rm -f ./examples/streaming/*.pyi
rm -f ./examples/multiproc/*_pb2.py
rm -f ./examples/multiproc/*_grpc.py
rm -f ./examples/multiproc/*.pyi
rm -f ./tests/*_pb2.py
rm -f ./tests/*_grpc.py
rm -f ./tests/*.pyi
@ -27,6 +30,7 @@ proto: clean
python3 -m grpc_tools.protoc -I. --python_out=. --python_grpc_out=. --mypy_out=. grpclib/reflection/v1alpha/reflection.proto
python3 -m grpc_tools.protoc -Iexamples --python_out=examples --python_grpc_out=examples --grpc_python_out=examples --mypy_out=examples examples/helloworld/helloworld.proto
python3 -m grpc_tools.protoc -Iexamples --python_out=examples --python_grpc_out=examples --mypy_out=examples examples/streaming/helloworld.proto
python3 -m grpc_tools.protoc -Iexamples --python_out=examples --python_grpc_out=examples --mypy_out=examples examples/multiproc/primes.proto
cd tests; python3 -m grpc_tools.protoc -I. --python_out=. --python_grpc_out=. --mypy_out=. dummy.proto
release: proto

View File

View File

@ -0,0 +1,45 @@
import asyncio
from typing import Tuple
from grpclib.client import Channel
# generated by protoc
from .primes_pb2 import Request
from .primes_grpc import PrimesStub
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
0,
1,
2,
3,
4,
5,
6,
7,
]
async def main() -> None:
channel = Channel('127.0.0.1', 50051)
primes = PrimesStub(channel)
async def check(n: int) -> Tuple[int, bool]:
reply = await primes.Check(Request(number=n))
return n, reply.is_prime.value
for f in asyncio.as_completed([check(n) for n in PRIMES]):
number, is_prime = await f
print(f'Number {number} {"is" if is_prime else "is not"} prime')
channel.close()
if __name__ == '__main__':
asyncio.run(main())

View File

@ -0,0 +1,17 @@
syntax = "proto3";
package primes;
import "google/protobuf/wrappers.proto";
message Request {
int64 number = 1;
}
message Reply {
google.protobuf.BoolValue is_prime = 1;
}
service Primes {
rpc Check (Request) returns (Reply) {}
}

View File

@ -0,0 +1,41 @@
# Generated by the Protocol Buffers compiler. DO NOT EDIT!
# source: multiproc/primes.proto
# plugin: grpclib.plugin.main
import abc
import typing
import grpclib.const
import grpclib.client
if typing.TYPE_CHECKING:
import grpclib.server
import google.protobuf.wrappers_pb2
import multiproc.primes_pb2
class PrimesBase(abc.ABC):
@abc.abstractmethod
async def Check(self, stream: 'grpclib.server.Stream[multiproc.primes_pb2.Request, multiproc.primes_pb2.Reply]') -> None:
pass
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {
'/primes.Primes/Check': grpclib.const.Handler(
self.Check,
grpclib.const.Cardinality.UNARY_UNARY,
multiproc.primes_pb2.Request,
multiproc.primes_pb2.Reply,
),
}
class PrimesStub:
def __init__(self, channel: grpclib.client.Channel) -> None:
self.Check = grpclib.client.UnaryUnaryMethod(
channel,
'/primes.Primes/Check',
multiproc.primes_pb2.Request,
multiproc.primes_pb2.Reply,
)

View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: multiproc/primes.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2
DESCRIPTOR = _descriptor.FileDescriptor(
name='multiproc/primes.proto',
package='primes',
syntax='proto3',
serialized_options=None,
serialized_pb=_b('\n\x16multiproc/primes.proto\x12\x06primes\x1a\x1egoogle/protobuf/wrappers.proto\"\x19\n\x07Request\x12\x0e\n\x06number\x18\x01 \x01(\x03\"5\n\x05Reply\x12,\n\x08is_prime\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.BoolValue23\n\x06Primes\x12)\n\x05\x43heck\x12\x0f.primes.Request\x1a\r.primes.Reply\"\x00\x62\x06proto3')
,
dependencies=[google_dot_protobuf_dot_wrappers__pb2.DESCRIPTOR,])
_REQUEST = _descriptor.Descriptor(
name='Request',
full_name='primes.Request',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='number', full_name='primes.Request.number', index=0,
number=1, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=66,
serialized_end=91,
)
_REPLY = _descriptor.Descriptor(
name='Reply',
full_name='primes.Reply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='is_prime', full_name='primes.Reply.is_prime', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=93,
serialized_end=146,
)
_REPLY.fields_by_name['is_prime'].message_type = google_dot_protobuf_dot_wrappers__pb2._BOOLVALUE
DESCRIPTOR.message_types_by_name['Request'] = _REQUEST
DESCRIPTOR.message_types_by_name['Reply'] = _REPLY
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Request = _reflection.GeneratedProtocolMessageType('Request', (_message.Message,), dict(
DESCRIPTOR = _REQUEST,
__module__ = 'multiproc.primes_pb2'
# @@protoc_insertion_point(class_scope:primes.Request)
))
_sym_db.RegisterMessage(Request)
Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict(
DESCRIPTOR = _REPLY,
__module__ = 'multiproc.primes_pb2'
# @@protoc_insertion_point(class_scope:primes.Reply)
))
_sym_db.RegisterMessage(Reply)
_PRIMES = _descriptor.ServiceDescriptor(
name='Primes',
full_name='primes.Primes',
file=DESCRIPTOR,
index=0,
serialized_options=None,
serialized_start=148,
serialized_end=199,
methods=[
_descriptor.MethodDescriptor(
name='Check',
full_name='primes.Primes.Check',
index=0,
containing_service=None,
input_type=_REQUEST,
output_type=_REPLY,
serialized_options=None,
),
])
_sym_db.RegisterServiceDescriptor(_PRIMES)
DESCRIPTOR.services_by_name['Primes'] = _PRIMES
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,55 @@
import os
import math
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from grpclib.utils import graceful_exit
from grpclib.server import Stream, Server
from google.protobuf.wrappers_pb2 import BoolValue
# generated by protoc
from .primes_pb2 import Request, Reply
from .primes_grpc import PrimesBase
def is_prime(n: int) -> bool:
print(f'{os.getpid()}: Started to check {n} number')
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
class Primes(PrimesBase):
def __init__(self, executor: ProcessPoolExecutor):
self._executor = executor
self._loop = asyncio.get_event_loop()
async def Check(self, stream: Stream[Request, Reply]) -> None:
request = await stream.recv_message()
assert request is not None
number_is_prime = await self._loop.run_in_executor(
self._executor, is_prime, request.number
)
reply = Reply(is_prime=BoolValue(value=number_is_prime))
await stream.send_message(reply)
async def main(*, host: str = '127.0.0.1', port: int = 50051) -> None:
with ProcessPoolExecutor(max_workers=4) as executor:
server = Server([Primes(executor)])
with graceful_exit([server]):
await server.start(host, port)
print(f'Serving on {host}:{port}')
await server.wait_closed()
if __name__ == '__main__':
asyncio.run(main())