티스토리 뷰
[ELK Stack] Logstash Oracle DB -> XML -> Elasticsearch에 Insert 구현
Arc Lab. 2016. 12. 13. 20:20[업데이트 2016.12.29 23:26]
이번 포스팅에서는 Logstash를 통해 RDB로 부터 데이터를 가져온 후, 데이터중 XML 데이터를 가지고 있는 field를 접근하여 XML 데이터 Parsing후 최종적으로 JSON으로 변환하여 Elasticsearch로 데이터가 insert 되도록 구현해보고자 합니다.
사용될 Logstash Plugin들은 다음과 같습니다.
- Input: jdbc
- Filter: xml, mutate
- output: stdout, elasticsearch
RDB 및 XML 데이터 구조는 다음과 같다고 가정합니다.
<RDB(Oracle)>
- User Name: admin
- Password: 1234
- IP: 192.168.0.1
- Port: 1500
- SID: MYDB
- Table Name: tbl_api
- Table Columns: idx, xml_data, create_date, company, license, reserved
<XML Data>
1.00 2014-01-04 test 1234 John
Input Plugin의 jdbc 통해 Oracle DB에서 데이터를 가져옵니다.
input { jdbc { jdbc_driver_library => "C:\oracle-jdbc-driver\ojdbc6.jar" jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" jdbc_connection_string => "jdbc:oracle:thin:@192.168.0.1:1500/MYDB" jdbc_user => "admin" jdbc_password => "1234" statement => "select * from tbl_api where idx > :sql_last_value" use_column_value => true tracking_column => idx schedule => "* * * * *" } }
Filter Plugin에 xml을 사용하며, XPath를 사용하여 최종적으로 JSON으로 변환할 field를 추가 합니다. 여기서 target을 지정하여 output으로 가져올 필요가 없으므로, store_xml => "false" 및 target을 미지정하였습니다. XML 데이터에서 필요한 값을 가져오기 위해 XPath를 사용하였습니다. Depth가 깊은 데이터를 가져올 때 편리한 것 같습니다.
* 참고: https://www.elastic.co/guide/en/logstash/current/plugins-filters-xml.html
* 참고: http://www.w3schools.com/xml/xml_xpath.asp
filter { xml { source => "xml_data" store_xml => "false" xpath => [ "/API/APIInfo/Version/text()", "api_version" ] xpath => [ "/API/APIDetails/UserInfo/ID/text()", "api_id" ] xpath => [ "/API/APIDetails/UserInfo/PINCode/text()", "api_pincode" ] remove_field => [ "xml_data", "company", "license", "reserved" ] } mutate { replace => { "api_version" => "%{[api_version][0]}" "api_id" => "%{[api_id][0]}" "api_pincode" => "%{[api_pincode][0]}" } } }
위와 같이 XML 데이터내에 APIInfo > Version과 APIDetails > UserInfo > ID, PINCode만 가져와서 최종 output으로 지정합니다. 그 외의 field는 불필요 하다고 판단하여 remove_field를 통해 삭제합니다. 또한 source 지정되었던 xml_data도 원하는 field값을 가져왔으므로 삭제 합니다.
그런후 mutate plugin을 통해 최종 output으로 지정할 field를 replace를 통해 실제 값을 지정합니다. %{[api_version][0]} 0번 index를 참조하여 실제 값(여기서는 string)을 가져옵니다. 만약 force_array => "false"를 지정하면 XML이 array로 만들어지지 않아서 %{[api_version][0]} 와 같은 접근 없이 사용 가능합니다.
마지막으로 Output Plugin에 stdout(디버그용), elasticsearch를 설정합니다.
아래와 같이 hosts(elasticsearch)로 index pattern은 logstash-api-2016.01.01로 지정 및 document_type은 "api", 그리고 중복된 document가 insert되지 않도록 "idx" field를 지정합니다.
output { stdout { codec => rubydebug } elasticsearch { hosts => ["192.168.0.2:9200"] index => ["logstash-api-%{+YYYY.MM.dd}"] document_type => ["api"] document_id => ["%{idx}"] # prevent from dulicating records from Oracle DB } }
Elasticsearch에서 logstash-api-* index pattern에 대한 index template을 정의합니다. _default_ mapping을 지정하며, string중에 number로 판단되는 것들은 자동으로 변경 되도록 numeric_detection를 true로 설정합니다. shards 및 replicas도 적절히 설정합니다.
{ "logstash-api": { "order": 0, "template": "logstash-api-*", "settings": { "index.mapping.total_fields.limit": 2000, "index": { "number_of_shards": "3", "number_of_replicas": "1" } }, "mappings": { "_default_": { "numeric_detection": true, "_all": { "enabled": false } } } } }
Kibana Console에서 아래와 같이 REST API를 이용하여 업데이트합니다.
PUT _template/logstash_api_template { "logstash-api": { "order": 0, "template": "logstash-api-*", "settings": { "index.mapping.total_fields.limit": 2000, "index": { "number_of_shards": "3", "number_of_replicas": "1" } }, "mappings": { "_default_": { "numeric_detection": true, "_all": { "enabled": false } } } } }
최종 Logstash Config 및 Index Template 파일은 아래의 GitHub를 참조 하시기 바랍니다.
** 추가 업데이트
추가로 Elasticsearch에 데이터 insert중에 아래와 같은 오류가 발생시 다음과 같이 total field limit을 변경하면 됩니다.
"limit of total fields [1000] in index" has been exceeded. "
=> "index.mapping.total_fields.limit": 2000
* 참고: https://discuss.elastic.co/t/total-fields-limit-setting/53004/2
** 추가 업데이트
XML에서 특정 XPath데이터를 가져와서 최종 output을 만드는 config 파일입니다.
또한 최종 output plugin에서 특정 field값이 없을때는 Elasticsearch로 insert하지 않도록 if문을 추가하였습니다.
input { jdbc { jdbc_driver_library => "C:\oracle-jdbc-driver\ojdbc6.jar" jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" jdbc_connection_string => "jdbc:oracle:thin:@192.168.0.1:1500/MYDB" jdbc_user => "admin" jdbc_password => "1234" statement => "select * from tbl_users where idx > :sql_last_value" use_column_value => true tracking_column => idx schedule => "* * * * *" } } filter { xml { source => "xml" force_array => "false" store_xml => "false" xpath => [ "/Users/Account/UserName", "UserName", "/Users/Account/Password", "Password", "/Users/Account/OtherInfo", "OtherInfo", ] remove_field => [ "xml" ] } xml { source => "UserName" force_array => "false" target => "UserName" } xml { source => "Password" force_array => "false" target => "Password" } xml { source => "OtherInfo" force_array => "false" target => "OtherInfo" } } output { stdout { codec => rubydebug } # Insert if OtherInfo XML data exists if [OtherInfo] { elasticsearch { hosts => ["192.168.0.2:9200"] index => ["logstash-customer-%{+YYYY.MM.dd}"] document_type => ["user"] document_id => ["%{idx}"] # prevent from dulicating records from Oracle DB } } }
<GitHub>
https://github.com/asyncbridge/analytics/blob/master/ELK/LogstashFilter/lostash-filter.conf
https://github.com/asyncbridge/analytics/blob/master/ELK/LogstashFilter/lostash-filter-xml.conf
https://github.com/asyncbridge/analytics/blob/master/ELK/LogstashFilter/default_index_template.txt
- Total
- Today
- Yesterday
- Library
- #TensorFlow
- 도커
- ILoop Engine
- ate
- Physical Simulation
- Mask R-CNN
- belief
- #ApacheSpark
- Game Engine
- docker
- Meow
- Jekyll and Hyde
- SSM
- Memorize
- English
- OST
- Ragdoll
- some time ago
- sentence test
- GOD
- project
- #REST API
- Badge
- 2D Game
- Worry
- #ApacheZeppelin
- aws #cloudfront
- #ELK Stack
- Sea Bottom
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |