" />

警告:即将离开本站

点击"继续"将前往其他页面,确认后跳转。

侧边栏壁纸
  • 累计撰写 19 篇文章
  • 累计创建 2 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

自定义KafkaLogAppender搭建日志管理分析系统

dengdz
2025-01-02 / 0 评论 / 0 点赞 / 23 阅读 / 0 字

1. 项目背景

在生产环境中,我们的 Flink 应用程序运行过程中遇到了以下日志管理难题:

  1. 日志体量大

  • 程序长期运行产生海量日志

  • 单个日志文件过大,难以获取和分析

  • 存储和检索效率低下

  1. 日志检索困难

  • 缺乏有效的检索机制

  • 错误定位耗时

  • 系统监控不及时

2. 解决方案

针对以上问题,我们提出了两种解决方案:

2.1. 方案一:HTTP接口方案

  • 提供统一的日志存储接口

  • 程序通过API调用存储日志

  • 集中化管理日志数据

2.2. 方案二:Kafka方案(已采用)

  • 自定义Log4j2 Appender

  • 异步推送日志到Kafka

  • 使用Flink任务进行日志处理和存储

方案对比

评估维度

方案一:HTTP接口

方案二:Kafka方案

代码侵入性

需要修改现有代码

仅需配置变更

性能影响

同步调用可能造成阻塞

异步发送,低延迟

可靠性

依赖接口服务稳定性

高可用,支持消息持久化

扩展性

受限于接口服务能力

高吞吐,易扩展

维护成本

需要维护接口服务

依赖现有Kafka集群

2.3. 最终选型

最终选择采用方案二,主要基于以下考虑:

  1. 低侵入性:保持原有的日志使用习惯,仅需添加配置

  2. 高性能:Kafka的高吞吐特性适合处理大规模日志

  3. 可靠性:消息队列机制确保日志不丢失

  4. 扩展性:便于后续接入其他数据处理系统

通过这套日志系统的建设,不仅解决了现有的日志管理问题,也为后续的系统监控和问题诊断提供了基础支持。

3. 消息格式设计

字段名

类型

必填

说明

示例

timestamp

long

日志产生的时间戳(毫秒)

1635724800000

level

String

日志级别

"INFO"/"WARN"/"ERROR"

logger

String

记录日志的类名

"com.example.YourClass"

message

String

日志主要内容

"这是一条信息日志"

threadName

String

产生日志的线程名

"main"

errorMessage

String

异常信息,仅在发生异常时存在

"NullPointerException"

contextData

Map<String,String>

上下文信息,包含额外的环境数据

{"applicationName": "TestApp"}

4. maven依赖

添加下面所需要的maven依赖

<!-- Log4j2 核心依赖 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.17.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.2</version>
        </dependency>
        
        <!-- SLF4J 依赖 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.17.2</version>
        </dependency>
        
        <!-- Kafka 客户端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
        
        <!-- JSON 处理 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.3</version>
        </dependency>

5. KafkaLogAppender

将自定义的KafkaLogAppender添加到项目中。

package com.example.logging;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Core;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import java.util.Map;
import java.util.Properties;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * Log4j2 Appender for Kafka
 * 将日志消息发送到Kafka主题
 */
@Plugin(
        name = "KafkaLogger",
        category = Core.CATEGORY_NAME,
        elementType = Appender.ELEMENT_TYPE
)
public class KafkaLogAppender extends AbstractAppender {
    private final KafkaProducer<String, String> producer;
    private final String topic;
    private final ObjectMapper mapper;
    private volatile boolean initialized = false;

    protected KafkaLogAppender(String name, Filter filter, String topic, String bootstrapServers) {
        super(name, filter, null);
        Objects.requireNonNull(topic, "Topic must not be null");
        Objects.requireNonNull(bootstrapServers, "Bootstrap servers must not be null");

        this.topic = topic;
        this.producer = createProducer(bootstrapServers);
        this.mapper = new ObjectMapper()
                .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
    }

