分散システムとポエム

WebサーバのログをS3→Lambda→AWS ElasticSearchで解析

S3にログを上げると自動でAmazon Elasticsearch Serviceへ投入するよう自動化に挑戦してみた。以下のページを読めば普通にできる。

Amazon Elasticsearch Service にストリーミングデータをロードする – Amazon Elasticsearch Service

アーキテクチャ

S3 -> Lambda -> Amazon Elasticsearch Service

いずれのサービスともリージョンが同一になるよう注意する。

S3のバケット作成

tfファイルを残すほどでもないのレベルではあるものの、一貫性を保つために残しておく。

provider "aws" {
  region = "ap-northeast-1"
}

resource "aws_s3_bucket" "b" {
  bucket = "log-store-base"
  acl    = "private"

  tags = {
    Name        = "My bucket"
    Environment = "Staging"
  }
}

Elasticsearch Serviceのドメイン作成

インスタンスは月750時間まで無料枠があるt2.micro.elasticsearchを選んだ。無料枠があるストレージはSSD 10GiBを選んだ。IP制限をしている箇所の 0.0.0.0/32 は適宜

provider "aws" {
  region = "ap-northeast-1"
}

variable "domain" {
  default = "reiwa0407"
}

data "aws_region" "current" {}

data "aws_caller_identity" "current" {}

resource "aws_elasticsearch_domain" "es" {
  domain_name = "${var.domain}"
  elasticsearch_version = "6.4"

  cluster_config {
    instance_type = "t2.small.elasticsearch"
  }

  ebs_options {
    ebs_enabled = true
    volume_size = 10
  }

  snapshot_options {
    automated_snapshot_start_hour = 23
  }

  access_policies = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "es:*",
      "Principal": "*",
      "Effect": "Allow",
      "Resource": "arn:aws:es:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:domain/${var.domain}/*",
      "Condition": {
        "IpAddress": {"aws:SourceIp": ["0.0.0.0/32"]}
      }
    }
  ]
}
POLICY
}

Lambdaの関数作成

GUI操作かtfで設定

プログラムの作成

nginxのログフォーマット(CentOS 7へパッケージ導入したnginxから抽出)

log_format main '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"';

apacheのログフォーマット(CentOS 7へパッケージ導入したapacheから抽出)

LogFormat "%h %l %u %t "%r" %>s %b "%{Referer}i" "%{User-Agent}i"" combined

Pythonのサンプルコード

Amazon Elasticsearch Service にストリーミングデータをロードする – Amazon Elasticsearch Service

Node.jsのサンプルコード

amazon-elasticsearch-lambda-samples/s3_lambda_es.js at master · aws-samples/amazon-elasticsearch-lambda-samples

そのままだとApacheとNginxの両方に対応していないので修正する。また、エラー時にCloudWatchコンソールへメッセージを出力する処理を追加する。

import re
from datetime import datetime as dt

def parser(line):
    json_body = {}

    '''
    Pickup enclosed "item" in double quotes
    '''
    pattern_quote = re.compile(r'("[^"]+")')
    quote_items = pattern_quote.findall(line)

    # remove blank item in list and double-quote in string
    quote_items = [tmp.replace('"', '') for tmp in quote_items]
    if len(quote_items) < 1:
        raise ValueError

    try:
        request_line = quote_items[0].split()
        json_body['method'] = request_line[0].upper()
        json_body['request_uri'] = request_line[1]
        json_body['http_version'] = request_line[2].upper()
        json_body['referer'] = quote_items[1]
        json_body['user_agent'] = quote_items[2]
    except Exception as e:
        print(e)
        print("t", line)
        return {}

    # remove matched item in list
    line = re.sub(pattern_quote, '', line)

    '''
    Pickup item splited by space
    '''
    request_items = [l for l in line.split() if l]

    try:
        json_body['source_ip'] = request_items[0]
        json_body['remote_user'] = request_items[2]
        date_str = request_items[3].replace('[', '')
        date = dt.strptime(date_str, '%d/%b/%Y:%H:%M:%S')
        json_body['time_stamp'] = date.strftime('%Y-%m-%d %H:%M:%S')
        json_body['time_zone'] = request_items[4].replace(']', '')
        json_body['status_code'] = request_items[5]
        json_body['body_bytes'] = request_items[6]
    except Exception as e:
        print(e)
        print("t", line)
        return {}

    '''
    for p,q in json_body.items():
        print(p,q)
    '''

    return json_body


if __name__ == '__main__':
    '''
    res = parser('192.168.0.182 - - [07/Apr/2019:17:35:45 +0900] "GET / HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.40 Safari/537.36" "-"')
    print(res)
    '''
    import sys
    with open(sys.argv[1]) as f:
        #print(f.read())
        for l in f.read().splitlines():
            r = parser(l)
            for k,v in r.items():
                # print(k, ":", v)
                pass

終わりに

AWS Kinesis Data Firehoseを使うアプローチのほうがよりリアルタイム性が高くなるらしいので、検証してみたいと思います。

Amazon Kinesis Data Firehose とは – Amazon Kinesis Data Firehose

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です