部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)
-
部署ZK
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
-
部署Kafka
docker run -d --name xdclass_kafka \ -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ --env KAFKA_HEAP_OPTS=-Xmx256M \ --env KAFKA_HEAP_OPTS=-Xms128M \ -e KAFKA_ZOOKEEPER_CONNECT=[内网ip]:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[外网ip]:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:2.13-2.7.0
-
采用Slf4j采集日志(lombok)
-
需求
- 控制台输出访问日志,方便测试
- 业务数据实际输出到kafka
- 常用的框架 log4j、logback、self4j等
-
log4j、logback、self4j 之间有啥关系
-
SLF4J(Simple logging Facade for Java) 门面设计模式 |外观设计模式
-
把不同的日志系统的实现进行了具体的抽象化,提供统一的日志使用接口
-
具体的日志系统就有log4j,logback等;
-
logback也是log4j的作者完成的,有更好的特性,可以取代log4j的一个日志框架, 是slf4j的原生实现
-
log4j、logback可以单独的使用,也可以绑定slf4j一起使用
-
-
编码规范建议不直接用log4j、logback的API,应该用self4j, 日后更换框架所带来的成本就很低
-
-
依赖引入
<!-- 代码自动生成依赖 end--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--Springboot项目整合spring-kafka依赖包配置--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
-
kafka配置application.properties
#----------kafka配置-------------- spring.kafka.bootstrap-servers=[外网ip]:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
-
logback.xml配置
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOG_HOME" value="./data/logs/link" /> <!--采用打印到控制台,记录日志的方式--> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern> </encoder> </appender> <!-- 采用保存到日志文件 记录日志的方式--> <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/link.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/link-%d{yyyy-MM-dd}.log</fileNamePattern> </rollingPolicy> <encoder> <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern> </encoder> </appender> <!-- 指定某个类单独打印日志 --> <logger name="net.xdclass.service.impl.LogServiceImpl" level="INFO" additivity="false"> <appender-ref ref="rollingFile" /> <appender-ref ref="console" /> </logger> <root level="info" additivity="false"> <appender-ref ref="console" /> </root> </configuration>
-
LogServiceImpl
@Service @Slf4j public class LogServiceImpl implements LogService { // Kafka:topic private static final String TOPIC_NAME = "ods_link_visit_topic"; @Autowired private KafkaTemplate kafkaTemplate; /** * 记录日志 * * @param request * @param shortLinkCode * @param accountNo * @return */ @Override public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) { // ip、 浏览器信息 String ip = CommonUtil.getIpAddr(request); // 全部请求头 Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request); Map<String,String> availableMap = new HashMap<>(); availableMap.put("user-agent",headerMap.get("user-agent")); availableMap.put("referer",headerMap.get("referer")); availableMap.put("accountNo",accountNo.toString()); LogRecord logRecord = LogRecord.builder() //日志类型 .event(LogTypeEnum.SHORT_LINK_TYPE.name()) //日志内容 .data(availableMap) //客户端ip .ip(ip) // 时间 .ts(CommonUtil.getCurrentTimestamp()) //业务唯一标识(短链码) .bizId(shortLinkCode).build(); String jsonLog = JsonUtil.obj2Json(logRecord); //打印日志 in 控制台 log.info(jsonLog); // 发送kafka kafkaTemplate.send(TOPIC_NAME,jsonLog); } }
-
kafka命令
``` 创建topic ./kafka-topics.sh --create --zookeeper 172.17.0.1:2181 --replication-factor 1 --partitions 1 --topic ods_link_visit_topic 查看topic ./kafka-topics.sh --list --zookeeper 172.17.0.1:2181 删除topic ./kafka-topics.sh --zookeeper 172.17.0.1:2181 --delete --topic ods_link_visit_topic 消费者消费消息 ./kafka-console-consumer.sh --bootstrap-server 192.168.75.146:9092 --from-beginning --topic ods_link_visit_topic 生产者发送消息 ./kafka-console-producer.sh --broker-list 192.168.75.146:9092 --topic ods_link_visit_topic ```
-
测试
@Controller @Slf4j public class LinkApiController { @Autowired private ShortLinkService shortLinkService; @Autowired private LogService logService; /** * * @param shortLinkCode * @param request * @param response */ @GetMapping(path = "/test") public void dispatch(HttpServletRequest request, HttpServletResponse response) { log.info("短链:{}", shortLinkCode); logService.recodeShortLinkLog(request, shortLinkCode, shortLinkVO.getAccountNo()); }