.title-slide # Concurrency in Python and other languages .right[Andrey Vlasovskikh] .right[JetBrains] .right[PyCon Russia 2014] --- .center[![PyCharm](media/pycharm-logo.png)] * I'm [@vlasovskikh](http://twitter.com/vlasovskikh) and [pirx.ru](http://pirx.ru/) * From St. Petersburg, Russia * I work for JetBrains * PyCharm: IDE for Python and Web development * IdeaVim: Vim emulation plug-in the IntelliJ platform * I do open-source * Work: PyCharm Community Edition, IdeaVim * Personal: funcparserlib, Obraz, etc. --- # Concurrency in Python is hard * Hard to utilize resources * CPU cores * Memory usage * Network thoughput * Simultaneous I/O connections * Hard to write * Concurrent code isn't simple, less readable * Error-prone: race conditions, heisenbugs --- # Ways to do concurrency * OS processes * `os.fork()` from POSIX, `multiprocessing` after Java * N cores, ~10MB/process, context switching, slow communication * OS threads * `thread` from POSIX, `threading` after Java * 1 core due to GIL, ~20KB/thread, 2048 threads, parallel blocking I/O, context switching, race conditions * Non-blocking I/O and event loop * `select`, `asyncore`, `asynchat` * 1 core, ~1KB/dispatcher, non-blocking I/O, callback hell --- # New concurrency modules * `concurrent.futures` in Python 3.2 after Java * Process and thread pools * Future objects represent async computations * `asyncio` in Python 3.4 similar to C# * After Twisted framework * I/O and event loop * Lightweight tasks * No callbacks thanks to `yield from` --- # Erlang, Go and others * Best known for their concurrency * 1986: Erlang, 2009: Go * Akka framework for Scala * Actors and message passing * Concurrent tasks that communicate only via messaging * Reactive programming * Futures and obserables: visible effects of time * Only toy actor examples for Python * Until `asyncio`? --- # Actor-based concurrency using asyncio? * `asyncio` compared to event loops * It provides tasks and message passing compared to asyncore * `asyncio` compared to threads * Faster than processes/threads: 1M tasks, 1 msg = 100 calls --- .center.middle # Example --- # Example: CLI-based Twitter client * Initially was based `threading` * It was a Convore client, but Twitter is similar * We will develop it using `asyncio` .xml +---------+ stdin +-----------+ +---------+ | +-------> | HTTP | | | Console |stdout | Twitter +--------> Twitter | | <-------+ Client | | Server | | | | | | | +---------+ +----+------+ +---------+ | popen +----v---+ | Notify | +--------+ --- # Why CLI Twitter client as an example? * You can obviously achieve 10K I/O connections in `ascyncio` * Also known as C10K * We'll focus on how to work with communicating tasks * How to apply actor-based design * Only high-level overview of `asyncio` * Focus on architecture and design * Visit Andrew Svetlov's talk for the technical "meat" --- # Structure of threaded client class Console: def loop(self): # Input cmd parsing loop def on_event(self, e): ... class Client: def get_home_timeline(self): # *Blocking* HTTP GET def get_replies_timeline(self): # *Blocking* HTTP GET def post(self, tweet): # *Blocking* HTTP POST def add_event_listener(self, callback): ... class Stream(threading.Thread): def run(self): # HTTP GET polling loop # Invoke callbacks class Notification: def on_event(self, e): # Launch desktop-notify --- # Cache streamed tweets class Client: def __init__(self, ...): self._timeline_cache = [] ... def get_home_time(self): # Get cached or HTTP GET def on_event(self, e): # Update cache # In streaming thread (!) class Stream(threading.Thread): def run(self): # HTTP GET polling loop # Invoke callbacks * Problems * Heisenbugs in updating `self._timeline_cache` * First HTTP GET still blocks --- .center.middle # Actor-based Twitter client --- # Reactive function calls * Futures and coroutines: async compuations, ability not to block/wait class Client: def get_home_timeline(self) -> asyncio.Future: ... @asyncio.coroutine def get_replies_timeline(self): ... client = Client() future = client.get_home_timeline() # Wait or not? yield from future # Or yield from coroutine --- # Reactive interface class Client: @asyncio.coroutine def get_home_timeline(self): ... @asyncio.coroutine def get_replies_timeline(self): ... @asyncio.coroutine def post(self, tweet): ... @asyncio.coroutine def subscribe(self, queue): ... * Time effects made explicit --- # Actor model * Thread of control, behavior, private state, message passing, new actors .xml receive +-----------------+ send +------+ |queue state | +-------+ | +--+--> +------+ +-+ +-----> | | | | +------+ +-+ | | | +------+ | | | +-------+ | +--------+--------+ +------+ | | | +--+ | * (start) | | +---v---+ +------+ | | | | +-------+ --- # Actors and asyncio .xml +-------------------+---------------------------+ | Actor model | asyncio | +-----------------------------------------------+ | | | | Actor | Task | | | | | Actor's behavior | Coroutine | | | | | Send message | Send to network / | | | Put into future or queue | | | | | Receive message | Read from network / | | | Get from future or queue | | | | +-------------------+---------------------------+ --- # First look at asyncio actor/task @asyncio.coroutine def stream_events(self, queue): while True: yield from asyncio.sleep(self._STREAM_RETRY_TIMEOUT) try: response = yield from self._http_request( 'GET', self._STREAM_URL) try: while True: chunk = yield from response.content.read() with suppress(UnicodeDecodeError, ValueError): msg = json.loads(chunk.decode('utf-8') yield from queue.put(msg) except aiohttp.EofStream: pass except Exception as e: self._log.error(e) q = asyncio.Queue() asyncio.get_event_loop().run_forever(stream_events(q)) --- # Main actors diagram .xml +--------------------------------+ Stdin | +-----------+ +--------+ | +--------+ +--+ | | | | | | HTTP | | | +------> Console +-----> Client +---------> Server | +--+ | | | = | | | = | | | | (blocks | +---+----+ | +-^------+ Stdout | | input) | | sub | | +--+ | | | +---v----+ | HTTP | = | <------+ <-----+ +-----------+ +--+ | | | | Stream | | | +-----------+ | | | | +--^--+--+ | Notify | +--------------+ sub | | | +--+ | | +-----+ | | | <------+ Notification | | | +--+ * | | <--------+ | | +--------------+ | +--------------------------------+ * Based on I/O connections, remove input blocking --- # Actor one-way messaging @asyncio.coroutine def foo(queue): yield from queue.put(42) @asyncio.coroutine def bar(queue): return (yield from queue.get()) q = asyncio.Queue() asyncio.async(foo(q)) asyncio.async(bar(q)) * Note that `queue.put()` has time effect --- # Actor request-response messaging class ConnectRequest(asyncio.Future): ... @asyncio.coroutine def foo(queue): request = ConnectRequest(...) yield from queue.put(request) return (yield from request) @asyncio.coroutine def bar(queue): while True: msg = yield from queue.get() if isinstance(msg, ConnectRequest): msg.set_result('ack') --- # Split actors .xml +--------------------------------+ Stdin | +-----------+ +--------+ | +--------+ +--+ | | | = | | | HTTP | | | +------> Console | +--> Client +---------> Server | +--+ | | | | | | | = | | | +--^----+---+ | +---+----+ | +-^------+ Stdout | | | * | | sub | | +--+ | | +-v-+ | +---v----+ | HTTP | = | <------------+ H +----+ | +-----------+ +--+ | | +---+ | Stream | | | +--------------+ | | | +--^--+--+ | Notify | +--------------+ sub | | | +--+ | | +-----+ | | | <------+ Notification | | | +--+ * | | <--------+ | | +--------------+ | +--------------------------------+ * One input per actor is good --- # One input per actor .xml +----------------------------------------+ Server | | Stdin | +-------+ +--------------+ | = +---+ +--+ | | Input | | +---------> | | +-------> +-+ | = | Client | | +-^-+ +--+ | | |*+---------> | | | | | +++ | +-------+------+ | | | | | | Subscription | Stream | | | | | | +----+ | +---+ | = | | +-------+ | <--+--+ +-----------+ Stdout | | +-+--+ | +---+ | +--+ | +----v--+ | | | | <-------+ | | | | +--+ | |Output | | | | | | <---------+ | | | +-------+ | | | Notify | +--------------+ | +--+ | +----------+-+ | | | <-----+Notification<------+ | +--+ * | +------------+ | +----------------------------------------+ --- # Organic multi-cellular structure .xml +----------------------------------------+ Server | +-----------+ +------------------+ | Stdin | | +-------+ | | +--------------+ | | = +---+ +--+ | | | Input | | | | +---------> | | +-------> +-+ | | = | | Client | | | +-^-+ +--+ | | | |*+---------> | | | | | | | +++ | | | +-------+------+ | | | | | | | | | | Subscrib| Stream| | | | | | | | | | +----+ | +---+ | | = | | | +-------+ | | | <--+--+ +-----------+ Stdout | | | | | +-+--+ | +---+ | | +--+ | | +----v--+ | | | | | | | <-------+ | | +------------------+ | +--+ | | |Output | | | | | | | | <---------+ | | | | +-------+ | | | | Notify | +-----------+ | | | +--+ | +------------+------------+ | | <-----+Notification<------+ | +--+ * | +------------+ | +----------------------------------------+ --- # Cache tweets .xml get_timeline +--+ = +-------------+ | +----+----> | +--+ | | Client | | | | GET /timeline publish | | cache | = +---------+ | | +-+ +-------> Server | | | +-+ | +----^----+ | | | | = | +--------+----+ | GET /stream | | | | subscribe| | | +----v----+ +----+----+ | | | tweet | | +--------+ Subscri <-------+ Stream | | | | | +---------+ +---------+ * Easy: `Client` is a sequential task --- # Sync I/O or CPU-heavy call * Suppose synchronous `get_gravatar(email)` * Provided by a third-party library, cannot rewrite * Off-load I/O to thread pool, CPU to process pool def get_gravatar(email): # Do *blocking* I/O executor = concurrent.futures.ThreadPoolExecutor(4) @asyncio.coroutine def get_gravatar_async(email): return (yield from asyncio.run_in_executor(get_gravatar, email)) --- .center.middle # Analysis --- # Actors compared to OOP * Similar * Encapsulation of state * Message passing * Different * OOP: synchronous stack-based method calls * Actors: concurrent execution * Actors are OOP done right * According to Alan Kay who invented the term "OOP" --- # Upsides of actors in asyncio * Scaling up * Utilize network bandwidth, C10K prolem solved * Low memory usage * Low response time (if you are reactive) * Scaling out * Actors look like services, move them to remote machines * Actor-based and reactive programming paradigm * Guidelines for structuring your code --- # Downsides of actors in asyncio * New asyncio versions of network libraries required * And the libraries that use them, and so on * `yield from` is easy to forget in a call * Tasks and queues are harder than in Erlang/Go * Silent death is a bad strategy for error handling * It's inconvenient to pass exceptions among tasks * No pattern matching for queue polling * CPU usage remains generally problematic * Off-load to process pool, external queues --- # Wrap up * `asyncio` looks promising * Actor-based concurrency and reactive behavior * We've got standard "lightweight threads" * Does `asyncio` fit into the context of your work? * Give it a try! * Source code for the `asyncio` Twitter client example * [http://tinyurl.com/twitter-asyncio](https://gist.github.com/vlasovskikh/e2c325f9211f37e53d29) * Feel free to ask any questions * [@vlasovskikh](http://twitter.com/vlasovskikh) and [pirx.ru](http://pirx.ru/)