fbpx

Filebeat ModulesとLogstashの組み合わせ方 #Elastic #Elasticsearch

この記事は1年以上前に投稿されました。情報が古い可能性がありますので、ご注意ください。

Elastic Stackを使って日頃から様々なログを解析しているとFilebeatに頼ることが多くなってきます。

今回は、Filebeat Modulesを利用して、GSuiteやZoom、O365などのクラウドサービスのログを収集し
活用していく上でふと疑問に感じたことについてまとめてみました。

Filebeat Modulesとは


2017年3月28日にリリースされたバージョン5.3以降で登場した機能です。
Filebeat Modulesを用いることでサポートされるログの収集、加工、可視化を自動的に処理してくれます。
Filebeat Modulesが収集したログをElasticsearchのingest機能で加工してIndexに格納します。

スクリーンショット

ingestの設定はKibanaで確認することができます。
スクリーンショット

現最新バージョンの7.11では全部で65種類のModulesが用意されています。
全部のModuleではありませんが、Kibanaダッシュボードのテンプレートも付属しています。

【参考】
Filebeat modules

今回実施したこと


Auditd Modules1を利用して、Linux OSの監査ログをElasticsearchに取り込みます。
同時に同じログを長期保管目的でGoogle Cloud Storage(以降、GCS)2にも格納したいと思います。

しかし、Filebeatの出力先には、以下の7つしか利用できません。
また、現行バージョンでは複数の出力先に出すこともできません。

  • Elasticsearch Service
  • Elasticsearch
  • Logstash
  • Kafka
  • Redis
  • File (Local)
  • Console (Local)

複数の出力先に同時にログ出力したり、上記にない出力先にログを出したい場合は
FilebeatとElasticsearchの間にLogstash3を挟む必要があります。

しかし、Filebeat Modulesを利用する場合は加工処理はElasticsearchのingest機能がおこないます。
Modules利用時にLogstashを挟んでGCSに送るとどうなるのか、今回はそれを実際に試してみました^^

必要なサブスクリプション


  • Filebeat ModulesはOSSもしくはBASICで利用できるため全て無償です。
  • サブスクリプションページ4の記載内容とFilebeat Modulesページの記載内容に少し差異があります。
  • 下記図のX-Packと記載されているものはBASICで利用できるModulesになります。

【Filebeat Modulesページのサンプル】
スクリーンショット 2021-03-06 17.23.46.png

利用環境


項目 内容
Elasticsearch 7.11.1
Kibana 7.11.1
Logstash 7.11.1
Filebeat 7.11.1
JDK version "11.0.10" 2021-01-19 LTS
VM image centos-7-v20201112
Region us-central1 (アイオワ)

※投稿時点における最新版を採用しています。

【構成図】
・ FilebeatがインストールされているLinux OSの監査ログをAuditd Modulesを利用して取得します。
・ ログをLogstashに転送し、加工せずにElasticsearchとGCSに出力します。

スクリーンショット

【補足】
・ GCPのVPCネットワークは、defaultネットワークを利用しています。
・ 各VM間の通信は全て許可されています。
・ 本投稿では、各VMマシンの作成および各OSへのElasticプロダクトのインストール手順は省略しています。
・ KibanaからElasticsearchに接続できる設定となっています。

【参考】
Elasticsearchのインストール方法
Kibanaのインストール方法
Logstashのインストール方法
Filebeatのインストール方法

実施手順


  • 下記手順で設定を実施しました。
  1. GCSのバケット作成
  2. サービスアカウントの作成
  3. Filebeatの設定
  4. Auditd Modulesの導入
  5. Logstashの設定
  6. Kibanaでログ閲覧
  7. GCSのログ閲覧

1. GCSのバケット作成

  • あえてCLIで設定を作っていきます。
  • GCPコンソールにログインし、Cloud Shellを起動します。

スクリーンショット 2021-03-06 18.14.34.png

  • 下記コマンドを実行して、GCSのバケットを作成します。

$ gsutil mb -p unique-voyage-xxxxxx -c nearline -l us-central1 -b on gs://log-bucket-2021

  • 設定値は下記の通りです。GCPコンソールからGUIで設定してもOKです。
項目 パラメータ
バケット名 log-bucket-2021
ロケーションタイプ Region
ロケーション us-central1 (アイオワ)
ストレージクラス Nearline
アクセス制御 均一
暗号化 (オプション) Googleが管理する鍵 (デフォルト)
保持ポリシー (オプション) 未設定 (デフォルト)
ラベル (オプション) 未設定 (デフォルト)

