部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)

  • 部署ZK

    docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
    
  • 部署Kafka

    	docker run -d --name xdclass_kafka \
    	-p 9092:9092 \
    	-e KAFKA_BROKER_ID=0 \
    	--env KAFKA_HEAP_OPTS=-Xmx256M \
    	--env KAFKA_HEAP_OPTS=-Xms128M \
    	-e KAFKA_ZOOKEEPER_CONNECT=[内网ip]:2181 \
    	-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[外网ip]:9092 \
    	-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:2.13-2.7.0
    
  • 采用Slf4j采集日志(lombok)

  • 需求

    • 控制台输出访问日志,方便测试
    • 业务数据实际输出到kafka
    • 常用的框架 log4j、logback、self4j等
  • log4j、logback、self4j 之间有啥关系

    • SLF4J(Simple logging Facade for Java) 门面设计模式 |外观设计模式

      • 把不同的日志系统的实现进行了具体的抽象化,提供统一的日志使用接口

      • 具体的日志系统就有log4j,logback等;

      • logback也是log4j的作者完成的,有更好的特性,可以取代log4j的一个日志框架, 是slf4j的原生实现

      • log4j、logback可以单独的使用,也可以绑定slf4j一起使用

    • 编码规范建议不直接用log4j、logback的API,应该用self4j, 日后更换框架所带来的成本就很低

  • 依赖引入

    <!-- 代码自动生成依赖 end-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--Springboot项目整合spring-kafka依赖包配置-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
  • kafka配置application.properties

    #----------kafka配置--------------
    spring.kafka.bootstrap-servers=[外网ip]:9092
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
  • logback.xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <property name="LOG_HOME" value="./data/logs/link" />
    
        <!--采用打印到控制台,记录日志的方式-->
        <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
            </encoder>
        </appender>
    
        <!-- 采用保存到日志文件 记录日志的方式-->
        <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <file>${LOG_HOME}/link.log</file>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <fileNamePattern>${LOG_HOME}/link-%d{yyyy-MM-dd}.log</fileNamePattern>
            </rollingPolicy>
            <encoder>
                <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
            </encoder>
        </appender>
    
    
        <!-- 指定某个类单独打印日志 -->
        <logger name="net.xdclass.service.impl.LogServiceImpl"
                level="INFO" additivity="false">
            <appender-ref ref="rollingFile" />
            <appender-ref ref="console" />
        </logger>
    
        <root level="info" additivity="false">
            <appender-ref ref="console" />
        </root>
    
    </configuration>
    
  • LogServiceImpl

    @Service
    @Slf4j
    public class LogServiceImpl implements LogService {
    	
    	// Kafka:topic
        private static final String TOPIC_NAME = "ods_link_visit_topic";
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        /**
         * 记录日志
         *
         * @param request
         * @param shortLinkCode
         * @param accountNo
         * @return
         */
        @Override
        public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
            // ip、 浏览器信息
            String ip = CommonUtil.getIpAddr(request);
            // 全部请求头
            Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);
    
            Map<String,String> availableMap = new HashMap<>();
            availableMap.put("user-agent",headerMap.get("user-agent"));
            availableMap.put("referer",headerMap.get("referer"));
            availableMap.put("accountNo",accountNo.toString());
    
            LogRecord logRecord = LogRecord.builder()
                    //日志类型
                    .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                    //日志内容
                    .data(availableMap)
                    //客户端ip
                    .ip(ip)
                    // 时间
                    .ts(CommonUtil.getCurrentTimestamp())
                    //业务唯一标识(短链码)
                    .bizId(shortLinkCode).build();
    
            String jsonLog = JsonUtil.obj2Json(logRecord);
    
            //打印日志 in 控制台
            log.info(jsonLog);
    
            // 发送kafka
            kafkaTemplate.send(TOPIC_NAME,jsonLog);
    
        }
    }
    
  • kafka命令

    ```
    创建topic
    ./kafka-topics.sh --create --zookeeper 172.17.0.1:2181 --replication-factor 1 --partitions 1 --topic ods_link_visit_topic
    
    查看topic
    ./kafka-topics.sh --list --zookeeper 172.17.0.1:2181
    
    删除topic
    ./kafka-topics.sh --zookeeper 172.17.0.1:2181 --delete --topic ods_link_visit_topic
    
    消费者消费消息
    ./kafka-console-consumer.sh --bootstrap-server 192.168.75.146:9092 --from-beginning --topic ods_link_visit_topic
    
    生产者发送消息
    ./kafka-console-producer.sh --broker-list 192.168.75.146:9092  --topic ods_link_visit_topic
    ```
    
    
  • 测试

    @Controller
    @Slf4j
    public class LinkApiController {
        @Autowired
        private ShortLinkService shortLinkService;
    
        @Autowired
        private LogService logService;
    
    
        /**
         *
         * @param shortLinkCode
         * @param request
         * @param response
         */
        @GetMapping(path = "/test")
        public void dispatch(HttpServletRequest request, HttpServletResponse response) {
    
            log.info("短链:{}", shortLinkCode);
            logService.recodeShortLinkLog(request, shortLinkCode, shortLinkVO.getAccountNo());
            
        }
    
    

热门相关:恭喜你被逮捕了   未来兽世:买来的媳妇,不生崽   今天也没变成玩偶呢   拒嫁豪门,前妻太抢手   异世修真邪君