    /**
     * 创建Kafka Producer实例
     */
    private KafkaProducer<String, String> createProducer(String bootstrapServers) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "1");
        props.put("retries", 3);
        props.put("batch.size", 1024);
        props.put("linger.ms", 0);
        props.put("buffer.memory", 33554432);

        return new KafkaProducer<>(props);
    }

    @PluginFactory
    public static KafkaLogAppender createAppender(
            @PluginAttribute("name") @Required String name,
            @PluginAttribute("topic") @Required String topic,
            @PluginAttribute("bootstrapServers") @Required String bootstrapServers,
            @PluginElement("Filter") final Filter filter) {

        if (name == null) {
            LOGGER.error("No name provided for KafkaLogAppender");
            return null;
        }
        if (topic == null) {
            LOGGER.error("No topic provided for KafkaLogAppender");
            return null;
        }
        if (bootstrapServers == null) {
            LOGGER.error("No bootstrapServers provided for KafkaLogAppender");
            return null;
        }

        return new KafkaLogAppender(name, filter, topic, bootstrapServers);
    }

    @Override
    public void start() {
        super.start();
        initialized = true;
    }

    @Override
    public void append(LogEvent event) {
        if (!initialized) {
            return;
        }

        try {
            String jsonMessage = convertToJson(event);
            sendToKafka(jsonMessage);
        } catch (Exception e) {
            LOGGER.error("Failed to send log message to Kafka", e);
        }
    }

    /**
     * 将LogEvent转换为JSON字符串
     */
    private String convertToJson(LogEvent event) throws Exception {
        LogMessage logMessage = new LogMessage(
                event.getTimeMillis(),
                event.getLevel().toString(),
                event.getLoggerName(),
                event.getMessage().getFormattedMessage(),
                event.getThreadName(),
                event.getThrown() != null ? event.getThrown().getMessage() : null,
                event.getContextData().toMap()
        );

        return mapper.writeValueAsString(logMessage);
    }

    /**
     * 发送消息到Kafka
     */
    private void sendToKafka(String jsonMessage) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonMessage);
        
        try {
            // 同步发送确保消息发送成功
            Future<RecordMetadata> future = producer.send(record);
            producer.flush();
            
            // 等待发送结果
            RecordMetadata metadata = future.get();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Message sent successfully to topic {} partition {} offset {}",
                        metadata.topic(), metadata.partition(), metadata.offset());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while sending message to Kafka", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Failed to send message to Kafka", e.getCause());
        }
    }

    @Override
    public void stop() {
        if (initialized) {
            initialized = false;
            if (producer != null) {
                producer.flush();
                producer.close();
            }
        }
        super.stop();
    }

    static class LogMessage {
        @JsonProperty
        private long timestamp;
        @JsonProperty
        private String level;
        @JsonProperty
        private String logger;
        @JsonProperty
        private String message;
        @JsonProperty
        private String threadName;
        @JsonProperty
        private String errorMessage;
        @JsonProperty
        private Map<String, String> contextData;

        // 构造函数
        public LogMessage(long timestamp, String level, String logger, String message,
                          String threadName, String errorMessage, Map<String, String> contextData) {
            this.timestamp = timestamp;
            this.level = level;
            this.logger = logger;
            this.message = message;
            this.threadName = threadName;
            this.errorMessage = errorMessage;
            this.contextData = contextData;
        }

        // Getter方法
        public long getTimestamp() {
            return timestamp;
        }

        public String getLevel() {
            return level;
        }

        public String getLogger() {
            return logger;
        }

        public String getMessage() {
            return message;
        }

        public String getThreadName() {
            return threadName;
        }

        public String getErrorMessage() {
            return errorMessage;
        }

        public Map<String, String> getContextData() {
            return contextData;
        }

        // Setter方法
        public void setTimestamp(long timestamp) {
            this.timestamp = timestamp;
        }

        public void setLevel(String level) {
            this.level = level;
        }

        public void setLogger(String logger) {
            this.logger = logger;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public void setThreadName(String threadName) {
            this.threadName = threadName;
        }

        public void setErrorMessage(String errorMessage) {
            this.errorMessage = errorMessage;
        }

        public void setContextData(Map<String, String> contextData) {
            this.contextData = contextData;
        }
    }

} 

6. log4j2.xml

