Filebeat ModulesとLogstashの組み合わせ方 #Elastic #Elasticsearch

この記事は1年以上前に投稿されました。情報が古い可能性がありますので、ご注意ください。
Elastic Stackを使って日頃から様々なログを解析しているとFilebeatに頼ることが多くなってきます。
今回は、Filebeat Modulesを利用して、GSuiteやZoom、O365などのクラウドサービスのログを収集し
活用していく上でふと疑問に感じたことについてまとめてみました。
Filebeat Modulesとは
2017年3月28日にリリースされたバージョン5.3以降で登場した機能です。
Filebeat Modulesを用いることでサポートされるログの収集、加工、可視化を自動的に処理してくれます。
Filebeat Modulesが収集したログをElasticsearchのingest機能で加工してIndexに格納します。

ingestの設定はKibanaで確認することができます。

現最新バージョンの7.11では全部で65種類のModulesが用意されています。
全部のModuleではありませんが、Kibanaダッシュボードのテンプレートも付属しています。
【参考】
・ Filebeat modules
今回実施したこと
Auditd Modules1を利用して、Linux OSの監査ログをElasticsearchに取り込みます。
同時に同じログを長期保管目的でGoogle Cloud Storage(以降、GCS)2にも格納したいと思います。
しかし、Filebeatの出力先には、以下の7つしか利用できません。
また、現行バージョンでは複数の出力先に出すこともできません。
- Elasticsearch Service
 - Elasticsearch
 - Logstash
 - Kafka
 - Redis
 - File (Local)
 - Console (Local)
 
複数の出力先に同時にログ出力したり、上記にない出力先にログを出したい場合は
FilebeatとElasticsearchの間にLogstash3を挟む必要があります。
しかし、Filebeat Modulesを利用する場合は加工処理はElasticsearchのingest機能がおこないます。
Modules利用時にLogstashを挟んでGCSに送るとどうなるのか、今回はそれを実際に試してみました^^
必要なサブスクリプション
- Filebeat ModulesはOSSもしくはBASICで利用できるため全て無償です。
 - サブスクリプションページ4の記載内容とFilebeat Modulesページの記載内容に少し差異があります。
 - 下記図のX-Packと記載されているものはBASICで利用できるModulesになります。
 
【Filebeat Modulesページのサンプル】

利用環境
| 項目 | 内容 | 
|---|---|
| Elasticsearch | 7.11.1 | 
| Kibana | 7.11.1 | 
| Logstash | 7.11.1 | 
| Filebeat | 7.11.1 | 
| JDK version | "11.0.10" 2021-01-19 LTS | 
| VM image | centos-7-v20201112 | 
| Region | us-central1 (アイオワ) | 
※投稿時点における最新版を採用しています。
【構成図】
・ FilebeatがインストールされているLinux OSの監査ログをAuditd Modulesを利用して取得します。
・ ログをLogstashに転送し、加工せずにElasticsearchとGCSに出力します。

【補足】
・ GCPのVPCネットワークは、defaultネットワークを利用しています。
・ 各VM間の通信は全て許可されています。
・ 本投稿では、各VMマシンの作成および各OSへのElasticプロダクトのインストール手順は省略しています。
・ KibanaからElasticsearchに接続できる設定となっています。
【参考】
・Elasticsearchのインストール方法
・Kibanaのインストール方法
・Logstashのインストール方法
・Filebeatのインストール方法
実施手順
- 下記手順で設定を実施しました。
 
- GCSのバケット作成
 - サービスアカウントの作成
 - Filebeatの設定
 - Auditd Modulesの導入
 - Logstashの設定
 - Kibanaでログ閲覧
 - GCSのログ閲覧
 
1. GCSのバケット作成
- あえてCLIで設定を作っていきます。
 - GCPコンソールにログインし、Cloud Shellを起動します。
 

- 下記コマンドを実行して、GCSのバケットを作成します。
 
$ gsutil mb -p unique-voyage-xxxxxx -c nearline -l us-central1 -b on gs://log-bucket-2021
- 設定値は下記の通りです。GCPコンソールからGUIで設定してもOKです。
 
