博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
logstash.conf 配置:input kafka,filter,output elasticsearch/mysql
阅读量:4328 次
发布时间:2019-06-06

本文共 5075 字,大约阅读时间需要 16 分钟。

# Tuning and Profiling Logstash Performance:# pipeline.workers, pipeline.batch.size, pipeline.batch.delay

logstash -input:

input {		stdin { codec => plain { charset => "GBK" } }		# metadata :	# [@metadata][kafka][topic]: Original Kafka topic from where the message was consumed.	# [@metadata][kafka][consumer_group]: Consumer group	# [@metadata][kafka][partition]: Partition info for this message.	# [@metadata][kafka][offset]: Original record offset for this message.	# [@metadata][kafka][key]: Record key, if any.	# [@metadata][kafka][timestamp]: Timestamp when this message was received by the Kafka broker.	# common options:	# add_field,codec,enable_metric,id,tags,type	# 	kafka {		id => "my_plugin_id"		bootstrap_servers => ["192.168.117.191:9092"]			topics => ["topic_name"]  	# kafka.topic		#group_id => "logstash" 	# default logstash 		client_id => "cmd"			# default logstash		consumer_threads => 5 		auto_offset_reset => "latest"	# earliest,latest		decorate_events => true		# metadata		enable_auto_commit => true	# when the process fails: offset_reset		#codec => "json"				# request_timeout_ms ,retry_backoff_ms => 			}}

logstash-filter:

filter {  # common options:add_field,add_tag,id,remove_field,remove_tag  alter {    # "field_name": "value1"	coalesce => [       "field_name", "value1", "value2", "value3", ...    ]	# change value "field_name": "new_value"	condrewrite => [	  "field_name", "expected_value", "new_value",	  "field_name2", "expected_value2", "new_value2",    ]	condrewriteother => [	  "field_name", "expected_value", "field_name_to_change", "value",	  "field_name2", "expected_value2", "field_name_to_change2", "value2",    ]	    add_field => {      "foo_%{somefield}" => "Hello world, from %{host}"      "new_field" => "new_static_value"    }  }  ### ## ### ## ### ## Date formats ## ### ## ### ## ###   #  # "Apr 17 09:32:01"			MMM dd HH:mm:ss  # 1366125117000				UNIX_MS   # 1326149001.132				UNIX   # "2011-04-19T03:44:01.103Z"	ISO8601   date {	#match [ field, formats... ]	#Example match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]	match => [ "logdate", "MMM dd yyyy HH:mm:ss","MMM  d yyyy HH:mm:ss", "ISO8601" ]		add_field => {      "foo_%{somefield}" => "Hello world, from %{host}"      "new_field" => "new_static_value"    }	remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]  } date {       # [field, to_format, input_format]     match => ["create_at", "yyyy-MM-dd HH:mm:ss,SSS", "UNIX"]     target => "@timestamp"      locale => "cn" }    ### ## ### ## ### ## Dissect ## ### ## ### ## ###   #   # unstructured loginfo  dissect {    mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}" }  }  ### ## ### ## ### ## Grok ## ### ## ### ## ###   #   # HTTP  # syntax %{SYNTAX:SEMANTIC} dataType: field_name  # 55.3.244.1 GET /index.html 15824 0.043  grok {    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }  }    ### ## ### ## ### ## Josn ## ### ## ### ## ###   #  json {    source => "message"		#required source_field	target => "kfk"			#expanded field "kfk" => {input-keys: input-values}  }    ### ## ### ## ### ## Mutate ## ### ## ### ## ###   # executed in this order:    # coerce >rename >update >replace ...>remove >split >join >merge >copy    # You can control the order by using separate mutate blocks: mutate-1 >mutate-2  #    mutate {	split => ["hostname", "."]	add_field => { "shortHostname" => "%{hostname[0]}" }  }  mutate {	rename => ["shortHostname", "hostname" ]  }    mutate {    #data_type: integer,float,string,boolean,    convert => {      "fieldname" => "integer"      "booleanfield" => "boolean"    }		copy => { "source_field" => "dest_field" }	rename => { "HOSTORIP" => "client_ip" }		join => { "fieldname" => "," }	split => { "fieldname" => "," }		replace => { "message" => "%{source_host}: My new message" }	update => { "sample" => "My new message" } #If the field does not exist, then no action will be taken.	  }}     

logstash-output:

output {  # common options: codec,enable_metric,id  elasticsearch {    hosts => ["http://localhost:9200"]    index => "logstash_output-%{+YYYY.MM.dd}"    #user => "elastic"    #password => "changeme"  }    jdbc {    driver_jar_path => "D:/Program Files/Maven/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar"    driver_class => "com.mysql.jdbc.Driver"    connection_string => "jdbc:mysql://localhost:3306/test?user=root&password=root"    statement => [ "insert into logstash_stdout (TEST_TIME ,TEST_HOST,MESSAGES) values (?,?,?)","%{@timestamp}" ,"%{host}","%{message}" ]  }  stdout {}}

 mutate event sample:

input { stdin { } }filter {  mutate { add_field => { "show" => "This data will be in the output" } }  # @metadata.test = "Hello"  mutate { add_field => { "[@metadata][test]" => "Hello" } }  mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }}output {  if [@metadata][test] == "Hello" {    stdout { codec => rubydebug { metadata => true } }  }}

  

 

转载于:https://www.cnblogs.com/andea/p/11183304.html

你可能感兴趣的文章
第四阶段 15_Linux tomcat安装与配置
查看>>
NAS 创建大文件
查看>>
学习笔记-模块之xml文件处理
查看>>
接口测试用例
查看>>
面试:用 Java 实现一个 Singleton 模式
查看>>
Sybase IQ导出文件的几种方式
查看>>
案例:手动输入一个字符串,打散放进一个列表,小写字母反序 大写字母保持不变...
查看>>
linux 系统下 tar 的压缩与解压缩命令
查看>>
阿里负载均衡,配置中间证书问题(在starcom申请免费DV ssl)
查看>>
转:How to force a wordbreaker to be used in Sharepoint Search
查看>>
MySQL存储过程定时任务
查看>>
Python中and(逻辑与)计算法则
查看>>
POJ 3267 The Cow Lexicon(动态规划)
查看>>
设计原理+设计模式
查看>>
音视频处理
查看>>
tomcat 7服务器跨域问题解决
查看>>
前台实现ajax 需注意的地方
查看>>
Jenkins安装配置
查看>>
个人工作总结05(第二阶段)
查看>>
Java clone() 浅拷贝 深拷贝
查看>>