본문 바로가기

프로그래머/Python

[Effective Python 복습] Chapter 5. 병행성과 병렬성

21. 키워드 전용 인수로 명료성을 강요하자

  • 키워드 인수는 함수 호출의 의도를 더 명확하게 해준다
  • 특히 불 플래그를 여러 개 받는 함수처럼 헷갈리기 쉬운 함수를 호출할 때 키워드 인수를 넘기게 하려면 키워드 전용 인수를 사용하자
  • 파이썬3는 함수의 키워드 전용 인수 문법을 명시적으로 지원한다
  • 리스트에 있는 * 기호는 위치 인수의 끝과 키워드 전용 인수의 시작을 기리킨다
    def safe_division_c(number, diviser, *, 
                      ignore_overflow=False, 
                      ignore_zero_division=False):
      # ...
    

safe_division_c(1, 10**500, True, False) # TypeError
safe_division_c(1, 0, ignore_zero_division=True) # 문제없음

- 파이썬2에서는 **kwargs를 사용하고 TypeError 예외를 직접 일으키는 방법으로 함수의 키워드 전용 인수를 흉내 낼 수 있다
```python
# 파이썬2
def print_args(*args, **kwargs):
    print 'Positional:    ', args
    print 'Keyword:        ', kwargs

print_args(1, 2, foo='bar', stuff='meep')

Positional: (1,2)

Keyword: {'foo': 'bar', 'stuff': 'meep'}

# 파이썬2
def safe_division_d(number, divisor, **kwargs):
    ignore_overflow = kwargs.pop('ignore_overflow', False)
    ignore_zero_div = kwargs.pop('ignore_zero_division', False)
    if kwargs:
        raise TypeError('Unexpected **kwargs: %r' % kwargs)

safe_division_d(1,10)
safe_division_d(1,0, ignore_zero_division=True)
safe_division_d(1,10**500, ignore_overflow=True)
safe_division_d(1,10, False, True)         # TypeError
safe_division_d(1,10, unexpected=True)    # TypeError

Chapter5. 병행성과 병렬성

36. 자식 프로세스를 관리하려면 subprocess를 사용하자

  • 자식 프로세스를 실행하고 자식 프로세스의 입출력 스트림을 관리하려면 subprocess 모듈을 사용하자
  • 자식 프로세스는 파이썬 인터프리터에서 병렬로 실행되어 CPU 사용을 극대화하게 해준다
  • communicate에 timeout 파라미터를 사용하여 자식 프로세스들이 교착 상태(deadlock)에 빠지거나 멈추는 상황을 막자
proc = subprocess.Popen(
    ['echo', 'Hello from the child!'],
    stdout=subprocess.PIPE)
out, err = proc.communicate()
print(out.decode('utf-8'))
  • Popen 생성자가 프로세스를 시작
  • communicate 메서드는 자식 프로세스의 출력을 읽어오고 자식 프로세스가 종료할 때까지 대기
proc = subprocess.Popen(['sleep', '0.3'])
while proc.poll() is None:
    print('Working...')
    # 시간이 걸리는 작업 수행
print('Exit status', proc.poll())
  • 자식 프로세스의 상태는 파이썬이 다르 작업을 하는 동안 주기적으로 폴링된다
def run_sleep(period):
    proc = subprocess.Popen(['sleep', str(period)])
    return proc

start = time()
procs = []
for _ in range(10):
    proc = run_sleep(0.1)
    procs.append(proc)

for proc in procs:
    proc.communicate()
end = time()
print('Finished in %.3f seconds' % (end-start))
  • 여러 자식 프로세스를 병렬로 실행
def run_openssl(data):
    env = os.environ.copy()
    env['password'] = b'\xe24U\n\xd0Ql35\x11'
    proc = subprocess.Popen(
        ['openssl', 'enc', '-des3', '-pass', 'env:password'],
        env = env,
        stdin = subprocess.PIPE,
        stdout = subprocess.PIPE)
    proc.stdin.write(data)
    proc.stdin.flush()    # 자식 프로세스가 입력을 반드시 받게 함
    return proc

procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_openssl(data)
    procs.append(proc)

for proc in procs:
    out, err = proc.communicate()
    print(out[-10:])
  • 파이프를 이용해 데이터를 서브프로세스로 보낸 다음 서브프로세스의 결과를 받아옴
def run_md5(input_stdin):
    proc = subprocess.Popen(
        ['md5'],
        stdin=input_stdin,
        stdout=subprocess.PIPE)
    return proc

input_procs = []
hash_procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_openssl(data)
    procs.append(proc)
    hash_proc = run_md5(proc.stdout)
    hash_proc.append(hash_proc)

for proc in input_procs:
    proc.communicate()

for proc in hash_procs:
    out, err = proc.communicate()
    print(out.strip())
  • 한 자식 프로세스의 결과를 다른 프로세스의 입력으로 연결하여 병렬 프로세스의 체인을 생성
proc = run_sleep(10)
try:
    proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc.terminate()
    proc.wait()

print('Exit status', proc.poll())
  • python 3.3 이후 버전에서 사용 가능
  • 자식 프로세스가 종료되지 않거나 입력 또는 출력 파이프에서 블록될 염려가 있다면 comminicate 메서드에 timeout 파라미터를 넘겨야 한다

37. 스레드를 블로킹 I/O 용으로 사용하고 병렬화용으로는 사용하지 말자

  • 파이썬 스레드는 전역 인터프리터 잠금(GIL) 때문에 여러 CPU 코어에서 병렬로 바이트코드를 실행할 수 없다
  • GIL에도 불구하고 파이썬 스레드는 동시에 여러 작업을 하는 것처럼 보여주기 쉽게 해주므로 여전히 유용하다
  • 여러 시스템 호출을 병렬로 수행하려면 파이썬 스레드를 사용하자. 이렇게 하면 계산을 하면서도 블로킹 I/O를 수행할 수 있다
def factorize(number):
    for i in range(1, number + 1):
        if number % i == 0:
            yield i

numbers = [132123, 1243, 1532 ,1234]
start = time()
for number in numbers():
    lsit(factorize(number))
end = time()
print('Took %.3f seconds' % (end - start))

from threading import Thread

class FactorizeThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number

    def run(self):
        self.factors = list(factorize(self.number))

start = time()
threads = []
for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()
end = time()
print('Took %.3f seconds' % (end - start))
import select

def slow_systemcall():
    select.select([], [], [], 0.1)

start = time()
for _ in range(5):
    slow_systemcall()
end = time()
print('Took %.3f seconds' % (end - start))

start = time()
threads = []
for _ in range(5)
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

def compute_helicopter_location(index):
    # ....

for i in range(5):
    compute_helicopter_location(i)
for thread in threads:
    thread.join()
end = time()
print('Took %.3f seconds' % (end - start))

38. 스레드에서 데이터 경쟁을 막으려면 Lock을 사용하자

  • 파이썬에 전역 인터프리터 잠금이 있다고 해도 프로그램 안에서 실행되는 스레드 간의 데이터 경쟁으로부터 보호할 책임은 프로그래머에게 있다
  • 여러 스레드가 잠금 없이 같은 객체를 수정하면 프로그램의 자료 구조가 오염된다
  • 내장 모듈 threading의 Lock 클래스는 파이썬의 표준 상호 배재 잠금 구현이다
def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        # 센서에서 읽어옴
        # ...
        counter.increment(1)

def run_threads(func, how_many, counter):
    threads = []
    for i in range(5):
        args = (i, how_many, counter)
        thread = Thread(target=func, args=args)
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()

class LockingCounter(object):
    def __init__(self):
        self.lock = Lock()
        self.count = 0

    def increment(self, offset):
        with self.lock:
            self.count += offset

how_many = 10**5
counter = LockingCounter()
run_threads(worker, how_many, counter)
print('Counter should be %d, found %d' %
    (5 * how_many, counter.count))

39. 스레드 간의 작업을 조율하려면 Queue를 사용하자

  • 파이프라인은 여러 파이썬 스레드를 사용하여 동시에 실행하는 작업의 순서를 구성하기에 아주 좋은 방법이다
  • 병행 파이프라인을 구축할 때는 많은 문제(바쁜 대기, 작업자 중단, 메모리 부족)가 일어날 수 있다는 점을 주의하자
  • Queue 클래스는 연산 블로킹, 버퍼 크기, 조인 등 견고한 파이프라인을 만드는 데 필요한 기능을 모두 갖췄다
class MyQueue(object):
    def __init__(self):
        self.items = deque()
        self.lock = Lock()

    def put(self, item):
        with self.lock
            self.items.append(item)

    def get(self):
        with self.lock:
            return self.items.popleft()

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                sleep(0.01)        # 처리할 아이템이 없음
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

download_queue = MyQueue()
resuze_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.put(object())

while len(done_queue.items) < 1000:
    # 기다리는 동안 작업 수행

processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Processed', processed, 'items after polling',
    polled, 'times')
  • run 메서드에서 IndexError 예외를 잡는 부분이 매우 많이 실행됨
    • CPU 시간 낭비
from queue import Queue
queue = Queue()

def consumer():
    print('Comsumer waiting')    # 1
    queue.get()                    
    print('Comsumer done')        # 3

thread = Thread(target=consumer)
thread.start()

print('Producer putting')        # 2
queue.put(object())
thread.join()
print('Producer done')            # 4


queue = Queue(1)

def consumer():
    time.sleep(0.1)
    queue.get()                    # 2
    print('Comsumer got 1')        
    queue.get()                    # 4
    print('Comsumer go 2')        


thread = Thread(target=consumer)
thread.start()

queue.put(object())            # 1
print('Producer put 1')
queue.put(object())            # 3
print('Producer put 2')
thread.join()
print('Printer done')


in_queue = Queue()

def consumer():
    print('Comsumer waiting')
    work = in_queue.get()        # 2
    print('Comsumer working')
    # 작업 수행
    # ...        
    print('Comsumer done')
    in_queue.task_done()        # 3

Thread(target=consumer).start()

in_queue.put(object())            # 1
print('Producer waiting')
in_queue.join()                    # 4
print('Producer done')

class ClosableQueue(Queue)
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINE

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        # ...

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

download_queue = ClosableQueue()
# ...
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    # ...
]

for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.put(object())
download_queue.close()

download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')

40. 많은 함수를 동시에 실행하려면 코루틴을 고려하자

  • 코루틴은 함수 수만 개를 마치 동시에 실행하는 것처럼 실행하는 효과적인 방법을 제공한다
  • 제너레이터 안에서 yield 표현식의 값은 외부 코드에서 제너레이터의 send 메서드에 전달한 값이다
  • 코루틴은 프로그램의 핵심 로직을 주변 환경과 상호 작용하는 코드로부터 분리할 수 있는 강력한 도구다
  • 파이썬 2는 yield from 문법과 제너레이터에서 값을 반환하는 기능을 지원하지 않는다
def my_coroutine():
    while True:
        received = yield
        print('Received:', received)

it = my_coroutine()
next(it)
it.send('First')
it.send('Second')


def minimize():
    current = yield
    while True:
        value = yield current
        current = min(value, current)

it = minimize()
next(it)
print(it.send(10))
print(it.send(4))
print(it.send(22))
print(it.send(-1))


Query = namedtuple('Query', ('y', 'x'))

def count_neighbors(y, x):
    n_ = yield Query(y+1, x+0)
    ne = yield Query(y+1, x+1)
    # ...
    neighbor_states = [n_, ne, e_, ..., nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

it = count_neighbors(10, 5)
q1 = next(it)
print('First yield: ', q1)
q2 = it.send(ALIVE)
print('Second yield: ', q2)
q3 = it.send(ALIVE)
#...
try:
    count = it.send(EMPTY)
except StopIteration as e:
    print('Count: ', e.value)


Transition = namedtuple('Transitioin', ('y', 'x', 'state'))

def game_logic(state, neighbors):
    # ...

def step_cell(y, x):
    state = yield Query(y, x)
    neighbors = yield from count_neighbors(y, x)
    next_state = game_logic(state, neighbors)
    yield Transition(y, x, next_state)


TICK = object()

def simulate(height, width):
    while True:
        for y in range(height):
            for x in range(width):
                yield from step_cell(y, x)
        yield TICK

class Grid(object):
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def __str__(self):
        # ...

    def query(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def assign(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

def live_a_generation(grid, sim):
    progeny = Grid(grid.height, grid.width)
    item = next(sim)
    while item is not TICK:
        if isinstance(item, Query):
            state = grid.query(item.y, item.x)
            item = sim.send(state)
        else:
            progeny.assign(item.y, item.x, item.state)
            item = next(sim)
    return progeny

class ColumnPrinter(object):
    # ...

columns = ColumnPrinter()
sim = simulate(grid.height, grid.width)
for i in range(5):
    columns.append(str(grid))
    grid = live_a_generation(grid, sim)

print(columns)

41. 진정한 병렬성을 실현하려면 concurrent.futures를 고려하자

  • CPU 병목점을 C 확장 모듈로 옮기는 방법은 파이썬 코드에 최대한 투자하면서 성능을 개선할 수 있는 효과적인 방법이다. 하지만 이렇게 하면 비용이 많이 들어가고 버그가 생길 수도 있다
  • multiprocessing 모듈은 파이썬에서 특정 유형의 계산을 최소한의 노력으로 병렬화 할 수 있는 강력한 도구를 제공
  • multiprocessing의 강력한 기능은 concurrent.futures와 그 안에 들어 있는 간단한 ProcessPoolExecutor 클래스로 접근하는 게 가장 좋다
  • multiprocessing 모듈의 고급 기능은 너무 복잡하므로 피하는 것이 좋다
def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i

number = [(2143, 12351) ...]
start = time()
results = list(map(gcd, numbers))
end = time()
print('Took $.3f seconds' % (end - start))     # Took 1.170 seconds

start = time()
pool = ThreadPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time()
print('Took $.3f seconds' % (end - start))     # Took 1.119 seconds

start = time()
pool = ProcessPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time()
print('Took $.3f seconds' % (end - start))     # Took 0.663 seconds