1. 项目背景
在生产环境中,我们的 Flink 应用程序运行过程中遇到了以下日志管理难题:
日志体量大
程序长期运行产生海量日志
单个日志文件过大,难以获取和分析
存储和检索效率低下
日志检索困难
缺乏有效的检索机制
错误定位耗时
系统监控不及时
2. 解决方案
针对以上问题,我们提出了两种解决方案:
2.1. 方案一:HTTP接口方案
提供统一的日志存储接口
程序通过API调用存储日志
集中化管理日志数据
2.2. 方案二:Kafka方案(已采用)
自定义Log4j2 Appender
异步推送日志到Kafka
使用Flink任务进行日志处理和存储
方案对比
2.3. 最终选型
最终选择采用方案二,主要基于以下考虑:
低侵入性:保持原有的日志使用习惯,仅需添加配置
高性能:Kafka的高吞吐特性适合处理大规模日志
可靠性:消息队列机制确保日志不丢失
扩展性:便于后续接入其他数据处理系统
通过这套日志系统的建设,不仅解决了现有的日志管理问题,也为后续的系统监控和问题诊断提供了基础支持。
3. 消息格式设计
4. maven依赖
添加下面所需要的maven依赖
<!-- Log4j2 核心依赖 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
</dependency>
<!-- SLF4J 依赖 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
</dependency>
<!-- Kafka 客户端 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- JSON 处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
5. KafkaLogAppender
将自定义的KafkaLogAppender添加到项目中。
package com.example.logging;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Core;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.util.Map;
import java.util.Properties;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Log4j2 Appender for Kafka
* 将日志消息发送到Kafka主题
*/
@Plugin(
name = "KafkaLogger",
category = Core.CATEGORY_NAME,
elementType = Appender.ELEMENT_TYPE
)
public class KafkaLogAppender extends AbstractAppender {
private final KafkaProducer<String, String> producer;
private final String topic;
private final ObjectMapper mapper;
private volatile boolean initialized = false;
protected KafkaLogAppender(String name, Filter filter, String topic, String bootstrapServers) {
super(name, filter, null);
Objects.requireNonNull(topic, "Topic must not be null");
Objects.requireNonNull(bootstrapServers, "Bootstrap servers must not be null");
this.topic = topic;
this.producer = createProducer(bootstrapServers);
this.mapper = new ObjectMapper()
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
}
/**
* 创建Kafka Producer实例
*/
private KafkaProducer<String, String> createProducer(String bootstrapServers) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "1");
props.put("retries", 3);
props.put("batch.size", 1024);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432);
return new KafkaProducer<>(props);
}
@PluginFactory
public static KafkaLogAppender createAppender(
@PluginAttribute("name") @Required String name,
@PluginAttribute("topic") @Required String topic,
@PluginAttribute("bootstrapServers") @Required String bootstrapServers,
@PluginElement("Filter") final Filter filter) {
if (name == null) {
LOGGER.error("No name provided for KafkaLogAppender");
return null;
}
if (topic == null) {
LOGGER.error("No topic provided for KafkaLogAppender");
return null;
}
if (bootstrapServers == null) {
LOGGER.error("No bootstrapServers provided for KafkaLogAppender");
return null;
}
return new KafkaLogAppender(name, filter, topic, bootstrapServers);
}
@Override
public void start() {
super.start();
initialized = true;
}
@Override
public void append(LogEvent event) {
if (!initialized) {
return;
}
try {
String jsonMessage = convertToJson(event);
sendToKafka(jsonMessage);
} catch (Exception e) {
LOGGER.error("Failed to send log message to Kafka", e);
}
}
/**
* 将LogEvent转换为JSON字符串
*/
private String convertToJson(LogEvent event) throws Exception {
LogMessage logMessage = new LogMessage(
event.getTimeMillis(),
event.getLevel().toString(),
event.getLoggerName(),
event.getMessage().getFormattedMessage(),
event.getThreadName(),
event.getThrown() != null ? event.getThrown().getMessage() : null,
event.getContextData().toMap()
);
return mapper.writeValueAsString(logMessage);
}
/**
* 发送消息到Kafka
*/
private void sendToKafka(String jsonMessage) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonMessage);
try {
// 同步发送确保消息发送成功
Future<RecordMetadata> future = producer.send(record);
producer.flush();
// 等待发送结果
RecordMetadata metadata = future.get();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Message sent successfully to topic {} partition {} offset {}",
metadata.topic(), metadata.partition(), metadata.offset());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while sending message to Kafka", e);
} catch (ExecutionException e) {
throw new RuntimeException("Failed to send message to Kafka", e.getCause());
}
}
@Override
public void stop() {
if (initialized) {
initialized = false;
if (producer != null) {
producer.flush();
producer.close();
}
}
super.stop();
}
static class LogMessage {
@JsonProperty
private long timestamp;
@JsonProperty
private String level;
@JsonProperty
private String logger;
@JsonProperty
private String message;
@JsonProperty
private String threadName;
@JsonProperty
private String errorMessage;
@JsonProperty
private Map<String, String> contextData;
// 构造函数
public LogMessage(long timestamp, String level, String logger, String message,
String threadName, String errorMessage, Map<String, String> contextData) {
this.timestamp = timestamp;
this.level = level;
this.logger = logger;
this.message = message;
this.threadName = threadName;
this.errorMessage = errorMessage;
this.contextData = contextData;
}
// Getter方法
public long getTimestamp() {
return timestamp;
}
public String getLevel() {
return level;
}
public String getLogger() {
return logger;
}
public String getMessage() {
return message;
}
public String getThreadName() {
return threadName;
}
public String getErrorMessage() {
return errorMessage;
}
public Map<String, String> getContextData() {
return contextData;
}
// Setter方法
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public void setLevel(String level) {
this.level = level;
}
public void setLogger(String logger) {
this.logger = logger;
}
public void setMessage(String message) {
this.message = message;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public void setContextData(Map<String, String> contextData) {
this.contextData = contextData;
}
}
}
6. log4j2.xml
添加log4j2.xml日志文件
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<KafkaLogger name="KafkaLogger"
topic="test"
bootstrapServers="xxx.xxx.xxx.xxx:9092">
</KafkaLogger>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%highlight{[%level]} %date{yyyy-MM-dd HH:mm:ss.SSS} %style{%class{1}}{cyan}:[%line] - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="KafkaLogger"/>
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
7. Demo
public class LogTest {
private static final Logger logger = LoggerFactory.getLogger(LogTest.class);
public static void main(String[] args) {
// 添加一些上下文信息
MDC.put("applicationName", "TestApp");
MDC.put("serverIp", "xx.xx.xx.xx");
// 测试不同级别的日志
logger.info("这是一条信息日志");
logger.warn("这是一条警告日志");
try {
throw new RuntimeException("测试异常");
} catch (Exception e) {
logger.error("发生错误", e);
}
// 清理MDC
MDC.clear();
}
}
Kafka消息
{
"timestamp": 1735623272758,
"level": "INFO",
"logger": "com.example.logging.LogTest",
"message": "这是一条信息日志",
"threadName": "main",
"errorMessage": null,
"contextData": {
"serverIp": "xx.xx.xx.xx",
"applicationName": "TestApp"
}
}
8. Flink引入自定义KafkaLogAppender
消息格式
在 Flink 分布式环境中,需要在日志中标识消息来源节点,因此在 contextData 中添加以下信息:
log4j2.xml文件中添加flink相关信息
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Properties>
<!-- 定义全局属性 -->
<Property name="hostName">${sys:hostname:-unknown}</Property>
<Property name="applicationName">FlinkLoggerDemo</Property>
</Properties>
<Appenders>
<!-- 控制台输出 -->
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%highlight{[%level]} %date{yyyy-MM-dd HH:mm:ss.SSS} %style{%class{1}}{cyan}:[%line] - %msg%n"/>
</Console>
<!-- Kafka日志追加器 -->
<KafkaLogger name="KafkaLogger"
topic="t"
bootstrapServers="xx.xx.xx.xx:9092">
<!-- 预配置基础上下文数据 -->
<ContextData>
<!-- 只配置基础的、固定的信息 -->
<applicationName>${applicationName}</applicationName>
<hostName>${hostName}</hostName>
<environment>${sys:env:-local}</environment>
</ContextData>
</KafkaLogger>
</Appenders>
<Loggers>
<!-- 为特定包配置日志级别 -->
<Logger name="com.example" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
<AppenderRef ref="KafkaLogger"/>
</Logger>
<Root level="info">
<AppenderRef ref="Console"/>
<!-- Root logger 不使用 KafkaLogger,避免发送不必要的日志 -->
</Root>
</Loggers>
</Configuration>
Kafka消息
{
"timestamp": 1735638586447,
"level": "INFO",
"logger": "com.example.logging.FlinkLoggerDemo",
"message": "Processing data batch...",
"threadName": "Legacy Source Thread - Source: Custom Source -\u003e Map -\u003e Sink: Print to Std. Out (1/1)#0",
"errorMessage": null,
"contextData": {
"jobName": "FlinkLoggerDemo",
"sourceTask": "CustomSource"
}
}
9. Spark引入自定义KafkaLogAppender
消息格式
在 Spark 分布式环境中,与 Flink 作业类似需要在日志中标识消息来源节点,因此采用相同的消息结构。
log4j2.xml文件中添加spark相关信息
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Properties>
<!-- 定义全局属性 -->
<Property name="hostName">${sys:hostname:-unknown}</Property>
<Property name="applicationName">SparkLoggerDemo</Property>
</Properties>
<Appenders>
<!-- 控制台输出 -->
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%highlight{[%level]} %date{yyyy-MM-dd HH:mm:ss.SSS} %style{%class{1}}{cyan}:[%line] - %msg%n"/>
</Console>
<!-- Kafka日志追加器 -->
<KafkaLogger name="KafkaLogger"
topic="test"
bootstrapServers="xx.xx.xx.xx:9092">
<!-- 预配置基础上下文数据 -->
<ContextData>
<!-- 只配置基础的、固定的信息 -->
<applicationName>${applicationName}</applicationName>
<hostName>${hostName}</hostName>
<environment>${sys:env:-local}</environment>
<sparkAppId>${sys:spark.app.id:-unknown}</sparkAppId>
<sparkExecutorId>${sys:spark.executor.id:-unknown}</sparkExecutorId>
</ContextData>
</KafkaLogger>
</Appenders>
<Loggers>
<!-- 为特定包配置日志级别 -->
<Logger name="com.example" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
<AppenderRef ref="KafkaLogger"/>
</Logger>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
Kafka消息
{
"timestamp": 1735779491711,
"level": "INFO",
"logger": "com.example.logging.SparkLoggerDemo",
"message": "Processing value: 34",
"threadName": "Executor task launch worker for task 6.0 in stage 0.0 (TID 6)",
"errorMessage": null,
"contextData": {
"componentType": "map",
"partitionId": "6",
"jobName":"SparkLoggerDemo"
}
}
10. 日志存储
日志发送到Kafka后,可以通过flink或者spark作业消费数据将日志存储到合适的存储介质中,例如我的项目中有elasticsearch组件并且还有额外的空间可以使用,或者是其他类型的数据库例如Doris或者MySQL都可以。
CREATE TABLE `application_logs` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`job_type` VARCHAR(10) NOT NULL COMMENT '任务类型:FLINK/SPARK',
`job_name` VARCHAR(100) NOT NULL COMMENT '任务名称',
-- 日志基本信息
`timestamp` TIMESTAMP(3) NOT NULL COMMENT '日志时间戳',
`level` ENUM('TRACE','DEBUG','INFO','WARN','ERROR','FATAL') NOT NULL COMMENT '日志级别',
`logger` VARCHAR(255) NOT NULL COMMENT '日志记录器名称',
`message` TEXT NOT NULL COMMENT '日志消息',
`thread_name` VARCHAR(100) NOT NULL COMMENT '线程名称',
`error_message` TEXT NULL COMMENT '错误信息',
-- 上下文数据
`context_data` JSON NULL COMMENT '上下文数据',
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
PRIMARY KEY (`id`),
-- 查询索引
INDEX `idx_job` (`job_type`, `job_name`, `timestamp`),
INDEX `idx_level_time` (`level`, `timestamp`),
INDEX `idx_timestamp` (`timestamp`)
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci
COMMENT = '应用程序日志表';
11. 查询分析
可以通过编写SQL语句查询想要的数据。
例如查询2024-01-01 到 2024-01-02 时间段内的xxxx任务的全部日志:
SELECT
timestamp,
job_name,
message,
error_message,
JSON_UNQUOTE(JSON_EXTRACT(context_data, '$.taskId')) as task_id
FROM application_logs
WHERE job_name = 'xxxx'
AND timestamp BETWEEN '2024-01-01 00:00:00' AND '2024-01-02 00:00:00'
ORDER BY timestamp DESC;
评论区