添加log4j2.xml日志文件

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
    <Appenders>
        <KafkaLogger name="KafkaLogger"
                  topic="test"
                  bootstrapServers="xxx.xxx.xxx.xxx:9092">
        </KafkaLogger>
        
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%highlight{[%level]} %date{yyyy-MM-dd HH:mm:ss.SSS} %style{%class{1}}{cyan}:[%line] - %msg%n"/>
        </Console>
    </Appenders>
    
    <Loggers>
        <Root level="info">
            <AppenderRef ref="KafkaLogger"/>
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration> 

7. Demo

public class LogTest {
    private static final Logger logger = LoggerFactory.getLogger(LogTest.class);

    public static void main(String[] args) {
        // 添加一些上下文信息
        MDC.put("applicationName", "TestApp");
        MDC.put("serverIp", "xx.xx.xx.xx");

        // 测试不同级别的日志
        logger.info("这是一条信息日志");
        logger.warn("这是一条警告日志");

        try {
            throw new RuntimeException("测试异常");
        } catch (Exception e) {
            logger.error("发生错误", e);
        }

        // 清理MDC
        MDC.clear();
    }
} 

Kafka消息

{
  "timestamp": 1735623272758,
  "level": "INFO",
  "logger": "com.example.logging.LogTest",
  "message": "这是一条信息日志",
  "threadName": "main",
  "errorMessage": null,
  "contextData": {
    "serverIp": "xx.xx.xx.xx",
    "applicationName": "TestApp"
  }
}

8. Flink引入自定义KafkaLogAppender

  1. 消息格式

在 Flink 分布式环境中,需要在日志中标识消息来源节点,因此在 contextData 中添加以下信息:

字段名

说明

示例值

nodeId

节点唯一标识

"tm-1", "jm-1"

nodeType

节点类型

"TaskManager", "JobManager"

nodeHost

节点主机名/IP

"worker-1.example.com"

taskId

Task唯一标识

"source_1_0"

subtaskIndex

子任务索引

"0"

jobId

Flink Job ID

"7b2aad32c2"

  1. log4j2.xml文件中添加flink相关信息

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
    <Properties>
        <!-- 定义全局属性 -->
        <Property name="hostName">${sys:hostname:-unknown}</Property>
        <Property name="applicationName">FlinkLoggerDemo</Property>
    </Properties>

    <Appenders>
        <!-- 控制台输出 -->
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%highlight{[%level]} %date{yyyy-MM-dd HH:mm:ss.SSS} %style{%class{1}}{cyan}:[%line] - %msg%n"/>
        </Console>
        
        <!-- Kafka日志追加器 -->
        <KafkaLogger name="KafkaLogger"
                    topic="t"
                    bootstrapServers="xx.xx.xx.xx:9092">
            <!-- 预配置基础上下文数据 -->
            <ContextData>
                <!-- 只配置基础的、固定的信息 -->
                <applicationName>${applicationName}</applicationName>
                <hostName>${hostName}</hostName>
                <environment>${sys:env:-local}</environment>
            </ContextData>
        </KafkaLogger>
    </Appenders>
    
    <Loggers>
        <!-- 为特定包配置日志级别 -->
        <Logger name="com.example" level="INFO" additivity="false">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="KafkaLogger"/>
        </Logger>
        
        <Root level="info">
            <AppenderRef ref="Console"/>
            <!-- Root logger 不使用 KafkaLogger,避免发送不必要的日志 -->
        </Root>
    </Loggers>
</Configuration> 
  1. Kafka消息

{
  "timestamp": 1735638586447,
  "level": "INFO",
  "logger": "com.example.logging.FlinkLoggerDemo",
  "message": "Processing data batch...",
  "threadName": "Legacy Source Thread - Source: Custom Source -\u003e Map -\u003e Sink: Print to Std. Out (1/1)#0",
  "errorMessage": null,
  "contextData": {
    "jobName": "FlinkLoggerDemo",
    "sourceTask": "CustomSource"
  }
}

9. Spark引入自定义KafkaLogAppender

  1. 消息格式

