개발/Python

Concurrency

Louisus 2020. 7. 31. 22:25
728x90

Python 코루틴

  • 비동기 I/O 코루틴 개념

파일을 쓰거나 읽거나 또는 네트워크의 웹 사이트에 요청하고 응답이 오는 타이밍에 모든 워커들이 멈추는 현상이 발생하므로 바로 제어권을 요청하면 다음 워커가 일하고 또 다음 워커가 일하는 것을 yield를 통해 제어권을 메인 함수에 주고 일을 여러 명이서 동시성을 가지고 할 수 있게 함. 단일 스레드.

  • Block I/O 순차 실행
  • ThreadPool 사용 우회
  • Asyncio

순차 실행

# Asyncio
# 동기 - 기다림
# 비동기 I/O - 기다리지 않고 바로 실행
# Generator -> 반복적인 객체 Return(yield)
# 즉, 실행 Stop -> 다른 작업 위임 -> Stop 지점 부터 재실행 원리
# Non-Blocking 비동기 처리 적합

# BlockIO

import timeit
# from urllib.request import urlopen
import requests


urls = ['http://daum.net', 'https://google.com', 'https://apple.com', 'https://tistory.com', 'https://github.com', 'https://gmarket.co.kr']

start = timeit.default_timer()

# 순차 실행부
for url in urls:
    print('Start ', url)
    # 실제 요청
    # 요청하면 응답이 올 때 까지, 즉 urlopen 함수 끝날 때까지 아래부분이 실행 안됨
    text = requests.get(url)
    # text = urlopen(url)
    # 실제 내용
    print('Contents', text)
    print('Done ', url)

# 완료시간 - 시작시간
duration = timeit.default_timer() - start

# 총 실행 시간
print('Total Time: ', duration)
# Asyncio
# 동기 - 기다림
# 비동기 I/O - 기다리지 않고 바로 실행
# Generator -> 반복적인 객체 Return(yield)
# 즉, 실행 Stop -> 다른 작업 위임 -> Stop 지점 부터 재실행 원리
# Non-Blocking 비동기 처리 적합

# BlockIO -> Thread 사용
# 스레드 개수 및 GIL 문제 염두, 공유 메모리 문제 해결

import timeit
# from urllib.request import urlopen
import requests
from concurrent.futures import ThreadPoolExecutor
import threading


urls = ['http://daum.net', 'https://google.com', 'https://apple.com',
        'https://tistory.com', 'https://github.com', 'https://gmarket.co.kr']

start = timeit.default_timer()


def fetch(url):
    print('Thread Name : ', threading.current_thread().getName(), 'Start', url)
    requests.get(url)
    print('Thread Name : ', threading.current_thread().getName(), 'Done', url)


def main():
    # max_worker=10 넣으면 에러
    with ThreadPoolExecutor() as executor:
        for url in urls:
            # map으로 할 경우 순서대로 하지만 for 문에서 사용불가
            # 따로 매핑해줘야됨
            executor.submit(fetch, url)


# 스레드 할 때는 메인 함수 영역을 만들어 줘야됨
# 아니면 진입점이 없으면 에러 발생
if __name__ == '__main__':
    main()
    # 완료시간 - 시작시간
    duration = timeit.default_timer() - start

    # 총 실행 시간
    print('Total Time: ', duration)
# Asyncio
# 동기 - 기다림
# 비동기 I/O - 기다리지 않고 바로 실행
# Generator -> 반복적인 객체 Return(yield)
# 즉, 실행 Stop -> 다른 작업 위임 -> Stop 지점 부터 재실행 원리
# Non-Blocking 비동기 처리 적합

# BlockIO -> Thread 사용
# 스레드 개수 및 GIL 문제 염두, 공유 메모리 문제 해결

import timeit
from urllib.request import urlopen
import requests
from concurrent.futures import ThreadPoolExecutor
import threading
import asyncio



urls = ['http://daum.net', 'https://google.com', 'https://apple.com',
        'https://tistory.com', 'https://github.com', 'https://gmarket.co.kr']

start = timeit.default_timer()


async def fetch(url):
    print('Thread Name : ', threading.current_thread().getName(), 'Start', url)
    # TypeError: object Response can't be used in 'await' expression
    # request 가 blockio이기 때문
    # 이 부분만 스레딩으로 만들어서 해결
    # aiohttp 사용 가능(Asyncio 적용)

    res = await requests.get(url)
    print('Thread Name : ', threading.current_thread().getName(), 'Done', url)
    return res

# 여러개 동시에 제너레이터 함수 앞에 async 붙임
async def main():
    # yield from -> await
    # asyncio.ensure_future
    futures = [asyncio.ensure_future(fetch(url)) for url in urls]

    rst = await asyncio.gather(*futures)

    print()
    print('Result: ', rst)


# 스레드 할 때는 메인 함수 영역을 만들어 줘야됨
# 아니면 진입점이 없으면 에러 발생
if __name__ == '__main__':
    # 루프 생성 (여러 개의 제너레이터 함수로 부터 중앙에서 관리)
    loop = asyncio.get_event_loop()

    # 루프 대기
    loop.run_until_complete(main())

    # 함수 실행
    main()
    # 완료시간 - 시작시간
    duration = timeit.default_timer() - start

    # 총 실행 시간
    print('Total Time: ', duration)
# Asyncio
# 동기 - 기다림
# 비동기 I/O - 기다리지 않고 바로 실행
# Generator -> 반복적인 객체 Return(yield)
# 즉, 실행 Stop -> 다른 작업 위임 -> Stop 지점 부터 재실행 원리
# Non-Blocking 비동기 처리 적합

# BlockIO -> Thread 사용
# 스레드 개수 및 GIL 문제 염두, 공유 메모리 문제 해결

import timeit
from urllib.request import urlopen
import requests
from concurrent.futures import ThreadPoolExecutor
import threading
import asyncio


urls = ['http://daum.net', 'https://google.com', 'https://apple.com',
        'https://tistory.com', 'https://github.com', 'https://gmarket.co.kr']

start = timeit.default_timer()


async def fetch(url, executor):
    print('Thread Name : ', threading.current_thread().getName(), 'Start', url)
    # TypeError: object Response can't be used in 'await' expression
    # request 가 blockio이기 때문
    # 이 부분만 스레딩으로 만들어서 해결
    # aiohttp 사용 가능(Asyncio 적용)

    res = await loop.run_in_executor(executor, urlopen, url)
    print('Thread Name : ', threading.current_thread().getName(), 'Done', url)
    return res.read[0:5]

# 여러개 동시에 제너레이터 함수 앞에 async 붙임


async def main():
    # 스레드 풀 생성
    executor = ThreadPoolExecutor(max_workers=10)
    # yield from -> await
    # asyncio.ensure_future
    futures = [asyncio.ensure_future(fetch(url, executor)) for url in urls]

    rst = await asyncio.gather(*futures)

    print()
    print('Result: ', rst)


# 스레드 할 때는 메인 함수 영역을 만들어 줘야됨
# 아니면 진입점이 없으면 에러 발생
if __name__ == '__main__':
    # 루프 생성 (여러 개의 제너레이터 함수로 부터 중앙에서 관리)
    loop = asyncio.get_event_loop()

    # 루프 대기
    loop.run_until_complete(main())

    # 함수 실행
    main()
    # 완료시간 - 시작시간
    duration = timeit.default_timer() - start

    # 총 실행 시간
    print('Total Time: ', duration)