21. 키워드 전용 인수로 명료성을 강요하자
- 키워드 인수는 함수 호출의 의도를 더 명확하게 해준다
- 특히 불 플래그를 여러 개 받는 함수처럼 헷갈리기 쉬운 함수를 호출할 때 키워드 인수를 넘기게 하려면 키워드 전용 인수를 사용하자
- 파이썬3는 함수의 키워드 전용 인수 문법을 명시적으로 지원한다
- 리스트에 있는 * 기호는 위치 인수의 끝과 키워드 전용 인수의 시작을 기리킨다
def safe_division_c(number, diviser, *, ignore_overflow=False, ignore_zero_division=False): # ...
safe_division_c(1, 10**500, True, False) # TypeError
safe_division_c(1, 0, ignore_zero_division=True) # 문제없음
- 파이썬2에서는 **kwargs를 사용하고 TypeError 예외를 직접 일으키는 방법으로 함수의 키워드 전용 인수를 흉내 낼 수 있다
```python
# 파이썬2
def print_args(*args, **kwargs):
print 'Positional: ', args
print 'Keyword: ', kwargs
print_args(1, 2, foo='bar', stuff='meep')
Positional: (1,2)
Keyword: {'foo': 'bar', 'stuff': 'meep'}
# 파이썬2
def safe_division_d(number, divisor, **kwargs):
ignore_overflow = kwargs.pop('ignore_overflow', False)
ignore_zero_div = kwargs.pop('ignore_zero_division', False)
if kwargs:
raise TypeError('Unexpected **kwargs: %r' % kwargs)
safe_division_d(1,10)
safe_division_d(1,0, ignore_zero_division=True)
safe_division_d(1,10**500, ignore_overflow=True)
safe_division_d(1,10, False, True) # TypeError
safe_division_d(1,10, unexpected=True) # TypeError
Chapter5. 병행성과 병렬성
36. 자식 프로세스를 관리하려면 subprocess를 사용하자
- 자식 프로세스를 실행하고 자식 프로세스의 입출력 스트림을 관리하려면 subprocess 모듈을 사용하자
- 자식 프로세스는 파이썬 인터프리터에서 병렬로 실행되어 CPU 사용을 극대화하게 해준다
- communicate에 timeout 파라미터를 사용하여 자식 프로세스들이 교착 상태(deadlock)에 빠지거나 멈추는 상황을 막자
proc = subprocess.Popen(
['echo', 'Hello from the child!'],
stdout=subprocess.PIPE)
out, err = proc.communicate()
print(out.decode('utf-8'))
- Popen 생성자가 프로세스를 시작
- communicate 메서드는 자식 프로세스의 출력을 읽어오고 자식 프로세스가 종료할 때까지 대기
proc = subprocess.Popen(['sleep', '0.3'])
while proc.poll() is None:
print('Working...')
# 시간이 걸리는 작업 수행
print('Exit status', proc.poll())
- 자식 프로세스의 상태는 파이썬이 다르 작업을 하는 동안 주기적으로 폴링된다
def run_sleep(period):
proc = subprocess.Popen(['sleep', str(period)])
return proc
start = time()
procs = []
for _ in range(10):
proc = run_sleep(0.1)
procs.append(proc)
for proc in procs:
proc.communicate()
end = time()
print('Finished in %.3f seconds' % (end-start))
- 여러 자식 프로세스를 병렬로 실행
def run_openssl(data):
env = os.environ.copy()
env['password'] = b'\xe24U\n\xd0Ql35\x11'
proc = subprocess.Popen(
['openssl', 'enc', '-des3', '-pass', 'env:password'],
env = env,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE)
proc.stdin.write(data)
proc.stdin.flush() # 자식 프로세스가 입력을 반드시 받게 함
return proc
procs = []
for _ in range(3):
data = os.urandom(10)
proc = run_openssl(data)
procs.append(proc)
for proc in procs:
out, err = proc.communicate()
print(out[-10:])
- 파이프를 이용해 데이터를 서브프로세스로 보낸 다음 서브프로세스의 결과를 받아옴
def run_md5(input_stdin):
proc = subprocess.Popen(
['md5'],
stdin=input_stdin,
stdout=subprocess.PIPE)
return proc
input_procs = []
hash_procs = []
for _ in range(3):
data = os.urandom(10)
proc = run_openssl(data)
procs.append(proc)
hash_proc = run_md5(proc.stdout)
hash_proc.append(hash_proc)
for proc in input_procs:
proc.communicate()
for proc in hash_procs:
out, err = proc.communicate()
print(out.strip())
- 한 자식 프로세스의 결과를 다른 프로세스의 입력으로 연결하여 병렬 프로세스의 체인을 생성
proc = run_sleep(10)
try:
proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
proc.terminate()
proc.wait()
print('Exit status', proc.poll())
- python 3.3 이후 버전에서 사용 가능
- 자식 프로세스가 종료되지 않거나 입력 또는 출력 파이프에서 블록될 염려가 있다면 comminicate 메서드에 timeout 파라미터를 넘겨야 한다
37. 스레드를 블로킹 I/O 용으로 사용하고 병렬화용으로는 사용하지 말자
- 파이썬 스레드는 전역 인터프리터 잠금(GIL) 때문에 여러 CPU 코어에서 병렬로 바이트코드를 실행할 수 없다
- GIL에도 불구하고 파이썬 스레드는 동시에 여러 작업을 하는 것처럼 보여주기 쉽게 해주므로 여전히 유용하다
- 여러 시스템 호출을 병렬로 수행하려면 파이썬 스레드를 사용하자. 이렇게 하면 계산을 하면서도 블로킹 I/O를 수행할 수 있다
def factorize(number):
for i in range(1, number + 1):
if number % i == 0:
yield i
numbers = [132123, 1243, 1532 ,1234]
start = time()
for number in numbers():
lsit(factorize(number))
end = time()
print('Took %.3f seconds' % (end - start))
from threading import Thread
class FactorizeThread(Thread):
def __init__(self, number):
super().__init__()
self.number = number
def run(self):
self.factors = list(factorize(self.number))
start = time()
threads = []
for number in numbers:
thread = FactorizeThread(number)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
end = time()
print('Took %.3f seconds' % (end - start))
import select
def slow_systemcall():
select.select([], [], [], 0.1)
start = time()
for _ in range(5):
slow_systemcall()
end = time()
print('Took %.3f seconds' % (end - start))
start = time()
threads = []
for _ in range(5)
thread = Thread(target=slow_systemcall)
thread.start()
threads.append(thread)
def compute_helicopter_location(index):
# ....
for i in range(5):
compute_helicopter_location(i)
for thread in threads:
thread.join()
end = time()
print('Took %.3f seconds' % (end - start))
38. 스레드에서 데이터 경쟁을 막으려면 Lock을 사용하자
- 파이썬에 전역 인터프리터 잠금이 있다고 해도 프로그램 안에서 실행되는 스레드 간의 데이터 경쟁으로부터 보호할 책임은 프로그래머에게 있다
- 여러 스레드가 잠금 없이 같은 객체를 수정하면 프로그램의 자료 구조가 오염된다
- 내장 모듈 threading의 Lock 클래스는 파이썬의 표준 상호 배재 잠금 구현이다
def worker(sensor_index, how_many, counter):
for _ in range(how_many):
# 센서에서 읽어옴
# ...
counter.increment(1)
def run_threads(func, how_many, counter):
threads = []
for i in range(5):
args = (i, how_many, counter)
thread = Thread(target=func, args=args)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
class LockingCounter(object):
def __init__(self):
self.lock = Lock()
self.count = 0
def increment(self, offset):
with self.lock:
self.count += offset
how_many = 10**5
counter = LockingCounter()
run_threads(worker, how_many, counter)
print('Counter should be %d, found %d' %
(5 * how_many, counter.count))
39. 스레드 간의 작업을 조율하려면 Queue를 사용하자
- 파이프라인은 여러 파이썬 스레드를 사용하여 동시에 실행하는 작업의 순서를 구성하기에 아주 좋은 방법이다
- 병행 파이프라인을 구축할 때는 많은 문제(바쁜 대기, 작업자 중단, 메모리 부족)가 일어날 수 있다는 점을 주의하자
- Queue 클래스는 연산 블로킹, 버퍼 크기, 조인 등 견고한 파이프라인을 만드는 데 필요한 기능을 모두 갖췄다
class MyQueue(object):
def __init__(self):
self.items = deque()
self.lock = Lock()
def put(self, item):
with self.lock
self.items.append(item)
def get(self):
with self.lock:
return self.items.popleft()
class Worker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
self.polled_count = 0
self.work_done = 0
def run(self):
while True:
self.polled_count += 1
try:
item = self.in_queue.get()
except IndexError:
sleep(0.01) # 처리할 아이템이 없음
else:
result = self.func(item)
self.out_queue.put(result)
self.work_done += 1
download_queue = MyQueue()
resuze_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
Worker(download, download_queue, resize_queue),
Worker(resize, resize_queue, upload_queue),
Worker(upload, upload_queue, done_queue),
]
for thread in threads:
thread.start()
for _ in range(1000):
download_queue.put(object())
while len(done_queue.items) < 1000:
# 기다리는 동안 작업 수행
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Processed', processed, 'items after polling',
polled, 'times')
- run 메서드에서 IndexError 예외를 잡는 부분이 매우 많이 실행됨
- CPU 시간 낭비
from queue import Queue
queue = Queue()
def consumer():
print('Comsumer waiting') # 1
queue.get()
print('Comsumer done') # 3
thread = Thread(target=consumer)
thread.start()
print('Producer putting') # 2
queue.put(object())
thread.join()
print('Producer done') # 4
queue = Queue(1)
def consumer():
time.sleep(0.1)
queue.get() # 2
print('Comsumer got 1')
queue.get() # 4
print('Comsumer go 2')
thread = Thread(target=consumer)
thread.start()
queue.put(object()) # 1
print('Producer put 1')
queue.put(object()) # 3
print('Producer put 2')
thread.join()
print('Printer done')
in_queue = Queue()
def consumer():
print('Comsumer waiting')
work = in_queue.get() # 2
print('Comsumer working')
# 작업 수행
# ...
print('Comsumer done')
in_queue.task_done() # 3
Thread(target=consumer).start()
in_queue.put(object()) # 1
print('Producer waiting')
in_queue.join() # 4
print('Producer done')
class ClosableQueue(Queue)
SENTINEL = object()
def close(self):
self.put(self.SENTINE
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return
yield item
finally:
self.task_done()
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
# ...
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
download_queue = ClosableQueue()
# ...
threads = [
StoppableWorker(download, download_queue, resize_queue),
# ...
]
for thread in threads:
thread.start()
for _ in range(1000):
download_queue.put(object())
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')
40. 많은 함수를 동시에 실행하려면 코루틴을 고려하자
- 코루틴은 함수 수만 개를 마치 동시에 실행하는 것처럼 실행하는 효과적인 방법을 제공한다
- 제너레이터 안에서 yield 표현식의 값은 외부 코드에서 제너레이터의 send 메서드에 전달한 값이다
- 코루틴은 프로그램의 핵심 로직을 주변 환경과 상호 작용하는 코드로부터 분리할 수 있는 강력한 도구다
- 파이썬 2는 yield from 문법과 제너레이터에서 값을 반환하는 기능을 지원하지 않는다
def my_coroutine():
while True:
received = yield
print('Received:', received)
it = my_coroutine()
next(it)
it.send('First')
it.send('Second')
def minimize():
current = yield
while True:
value = yield current
current = min(value, current)
it = minimize()
next(it)
print(it.send(10))
print(it.send(4))
print(it.send(22))
print(it.send(-1))
Query = namedtuple('Query', ('y', 'x'))
def count_neighbors(y, x):
n_ = yield Query(y+1, x+0)
ne = yield Query(y+1, x+1)
# ...
neighbor_states = [n_, ne, e_, ..., nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
it = count_neighbors(10, 5)
q1 = next(it)
print('First yield: ', q1)
q2 = it.send(ALIVE)
print('Second yield: ', q2)
q3 = it.send(ALIVE)
#...
try:
count = it.send(EMPTY)
except StopIteration as e:
print('Count: ', e.value)
Transition = namedtuple('Transitioin', ('y', 'x', 'state'))
def game_logic(state, neighbors):
# ...
def step_cell(y, x):
state = yield Query(y, x)
neighbors = yield from count_neighbors(y, x)
next_state = game_logic(state, neighbors)
yield Transition(y, x, next_state)
TICK = object()
def simulate(height, width):
while True:
for y in range(height):
for x in range(width):
yield from step_cell(y, x)
yield TICK
class Grid(object):
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def __str__(self):
# ...
def query(self, y, x):
return self.rows[y % self.height][x % self.width]
def assign(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def live_a_generation(grid, sim):
progeny = Grid(grid.height, grid.width)
item = next(sim)
while item is not TICK:
if isinstance(item, Query):
state = grid.query(item.y, item.x)
item = sim.send(state)
else:
progeny.assign(item.y, item.x, item.state)
item = next(sim)
return progeny
class ColumnPrinter(object):
# ...
columns = ColumnPrinter()
sim = simulate(grid.height, grid.width)
for i in range(5):
columns.append(str(grid))
grid = live_a_generation(grid, sim)
print(columns)
41. 진정한 병렬성을 실현하려면 concurrent.futures를 고려하자
- CPU 병목점을 C 확장 모듈로 옮기는 방법은 파이썬 코드에 최대한 투자하면서 성능을 개선할 수 있는 효과적인 방법이다. 하지만 이렇게 하면 비용이 많이 들어가고 버그가 생길 수도 있다
- multiprocessing 모듈은 파이썬에서 특정 유형의 계산을 최소한의 노력으로 병렬화 할 수 있는 강력한 도구를 제공
- multiprocessing의 강력한 기능은 concurrent.futures와 그 안에 들어 있는 간단한 ProcessPoolExecutor 클래스로 접근하는 게 가장 좋다
- multiprocessing 모듈의 고급 기능은 너무 복잡하므로 피하는 것이 좋다
def gcd(pair):
a, b = pair
low = min(a, b)
for i in range(low, 0, -1):
if a % i == 0 and b % i == 0:
return i
number = [(2143, 12351) ...]
start = time()
results = list(map(gcd, numbers))
end = time()
print('Took $.3f seconds' % (end - start)) # Took 1.170 seconds
start = time()
pool = ThreadPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time()
print('Took $.3f seconds' % (end - start)) # Took 1.119 seconds
start = time()
pool = ProcessPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time()
print('Took $.3f seconds' % (end - start)) # Took 0.663 seconds
'프로그래머 > Python' 카테고리의 다른 글
[널널한 교수의 고급 파이썬] 01-2 리스트 요소는 참조형이다 (0) | 2020.12.11 |
---|---|
[널널한 교수의 고급 파이썬] 01-1 파이썬 자료형과 참조 변수 (0) | 2020.12.11 |
[Effective Python 복습] Chapter 4. 메타클래스와 속성 (0) | 2020.12.03 |
[Effective Python 복습] Chapter 3. 클래스와 상속 (0) | 2020.12.02 |
[파이썬] 파이썬 정리 for me (feat. FastCampus) (0) | 2020.05.30 |