在 Spark 分布式环境中,与 Flink 作业类似需要在日志中标识消息来源节点,因此采用相同的消息结构。

  1. log4j2.xml文件中添加spark相关信息

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
    <Properties>
        <!-- 定义全局属性 -->
        <Property name="hostName">${sys:hostname:-unknown}</Property>
        <Property name="applicationName">SparkLoggerDemo</Property>
    </Properties>

    <Appenders>
        <!-- 控制台输出 -->
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%highlight{[%level]} %date{yyyy-MM-dd HH:mm:ss.SSS} %style{%class{1}}{cyan}:[%line] - %msg%n"/>
        </Console>
        
        <!-- Kafka日志追加器 -->
        <KafkaLogger name="KafkaLogger"
                    topic="test"
                    bootstrapServers="xx.xx.xx.xx:9092">
            <!-- 预配置基础上下文数据 -->
            <ContextData>
                <!-- 只配置基础的、固定的信息 -->
                <applicationName>${applicationName}</applicationName>
                <hostName>${hostName}</hostName>
                <environment>${sys:env:-local}</environment>
                <sparkAppId>${sys:spark.app.id:-unknown}</sparkAppId>
                <sparkExecutorId>${sys:spark.executor.id:-unknown}</sparkExecutorId>
            </ContextData>
        </KafkaLogger>
    </Appenders>
    
    <Loggers>
        <!-- 为特定包配置日志级别 -->
        <Logger name="com.example" level="INFO" additivity="false">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="KafkaLogger"/>
        </Logger>
        
        <Root level="info">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration> 
  1. Kafka消息

{
  "timestamp": 1735779491711,
  "level": "INFO",
  "logger": "com.example.logging.SparkLoggerDemo",
  "message": "Processing value: 34",
  "threadName": "Executor task launch worker for task 6.0 in stage 0.0 (TID 6)",
  "errorMessage": null,
  "contextData": {
    "componentType": "map",
    "partitionId": "6",
    "jobName":"SparkLoggerDemo"
  }
}

10. 日志存储

日志发送到Kafka后,可以通过flink或者spark作业消费数据将日志存储到合适的存储介质中,例如我的项目中有elasticsearch组件并且还有额外的空间可以使用,或者是其他类型的数据库例如Doris或者MySQL都可以。

CREATE TABLE `application_logs` (
    `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键ID',
    
    `job_type` VARCHAR(10) NOT NULL COMMENT '任务类型:FLINK/SPARK',
    `job_name` VARCHAR(100) NOT NULL COMMENT '任务名称',
    
    -- 日志基本信息
    `timestamp` TIMESTAMP(3) NOT NULL COMMENT '日志时间戳',
    `level` ENUM('TRACE','DEBUG','INFO','WARN','ERROR','FATAL') NOT NULL COMMENT '日志级别',
    `logger` VARCHAR(255) NOT NULL COMMENT '日志记录器名称',
    `message` TEXT NOT NULL COMMENT '日志消息',
    `thread_name` VARCHAR(100) NOT NULL COMMENT '线程名称',
    `error_message` TEXT NULL COMMENT '错误信息',
    
    -- 上下文数据
    `context_data` JSON NULL COMMENT '上下文数据',
    
    `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
    
    PRIMARY KEY (`id`),
    
    -- 查询索引
    INDEX `idx_job` (`job_type`, `job_name`, `timestamp`),
    INDEX `idx_level_time` (`level`, `timestamp`),
    INDEX `idx_timestamp` (`timestamp`)
) 
ENGINE = InnoDB 
DEFAULT CHARSET = utf8mb4 
COLLATE = utf8mb4_unicode_ci
COMMENT = '应用程序日志表';

11. 查询分析

可以通过编写SQL语句查询想要的数据。

例如查询2024-01-01 到 2024-01-02 时间段内的xxxx任务的全部日志:

SELECT
    timestamp,
    job_name,
    message,
    error_message,
    JSON_UNQUOTE(JSON_EXTRACT(context_data, '$.taskId')) as task_id
FROM application_logs
WHERE job_name = 'xxxx'
  AND timestamp BETWEEN '2024-01-01 00:00:00' AND '2024-01-02 00:00:00'
ORDER BY timestamp DESC;

0

评论区