【補足】
・ 「unique-voyage-xxxxxx」は、GCPプロジェクト名になります。
・ バケット名は好きな名前で良いですが、全世界で一意である必要があります。
・ 長期保管のアーカイブ用途なので、Nearlineとしています。
・ VMと同じリージョンで作成しています。

【参考】
ストレージバケットの作成

2. サービスアカウントの作成

  • Cloud Shellで下記コマンドを実行して、サービスアカウントを作成します。

$ gcloud iam service-accounts create for-logstash \
    --description="for_logstash_output" \
    --display-name="for-logstash"

  • 上記で作成したサービスアカウントに対して、ストレージ管理者権限を付与します。

$ gcloud projects add-iam-policy-binding unique-voyage-xxxxxx \
    --member="serviceAccount:for-logstash@unique-voyage-xxxxxx.iam.gserviceaccount.com" \
    --role="roles/storage.objectAdmin"

  • 設定値は下記の通りです。GCPコンソールからGUIで設定してもOKです。
項目 パラメータ
サービスアカウント名 for-logstash
説明 for_logstash_output
表示名 for-logstash
ロール名 Storage オブジェクト管理者
  • 作成したサービスアカウントのキーを発行します。
    スクリーンショット 2021-03-06 21.36.10.png

  • JSON形式で「作成」します。
    スクリーンショット

  • キーファイル(.json)がローカルPCにダウンロードされます。あとでlogstash-vmに配置します。

  • 下記のようにキーのステータスが有効となっていればOKです。

スクリーンショット

【補足】
・キーファイルの中身は下記のようになっています。

{
  "type": "service_account",
  "project_id": "unique-voyage-xxxxxx",
  "private_key_id": "xxxx61dfxxxx3a21xxxx8a8bxxxxfb70xxxx9b49",
  "private_key": "-----BEGIN PRIVATE KEY-----\n\n-----END PRIVATE KEY-----\n",
  "client_email": "for-logstash@unique-voyage-xxxxxx.iam.gserviceaccount.com",
  "client_id": "xxxxxxxxxxxxxxxxxxxxx",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/es-repo%40unique-voyage-xxxxxx.iam.gserviceaccount.com"
}

【参考】
サービスアカウントの作成と管理
サービスアカウントキーの作成と管理

3. Filebeatの設定

  • GCPコンソールからfilebeat-vmにブラウザウィンドウでログインします。
  • filebeat.ymlの設定をおこないます。

$ sudo vi /etc/filebeat/filebeat.yml

  • Kibanaにダッシュボードのテンプレートをロードする設定をおこないます。
  • setup.dashboards.enabledtureとします。

 # ================================= Dashboards =================================
 # These settings control loading the sample dashboards to the Kibana index. Loading
 # the dashboards is disabled by default and can be enabled either by setting the
 # options here or by using the <code>setup</code> command.
setup.dashboards.enabled: true

  • setup.kibana配下のhostにKibanaのIPアドレスを記載します。

setup.kibana:
host: "10.128.0.13:5601"

  • Elasticsearchへの出力設定をおこないます。
  • hostsにElasticsearchのIPアドレスを記載します。

 # ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["10.128.0.14:9200"]

4. Auditd Modulesの導入

  • filebeat-vmでAuditd Moduleを有効化します。
  • 下記コマンドを実行し、Enabled auditdと表示することを確認します。

$ sudo filebeat modules enable auditd

  • 下記コマンドで有効化されていることを確認します。

$ sudo filebeat modules list

  • Auditd Module用のダッシュボードをロードします。
  • このコマンドの実行条件は「出力先がELasticsearch」かつ「FilebeatがKibanaと通信できる」ことです。

$ sudo filebeat setup –e
Overwriting ILM policy is disabled. Set <code>setup.ilm.overwrite: true</code> for enabling.
Index setup finished.
Loading dashboards (Kibana must be running and reachable)
Loaded dashboards
Setting up ML using setup --machine-learning is going to be removed in 8.0.0. Please use the ML app instead.
See more: https://www.elastic.co/guide/en/machine-learning/current/index.html
Loaded machine learning job configurations
Loaded Ingest pipelines

  • Elasticsearchへの出力設定を無効化します。
  • output.elasticsearchhostsをコメントアウトします。

 # ---------------------------- Elasticsearch Output ----------------------------
 # output.elasticsearch:
  # Array of hosts to connect to.
  # hosts: ["localhost:9200"]

  • Logstashへの出力設定をおこないます。
  • output.logstash配下のenabledtrueとします。
  • hostsにLogstashのIPアドレスを記載します。

 # ------------------------------ Logstash Output -------------------------------