| 項目 | パラメータ | 
|---|---|
| バケット名 | log-bucket-2021 | 
| ロケーションタイプ | Region | 
| ロケーション | us-central1 (アイオワ) | 
| ストレージクラス | Nearline | 
| アクセス制御 | 均一 | 
| 暗号化 (オプション) | Googleが管理する鍵 (デフォルト) | 
| 保持ポリシー (オプション) | 未設定 (デフォルト) | 
| ラベル (オプション) | 未設定 (デフォルト) | 
【補足】
・ 「unique-voyage-xxxxxx」は、GCPプロジェクト名になります。
・ バケット名は好きな名前で良いですが、全世界で一意である必要があります。
・ 長期保管のアーカイブ用途なので、Nearlineとしています。
・ VMと同じリージョンで作成しています。
【参考】
・ストレージバケットの作成
2. サービスアカウントの作成
- Cloud Shellで下記コマンドを実行して、サービスアカウントを作成します。
 
$ gcloud iam service-accounts create for-logstash \
    --description="for_logstash_output" \
    --display-name="for-logstash"
- 上記で作成したサービスアカウントに対して、ストレージ管理者権限を付与します。
 
$ gcloud projects add-iam-policy-binding unique-voyage-xxxxxx \
    --member="serviceAccount:for-logstash@unique-voyage-xxxxxx.iam.gserviceaccount.com" \
    --role="roles/storage.objectAdmin"
- 設定値は下記の通りです。GCPコンソールからGUIで設定してもOKです。
 
| 項目 | パラメータ | 
|---|---|
| サービスアカウント名 | for-logstash | 
| 説明 | for_logstash_output | 
| 表示名 | for-logstash | 
| ロール名 | Storage オブジェクト管理者 | 
- 
作成したサービスアカウントのキーを発行します。

 - 
JSON形式で「作成」します。

 - 
キーファイル(.json)がローカルPCにダウンロードされます。あとでlogstash-vmに配置します。
 - 下記のようにキーのステータスが有効となっていればOKです。
 

【補足】
・キーファイルの中身は下記のようになっています。
{
  "type": "service_account",
  "project_id": "unique-voyage-xxxxxx",
  "private_key_id": "xxxx61dfxxxx3a21xxxx8a8bxxxxfb70xxxx9b49",
  "private_key": "-----BEGIN PRIVATE KEY-----\n\n-----END PRIVATE KEY-----\n",
  "client_email": "for-logstash@unique-voyage-xxxxxx.iam.gserviceaccount.com",
  "client_id": "xxxxxxxxxxxxxxxxxxxxx",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/es-repo%40unique-voyage-xxxxxx.iam.gserviceaccount.com"
}
【参考】
・サービスアカウントの作成と管理
・サービスアカウントキーの作成と管理
3. Filebeatの設定
- GCPコンソールからfilebeat-vmにブラウザウィンドウでログインします。
 - filebeat.ymlの設定をおこないます。
 
$ sudo vi /etc/filebeat/filebeat.yml
- Kibanaにダッシュボードのテンプレートをロードする設定をおこないます。
 - setup.dashboards.enabledをtureとします。
 
# ================================= Dashboards ================================= # These settings control loading the sample dashboards to the Kibana index. Loading # the dashboards is disabled by default and can be enabled either by setting the # options here or by using the <code>setup</code> command. setup.dashboards.enabled: true
- setup.kibana配下のhostにKibanaのIPアドレスを記載します。
 
setup.kibana: host: "10.128.0.13:5601"
- Elasticsearchへの出力設定をおこないます。
 - hostsにElasticsearchのIPアドレスを記載します。
 
# ---------------------------- Elasticsearch Output ---------------------------- output.elasticsearch: # Array of hosts to connect to. hosts: ["10.128.0.14:9200"]
4. Auditd Modulesの導入
- filebeat-vmでAuditd Moduleを有効化します。
 - 下記コマンドを実行し、Enabled auditdと表示することを確認します。
 
$ sudo filebeat modules enable auditd
- 下記コマンドで有効化されていることを確認します。
 
$ sudo filebeat modules list
- Auditd Module用のダッシュボードをロードします。
 - このコマンドの実行条件は「出力先がELasticsearch」かつ「FilebeatがKibanaと通信できる」ことです。
 
$ sudo filebeat setup –e Overwriting ILM policy is disabled. Set <code>setup.ilm.overwrite: true</code> for enabling. Index setup finished. Loading dashboards (Kibana must be running and reachable) Loaded dashboards Setting up ML using setup --machine-learning is going to be removed in 8.0.0. Please use the ML app instead. See more: https://www.elastic.co/guide/en/machine-learning/current/index.html Loaded machine learning job configurations Loaded Ingest pipelines
- Elasticsearchへの出力設定を無効化します。
 - output.elasticsearchとhostsをコメントアウトします。
 
