MASOCON 2019 데모 자료 입니다.


발표 내용을 참고해주시면 됩니다. 


- 사이트 : https://www.imaso.co.kr/masocon2019

- 발표자료 : https://www.slideshare.net/MinJunKim5/masocon-2019-serverless-kimminjun (슬라이드쉐어)

https://speakerdeck.com/microsoftware/masokon-2019-seobeoriseureul-hwalyonghan-bunsan-ceori-gimminjun (스피커덱 - 마소콘공식)




데모를 무엇으로 준비할지 고민이 엄청 많았습니다.

공공기관의 API는 대부분 한도가 있다보니 적절한 예시를 찾아야 했는데...

관세청에서 찾았습니다.


관세청API에서 '관세환율'을 이용했습니다.


먼저 관세청API에 대해 설명을 하자면, 1:N 의 데이터 입니다. XML로 리턴받으며 GET방식으로 1개씩 받아올 수 있습니다.


아래 이미지는 관세청 API 문서의 일부입니다.


(출처 : https://unipass.customs.go.kr


형태는 일반적인 공공기관의 API와 크게 다르지는 않습니다. 






요청메시지는 단순하게 날짜와 수출입 구분만 넣습니다. 




응답 메시지는 국가부호/화폐단위명/환율/통화부호/적용개시일자/수출입구분 총 6개의 필드로 구성되어 있습니다. 







요청/응답에 대한 예시입니다. 








인증키는 개별적으로 신청하여 받으시면 됩니다. 과정에 대해서는 생략합니다. 


구조도입니다. 


1. AWS Console을 이용하여 실행.

2~5. API Gateway로 처리, Lambda에서 데이터(날짜범위)값을 갖고 옴.

6~8. SNS를 통해 Invoke된 Lambda는 각각 할당 받은 만큼 처리한다.

9. 처리가 완료된 데이터는 DynamoDB로 저장한다.




Lambda는 3개가 필요하고 API Gateway 1개 그리고 SNS 와 DynamoDB가 필요합니다.
언어는 Python(v3.6)을 사용했고, 발표내용에 나온것처럼 Layers를 이용하여 라이브러리를 패키징하여 재사용 하였습니다.


일단 Lambda에 있는 Layers부터 이미지를 통해 보여드리겠습니다.



Lambda에 접근을 하면 좌측 메뉴중에 가장 하단에 '계층'이 존재합니다. 

이미지는 제가 이미 등록해둔 라이브러리 패키징이네요. 등록한 'python36-lib'를 계속해서 재사용하여, Lambda를 실행합니다.


우선 위의 API에 필요한 날짜값을 갖고오는 Lambda를 준비합니다.





Designer에 Lambda이름 밑에 Layers를 등록되어 있는모습이 보입니다.

Layers는 최대 5개를 등록할수 있어요. (라이브러리가 중복되거나 하는 문제가 발생할 수 있습니다.)





그리고 API Gateway도 확인할 수 있어요. API Gateway를 생성하여 Lambda와 연결한 상태입니다.

API Gateway를 이용하여 처음 시작하는 Lambda가 위의 이미지의 Lambda를 호출해서 날짜값을 갖고 올꺼예요.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#-*- coding:utf-8 -*-
import datetime
from datetime import date
import json
 
def lambda_handler(event, context):
        
    strdate = ''
    for i in range(13660):
    # for i in range(1, 31):
        datedata = datetime.date.today() + datetime.timedelta(-i)
        if (i != 1):
            strdate = strdate + ',' + datedata.strftime('%Y%m%d')
        else :
            strdate = datedata.strftime('%Y%m%d')
        # print(datedata.strftime('%Y%m%d'))
 
    return strdate
 
# if __name__ == '__main__':
#     lambda_handler('','');
cs



yyyymmdd의 값을 ( , ) 로 붙여서 Text를 return 해주는 함수예요.
정말 심플하게 Source를 준비했어요. 31개로 돌려보고 365개로 돌려보고...
발표 당시에 데모를 돌리는데 양이 너무 적다보니 금방 끝나더라구요. 그래서 3660를 돌렸는데 그 상태로 남아 있어요.
(실제로 제가 작업할땐 다른 API를 호출하는데 약 15~20만건의 데이터를 돌리고 있어요)




이미지처럼 날짜값을 갖고 오고, 기준이 되는 Lambda에서 호출해서 받아온뒤 ( , ) 를 기준으로 잘라서 사용합니다. 
아래 이미지가 바로 기준이 되는 Lambda예요.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#-*- coding:utf-8 -*-
 
import json
import zeep
import urllib3
import xml.etree.ElementTree as ET
import time
import ast
import boto3
import requests
import os
import re
 
from urllib.request import urlopen
from requests.auth import HTTPBasicAuth
from requests import Session
from zeep import Client
from zeep.transports import Transport
from bs4 import BeautifulSoup
 
def errorCheck(msg) :
    print(msg)
 
def get_unipass_data(list):
 
    """
    # get_unipass_data
    # (list)  
    """
    
    print('# [get_unipass_data] Count : 'len(list))
    # print(list)
    print(os.environ['ARN_SCRAPING'])
    try:
 
        print('# [SNS 전송 시작]')
        client = boto3.client('sns')
        response = client.publish(
            TargetArn = os.environ['ARN_SCRAPING'],
            Message = json.dumps({'default': json.dumps(list)}),
            Subject = 'PY36_MASOCON_DEMO_UNIPASS_URL',
            MessageStructure='json'
        )
        print('# [SNS 전송 끝]')
 
 
    except Exception as e:
        print('# [SNS 전송 오류]')
        print(e)
 
        return errorCheck(e,'')
    
    return 'OK'
 
 
def url_list():
    try:
        
        headers = {
            'x-api-key': os.environ['xapikey'],
            'Content-type':'application/json;charset=utf-8',
            'Accept':'*/*'
        }
        
        data = {}
        resp = requests.post(os.environ['api_url'], headers=headers, data=json.dumps(data))
        dict = resp.text.split(',')
        url_list = []
        
        if len(dict) > 0 :
            for i in dict: 
                url_list.append(os.environ['unipass_url'+ i.replace("\"",""))
                
        print('# 총 건수 : 'len(url_list))
        data = []
        if len(url_list) > 0 :
            count = 0
            total_group_count = 0
            for i in url_list: 
                data.append(url_list[count])
                count = count + 1
                if (len(data)/36== 1 :
                    get_unipass_data(data)
                    total_group_count = total_group_count + 1 
                    data.clear()
                    #break
            if len(data) > 0 :
                total_group_count = total_group_count + 1 
                get_unipass_data(data)
                data.clear()
 
            print('# 36개씩 나눈 작업 개수 : ', total_group_count)
        return url_list 
        
    except Exception as e:
        print(e)
        return errorCheck(e)
 
def lambda_handler(event, context):
    url_list()
 
# if __name__ == '__main__':
#     lambda_handler('','')
    
cs




소스는 API Gateway 를 호출하고, 받아온 Text 값을 ( , ) 를 기준으로 split을 하고 dict에 넣습니다.
그런데 보시면 os.environ['변수명'] 이 보이실꺼예요. 이것은 Lambda에서 제공해주는 환경 변수를 이용했어요. 
(boto3를 이용해서 인코딩/디코딩을 해서 사용해도 좋습니다.)
위치는 Lambda Console화면에서 '함수코드' 밑에 '환경변수'가 있습니다. 
저는 key를 4개 사용했습니다. 



반복문을 이용하여 36개씩 나누어 get_unipass_data를 호출해요. 
get_unipass_data는 Amazon SNS 에 던지는 역할입니다.
데모에서 에러처리에 대해서는 별도로 구성하지는 않았어요.
(실제로 구현할때는 에러처리도 Amazon SNS에 에러내용을 전달하여 Lambda를 통해서 RDBMS에 넣었습니다.)

여기까지가 아래 구조도의 내용이예요.



그럼 Amazon SNS를 통해 전달받은 Lambda를 보여드릴게요. 



SNS가 연결되어 있는것을 볼 수 있습니다. 활성화도 되어 있구요. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#-*- coding:utf-8 -*-
######################
############## lib 
######################  
 
import json
import zeep
import urllib3
import xml.etree.ElementTree as ET
import time
import ast
import boto3
from urllib.request import urlopen
from requests.auth import HTTPBasicAuth
from requests import Session
from zeep import Client
from zeep.transports import Transport
from bs4 import BeautifulSoup
from multiprocessing import Process
 
def dynamo_insert(list):
 
    """
    # get_unipass_data
    # (list)  
    """
    
    print('# [DynamoDB] Count : 'len(list))
 
    try:
 
        print('# [SNS 전송 시작]')
        client = boto3.client('sns')
        response = client.publish(
            TargetArn = 'ARN값을 넣습니다...',
            Message = json.dumps({'default': json.dumps(list)}),
            Subject = 'PY36_MASOCON_DEMO_UNIPASS_SCRAPING',
            MessageStructure='json'
        )
        print('# [SNS 전송 끝]')
 
 
    except Exception as e:
        print('# [SNS 전송 오류]')
        print(e)
 
        return errorCheck(e,'')
    
    return 'OK'
 
 
def errorCheck(msg) :
    print(msg)
    
def change_date_format(content):
    if content != None:
        return content[0:4]+'-'+ content[4:6+ '-' + content[6:8
 
def Unipass_scraping(list):
 
    count = 0
    
    print('# [Unipass_scraping] Count : 'len(list))
    err_url = ''
 
    try:
        if len(list) > 0 :
            for url in list: 
 
                err_url = url
                count = count + 1
 
                tree = ET.ElementTree(file=urlopen(url))
 
                tCnt = 0
                tCnt = int(tree.find('tCnt').text)
                
                print(tCnt)
                list_trifFxrtInfoQryRsltVo = []
                        
                if tCnt > 0:
                    for item in tree.findall('trifFxrtInfoQryRsltVo'):
                        if item.find('cntySgn').text == "KR" : 
                            trifFxrtInfoQryRsltVo = {
                                'cntySgn' : item.find('cntySgn').text if item.find('cntySgn').text != None else None,
                                'mtryUtNm' : item.find('mtryUtNm').text if item.find('mtryUtNm').text != None else None,
                                'fxrt' :item.find('fxrt').text if item.find('fxrt').text != None else None,
                                'aplyBgnDt' : change_date_format(item.find('aplyBgnDt').text) if item.find('aplyBgnDt').text != None else None,
                                'currSgn':item.find('currSgn').text if item.find('currSgn').text != None else None,
                                'imexTp' : 2
                            }
 
                    dynamo_insert(trifFxrtInfoQryRsltVo)
 
        err_url = ''
        
        print('success')
 
    except Exception as e:
        print('# [Unipass_scraping] Error : ', e)
        print('# [Unipass_scraping] err_url : ', err_url)
        # pass
        return errorCheck(e,err_url)
        
        
        
def lambda_handler(event, context):
    
    if 'API' in event :
 
        print('# 작업명 (Event == API)')
 
        job_type = event['API'][0]['Type']
        message = json.dumps(event['API'][0]['Data'])
        messageList = ast.literal_eval(message)
        
        Unipass_scraping(messageList)
        
        return job_type
 
    elif 'Records' in event : 
 
        print('# 작업명 (Event == Records)')
 
        job_type = event['Records'][0]['Sns']['Subject']
        message = event['Records'][0]['Sns']['Message']
        messageList = ast.literal_eval(message)
        
        Unipass_scraping(messageList)
        
        return job_type
 
    else : 
        return 'not working'
        
cs





핸들러는 이벤트 값을 받고, 이벤트 값에 제가 API 인지 Records인지 구분을 합니다. 

발표내용을 보면 'API Gateway를 통해서 직접 실행할수 있게 구현했다.' 라는 내용이 있는데요. 그 내용입니다.

간단하게 구분값을 API / Records 로 했어요. 그리고 Subject에 타입을 넣어서 던졌어요. 

실제 구현한 소스에는 Subject를 기준으로 분기처리도 했었어요. 

message에는 데이터를 담았어요. 그래서 message 를 list로 만들어서 Unipass_scraping으로 던집니다. 


처음에 언급한바와 같이 관세청 API는 1:N 이예요. 1개씩 던져서 여러행을 받습니다.

list 를 반복하여 관세청 API를 통신하고, 결과 값을 받아서 DynamoDB에 만든 TEST테이블에 Insert합니다. 

(저는 리턴받은 데이터중 KR만 뽑아서 담았어요) 

DynamoDB를 Insert하는 Lambda에 SNS를 이용해서 던집니다. 

DynamoDB에서 테이블은 간단하게 1개의 키로만 구성했어요. (DynamoDB는 NoSQL입니다.)







DynamoDB에 Insert하는 Lambda의 소스입니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import json
import ast
import boto3
 
def dynamoDB_INSERT(data):
    dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2')
    dynamodb.Table('MASOCON_DEMO').put_item(
        Item={
            'DATA': data
        }
    )
    
    return 'success'
 
def lambda_handler(event, context):
    
    if 'API' in event :
 
        print('# 작업명 (Event == API)')
 
        job_type = event['API'][0]['Type']
        message = json.dumps(event['API'][0]['Data'])
        messageList = ast.literal_eval(message)
        dynamoDB_INSERT(message)
        return 'success'
 
    elif 'Records' in event : 
 
        print('# 작업명 (Event == Records)')
 
        job_type = event['Records'][0]['Sns']['Subject']
        message = event['Records'][0]['Sns']['Message']
        messageList = ast.literal_eval(message)
        print(message)
        dynamoDB_INSERT(message)
        return 'success'
 
    else : 
            
        # data = 'aaaa'
        # dynamoDB_INSERT(data)
        
        return 'not working'
        
cs





Amazon SNS를 통해서 받아서 넣는건데요, 이전에 봤던 Lambda와 다른점은 DynamoDB에 Insert를 하는것 말고는 크게 다른것이 없습니다.

결과를 보여드리겠습니다.


실행 전 





실행 후 



참고로 호출하는 관세환율에 대한 정보는 7일을 기준으로 바뀌는 데이터였어요. 
즉, 7번의 중복되는 데이터를 제가 갖고오는거죠. 그렇지만 DynamoDB의 특성상 키값을 기준으로 동일한 값이 들어가면 UPDATE처리가 된다고해요.
(사실... 중복제거하고 DynamoDB에 넣는게 맞는것인데.. 일단 저는 이렇게 데모를 준비했네요ㅎㅎ) 
실행 후 테이블에 몇백개의 데이터가 들어가있는것을 확인해볼 수 있습니다.

데모를 준비하면서 IAM을 잡는 부분이 생각보다 어려웠어요. 
실무에서 구현할때는 사내에 있는 클라우드팀의 도움을 받았으나, 데모는 하나하나 제가 설정을 잡은거였거든요.

상세하게 서비스별로 설명드리기는 어렵고, 기회가 되면 IAM 설정을 포함한...
사용한 서비스들에 대해서 하나씩 글을 남겨보겠습니다.
(이글은 데모에 대한 내용일뿐이예요...)

마지막으로 다시 구조도를 보여드리겠습니다.







Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #1 (이전글)

Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #2 (현재글)

 

구조도


프로젝트 내용 


1. SAP 의 데이터를 취합하여 SAOP로 AWS Lambda 에 I/F 를 진행 
2. AWS Lambda 에서 취합한 데이터를 나누어 Amazon SNS(Simple Notification Service) 를 이용함. 

3. Amazon SNS에서 또 다른 AWS Lambda 로 분산하여 데이터를 던지고, AWS Lambda 에서는 관세청 (UNIPASS API)를 SAOP 로 호출하여 XML 기반 데이터를 리턴 받음.

4. 리턴 받은 데이터는 SAP 로 I/F 진행함.

5. 동일하게 Web 에서도 조금 더 빨리 확인할 수 있기를 원하는 데이터가 존재할 수 있기에 기능을 넣음.

6. 기능은 위에서 사용한 로직을 재활용하여 API Gateway 로 값을 던질 수 있도록함.

5. 진행 중에 발생한 오류 역시 Amazon SNS 를 이용하여 오류처리용 AWS Lambda 로 던짐.

6. 오류처리용 AWS Lambda 에서는 사내 RDBMS에 로그를 남김. (에이전트를 통해 운영담당자에게 메일 발송)

 

 

5. Amazon SNS 를 맛보다. 

 

Amazon SNS 를 이용하여 관세청 데이터를 처리하는 1개의 Lambda 에 큐를 쌓고 분산처리를 하니 가능해졌다.

 

'그래서 동시에 몇개까지 처리가 가능할까 ?'

 

우선 SNS 로 던질때 첫 Lambda의 동작은

 

  1) 500개씩 관세청API에 호출할 목록을 나누어 배열에 담는다.

  2) 배열은 루프를 통해 반복하여 SNS 를 호출하여 두번쨰 Lambda에 전달한다.

  3) 두번째 Lambda에서는 관세청 API를 호출하고 처리결과는 곧 바로 SAP로 I/F(인터페이스)해준다. 

 

데이터 처리 순서가 중요하지 않았기 때문에 먼저 끝나는놈은 먼저 끝내고 퇴근하면 된다.

(눈치보지 말고, 할거 다 했으면 칼퇴하자)

 

Amazon SNS 를 처리하려면 boto3라는 라이브러리를 사용해야한다. 

#Pyhton 으로 Amazon SNS 호출하는 예제 

client = boto3.client('sns')
response = client.publish(
            TargetArn = 'arn 값',
            Message = 'json 형식으로 가공한 데이터'
            Subject = '제목',
            MessageStructure='json'
        )

 

그리고 받는쪽에서는 아래 그림과 같이 받는다.

 

수신자가 받는 json 형식의 값

[출처] https://docs.aws.amazon.com/ko_kr/sns/latest/dg/sns-message-and-json-formats.html

 

 

여러 값을 활용할 수 있는데 나는 제목과 메시지를 활용하기로 했다.

Subject은 3개의 Lamba 중에 어디서 날린 놈인가, Message는 Lambda 의 결과 값을 넣었다.

 

이제 큐를 쌓기만 하지말고 동시성 작업에 대해 확인을 해야하는데, 부하를 줄이기 위해 10개씩만 띄우기로 했다. 

 

 

AWS Lambda 의 동시성 예약 설정

 

 람다1부터 람다3까지 한번에 테스트를 해보니 잘 되는것을 확인하였다. 

이거 의외로 너무 쉽게 끝났다. 

 

SNS가 추가된 구조도

 

6. 방심은 금물 웹에서도 호출을 ? API Gateway

 

아무리 주기적으로 데이터를 끌어온다 한들 실시간하고는 차이가 있을 수 밖에 없다. 

결과적으로 수동으로 동작하는것도 필요로 하게 되었고, 이에 대한 고민을 하기 시작했다. 

한 번의 액션으로 동일하게 처리하는것은 사실상 불필요하다. 원하는 데이터만 다시 가져오도록 구조를 만들어보자.

다행스럽게도 관세청 API를 호출하는 목록을 처리하는 부분에서 목록이 아닌 단일 데이터를 처리할 수 있는 로직만 추가하면 끝이였다.

 

그런데 이것을 어떻게 호출하지 ? 

 

사용하고 있는 웹에 기능을 넣는게 가장 알맞겠다. 웹에서 람다를 호출하기 위해서는 API Gateway 라는놈을 만져 줘야 한다.

 

API Gateway 가 궁금하면 링크로 가서 구경 

https://docs.aws.amazon.com/ko_kr/apigateway/latest/developerguide/api-gateway-basic-concept.html

 

 

API Gateway 를 만드는 과정은 의외로 간단했다.

메서드 실행에는 API 를 필요하게 설정을 잡는다. 

 

Amazon API Gateway 메서드 실행 설정

 

 

메서드 실행의 통합요청은 유형에 Lambda 를 체크하고, 매핑 템플릿은 json 으로 했다. 

 

그리고 호출받는 Lambda 에서는 성공시 일단은 "job success" 라는 간단한 문구로 성공했음을 알리도록 했다. 

(나중에 이건 변경할 예정이다.) 

API Gateway를 배포 진행하고, Postman을 사용해서 테스트를 해봤다.

 

 

헤더값에는 x-api-key 값을 넣고..

 

Postman 으로 테스트 진행
리턴 값 확인

 

본문에는 위에 Amazon SNS 호출할때 Message 의 형식을 최대한 맞추어 진행한다. Type 을 API 직접 호출과 Cron으로 나누어 해당 Lambda 에서는 어디서 호출되었냐에 따라 로그를 남기도록 수정하였다. 

요거로 이제 웹에서 호출하게 하고~ 마무리 해야겠다 했다.

 

API Gateway 까지 추가된 구조도

 

7. 준비는 끝났다.

 

생각을 해보니 주기적으로 실행을 어떻게 시키나 궁금했다.

클라우드 팀에서 안내해주더라... 클라우드워치로 크론탭기능이 가능하다고...

곧 바로 CloudWatch 통해서 원하는 시간 단위로 호출하는것을 찾아봤다.

규칙이란놈을 넣으면 끝나는군.. 

 

 

규칙 생성을 진행하는데, 이벤트 패턴과 일정 두 가지가 가능하더라. 

나는 일정으로 Cron식을 선택하여 매 20분마다 실행하도록 하였다. 그리고 대상 Lambda를 추가했다. 

 

 

 

표현식이 궁금하다면 링크를 참고하자.

https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/tutorial-scheduled-events-schedule-expressions.html

 

CloudWatch가 추가된 구조도

 

8. 끝날때까지 끝난게 아니다.. 에러처리!

 

모든 구성을 마치고 테스트를 진행하는 과정에 에러가 발생했다. 잘돌아가는가 싶더니 에러가 발생했더라.

에러처리가 빠졌다.

에러를 감지해서 어떻게 저장하고 어떻게 알려줘야할까 ?


처음에는 CloudWatch 를 활용해보려고 했다.
우선 클라우드팀의 도움으로 CloudWatch 로그로 에러를 보는법부터 찾는방법까지 안내받았다.
하지만 운영자가 보기에는 너무 어려운 방법이였다. 
(본인의 기준으로 사용자를 생각하면 가장 나쁜 개발자!)

CloudWatch 에 가장 깔끔하게 log 를 남기고 그내용을 한번씩 탐지하여 메일발송과 슬랙으로 푸쉬하는것이였다.
너무 기대한걸까?
CloudWatch 의 에러감지는 별로 좋지 않았다. 정확한 에러문구는 날려주지 않는것이었다.
(결국 운영자가 보기에 어쩌구 저쩌구한건 개소리였던걸로...)

'어쩔 수 없이 DB를 사용해야 한다라면 AWS의 DB들중에 골라서 사용할까?' 했지만 

내가 지갑을 쥐고 있는것도 아니고... 
그냥 사내에 있는 RDBMS를 이용하고 메일 발송으로 안내 하는걸로 했다.

모든 로직에 try catch 를 넣고 exception 내용을 하나하나 잘 담가둔다..
그리고 장애처리 Lambda 를 신규 생성하고 장애내역을 Lambda 로 쏜다..
거기에서는 RDBMS 에 Table에 Insert 한다.

장애는 3가지 유형이 있을것으로 예상된다.


- SAP의 통신 오류
- 관세청 API의 오류
- AWS의 오류

각 Lambda 에 오류처리하는 로직을 넣는다. 
이때 SNS를 통해 오류처리용 Lambda 로 전송한다. 
오류 처리용 Lambda 는 RDBMS에 데이터를 INSERT 해준다.

그런데...
pymssql 이 동작하질 않는다? 왜지 ?


거의 급발진 마냥 돌진하던 손이 키보드에서 떨어지는 순간이였다.
차분히 찾아보니 이것역시 lxml라이브러리에서 겪은 문제였다.

 

'분명히... 어느 슈퍼맨이 github에 올려뒀을꺼야..' 

없더라~~ 아니면 내가 못 찾은거던가...
결국 EC2에 접속...
그리고 pip3 pymssql install 진행하고 내려받은 놈의 경로로 접근해서 폴더를 몽땅 다운받았다.

SELECT 1 을 날리니 1이 돌아오길래 다 끝났다고 생각했다.
'오늘은 장사접고 위키나 정리해야겠다.' 하고 돌려두는데 이런 python에서 한글이 깨진다.

확인해보니 장애문구를 SNS로 전달하는 과정에 Message 를 json데이터로 변환시켜 값을 넘기는데,
json으로 변환하면 한글이 깨지더라.
(사실 이유를 알아가는 과정속에서 일반적인 프로그래밍언어의 글씨 깨짐 현상을 대처하는 여러삽질을 똑같이 했었다...)

json.dump를 쓸때 ensure_ascii=False 를 넣으면 끝..

 

9. 에필로그

 

결국 아래와 같은 구조도로 작업은 종료되었다.

간간히 발생하는 에러는 대부분 관세청API가 잘못된 리턴값을 던지는 경우이고, 현재는 10분 단위로 돌아가고 있다.

 

우선 작업을 내 멋대로 점점 크게 만드는데도 별 말없이 가만히 방치해둔 현재의 회사 갑님에게 감사합니다.

(물론 처음에는 이랬다 저랬다 어려웠지만..ㅠㅠ)

 

그리고 서포터를 해준 AWSKRUG의 운영자이자, 클라우드팀 류한진 님에게 감사합니다.

(사실 나는 끌려만 갔다... 이분이 자꾸 '요거를 저거를... 사용하면 된다.' 하면서 나를 살랑살랑 꼬셨다.. )

 

간간히 기만하게 나의 짜증을 받아준 강대명 님과 가만히 들어준 김대희 님에게 감사합니다.

 

구조도

 

 

Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #1 (이전글)

Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #2 (현재글)

Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #1 (현재글)

Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #2

 

구조도

 

프로젝트 내용 

 


1. SAP 의 데이터를 취합하여 SAOP로 AWS Lambda 에 I/F 를 진행 
2. AWS Lambda 에서 취합한 데이터를 나누어 Amazon SNS(Simple Notification Service) 를 이용함. 

3. Amazon SNS에서 또 다른 AWS Lambda 로 분산하여 데이터를 던지고, AWS Lambda 에서는 관세청 (UNIPASS API)를 SAOP 로 호출하여 XML 기반 데이터를 리턴 받음.

4. 리턴 받은 데이터는 SAP 로 I/F 진행함.

5. 동일하게 Web 에서도 조금 더 빨리 확인할수 있기를 원하는 데이터가 존재할수 있기에 기능을 넣음.

6. 기능은 위에서 사용한 로직을 재활용하여 API Gateway 로 값을 던질수 있도록함.

5. 진행중에 발생한 오류역시 Amazon SNS 를 이용하여 오류처리용 AWS Lambda 로 던짐.

6. 오류처리용 AWS Lambda 에서는 사내 RDBMS에 로그를 남김. (에이전트를 통해 운영담당자에게 메일 발송)

 

 


파이썬을 접하게 된 계기

 

2018년에 알고리즘 문제풀기 스터디그룹을 참여하고 있었는데,  
'Python 으로 한번 해볼까?' 라는 생각을 갖게되었다.

 

'책장에 파이썬책이 한 권 있긴한데..' 하며 책을 찾았고, 슬슬 보기 시작했다.

 

이 책은 2017년 12월중 한빛미디어의 송경석님 FB의 '파이썬 코리아' 그룹에서 
출간 이벤트를 진행하였고 당첨되어 '헤드 퍼스트 파이썬 (Python3)개정판'을 받았다.
(하지만 제대로 본적은 없었음...죄송)

 

Facebook Python Korea Group Event

 

 

Head First Python




그래서 2018년 12월에 처음 책을 펴보았다.
12월 중순경부터 해커랭크의 문제들을 python 으로 풀이하기 시작했다.

 


파이썬의 기본 학습에 도움 받은 사이트


점프 투 파이썬 https://wikidocs.net
파이썬 자습서 https://docs.python.org/ko/3/tutorial/index.html 

 

 



 

 

프롤로그 

 

업무중에 도와달라는 요청이 왔다.

종량제서버(윈도우서버)에 배치에서 오류가 발생한것 같은데 업체에서 대응이 느리니 좀 봐줄수 있냐고..

일단 끄덕끄덕하고 서버를 접속해보니 대충봤을때 java로 만든 수 많은 agent 가 실행되고 있었다.

신세계였다. 이게 관리가 될까? 심지어 서비스로 올리거나한것도 아닌 정말 창 하나하나 떠있다는...

도대체 이녀석들이 무엇을 위해 존재하는 애들인지 궁금해서 확인해보니.. 

관세청에 데이터를 SAP에 I/F 해주는 프로그램이라고 하더라..

관리자용 프로그램도 없고 그냥 agent 를 수십개를 띄워놓고 돌리고 있다니..

충격에 관세청 사이트를 접속해보았다. API를 제공하더라.. 그때부터 짐작이 가기 시작했다.

그래서 이거 얼마주고 쓰고 있는거냐 하니 XXXX만원이라고 하더라..

와... 세상에는 많은 사기꾼이 있다. 

그때부터였다. 내가 이걸 손대기 시작한것은...

 

 

 

 

1. 시작

 

 간단했다. 그냥 공공기관의 API를 호출해서 리턴받아 넣기만 하면 끝.

문서를 살펴보고 키값을 할당받아서 혼자서 값을 던져보고 리턴데이터를 확인해보니 예상대로 별거 없었다.

갑님에게 살포시 메일을 던졌다. '아마도 그 업체의 방식은 이러할것입니다.' 하고...

그러더니 각도기로 각을 재기 시작하더니 몇 가지 물어보더라.

결국 현업에게 전달되고, 계산기를 두드리기 시작하더라.. 그쯤에 느꼈다. 조만간 오겠구나 하고..

 예상대로 요청이 왔다. 처음에는 종량제 서버에 그냥 배치파일을 만들어서 돌리자고 하더라.

그런데 클라우드 팀에서 Serverless를 영업하기 시작했다.

그거에 또 혹하고 넘어가고선 배의 방향이 틀어지기 시작했다.

 

아래 구조도 처럼 처음에는 아주 심플하게  설계를 했다. 

 

SAP에서 데이터를 받아 관세청API 호출 목록을 만들고, 그 목록을 호출 한다.

1차 구조도

더 이상의 설명이 필요할까? 간단한거다. 

 

도구는 정해졌고, 이제 언어를 골라보자~~

우선시하는 언어는 역시 C#이였다. 하지만 Python을 맛보기 시작한 입장에서는 다른 욕구가 생기기 시작하더라...

 

'Python 이 크롤링에 많이 쓰이던데... '

 

'어차피 스크래핑이나 크롤링이나...' 하면서 Python 으로 찔끔찔끔 예제를 만들어보기 시작했다.

물론 .NET Core 2.1로 진행하는것에 대해서는 다른 개발자에게 맡기고서...

혼자 조용히 몰래 몰래 Python 으로 코딩을 틈틈히 하였는데, 나는 다 했는데 .NET Core 2.1로 진행하고 있는 개발자가 느리다. 이때다 싶어 내가 Python 으로 이미 다 만들어서 테스트도 해봤는데 잘 돌더라 Python 으로 가는것이 어떠냐 하고선 제의를 해보았다. 

 

처음에는 유지보수하는 개발자들도 C#이 주언어이기에 심한 반대를 하였으나, 결국 이미 Python으로 코딩이 종료되었음을 인정하고 힘이 빠졌나보다. Python으로 가는것에 수긍하기 시작했다.

 

 

AWS Lambda란 무엇입니까?

AWS Lambda는 서버를 프로비저닝하거나 관리하지 않고도 코드를 실행할 수 있게 해주는 컴퓨팅 서비스입니다. AWS Lambda는 필요 시에만 코드를 실행하며, 하루에 몇 개의 요청에서 초당 수천 개의 요청까지 자동으로 확장이 가능합니다. 사용한 컴퓨팅 시간에 대해서만 요금을 지불하면 되고 코드가 실행되지 않을 때는 요금이 부과되지 않습니다. AWS Lambda에서는 사실상 모든 유형의 애플리케이션이나 백엔드 서비스에 대한 코드를 별도의 관리 없이 실행할 수 있습니다. 

 

[출처] https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/welcome.html

 

2. AWS 에 올려보자

 

 여러개의 람다를 한번에 올려보니 '공통된 라이브러리로 동작을 하는데 뭉탱이로 이렇게 무식하게 하나씩 올려야하나?' 라는 생각을 갖게 되었다. 

클라우드팀에서 공통소스는 '레이어를 사용하세요' 라고 안내를 해주었다. 

 

Layers 이라고 존재하는것을 확인 했다. 요놈의 특징은 버전별로 엎어치기 개념이다. (하지만 과거 버전도 Lambda 에서는 사용이 가능하다.)

우선 pip list 로 사용한 라이브러리들을 확인하고 경로에 들어가 복사하여 하나의 폴더에 몰아 넣었다. 

 

그런데 레이어를 올리는 폴더구조에도 규칙이 있더라.

(https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/configuration-layers.html)

 

 

파이썬의 폴더 구조

Python 의 경우는 위의 사진처럼  [폴더명/python/라이브러리] 의 구조로 만들어야 한다.

폴더명은 아무거나해도 상관없다. 대표적인 라이브러리명이나 해당 레이어의 특성에 맞추어 네이밍을 하면 될 것같다.

 

 

AWS Lambda 계층

Lambda 함수는 추가 코드와 콘텐츠를 계층의 형태로 가져오도록 구성할 수 있습니다. 하나의 계층은 라이브러리, 사용자 지정 런타임 또는 그 외 종속성을 포함하는 ZIP 아카이브입니다. 배포 패키지에 라이브러리를 포함시킬 필요 없이 계층을 통해 함수에서 라이브러리를 사용할 수 있습니다.

 

[출처] https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/configuration-layers.html

 

3. 환경의 문제 

 

 처음부터 문제가 발생했다. 나의 작업환경은 윈도우 AWS Lambda 는 리눅스....

lxml 라이브러리를 사용하였을때 AWS Lambda 에서 동작이 불가능했다. 
파이썬 코리아 그룹에 질문글을 작성하고, Zeep 의 github에서 동일한 이슈가 존재하는지 확인해봤다.

(참고로 SAOP 통신은 Zeep 라는 라이브러리를 사용했고, 라이브러리 내부에 lxml이 존재한다.)


확인해보니 동일한 증상을 겪는사람이 있었음!!!

 

이슈 발생


https://github.com/JFox/aws-lambda-lxml 에 AWS Lambda 환경과 동일한 환경에서 
Build 를 진행한 사람이 올려둔 소스를 발견 하였다.


lxml 라이브러리에는 헤더파일이 존재하였고 Build 를 해야 처리가 가능하단점을 알게 되었다.

슈퍼맨이 나타났다.


동시에 파이썬 코리아 그룹에서도 알려주시는 고마운분이 있었다.

 

파이썬 코리아 그룹의 도움

 

그리고 아는 사람들중 가장 기만하신분이 따로 또 알려주시기도 했다. ㅋ

 

기만자

 

덕분에 github 에 조용히 접속 하여 클론하였고, 실행해보니 성공.

 

 

4. 첫 번째 벽을 느끼다. 

 

 어찌어찌 돌아가는 모습을 볼 수 있었다. 그런데 생각보다 많은 양을 처리해야 하기 때문에 러닝타임이 길더라...

(역시나 예상대로 뻗어버리기 시작한다. )

 

 원래는 심플하게 AWS Lambda 한 개의 함수로 모두 처리를 하고자 하였으나, 데이터의 양에 따라 처리하는 시간이 오래 걸리더라. 그래서 병렬처리를 진행해야봐야겠다 싶었는데 먼저 Python으로 해결을 해보고자 하는 마음에 병렬처리를 멀티스레드나 멀티프로세싱으로 해보자 했다. 

로컬에서는 성공적이였으나 처리 시간이 20분이 넘는 상태였다. (물론 직렬로 진행하면 로컬에서도 뻗어버림)

그래서 AWS Lambda의 메모리를 올리면서 계속 테스트를 진행을 해봤는데... 하다 하다가 너무 많은 메모리에 느린 처리 시간에 이거는 cron 기준 타임보다 오래 걸린다. 이렇게 진행할 바에는 그냥 서버 하나에 agent 만들어서 24시간 돌리는 게 나을 거 같다라는 생각까지 하게 되었다...

 

- AWS Lambda의 사양에 맞추어 봤을 때 상당히 비효율적이었음. (돌다가 뻗어버림)

 

AWS Lambda를 SAP에서 I/F 해서 데이터를 가공하는 놈

가공한데이터를 받아서 관세청에 던지고 리턴받은 데이터를 가공하여 SAP로 I/F 처리하는 놈 

 

총 2개로 나누어봤다. 

 

2차 구조도

 

하지만 여전히... 뻗음..

 

 

 

AWS Lambda 는 러닝타임과 메모리사용량에 비례하여 청구되는데, 해당 프로젝트는 실시간에 가까운 데이터 폴링 작업이라 비용에 대한 이슈보다는 안전한 데이터 폴링이 주된 목표였다. 원래 세 개의 기능정도가 있는데 워크 로드를 알기 어려워서 각각의 영향도를 낮추기 위해서 세 개로 분리했다. 어느정도 도움은 된듯하나 역시 원활하지 않았다.

 

 

 

광고 아니고 캡처해서 올린 사진 SNS

 

결국엔 AWS 서비스를 활용해서 처리하는 방법을 고안하는데 SQS, SNS 두 가지에서 많은 고민을 했었다.

Amazon SNS 를 사용하니 원하던 분산처리가 자연스럽게 처리가 되었다.

 

 

Amazon Simple Notification Service란 무엇입니까?

Amazon Simple Notification Service(Amazon SNS)는 구독 중인 endpoint 또는 클라이언트에 메시지 전달 또는 전송을 조정 및 관리하는 웹 서비스입니다. 

 

[출처] https://docs.aws.amazon.com/ko_kr/sns/latest/dg/welcome.html

 

 

Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #1 (현재글)

Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #2 (다음글)

 

 

+ Recent posts