output.logstash:
  enabled: true
  # The Logstash hosts
  hosts: ["10.128.0.12:5044"]

5. Logstashの設定

  • GCPコンソールからlogstash-vmにブラウザウィンドウでログインします。
  • 手順2でダウンロードしたキーファイルをアップロードします。

スクリーンショット

  • ユーザのホームディレクトリにアップロードされます。

$ ll
total 4
-rw-rw-r--. 1 user01 user01 2329 Mar 06 13:49 unique-voyage-xxxxxx-d095b0237324.json

  • キーファイル(.json)を/etc/logstash配下に移動させます。

$ sudo mv unique-voyage-xxxxxx-d095b0237324.json /etc/logtash/

  • logstash-output-google_cloud_storageプラグインをインストールします。
  • Installation successfulと表示されればOKです。

$ sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-google_cloud_storage
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Validating logstash-output-google_cloud_storage
Installing logstash-output-google_cloud_storage
Installation successful

  • logstash-plugin listコマンドで導入したプラグイン名が表示されればOKです。

$ sudo /usr/share/logstash/bin/logstash-plugin list 'logstash-output-google_cloud_storage'
logstash-output-google_cloud_storage

  • logstash.confの設定をおこないます。

$ sudo vi /etc/logstash/conf.d/logstash.conf

  • ingest機能で加工できるようにindexpipelineを下記のように設定します。
  • デフォルトでは1時間単位でファイルがGCSに配置されますが、今回は分単位で配置します。

input {
  ### Filebeatから受信
  beats {
    port => "5044"
  }
}
output {
  ### Elasticsearchに出力
  elasticsearch {
    hosts => "10.128.0.14:9200"
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    pipeline => "%{[@metadata][pipeline]}"
  }
  ### GCSに出力
  google_cloud_storage { 
    bucket => "log-bucket-2021"
    json_key_file => "/etc/logstash/unique-voyage-xxxxxx-d095b0237324.json"
    date_patterns => "%Y-%m-%dT%H:%M:00"
  }
}

  • logstash-vmのLogstashのプロセスを起動します。
  • 起動後、正常にプロセス起動できていることを確認します。

$ sudo systemctl start logstash
$ sudo systemctl status logstash

  • 次にfilebeat-vmのFilebeatのプロセスを起動します。
  • 起動後、正常にプロセス起動できていることを確認します。

$ sudo systemctl start filebeat
$ sudo systemctl status filebeat

【参考】
Google Cloud Storage output plugin
Configure the Logstash output
Elasticsearch output plugin

6. Kibanaでログ閲覧

  • ブラウザでhttp://:5601にアクセスします。
    スクリーンショット

  • [Management] > [Dev Tools]で GET _cat/indices/filebeat-*?v&s を実施するとIndexが生成されています。
    スクリーンショット

  • [Management] > [Stack Management] > [Index Patterns]でCreate index patternをクリックします。
    image.png

  • [Index pattern name]に[filebeat-*]と入力し、[Next step]をクリックします。
    image.png

  • [Time field]に@timestampを指定し、[Create index pattern]をクリックします。
    image.png

  • [Dashboard]で[Filebeat Auditd] Audit Events ECSを選択します。

  • 下記のようなAuditd Moduleによってロードされたテンプレートで監査ログが可視化されます。

スクリーンショット

  • [Discover]でログを見ると下記内容で取り込まれています。(直接Filebeatから来たものと同じです)