# ---------------------------- Elasticsearch Output ---------------------------- # output.elasticsearch: # Array of hosts to connect to. # hosts: ["localhost:9200"]
- Logstashへの出力設定をおこないます。
 - output.logstash配下のenabledをtrueとします。
 - hostsにLogstashのIPアドレスを記載します。
 
# ------------------------------ Logstash Output ------------------------------- output.logstash: enabled: true # The Logstash hosts hosts: ["10.128.0.12:5044"]
5. Logstashの設定
- GCPコンソールからlogstash-vmにブラウザウィンドウでログインします。
 - 手順2でダウンロードしたキーファイルをアップロードします。
 

- ユーザのホームディレクトリにアップロードされます。
 
$ ll total 4 -rw-rw-r--. 1 user01 user01 2329 Mar 06 13:49 unique-voyage-xxxxxx-d095b0237324.json
- キーファイル(.json)を/etc/logstash配下に移動させます。
 
$ sudo mv unique-voyage-xxxxxx-d095b0237324.json /etc/logtash/
- logstash-output-google_cloud_storageプラグインをインストールします。
 - Installation successfulと表示されればOKです。
 
$ sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-google_cloud_storage Using bundled JDK: /usr/share/logstash/jdk OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Validating logstash-output-google_cloud_storage Installing logstash-output-google_cloud_storage Installation successful
- logstash-plugin listコマンドで導入したプラグイン名が表示されればOKです。
 
$ sudo /usr/share/logstash/bin/logstash-plugin list 'logstash-output-google_cloud_storage' logstash-output-google_cloud_storage
- logstash.confの設定をおこないます。
 
$ sudo vi /etc/logstash/conf.d/logstash.conf
- ingest機能で加工できるようにindexとpipelineを下記のように設定します。
 - デフォルトでは1時間単位でファイルがGCSに配置されますが、今回は分単位で配置します。
 
input {
  ### Filebeatから受信
  beats {
    port => "5044"
  }
}
output {
  ### Elasticsearchに出力
  elasticsearch {
    hosts => "10.128.0.14:9200"
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    pipeline => "%{[@metadata][pipeline]}"
  }
  ### GCSに出力
  google_cloud_storage { 
    bucket => "log-bucket-2021"
    json_key_file => "/etc/logstash/unique-voyage-xxxxxx-d095b0237324.json"
    date_patterns => "%Y-%m-%dT%H:%M:00"
  }
}
- logstash-vmのLogstashのプロセスを起動します。
 - 起動後、正常にプロセス起動できていることを確認します。
 
$ sudo systemctl start logstash $ sudo systemctl status logstash
- 次にfilebeat-vmのFilebeatのプロセスを起動します。
 - 起動後、正常にプロセス起動できていることを確認します。
 
$ sudo systemctl start filebeat $ sudo systemctl status filebeat
【参考】
・ Google Cloud Storage output plugin
・ Configure the Logstash output
・ Elasticsearch output plugin
6. Kibanaでログ閲覧
- 
ブラウザでhttp://:5601にアクセスします。

 - 
[Management] > [Dev Tools]で GET _cat/indices/filebeat-*?v&s を実施するとIndexが生成されています。

 - 
[Management] > [Stack Management] > [Index Patterns]でCreate index patternをクリックします。

 - 
[Index pattern name]に[filebeat-*]と入力し、[Next step]をクリックします。

 - 
[Time field]に@timestampを指定し、[Create index pattern]をクリックします。

 - 
[Dashboard]で[Filebeat Auditd] Audit Events ECSを選択します。
 - 下記のようなAuditd Moduleによってロードされたテンプレートで監査ログが可視化されます。
 

- [Discover]でログを見ると下記内容で取り込まれています。(直接Filebeatから来たものと同じです)
 
