MASOCON 2019 데모 자료 입니다.
발표 내용을 참고해주시면 됩니다.
- 사이트 : https://www.imaso.co.kr/masocon2019
- 발표자료 : https://www.slideshare.net/MinJunKim5/masocon-2019-serverless-kimminjun (슬라이드쉐어)
https://speakerdeck.com/microsoftware/masokon-2019-seobeoriseureul-hwalyonghan-bunsan-ceori-gimminjun (스피커덱 - 마소콘공식)
데모를 무엇으로 준비할지 고민이 엄청 많았습니다.
공공기관의 API는 대부분 한도가 있다보니 적절한 예시를 찾아야 했는데...
관세청에서 찾았습니다.
관세청API에서 '관세환율'을 이용했습니다.
먼저 관세청API에 대해 설명을 하자면, 1:N 의 데이터 입니다. XML로 리턴받으며 GET방식으로 1개씩 받아올 수 있습니다.
아래 이미지는 관세청 API 문서의 일부입니다.
(출처 : https://unipass.customs.go.kr)
형태는 일반적인 공공기관의 API와 크게 다르지는 않습니다.
요청메시지는 단순하게 날짜와 수출입 구분만 넣습니다.
응답 메시지는 국가부호/화폐단위명/환율/통화부호/적용개시일자/수출입구분 총 6개의 필드로 구성되어 있습니다.
요청/응답에 대한 예시입니다.
인증키는 개별적으로 신청하여 받으시면 됩니다. 과정에 대해서는 생략합니다.
구조도입니다.
1. AWS Console을 이용하여 실행.
2~5. API Gateway로 처리, Lambda에서 데이터(날짜범위)값을 갖고 옴.
6~8. SNS를 통해 Invoke된 Lambda는 각각 할당 받은 만큼 처리한다.
9. 처리가 완료된 데이터는 DynamoDB로 저장한다.
Lambda에 접근을 하면 좌측 메뉴중에 가장 하단에 '계층'이 존재합니다.
이미지는 제가 이미 등록해둔 라이브러리 패키징이네요. 등록한 'python36-lib'를 계속해서 재사용하여, Lambda를 실행합니다.
우선 위의 API에 필요한 날짜값을 갖고오는 Lambda를 준비합니다.
Designer에 Lambda이름 밑에 Layers를 등록되어 있는모습이 보입니다.
Layers는 최대 5개를 등록할수 있어요. (라이브러리가 중복되거나 하는 문제가 발생할 수 있습니다.)
그리고 API Gateway도 확인할 수 있어요. API Gateway를 생성하여 Lambda와 연결한 상태입니다.
API Gateway를 이용하여 처음 시작하는 Lambda가 위의 이미지의 Lambda를 호출해서 날짜값을 갖고 올꺼예요.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | #-*- coding:utf-8 -*- import datetime from datetime import date import json def lambda_handler(event, context): strdate = '' for i in range(1, 3660): # for i in range(1, 31): datedata = datetime.date.today() + datetime.timedelta(-i) if (i != 1): strdate = strdate + ',' + datedata.strftime('%Y%m%d') else : strdate = datedata.strftime('%Y%m%d') # print(datedata.strftime('%Y%m%d')) return strdate # if __name__ == '__main__': # lambda_handler('',''); | cs |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | #-*- coding:utf-8 -*- import json import zeep import urllib3 import xml.etree.ElementTree as ET import time import ast import boto3 import requests import os import re from urllib.request import urlopen from requests.auth import HTTPBasicAuth from requests import Session from zeep import Client from zeep.transports import Transport from bs4 import BeautifulSoup def errorCheck(msg) : print(msg) def get_unipass_data(list): """ # get_unipass_data # (list) """ print('# [get_unipass_data] Count : ', len(list)) # print(list) print(os.environ['ARN_SCRAPING']) try: print('# [SNS 전송 시작]') client = boto3.client('sns') response = client.publish( TargetArn = os.environ['ARN_SCRAPING'], Message = json.dumps({'default': json.dumps(list)}), Subject = 'PY36_MASOCON_DEMO_UNIPASS_URL', MessageStructure='json' ) print('# [SNS 전송 끝]') except Exception as e: print('# [SNS 전송 오류]') print(e) return errorCheck(e,'') return 'OK' def url_list(): try: headers = { 'x-api-key': os.environ['xapikey'], 'Content-type':'application/json;charset=utf-8', 'Accept':'*/*' } data = {} resp = requests.post(os.environ['api_url'], headers=headers, data=json.dumps(data)) dict = resp.text.split(',') url_list = [] if len(dict) > 0 : for i in dict: url_list.append(os.environ['unipass_url'] + i.replace("\"","")) print('# 총 건수 : ', len(url_list)) data = [] if len(url_list) > 0 : count = 0 total_group_count = 0 for i in url_list: data.append(url_list[count]) count = count + 1 if (len(data)/36) == 1 : get_unipass_data(data) total_group_count = total_group_count + 1 data.clear() #break if len(data) > 0 : total_group_count = total_group_count + 1 get_unipass_data(data) data.clear() print('# 36개씩 나눈 작업 개수 : ', total_group_count) return url_list except Exception as e: print(e) return errorCheck(e) def lambda_handler(event, context): url_list() # if __name__ == '__main__': # lambda_handler('','') | cs |
그럼 Amazon SNS를 통해 전달받은 Lambda를 보여드릴게요.
SNS가 연결되어 있는것을 볼 수 있습니다. 활성화도 되어 있구요.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | #-*- coding:utf-8 -*- ###################### ############## lib ###################### import json import zeep import urllib3 import xml.etree.ElementTree as ET import time import ast import boto3 from urllib.request import urlopen from requests.auth import HTTPBasicAuth from requests import Session from zeep import Client from zeep.transports import Transport from bs4 import BeautifulSoup from multiprocessing import Process def dynamo_insert(list): """ # get_unipass_data # (list) """ print('# [DynamoDB] Count : ', len(list)) try: print('# [SNS 전송 시작]') client = boto3.client('sns') response = client.publish( TargetArn = 'ARN값을 넣습니다...', Message = json.dumps({'default': json.dumps(list)}), Subject = 'PY36_MASOCON_DEMO_UNIPASS_SCRAPING', MessageStructure='json' ) print('# [SNS 전송 끝]') except Exception as e: print('# [SNS 전송 오류]') print(e) return errorCheck(e,'') return 'OK' def errorCheck(msg) : print(msg) def change_date_format(content): if content != None: return content[0:4]+'-'+ content[4:6] + '-' + content[6:8] def Unipass_scraping(list): count = 0 print('# [Unipass_scraping] Count : ', len(list)) err_url = '' try: if len(list) > 0 : for url in list: err_url = url count = count + 1 tree = ET.ElementTree(file=urlopen(url)) tCnt = 0 tCnt = int(tree.find('tCnt').text) print(tCnt) list_trifFxrtInfoQryRsltVo = [] if tCnt > 0: for item in tree.findall('trifFxrtInfoQryRsltVo'): if item.find('cntySgn').text == "KR" : trifFxrtInfoQryRsltVo = { 'cntySgn' : item.find('cntySgn').text if item.find('cntySgn').text != None else None, 'mtryUtNm' : item.find('mtryUtNm').text if item.find('mtryUtNm').text != None else None, 'fxrt' :item.find('fxrt').text if item.find('fxrt').text != None else None, 'aplyBgnDt' : change_date_format(item.find('aplyBgnDt').text) if item.find('aplyBgnDt').text != None else None, 'currSgn':item.find('currSgn').text if item.find('currSgn').text != None else None, 'imexTp' : 2 } dynamo_insert(trifFxrtInfoQryRsltVo) err_url = '' print('success') except Exception as e: print('# [Unipass_scraping] Error : ', e) print('# [Unipass_scraping] err_url : ', err_url) # pass return errorCheck(e,err_url) def lambda_handler(event, context): if 'API' in event : print('# 작업명 (Event == API)') job_type = event['API'][0]['Type'] message = json.dumps(event['API'][0]['Data']) messageList = ast.literal_eval(message) Unipass_scraping(messageList) return job_type elif 'Records' in event : print('# 작업명 (Event == Records)') job_type = event['Records'][0]['Sns']['Subject'] message = event['Records'][0]['Sns']['Message'] messageList = ast.literal_eval(message) Unipass_scraping(messageList) return job_type else : return 'not working' | cs |
핸들러는 이벤트 값을 받고, 이벤트 값에 제가 API 인지 Records인지 구분을 합니다.
발표내용을 보면 'API Gateway를 통해서 직접 실행할수 있게 구현했다.' 라는 내용이 있는데요. 그 내용입니다.
간단하게 구분값을 API / Records 로 했어요. 그리고 Subject에 타입을 넣어서 던졌어요.
실제 구현한 소스에는 Subject를 기준으로 분기처리도 했었어요.
message에는 데이터를 담았어요. 그래서 message 를 list로 만들어서 Unipass_scraping으로 던집니다.
처음에 언급한바와 같이 관세청 API는 1:N 이예요. 1개씩 던져서 여러행을 받습니다.
list 를 반복하여 관세청 API를 통신하고, 결과 값을 받아서 DynamoDB에 만든 TEST테이블에 Insert합니다.
(저는 리턴받은 데이터중 KR만 뽑아서 담았어요)
DynamoDB를 Insert하는 Lambda에 SNS를 이용해서 던집니다.
DynamoDB에서 테이블은 간단하게 1개의 키로만 구성했어요. (DynamoDB는 NoSQL입니다.)
DynamoDB에 Insert하는 Lambda의 소스입니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | import json import ast import boto3 def dynamoDB_INSERT(data): dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2') dynamodb.Table('MASOCON_DEMO').put_item( Item={ 'DATA': data } ) return 'success' def lambda_handler(event, context): if 'API' in event : print('# 작업명 (Event == API)') job_type = event['API'][0]['Type'] message = json.dumps(event['API'][0]['Data']) messageList = ast.literal_eval(message) dynamoDB_INSERT(message) return 'success' elif 'Records' in event : print('# 작업명 (Event == Records)') job_type = event['Records'][0]['Sns']['Subject'] message = event['Records'][0]['Sns']['Message'] messageList = ast.literal_eval(message) print(message) dynamoDB_INSERT(message) return 'success' else : # data = 'aaaa' # dynamoDB_INSERT(data) return 'not working' | cs |
'CLOUD' 카테고리의 다른 글
Azure AD DS(Active Directory Domain Services) (0) | 2021.04.08 |
---|---|
Azure AD(Active Directory) 와 Windows AD(Active Directory) (0) | 2021.03.24 |
[MASOCON 2019] 컨퍼런스 트랙 - 서버리스를 활용한 분산 처리 DEMO (0) | 2019.11.28 |
Pyhton(v3.6)으로 AWS 를 활용한 분산 처리 #2 (0) | 2019.05.14 |
Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #1 (0) | 2019.05.11 |
Visual Studio Code 로 .NET Core 2.1 사용하여 AWS Lambda 만들기 (0) | 2019.05.08 |