본문 바로가기
DEV/AWS

[MASOCON 2019] 컨퍼런스 트랙 - 서버리스를 활용한 분산 처리 DEMO

by 김 민 준 2019. 11. 28.

 

MASOCON 2019 데모 자료 입니다.

 

 

 

발표 내용을 참고해주시면 됩니다. 

 

- 사이트 : https://www.imaso.co.kr/masocon2019

- 발표자료 : https://www.slideshare.net/MinJunKim5/masocon-2019-serverless-kimminjun (슬라이드쉐어)

https://speakerdeck.com/microsoftware/masokon-2019-seobeoriseureul-hwalyonghan-bunsan-ceori-gimminjun (스피커덱 - 마소콘공식)

 

 

 

 

데모를 무엇으로 준비할지 고민이 엄청 많았습니다.

공공기관의 API는 대부분 한도가 있다보니 적절한 예시를 찾아야 했는데...

관세청에서 찾았습니다.

 

관세청API에서 '관세환율'을 이용했습니다.

 

먼저 관세청API에 대해 설명을 하자면, 1:N 의 데이터 입니다. XML로 리턴받으며 GET방식으로 1개씩 받아올 수 있습니다.

 

아래 이미지는 관세청 API 문서의 일부입니다.

 

(출처 : https://unipass.customs.go.kr

 

형태는 일반적인 공공기관의 API와 크게 다르지는 않습니다. 

 

 

 

 

 

요청메시지는 단순하게 날짜와 수출입 구분만 넣습니다. 

 

 

 

응답 메시지는 국가부호/화폐단위명/환율/통화부호/적용개시일자/수출입구분 총 6개의 필드로 구성되어 있습니다. 

 

 

 

 

 

 

요청/응답에 대한 예시입니다. 

 

 

 

 

 

 

 

인증키는 개별적으로 신청하여 받으시면 됩니다. 과정에 대해서는 생략합니다. 

 

구조도입니다. 

 

1. AWS Console을 이용하여 실행.

2~5. API Gateway로 처리, Lambda에서 데이터(날짜범위)값을 갖고 옴.

6~8. SNS를 통해 Invoke된 Lambda는 각각 할당 받은 만큼 처리한다.

9. 처리가 완료된 데이터는 DynamoDB로 저장한다.

 
 
 
Lambda는 3개가 필요하고 API Gateway 1개 그리고 SNS 와 DynamoDB가 필요합니다.
언어는 Python(v3.6)을 사용했고, 발표내용에 나온것처럼 Layers를 이용하여 라이브러리를 패키징하여 재사용 하였습니다.
 
 
일단 Lambda에 있는 Layers부터 이미지를 통해 보여드리겠습니다.
 
 

 

Lambda에 접근을 하면 좌측 메뉴중에 가장 하단에 '계층'이 존재합니다. 

이미지는 제가 이미 등록해둔 라이브러리 패키징이네요. 등록한 'python36-lib'를 계속해서 재사용하여, Lambda를 실행합니다.

 

우선 위의 API에 필요한 날짜값을 갖고오는 Lambda를 준비합니다.

 

 

 

 

Designer에 Lambda이름 밑에 Layers를 등록되어 있는모습이 보입니다.

Layers는 최대 5개를 등록할수 있어요. (라이브러리가 중복되거나 하는 문제가 발생할 수 있습니다.)

 

 

 

 

그리고 API Gateway도 확인할 수 있어요. API Gateway를 생성하여 Lambda와 연결한 상태입니다.

API Gateway를 이용하여 처음 시작하는 Lambda가 위의 이미지의 Lambda를 호출해서 날짜값을 갖고 올꺼예요.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#-*- coding:utf-8 -*-
import datetime
from datetime import date
import json
 
def lambda_handler(event, context):
        
    strdate = ''
    for i in range(13660):
    # for i in range(1, 31):
        datedata = datetime.date.today() + datetime.timedelta(-i)
        if (i != 1):
            strdate = strdate + ',' + datedata.strftime('%Y%m%d')
        else :
            strdate = datedata.strftime('%Y%m%d')
        # print(datedata.strftime('%Y%m%d'))
 
    return strdate
 
# if __name__ == '__main__':
#     lambda_handler('','');
cs
 
 
yyyymmdd의 값을 ( , ) 로 붙여서 Text를 return 해주는 함수예요.
정말 심플하게 Source를 준비했어요. 31개로 돌려보고 365개로 돌려보고...
발표 당시에 데모를 돌리는데 양이 너무 적다보니 금방 끝나더라구요. 그래서 3660를 돌렸는데 그 상태로 남아 있어요.
(실제로 제가 작업할땐 다른 API를 호출하는데 약 15~20만건의 데이터를 돌리고 있어요)
 
 

 

이미지처럼 날짜값을 갖고 오고, 기준이 되는 Lambda에서 호출해서 받아온뒤 ( , ) 를 기준으로 잘라서 사용합니다. 
아래 이미지가 바로 기준이 되는 Lambda예요.
 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#-*- coding:utf-8 -*-
 
import json
import zeep
import urllib3
import xml.etree.ElementTree as ET
import time
import ast
import boto3
import requests
import os
import re
 
from urllib.request import urlopen
from requests.auth import HTTPBasicAuth
from requests import Session
from zeep import Client
from zeep.transports import Transport
from bs4 import BeautifulSoup
 
def errorCheck(msg) :
    print(msg)
 
def get_unipass_data(list):
 
    """
    # get_unipass_data
    # (list)  
    """
    
    print('# [get_unipass_data] Count : 'len(list))
    # print(list)
    print(os.environ['ARN_SCRAPING'])
    try:
 
        print('# [SNS 전송 시작]')
        client = boto3.client('sns')
        response = client.publish(
            TargetArn = os.environ['ARN_SCRAPING'],
            Message = json.dumps({'default': json.dumps(list)}),
            Subject = 'PY36_MASOCON_DEMO_UNIPASS_URL',
            MessageStructure='json'
        )
        print('# [SNS 전송 끝]')
 
 
    except Exception as e:
        print('# [SNS 전송 오류]')
        print(e)
 
        return errorCheck(e,'')
    
    return 'OK'
 
 
def url_list():
    try:
        
        headers = {
            'x-api-key': os.environ['xapikey'],
            'Content-type':'application/json;charset=utf-8',
            'Accept':'*/*'
        }
        
        data = {}
        resp = requests.post(os.environ['api_url'], headers=headers, data=json.dumps(data))
        dict = resp.text.split(',')
        url_list = []
        
        if len(dict) > 0 :
            for i in dict: 
                url_list.append(os.environ['unipass_url'+ i.replace("\"",""))
                
        print('# 총 건수 : 'len(url_list))
        data = []
        if len(url_list) > 0 :
            count = 0
            total_group_count = 0
            for i in url_list: 
                data.append(url_list[count])
                count = count + 1
                if (len(data)/36== 1 :
                    get_unipass_data(data)
                    total_group_count = total_group_count + 1 
                    data.clear()
                    #break
            if len(data) > 0 :
                total_group_count = total_group_count + 1 
                get_unipass_data(data)
                data.clear()
 
            print('# 36개씩 나눈 작업 개수 : ', total_group_count)
        return url_list 
        
    except Exception as e:
        print(e)
        return errorCheck(e)
 
def lambda_handler(event, context):
    url_list()
 
# if __name__ == '__main__':
#     lambda_handler('','')
    
cs
 
 
 
 
소스는 API Gateway 를 호출하고, 받아온 Text 값을 ( , ) 를 기준으로 split을 하고 dict에 넣습니다.
그런데 보시면 os.environ['변수명'] 이 보이실꺼예요. 이것은 Lambda에서 제공해주는 환경 변수를 이용했어요. 
(boto3를 이용해서 인코딩/디코딩을 해서 사용해도 좋습니다.)
위치는 Lambda Console화면에서 '함수코드' 밑에 '환경변수'가 있습니다. 
저는 key를 4개 사용했습니다. 
 

 

반복문을 이용하여 36개씩 나누어 get_unipass_data를 호출해요. 
get_unipass_data는 Amazon SNS 에 던지는 역할입니다.
데모에서 에러처리에 대해서는 별도로 구성하지는 않았어요.
(실제로 구현할때는 에러처리도 Amazon SNS에 에러내용을 전달하여 Lambda를 통해서 RDBMS에 넣었습니다.)
 
여기까지가 아래 구조도의 내용이예요.
 

 

 

그럼 Amazon SNS를 통해 전달받은 Lambda를 보여드릴게요. 

 

 

SNS가 연결되어 있는것을 볼 수 있습니다. 활성화도 되어 있구요. 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#-*- coding:utf-8 -*-
######################
############## lib 
######################  
 
import json
import zeep
import urllib3
import xml.etree.ElementTree as ET
import time
import ast
import boto3
from urllib.request import urlopen
from requests.auth import HTTPBasicAuth
from requests import Session
from zeep import Client
from zeep.transports import Transport
from bs4 import BeautifulSoup
from multiprocessing import Process
 
def dynamo_insert(list):
 
    """
    # get_unipass_data
    # (list)  
    """
    
    print('# [DynamoDB] Count : 'len(list))
 
    try:
 
        print('# [SNS 전송 시작]')
        client = boto3.client('sns')
        response = client.publish(
            TargetArn = 'ARN값을 넣습니다...',
            Message = json.dumps({'default': json.dumps(list)}),
            Subject = 'PY36_MASOCON_DEMO_UNIPASS_SCRAPING',
            MessageStructure='json'
        )
        print('# [SNS 전송 끝]')
 
 
    except Exception as e:
        print('# [SNS 전송 오류]')
        print(e)
 
        return errorCheck(e,'')
    
    return 'OK'
 
 
def errorCheck(msg) :
    print(msg)
    
def change_date_format(content):
    if content != None:
        return content[0:4]+'-'+ content[4:6+ '-' + content[6:8
 
def Unipass_scraping(list):
 
    count = 0
    
    print('# [Unipass_scraping] Count : 'len(list))
    err_url = ''
 
    try:
        if len(list) > 0 :
            for url in list: 
 
                err_url = url
                count = count + 1
 
                tree = ET.ElementTree(file=urlopen(url))
 
                tCnt = 0
                tCnt = int(tree.find('tCnt').text)
                
                print(tCnt)
                list_trifFxrtInfoQryRsltVo = []
                        
                if tCnt > 0:
                    for item in tree.findall('trifFxrtInfoQryRsltVo'):
                        if item.find('cntySgn').text == "KR" : 
                            trifFxrtInfoQryRsltVo = {
                                'cntySgn' : item.find('cntySgn').text if item.find('cntySgn').text != None else None,
                                'mtryUtNm' : item.find('mtryUtNm').text if item.find('mtryUtNm').text != None else None,
                                'fxrt' :item.find('fxrt').text if item.find('fxrt').text != None else None,
                                'aplyBgnDt' : change_date_format(item.find('aplyBgnDt').text) if item.find('aplyBgnDt').text != None else None,
                                'currSgn':item.find('currSgn').text if item.find('currSgn').text != None else None,
                                'imexTp' : 2
                            }
 
                    dynamo_insert(trifFxrtInfoQryRsltVo)
 
        err_url = ''
        
        print('success')
 
    except Exception as e:
        print('# [Unipass_scraping] Error : ', e)
        print('# [Unipass_scraping] err_url : ', err_url)
        # pass
        return errorCheck(e,err_url)
        
        
        
def lambda_handler(event, context):
    
    if 'API' in event :
 
        print('# 작업명 (Event == API)')
 
        job_type = event['API'][0]['Type']
        message = json.dumps(event['API'][0]['Data'])
        messageList = ast.literal_eval(message)
        
        Unipass_scraping(messageList)
        
        return job_type
 
    elif 'Records' in event : 
 
        print('# 작업명 (Event == Records)')
 
        job_type = event['Records'][0]['Sns']['Subject']
        message = event['Records'][0]['Sns']['Message']
        messageList = ast.literal_eval(message)
        
        Unipass_scraping(messageList)
        
        return job_type
 
    else : 
        return 'not working'
        
cs


 

 

 

 

 

 

 

핸들러는 이벤트 값을 받고, 이벤트 값에 제가 API 인지 Records인지 구분을 합니다. 

발표내용을 보면 'API Gateway를 통해서 직접 실행할수 있게 구현했다.' 라는 내용이 있는데요. 그 내용입니다.

간단하게 구분값을 API / Records 로 했어요. 그리고 Subject에 타입을 넣어서 던졌어요. 

실제 구현한 소스에는 Subject를 기준으로 분기처리도 했었어요. 

message에는 데이터를 담았어요. 그래서 message 를 list로 만들어서 Unipass_scraping으로 던집니다. 

 

처음에 언급한바와 같이 관세청 API는 1:N 이예요. 1개씩 던져서 여러행을 받습니다.

list 를 반복하여 관세청 API를 통신하고, 결과 값을 받아서 DynamoDB에 만든 TEST테이블에 Insert합니다. 

(저는 리턴받은 데이터중 KR만 뽑아서 담았어요) 

DynamoDB를 Insert하는 Lambda에 SNS를 이용해서 던집니다. 

DynamoDB에서 테이블은 간단하게 1개의 키로만 구성했어요. (DynamoDB는 NoSQL입니다.)

 

 

 

 

 

 

DynamoDB에 Insert하는 Lambda의 소스입니다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import json
import ast
import boto3
 
def dynamoDB_INSERT(data):
    dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2')
    dynamodb.Table('MASOCON_DEMO').put_item(
        Item={
            'DATA': data
        }
    )
    
    return 'success'
 
def lambda_handler(event, context):
    
    if 'API' in event :
 
        print('# 작업명 (Event == API)')
 
        job_type = event['API'][0]['Type']
        message = json.dumps(event['API'][0]['Data'])
        messageList = ast.literal_eval(message)
        dynamoDB_INSERT(message)
        return 'success'
 
    elif 'Records' in event : 
 
        print('# 작업명 (Event == Records)')
 
        job_type = event['Records'][0]['Sns']['Subject']
        message = event['Records'][0]['Sns']['Message']
        messageList = ast.literal_eval(message)
        print(message)
        dynamoDB_INSERT(message)
        return 'success'
 
    else : 
            
        # data = 'aaaa'
        # dynamoDB_INSERT(data)
        
        return 'not working'
        
cs
 
 
 
 
Amazon SNS를 통해서 받아서 넣는건데요, 이전에 봤던 Lambda와 다른점은 DynamoDB에 Insert를 하는것 말고는 크게 다른것이 없습니다.
 
결과를 보여드리겠습니다.
 
 
실행 전 
 
 

 

 
실행 후 
 

 

참고로 호출하는 관세환율에 대한 정보는 7일을 기준으로 바뀌는 데이터였어요. 
즉, 7번의 중복되는 데이터를 제가 갖고오는거죠. 그렇지만 DynamoDB의 특성상 키값을 기준으로 동일한 값이 들어가면 UPDATE처리가 된다고해요.
(사실... 중복제거하고 DynamoDB에 넣는게 맞는것인데.. 일단 저는 이렇게 데모를 준비했네요ㅎㅎ) 
실행 후 테이블에 몇백개의 데이터가 들어가있는것을 확인해볼 수 있습니다.
 
데모를 준비하면서 IAM을 잡는 부분이 생각보다 어려웠어요. 
실무에서 구현할때는 사내에 있는 클라우드팀의 도움을 받았으나, 데모는 하나하나 제가 설정을 잡은거였거든요.
 
상세하게 서비스별로 설명드리기는 어렵고, 기회가 되면 IAM 설정을 포함한...
사용한 서비스들에 대해서 하나씩 글을 남겨보겠습니다.
(이글은 데모에 대한 내용일뿐이예요...)
 
마지막으로 다시 구조도를 보여드리겠습니다.