{
  "_index": "filebeat-7.11.1-2021.03.06",
  "_type": "_doc",
  "_id": "zPwcCHgBQ64eSJLnd42x",
  "_version": 1,
  "_score": null,
  "_source": {
    "agent": {
      "hostname": "filebeat-vm",
      "name": "filebeat-vm",
      "id": "03947b5a-2cca-4d7b-b1af-8d5b375ae235",
      "ephemeral_id": "3cd60301-4e40-4b8b-b2ee-cc21c4715b97",
      "type": "filebeat",
      "version": "7.11.1"
    },
    "process": {
      "pid": 2704,
      "executable": "/usr/sbin/groupadd"
    },
    "log": {
      "file": {
        "path": "/var/log/audit/audit.log"
      },
      "offset": 3013195
    },
    "fileset": {
      "name": "log"
    },
    "auditd": {
      "log": {
        "ses": "4294967295",
        "op": "add-group",
        "sequence": 4179,
        "subj": "system_u:system_r:unconfined_service_t:s0"
      }
    },
    "tags": [
      "beats_input_codec_plain_applied"
    ],
    "cloud": {
      "availability_zone": "us-central1-a",
      "instance": {
        "name": "filebeat-vm",
        "id": "7439420779510628078"
      },
      "provider": "gcp",
      "machine": {
        "type": "e2-small"
      },
      "project": {
        "id": "unique-voyage-xxxxxx"
      },
      "account": {
        "id": "unique-voyage-xxxxxx"
      }
    },
    "input": {
      "type": "log"
    },
    "@timestamp": "2021-03-06T15:16:18.606Z",
    "ecs": {
      "version": "1.7.0"
    },
    "service": {
      "type": "auditd"
    },
    "@version": "1",
    "host": {
      "hostname": "filebeat-vm",
      "os": {
        "kernel": "3.10.0-1127.19.1.el7.x86_64",
        "codename": "Core",
        "name": "CentOS Linux",
        "family": "redhat",
        "version": "7 (Core)",
        "platform": "centos"
      },
      "ip": [
        "10.128.0.11",
        "fe80::4001:aff:fe80:b"
      ],
      "containerized": false,
      "name": "filebeat-vm",
      "id": "cd7c85585b483d2656ce72740e5b59f7",
      "mac": [
        "42:01:0a:80:00:0b"
      ],
      "architecture": "x86_64"
    },
    "event": {
      "ingested": "2021-03-06T15:16:26.671257065Z",
      "kind": "event",
      "module": "auditd",
      "action": "add_group",
      "dataset": "auditd.log",
      "outcome": "failed"
    },
    "user": {
      "audit": {
        "id": "4294967295"
      },
      "name": "google-sudoers",
      "id": "0"
    }
  },
  "fields": {
    "event.ingested": [
      "2021-03-06T15:16:26.671Z"
    ],
    "@timestamp": [
      "2021-03-06T15:16:18.606Z"
    ]
  },
  "sort": [
    1615043778606
  ]
}
7. GCSのログ閲覧
- GCPコンソールから [Storage] > [ブラウザ]を開き、log-bucket-2021をクリックします。
 - 下記のように分単位でファイルが生成されています。
 

- 下記はログの中身(audit.logのサンプル)になります。
 
2021-03-06T14:48:14.829Z {hostname=filebeat-vm, os={name=CentOS Linux, family=redhat, version=7 (Core), platform=centos, kernel=3.10.0-1127.19.1.el7.x86_64, codename=Core}, containerized=false, ip=[10.128.0.11, fe80::4001:aff:fe80:b], name=filebeat-vm, id=cd7c85585b483d2656ce72740e5b59f7, mac=[42:01:0a:80:00:0b], architecture=x86_64} type=CRYPTO_KEY_USER msg=audit(1615042093.428:2033): pid=2234 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:sshd_t:s0-s0:c0.c1023 msg='op=destroy kind=server fp=SHA256:18:2c:cd:0d:8c:b5:ea:17:f2:33:89:03:e0:bc:d0:df:a0:22:fc:30:c2:b7:84:27:db:f0:c1:30:ed:17:91:1a direction=? spid=2234 suid=0  exe="/usr/sbin/sshd" hostname=? addr=? terminal=? res=success'```
まとめ
さて、いかがでしたでしょうか?
意外と地味に手間が多い手順ではありましたが
無事にElasticsearchとGCSにFilebeat Modulesが取得したログを格納することができました。
Filebeat Modulesで必要なデータを事前にKibanaにロードするために
Filebeatの出力先を一度Elasticsearchに向けてから、Logstash向けに直すのはひと手間入ります。
検証後に気づいたんですが、Load ingest pipelinesに出力先がLogstashの場合のロード手順が書かれてました。
これで導入できれば、今回のような切り替えの手間がなくて良さそうです。
あとはlogstash.confのoutput句の書き方さえ理解できれば難しくはないと思います。
ぜひ、皆様も必要になったら、この手順を参考にしてみてはいかがでしょうか^^
