S3にログを上げると自動でAmazon Elasticsearch Serviceへ投入するよう自動化に挑戦してみた。以下のページを読めば普通にできる。
Amazon Elasticsearch Service にストリーミングデータをロードする – Amazon Elasticsearch Service
アーキテクチャ
S3 -> Lambda -> Amazon Elasticsearch Service
いずれのサービスともリージョンが同一になるよう注意する。
S3のバケット作成
tfファイルを残すほどでもないのレベルではあるものの、一貫性を保つために残しておく。
provider "aws" {
region = "ap-northeast-1"
}
resource "aws_s3_bucket" "b" {
bucket = "log-store-base"
acl = "private"
tags = {
Name = "My bucket"
Environment = "Staging"
}
}
Elasticsearch Serviceのドメイン作成
インスタンスは月750時間まで無料枠があるt2.micro.elasticsearchを選んだ。無料枠があるストレージはSSD 10GiBを選んだ。IP制限をしている箇所の 0.0.0.0/32 は適宜
provider "aws" {
region = "ap-northeast-1"
}
variable "domain" {
default = "reiwa0407"
}
data "aws_region" "current" {}
data "aws_caller_identity" "current" {}
resource "aws_elasticsearch_domain" "es" {
domain_name = "${var.domain}"
elasticsearch_version = "6.4"
cluster_config {
instance_type = "t2.small.elasticsearch"
}
ebs_options {
ebs_enabled = true
volume_size = 10
}
snapshot_options {
automated_snapshot_start_hour = 23
}
access_policies = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "es:*",
"Principal": "*",
"Effect": "Allow",
"Resource": "arn:aws:es:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:domain/${var.domain}/*",
"Condition": {
"IpAddress": {"aws:SourceIp": ["0.0.0.0/32"]}
}
}
]
}
POLICY
}
Lambdaの関数作成
GUI操作かtfで設定
プログラムの作成
nginxのログフォーマット(CentOS 7へパッケージ導入したnginxから抽出)
log_format main '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"';
apacheのログフォーマット(CentOS 7へパッケージ導入したapacheから抽出)
LogFormat "%h %l %u %t "%r" %>s %b "%{Referer}i" "%{User-Agent}i"" combined
Pythonのサンプルコード
Amazon Elasticsearch Service にストリーミングデータをロードする – Amazon Elasticsearch Service
Node.jsのサンプルコード
そのままだとApacheとNginxの両方に対応していないので修正する。また、エラー時にCloudWatchコンソールへメッセージを出力する処理を追加する。
import re
from datetime import datetime as dt
def parser(line):
json_body = {}
'''
Pickup enclosed "item" in double quotes
'''
pattern_quote = re.compile(r'("[^"]+")')
quote_items = pattern_quote.findall(line)
# remove blank item in list and double-quote in string
quote_items = [tmp.replace('"', '') for tmp in quote_items]
if len(quote_items) < 1:
raise ValueError
try:
request_line = quote_items[0].split()
json_body['method'] = request_line[0].upper()
json_body['request_uri'] = request_line[1]
json_body['http_version'] = request_line[2].upper()
json_body['referer'] = quote_items[1]
json_body['user_agent'] = quote_items[2]
except Exception as e:
print(e)
print("t", line)
return {}
# remove matched item in list
line = re.sub(pattern_quote, '', line)
'''
Pickup item splited by space
'''
request_items = [l for l in line.split() if l]
try:
json_body['source_ip'] = request_items[0]
json_body['remote_user'] = request_items[2]
date_str = request_items[3].replace('[', '')
date = dt.strptime(date_str, '%d/%b/%Y:%H:%M:%S')
json_body['time_stamp'] = date.strftime('%Y-%m-%d %H:%M:%S')
json_body['time_zone'] = request_items[4].replace(']', '')
json_body['status_code'] = request_items[5]
json_body['body_bytes'] = request_items[6]
except Exception as e:
print(e)
print("t", line)
return {}
'''
for p,q in json_body.items():
print(p,q)
'''
return json_body
if __name__ == '__main__':
'''
res = parser('192.168.0.182 - - [07/Apr/2019:17:35:45 +0900] "GET / HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.40 Safari/537.36" "-"')
print(res)
'''
import sys
with open(sys.argv[1]) as f:
#print(f.read())
for l in f.read().splitlines():
r = parser(l)
for k,v in r.items():
# print(k, ":", v)
pass
終わりに
AWS Kinesis Data Firehoseを使うアプローチのほうがよりリアルタイム性が高くなるらしいので、検証してみたいと思います。
Amazon Kinesis Data Firehose とは – Amazon Kinesis Data Firehose
コメントを残す