Logstash

Make a Data Pipeline with Aurora DB, Logstash and AWS Opensearch

kahnco 2023. 1. 25. 16:19

데이터 파이프라인을 짤 일이 있어서 Logstash를 활용해서 구현해보겠습니다.

Logstash 공식사이트 (https://www.elastic.co/kr/logstash/)에서 설치 관련 내용을 확인할 수 있지만, 제대로 실행되지 않아서 제 나름대로의 방법으로 진행해보겠습니다.

또한, 여러 개의 테이블을 동시에 Consume 하기 위해서 pipeline을 활용해서 여러 개의 Logstash를 돌렸으니 이 점 참고 바랍니다.

 


요구 조건

 

  1. DB에 데이터를 넣으면 그 넣어진 데이터를 OpenSearch에서 실시간으로 확인 가능해야합니다.
  2. 여러 개의 테이블을 동시에 Consume 해야합니다.
  3. sql_last_value 값을 써서 중복값이 OpenSearch에 쌓이지 않도록 해야합니다.

Aurora DB 세팅

 

  1. RDS에서 Aurora DB 타입으로 데이터베이스를 생성합니다. (DB 자체가 중요한게 아니라서 AZ 등등은 마음대로 설정하셔도 됩니다)
  2. 설정된 DB Instance에서, 라이터 인스턴스 (Writer Instance)의 엔드포인트를 복사해둡니다.

그림 1

 


AWS OpenSearch 세팅

 

  1. AWS OpenSearch를 네트워크 -> 퍼블릭 액세스 버전으로 생성해줍니다. (나머지 부분들은 모두 디폴트 값으로 생성해도 좋습니다.)
  2. 생성이 완료되면 도메인 엔드포인트를 저장해놓습니다.

그림 2


Logstash Instance 세팅

 

  1. EC2 인스턴스를 생성합니다.
  2. 인스턴스를 Ubuntu 인스턴스로 생성해줍니다.
  3. 1번에서 생성한 인스턴스의 보안 그룹을 Aurora DB의 보안그룹에 인바운드 추가해줍니다. (그림 3 참조)

그림 3

 


Logstash Config 세팅

 

1. 먼저, mysql-connector-java를 설치해줍니다. Logstash에서 Aurora DB와 통신시켜줄 플러그인입니다.

wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java_8.0.27-1debian10_all.deb # download
dpkg -x mysql-connector-java_8.0.28-1debian10_all.deb mysql_jdbc # 압축 해제

2. Logstash 또한 설치해줍니다. 한창 Log4j의 보안 이슈가 터졌을 때가 7.16.0 버전이었으니, 해당 버전 이상의 버전으로 활용해줍니다. 또한, 8 버전부터는 input, output 세팅 방법이 조금 달라졌으므로 이 게시글에서는 7.16.2 버전으로 진행하겠습니다.

curl https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz -o logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz

tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz

3. 이렇게 진행하면, /home/ubuntu/logstash-7.16.2 폴더가 생성되어있습니다.

 

4. 1번 과정에서 mysql_jdbc 라는 이름의 폴더에 압축을 해제했을텐데, 그 안에서 mysql-connector-java-8.0.27.jar 파일을 찾아서 /home/ubuntu/logstash-7.16.2/bin 폴더에 이동시켜줍니다.

 

5. /home/ubuntu/logstash-7.16.2/config 폴더로 이동합니다.

 

6. Logstash 인스턴스를 생성할 config 파일을 만들어줄 텐데, 다음과 같은 템플렛으로 작성해줍니다.

// filename: test.config

input{
  jdbc {
    jdbc_driver_library => "/home/ubuntu/logstash-7.16.2/bin/mysql-connector-java-8.0.27.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://{아까 복사해두었던 RDS Writer Instance 주소}:3306/{테이블 명}?useSSL=false"
    jdbc_user => "{유저 이름}"
    jdbc_password => "{유저 비밀번호}"
    jdbc_paging_enabled => true
    jdbc_default_timezone => "Asia/Seoul"
    schedule => "*/10 * * * * *"
    last_run_metadata_path => "/home/ubuntu/logstash-7.16.2/meta/.logstash_jdbc_last_{테이블에 맞는 이름}”
    statement => "SELECT *, UNIX_TIMESTAMP(updated_at) AS unix_ts_in_secs FROM {내가 원하는 테이블 명} WHERE (UNIX_TIMESTAMP(updated_at) > :sql_last_value AND updated_at < NOW()) ORDER BY updated_at ASC"
    record_last_run => true
    clean_run => true
    tracking_column_type => "numeric"
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
  }
}

filter {
  mutate{
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}

output {
  opensearch{
    hosts => "{OpenSearch 생성 후에 복사해둔 엔드 포인트 주소}"
    user => "{Opensearch 아이디}"
    password => "{Opensearch 비밀번호}"
    index => "{Opensearch에서 보여질 데이터 그룹 이름}”
    ecs_compatibility => disabled
    ssl_certificate_verification => false
    doc_as_upsert => true
    action => "update"
    document_id => "%{[@metadata][_id]}"
  }
}

7. pipelines.yml 파일을 약간 수정해줍니다. (* yaml 파일이라서 인덴트가 굉장히 중요합니다. 처음의 대시(-)는 띄어쓰기 없이 쓰고, 한 칸을 띄우고 시작해야합니다. 그리고 workers 부터는 두 칸을 띄우고 작성해야합니다.)

- pipeline.id: test
  pipeline.workers: 1
  pipeline.batch.size: 10
  path.config: "config/test.conf"
  
~~~~~
// 주석 제거
  pipeline.workers: 1 # CPU 코어 수에 맞게 조정
  pipeline.batch.size: 125 # Worker Thread가 한번에 처리하기 위한 이벤트의 크기 (알아서 적당히 조절해야합니다)

8. 다음 명령어를 통해서 Logstash를 백그라운드에서 실행시켜줍니다. (아래 명령어는 config 폴더에 있는 pipelines.yml를 기준으로 Logstash를 실행시키는 명령어입니다)

/home/ubuntu/logstash-7.16.2/bin/logstash --path.settings ./config &

 

반응형