{
  "_index": "filebeat-7.11.1-2021.03.06",
  "_type": "_doc",
  "_id": "zPwcCHgBQ64eSJLnd42x",
  "_version": 1,
  "_score": null,
  "_source": {
    "agent": {
      "hostname": "filebeat-vm",
      "name": "filebeat-vm",
      "id": "03947b5a-2cca-4d7b-b1af-8d5b375ae235",
      "ephemeral_id": "3cd60301-4e40-4b8b-b2ee-cc21c4715b97",
      "type": "filebeat",
      "version": "7.11.1"
    },
    "process": {
      "pid": 2704,
      "executable": "/usr/sbin/groupadd"
    },
    "log": {
      "file": {
        "path": "/var/log/audit/audit.log"
      },
      "offset": 3013195
    },
    "fileset": {
      "name": "log"
    },
    "auditd": {
      "log": {
        "ses": "4294967295",
        "op": "add-group",
        "sequence": 4179,
        "subj": "system_u:system_r:unconfined_service_t:s0"
      }
    },
    "tags": [
      "beats_input_codec_plain_applied"
    ],
    "cloud": {
      "availability_zone": "us-central1-a",
      "instance": {
        "name": "filebeat-vm",
        "id": "7439420779510628078"
      },
      "provider": "gcp",
      "machine": {
        "type": "e2-small"
      },
      "project": {
        "id": "unique-voyage-xxxxxx"
      },
      "account": {
        "id": "unique-voyage-xxxxxx"
      }
    },
    "input": {
      "type": "log"
    },
    "@timestamp": "2021-03-06T15:16:18.606Z",
    "ecs": {
      "version": "1.7.0"
    },
    "service": {
      "type": "auditd"
    },
    "@version": "1",
    "host": {
      "hostname": "filebeat-vm",
      "os": {
        "kernel": "3.10.0-1127.19.1.el7.x86_64",
        "codename": "Core",
        "name": "CentOS Linux",
        "family": "redhat",
        "version": "7 (Core)",
        "platform": "centos"
      },
      "ip": [
        "10.128.0.11",
        "fe80::4001:aff:fe80:b"
      ],
      "containerized": false,
      "name": "filebeat-vm",
      "id": "cd7c85585b483d2656ce72740e5b59f7",
      "mac": [
        "42:01:0a:80:00:0b"
      ],
      "architecture": "x86_64"
    },
    "event": {
      "ingested": "2021-03-06T15:16:26.671257065Z",
      "kind": "event",
      "module": "auditd",
      "action": "add_group",
      "dataset": "auditd.log",
      "outcome": "failed"
    },
    "user": {
      "audit": {
        "id": "4294967295"
      },
      "name": "google-sudoers",
      "id": "0"
    }
  },
  "fields": {
    "event.ingested": [
      "2021-03-06T15:16:26.671Z"
    ],
    "@timestamp": [
      "2021-03-06T15:16:18.606Z"
    ]
  },
  "sort": [
    1615043778606
  ]
}

7. GCSのログ閲覧

  • GCPコンソールから [Storage] > [ブラウザ]を開き、log-bucket-2021をクリックします。
  • 下記のように分単位でファイルが生成されています。

スクリーンショット 2021-03-06 23.55.10.png

  • 下記はログの中身(audit.logのサンプル)になります。

2021-03-06T14:48:14.829Z {hostname=filebeat-vm, os={name=CentOS Linux, family=redhat, version=7 (Core), platform=centos, kernel=3.10.0-1127.19.1.el7.x86_64, codename=Core}, containerized=false, ip=[10.128.0.11, fe80::4001:aff:fe80:b], name=filebeat-vm, id=cd7c85585b483d2656ce72740e5b59f7, mac=[42:01:0a:80:00:0b], architecture=x86_64} type=CRYPTO_KEY_USER msg=audit(1615042093.428:2033): pid=2234 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:sshd_t:s0-s0:c0.c1023 msg='op=destroy kind=server fp=SHA256:18:2c:cd:0d:8c:b5:ea:17:f2:33:89:03:e0:bc:d0:df:a0:22:fc:30:c2:b7:84:27:db:f0:c1:30:ed:17:91:1a direction=? spid=2234 suid=0  exe="/usr/sbin/sshd" hostname=? addr=? terminal=? res=success'```

まとめ


さて、いかがでしたでしょうか?

意外と地味に手間が多い手順ではありましたが
無事にElasticsearchとGCSにFilebeat Modulesが取得したログを格納することができました。

Filebeat Modulesで必要なデータを事前にKibanaにロードするために
Filebeatの出力先を一度Elasticsearchに向けてから、Logstash向けに直すのはひと手間入ります。

検証後に気づいたんですが、Load ingest pipelinesに出力先がLogstashの場合のロード手順が書かれてました。
これで導入できれば、今回のような切り替えの手間がなくて良さそうです。

あとはlogstash.confのoutput句の書き方さえ理解できれば難しくはないと思います。
ぜひ、皆様も必要になったら、この手順を参考にしてみてはいかがでしょうか^^

Author

ログ分析基盤のアーキテクチャデザインやクラウドにおけるセキュリティ実装方式を研究するElasticエバンジェリスト。
国内エンジニアの貴重なリソースを効率良く活用できるよう、技術ナレッジを惜しみなく発信するよう心がけている。

日比野恒の記事一覧

新規CTA