S3にログを上げると自動でAmazon Elasticsearch Serviceへ投入するよう自動化に挑戦してみた。以下のページを読めば普通にできる。
Amazon Elasticsearch Service にストリーミングデータをロードする – Amazon Elasticsearch Service
アーキテクチャ
S3 -> Lambda -> Amazon Elasticsearch Service
いずれのサービスともリージョンが同一になるよう注意する。
S3のバケット作成
tfファイルを残すほどでもないのレベルではあるものの、一貫性を保つために残しておく。
provider "aws" { region = "ap-northeast-1" } resource "aws_s3_bucket" "b" { bucket = "log-store-base" acl = "private" tags = { Name = "My bucket" Environment = "Staging" } }
Elasticsearch Serviceのドメイン作成
インスタンスは月750時間まで無料枠があるt2.micro.elasticsearchを選んだ。無料枠があるストレージはSSD 10GiBを選んだ。IP制限をしている箇所の 0.0.0.0/32 は適宜
provider "aws" { region = "ap-northeast-1" } variable "domain" { default = "reiwa0407" } data "aws_region" "current" {} data "aws_caller_identity" "current" {} resource "aws_elasticsearch_domain" "es" { domain_name = "${var.domain}" elasticsearch_version = "6.4" cluster_config { instance_type = "t2.small.elasticsearch" } ebs_options { ebs_enabled = true volume_size = 10 } snapshot_options { automated_snapshot_start_hour = 23 } access_policies = <<POLICY { "Version": "2012-10-17", "Statement": [ { "Action": "es:*", "Principal": "*", "Effect": "Allow", "Resource": "arn:aws:es:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:domain/${var.domain}/*", "Condition": { "IpAddress": {"aws:SourceIp": ["0.0.0.0/32"]} } } ] } POLICY }
Lambdaの関数作成
GUI操作かtfで設定
プログラムの作成
nginxのログフォーマット(CentOS 7へパッケージ導入したnginxから抽出)
log_format main '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"';
apacheのログフォーマット(CentOS 7へパッケージ導入したapacheから抽出)
LogFormat "%h %l %u %t "%r" %>s %b "%{Referer}i" "%{User-Agent}i"" combined
Pythonのサンプルコード
Amazon Elasticsearch Service にストリーミングデータをロードする – Amazon Elasticsearch Service
Node.jsのサンプルコード
そのままだとApacheとNginxの両方に対応していないので修正する。また、エラー時にCloudWatchコンソールへメッセージを出力する処理を追加する。
import re from datetime import datetime as dt def parser(line): json_body = {} ''' Pickup enclosed "item" in double quotes ''' pattern_quote = re.compile(r'("[^"]+")') quote_items = pattern_quote.findall(line) # remove blank item in list and double-quote in string quote_items = [tmp.replace('"', '') for tmp in quote_items] if len(quote_items) < 1: raise ValueError try: request_line = quote_items[0].split() json_body['method'] = request_line[0].upper() json_body['request_uri'] = request_line[1] json_body['http_version'] = request_line[2].upper() json_body['referer'] = quote_items[1] json_body['user_agent'] = quote_items[2] except Exception as e: print(e) print("t", line) return {} # remove matched item in list line = re.sub(pattern_quote, '', line) ''' Pickup item splited by space ''' request_items = [l for l in line.split() if l] try: json_body['source_ip'] = request_items[0] json_body['remote_user'] = request_items[2] date_str = request_items[3].replace('[', '') date = dt.strptime(date_str, '%d/%b/%Y:%H:%M:%S') json_body['time_stamp'] = date.strftime('%Y-%m-%d %H:%M:%S') json_body['time_zone'] = request_items[4].replace(']', '') json_body['status_code'] = request_items[5] json_body['body_bytes'] = request_items[6] except Exception as e: print(e) print("t", line) return {} ''' for p,q in json_body.items(): print(p,q) ''' return json_body if __name__ == '__main__': ''' res = parser('192.168.0.182 - - [07/Apr/2019:17:35:45 +0900] "GET / HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.40 Safari/537.36" "-"') print(res) ''' import sys with open(sys.argv[1]) as f: #print(f.read()) for l in f.read().splitlines(): r = parser(l) for k,v in r.items(): # print(k, ":", v) pass
終わりに
AWS Kinesis Data Firehoseを使うアプローチのほうがよりリアルタイム性が高くなるらしいので、検証してみたいと思います。
Amazon Kinesis Data Firehose とは – Amazon Kinesis Data Firehose
コメントを残す