From 898f4467d4e8c6a368d96488eff1d39a21459e2e Mon Sep 17 00:00:00 2001 From: Vladimir Magamedov Date: Wed, 22 May 2019 21:31:56 +0300 Subject: [PATCH] Added example of using ProcessPoolExecutor for CPU-intensive tasks --- Makefile | 4 + examples/multiproc/__init__.py | 0 examples/multiproc/client.py | 45 ++++++++++ examples/multiproc/primes.proto | 17 ++++ examples/multiproc/primes_grpc.py | 41 +++++++++ examples/multiproc/primes_pb2.py | 136 ++++++++++++++++++++++++++++++ examples/multiproc/server.py | 55 ++++++++++++ 7 files changed, 298 insertions(+) create mode 100644 examples/multiproc/__init__.py create mode 100644 examples/multiproc/client.py create mode 100644 examples/multiproc/primes.proto create mode 100644 examples/multiproc/primes_grpc.py create mode 100644 examples/multiproc/primes_pb2.py create mode 100644 examples/multiproc/server.py diff --git a/Makefile b/Makefile index a7348be..2786e44 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,9 @@ clean: rm -f ./examples/streaming/*_pb2.py rm -f ./examples/streaming/*_grpc.py rm -f ./examples/streaming/*.pyi + rm -f ./examples/multiproc/*_pb2.py + rm -f ./examples/multiproc/*_grpc.py + rm -f ./examples/multiproc/*.pyi rm -f ./tests/*_pb2.py rm -f ./tests/*_grpc.py rm -f ./tests/*.pyi @@ -27,6 +30,7 @@ proto: clean python3 -m grpc_tools.protoc -I. --python_out=. --python_grpc_out=. --mypy_out=. grpclib/reflection/v1alpha/reflection.proto python3 -m grpc_tools.protoc -Iexamples --python_out=examples --python_grpc_out=examples --grpc_python_out=examples --mypy_out=examples examples/helloworld/helloworld.proto python3 -m grpc_tools.protoc -Iexamples --python_out=examples --python_grpc_out=examples --mypy_out=examples examples/streaming/helloworld.proto + python3 -m grpc_tools.protoc -Iexamples --python_out=examples --python_grpc_out=examples --mypy_out=examples examples/multiproc/primes.proto cd tests; python3 -m grpc_tools.protoc -I. --python_out=. --python_grpc_out=. --mypy_out=. dummy.proto release: proto diff --git a/examples/multiproc/__init__.py b/examples/multiproc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/multiproc/client.py b/examples/multiproc/client.py new file mode 100644 index 0000000..ac4a88a --- /dev/null +++ b/examples/multiproc/client.py @@ -0,0 +1,45 @@ +import asyncio + +from typing import Tuple + +from grpclib.client import Channel + +# generated by protoc +from .primes_pb2 import Request +from .primes_grpc import PrimesStub + + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, +] + + +async def main() -> None: + channel = Channel('127.0.0.1', 50051) + primes = PrimesStub(channel) + + async def check(n: int) -> Tuple[int, bool]: + reply = await primes.Check(Request(number=n)) + return n, reply.is_prime.value + + for f in asyncio.as_completed([check(n) for n in PRIMES]): + number, is_prime = await f + print(f'Number {number} {"is" if is_prime else "is not"} prime') + + channel.close() + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/examples/multiproc/primes.proto b/examples/multiproc/primes.proto new file mode 100644 index 0000000..9132b7c --- /dev/null +++ b/examples/multiproc/primes.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package primes; + +import "google/protobuf/wrappers.proto"; + +message Request { + int64 number = 1; +} + +message Reply { + google.protobuf.BoolValue is_prime = 1; +} + +service Primes { + rpc Check (Request) returns (Reply) {} +} diff --git a/examples/multiproc/primes_grpc.py b/examples/multiproc/primes_grpc.py new file mode 100644 index 0000000..ed61f9f --- /dev/null +++ b/examples/multiproc/primes_grpc.py @@ -0,0 +1,41 @@ +# Generated by the Protocol Buffers compiler. DO NOT EDIT! +# source: multiproc/primes.proto +# plugin: grpclib.plugin.main +import abc +import typing + +import grpclib.const +import grpclib.client +if typing.TYPE_CHECKING: + import grpclib.server + +import google.protobuf.wrappers_pb2 +import multiproc.primes_pb2 + + +class PrimesBase(abc.ABC): + + @abc.abstractmethod + async def Check(self, stream: 'grpclib.server.Stream[multiproc.primes_pb2.Request, multiproc.primes_pb2.Reply]') -> None: + pass + + def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: + return { + '/primes.Primes/Check': grpclib.const.Handler( + self.Check, + grpclib.const.Cardinality.UNARY_UNARY, + multiproc.primes_pb2.Request, + multiproc.primes_pb2.Reply, + ), + } + + +class PrimesStub: + + def __init__(self, channel: grpclib.client.Channel) -> None: + self.Check = grpclib.client.UnaryUnaryMethod( + channel, + '/primes.Primes/Check', + multiproc.primes_pb2.Request, + multiproc.primes_pb2.Reply, + ) diff --git a/examples/multiproc/primes_pb2.py b/examples/multiproc/primes_pb2.py new file mode 100644 index 0000000..3ca7ff4 --- /dev/null +++ b/examples/multiproc/primes_pb2.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: multiproc/primes.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='multiproc/primes.proto', + package='primes', + syntax='proto3', + serialized_options=None, + serialized_pb=_b('\n\x16multiproc/primes.proto\x12\x06primes\x1a\x1egoogle/protobuf/wrappers.proto\"\x19\n\x07Request\x12\x0e\n\x06number\x18\x01 \x01(\x03\"5\n\x05Reply\x12,\n\x08is_prime\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.BoolValue23\n\x06Primes\x12)\n\x05\x43heck\x12\x0f.primes.Request\x1a\r.primes.Reply\"\x00\x62\x06proto3') + , + dependencies=[google_dot_protobuf_dot_wrappers__pb2.DESCRIPTOR,]) + + + + +_REQUEST = _descriptor.Descriptor( + name='Request', + full_name='primes.Request', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='number', full_name='primes.Request.number', index=0, + number=1, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=66, + serialized_end=91, +) + + +_REPLY = _descriptor.Descriptor( + name='Reply', + full_name='primes.Reply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='is_prime', full_name='primes.Reply.is_prime', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=93, + serialized_end=146, +) + +_REPLY.fields_by_name['is_prime'].message_type = google_dot_protobuf_dot_wrappers__pb2._BOOLVALUE +DESCRIPTOR.message_types_by_name['Request'] = _REQUEST +DESCRIPTOR.message_types_by_name['Reply'] = _REPLY +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Request = _reflection.GeneratedProtocolMessageType('Request', (_message.Message,), dict( + DESCRIPTOR = _REQUEST, + __module__ = 'multiproc.primes_pb2' + # @@protoc_insertion_point(class_scope:primes.Request) + )) +_sym_db.RegisterMessage(Request) + +Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict( + DESCRIPTOR = _REPLY, + __module__ = 'multiproc.primes_pb2' + # @@protoc_insertion_point(class_scope:primes.Reply) + )) +_sym_db.RegisterMessage(Reply) + + + +_PRIMES = _descriptor.ServiceDescriptor( + name='Primes', + full_name='primes.Primes', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=148, + serialized_end=199, + methods=[ + _descriptor.MethodDescriptor( + name='Check', + full_name='primes.Primes.Check', + index=0, + containing_service=None, + input_type=_REQUEST, + output_type=_REPLY, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_PRIMES) + +DESCRIPTOR.services_by_name['Primes'] = _PRIMES + +# @@protoc_insertion_point(module_scope) diff --git a/examples/multiproc/server.py b/examples/multiproc/server.py new file mode 100644 index 0000000..386f4ac --- /dev/null +++ b/examples/multiproc/server.py @@ -0,0 +1,55 @@ +import os +import math +import asyncio + +from concurrent.futures.process import ProcessPoolExecutor + +from grpclib.utils import graceful_exit +from grpclib.server import Stream, Server +from google.protobuf.wrappers_pb2 import BoolValue + +# generated by protoc +from .primes_pb2 import Request, Reply +from .primes_grpc import PrimesBase + + +def is_prime(n: int) -> bool: + print(f'{os.getpid()}: Started to check {n} number') + + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + + +class Primes(PrimesBase): + + def __init__(self, executor: ProcessPoolExecutor): + self._executor = executor + self._loop = asyncio.get_event_loop() + + async def Check(self, stream: Stream[Request, Reply]) -> None: + request = await stream.recv_message() + assert request is not None + number_is_prime = await self._loop.run_in_executor( + self._executor, is_prime, request.number + ) + reply = Reply(is_prime=BoolValue(value=number_is_prime)) + await stream.send_message(reply) + + +async def main(*, host: str = '127.0.0.1', port: int = 50051) -> None: + with ProcessPoolExecutor(max_workers=4) as executor: + server = Server([Primes(executor)]) + with graceful_exit([server]): + await server.start(host, port) + print(f'Serving on {host}:{port}') + await server.wait_closed() + + +if __name__ == '__main__': + asyncio.run(main())