데이터 파이프라인을 짤 일이 있어서 Logstash를 활용해서 구현해보겠습니다.
Logstash 공식사이트 (https://www.elastic.co/kr/logstash/)에서 설치 관련 내용을 확인할 수 있지만, 제대로 실행되지 않아서 제 나름대로의 방법으로 진행해보겠습니다.
또한, 여러 개의 테이블을 동시에 Consume 하기 위해서 pipeline을 활용해서 여러 개의 Logstash를 돌렸으니 이 점 참고 바랍니다.
요구 조건
- DB에 데이터를 넣으면 그 넣어진 데이터를 OpenSearch에서 실시간으로 확인 가능해야합니다.
- 여러 개의 테이블을 동시에 Consume 해야합니다.
- sql_last_value 값을 써서 중복값이 OpenSearch에 쌓이지 않도록 해야합니다.
Aurora DB 세팅
- RDS에서 Aurora DB 타입으로 데이터베이스를 생성합니다. (DB 자체가 중요한게 아니라서 AZ 등등은 마음대로 설정하셔도 됩니다)
- 설정된 DB Instance에서, 라이터 인스턴스 (Writer Instance)의 엔드포인트를 복사해둡니다.
AWS OpenSearch 세팅
- AWS OpenSearch를 네트워크 -> 퍼블릭 액세스 버전으로 생성해줍니다. (나머지 부분들은 모두 디폴트 값으로 생성해도 좋습니다.)
- 생성이 완료되면 도메인 엔드포인트를 저장해놓습니다.
Logstash Instance 세팅
- EC2 인스턴스를 생성합니다.
- 인스턴스를 Ubuntu 인스턴스로 생성해줍니다.
- 1번에서 생성한 인스턴스의 보안 그룹을 Aurora DB의 보안그룹에 인바운드 추가해줍니다. (그림 3 참조)
Logstash Config 세팅
1. 먼저, mysql-connector-java를 설치해줍니다. Logstash에서 Aurora DB와 통신시켜줄 플러그인입니다.
wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java_8.0.27-1debian10_all.deb # download
dpkg -x mysql-connector-java_8.0.28-1debian10_all.deb mysql_jdbc # 압축 해제
2. Logstash 또한 설치해줍니다. 한창 Log4j의 보안 이슈가 터졌을 때가 7.16.0 버전이었으니, 해당 버전 이상의 버전으로 활용해줍니다. 또한, 8 버전부터는 input, output 세팅 방법이 조금 달라졌으므로 이 게시글에서는 7.16.2 버전으로 진행하겠습니다.
curl https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz -o logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz
tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz
3. 이렇게 진행하면, /home/ubuntu/logstash-7.16.2 폴더가 생성되어있습니다.
4. 1번 과정에서 mysql_jdbc 라는 이름의 폴더에 압축을 해제했을텐데, 그 안에서 mysql-connector-java-8.0.27.jar 파일을 찾아서 /home/ubuntu/logstash-7.16.2/bin 폴더에 이동시켜줍니다.
5. /home/ubuntu/logstash-7.16.2/config 폴더로 이동합니다.
6. Logstash 인스턴스를 생성할 config 파일을 만들어줄 텐데, 다음과 같은 템플렛으로 작성해줍니다.
// filename: test.config
input{
jdbc {
jdbc_driver_library => "/home/ubuntu/logstash-7.16.2/bin/mysql-connector-java-8.0.27.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://{아까 복사해두었던 RDS Writer Instance 주소}:3306/{테이블 명}?useSSL=false"
jdbc_user => "{유저 이름}"
jdbc_password => "{유저 비밀번호}"
jdbc_paging_enabled => true
jdbc_default_timezone => "Asia/Seoul"
schedule => "*/10 * * * * *"
last_run_metadata_path => "/home/ubuntu/logstash-7.16.2/meta/.logstash_jdbc_last_{테이블에 맞는 이름}”
statement => "SELECT *, UNIX_TIMESTAMP(updated_at) AS unix_ts_in_secs FROM {내가 원하는 테이블 명} WHERE (UNIX_TIMESTAMP(updated_at) > :sql_last_value AND updated_at < NOW()) ORDER BY updated_at ASC"
record_last_run => true
clean_run => true
tracking_column_type => "numeric"
tracking_column => "unix_ts_in_secs"
use_column_value => true
}
}
filter {
mutate{
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
opensearch{
hosts => "{OpenSearch 생성 후에 복사해둔 엔드 포인트 주소}"
user => "{Opensearch 아이디}"
password => "{Opensearch 비밀번호}"
index => "{Opensearch에서 보여질 데이터 그룹 이름}”
ecs_compatibility => disabled
ssl_certificate_verification => false
doc_as_upsert => true
action => "update"
document_id => "%{[@metadata][_id]}"
}
}
7. pipelines.yml 파일을 약간 수정해줍니다. (* yaml 파일이라서 인덴트가 굉장히 중요합니다. 처음의 대시(-)는 띄어쓰기 없이 쓰고, 한 칸을 띄우고 시작해야합니다. 그리고 workers 부터는 두 칸을 띄우고 작성해야합니다.)
- pipeline.id: test
pipeline.workers: 1
pipeline.batch.size: 10
path.config: "config/test.conf"
~~~~~
// 주석 제거
pipeline.workers: 1 # CPU 코어 수에 맞게 조정
pipeline.batch.size: 125 # Worker Thread가 한번에 처리하기 위한 이벤트의 크기 (알아서 적당히 조절해야합니다)
8. 다음 명령어를 통해서 Logstash를 백그라운드에서 실행시켜줍니다. (아래 명령어는 config 폴더에 있는 pipelines.yml를 기준으로 Logstash를 실행시키는 명령어입니다)
/home/ubuntu/logstash-7.16.2/bin/logstash --path.settings ./config &
끝