作为公司数据资产的重要组成部分,日志在系统的可观察性、网络安全和数据分析方面扮演着关键角色。日志记录是故障排除的首选工具,也是提升系统安全性的重要参考。日志还是一个宝贵的数据源,通过对其进行分析,可以获取指导业务增长的有价值信息。
日志是计算机系统中事件的顺序记录。一个理想的日志分析系统应该是:
数据行业内常用的日志处理解决方案是ELK技术栈:Elasticsearch、Logstash和Kibana。该流程可分为五个模块:
图片
ELK堆栈具有优秀的实时处理能力,但也存在一些问题。
Elasticsearch中的索引映射定义了表的结构,包括字段名称、数据类型以及是否启用索引创建。
图片
Elasticsearch还拥有自动根据输入的JSON数据添加字段到映射的动态映射机制。这提供了某种程度的无模式支持,但这还不够,因为:
Elasticsearch拥有独特的领域特定语言(DSL),与大多数数据工程师和分析师熟悉的技术栈非常不同,所以存在陡峭的学习曲线。此外,Elasticsearch相对封闭的生态系统,在与BI工具集成方面会遇到一些阻力。最重要的是,Elasticsearch仅支持单表分析,滞后于现代OLAP对多表连接、子查询和视图的需求。
图片
Elasticsearch用户一直在抱怨计算和存储成本。根本原因在于Elasticsearch的工作方式。
随着数据量和集群规模的增长,保持稳定性会成为另一个问题:
在反思基于Elasticsearch的解决方案的优点和局限性后,Apache Doris开发人员对Apache Doris进行了日志处理的优化。
Elasticsearch的官方测试工具ES Rally进行的基准测试显示,Apache Doris在数据写入方面比Elasticsearch快约5倍,在查询方面快约2.3倍,并且仅消耗Elasticsearch使用存储空间的1/5。在HTTP日志的测试数据集上,它实现了550 MB/s的写入速度和10:1的压缩比。
图片
下图显示了一个典型的基于Doris的日志处理系统的样貌。它更加全面,从数据摄取、分析到应用,都可以更灵活地使用:
图片
此外,Apache Doris具有更好的无模式支持和更用户友好的分析引擎。
首先,在数据类型上进行优化。通过矢量化优化了字符串搜索和正则表达式匹配的文本性能,性能提升了2~10倍。对于JSON字符串,Apache Doris将其解析并存储为更紧凑和高效的二进制格式,可以加快查询速度4倍。还为复杂数据添加了一种新的数据类型:Array Map。它可以将连接的字符串进行结构化,以实现更高的压缩率和更快的查询速度。
其次,Apache Doris支持模式演化。这意味着可以根据业务变化调整模式。可以添加或删除字段和索引,并更改字段的数据类型。
Apache Doris提供了轻量级的模式更改功能,因此开发人员可以在几毫秒内添加或删除字段:
-- 添加列。结果会在毫秒级返回。
ALTER TABLE lineitem ADD COLUMN l_new_column INT;
还可以仅为目标字段添加索引,以避免不必要的索引创建带来的开销。在添加索引后,默认情况下,系统将为所有增量数据生成索引,并且可以指定需要索引的历史数据分区。
-- 添加倒排索引。Doris会为以后的所有新数据生成倒排索引。
ALTER TABLE table_name ADD INDEX index_name(column_name) USING INVERTED;
-- 为指定的历史数据分区构建索引。
BUILD INDEX index_name ON table_name PARTITIONS(partition_name1, partition_name2);
基于SQL的分析引擎确保数据工程师和分析师能够在短时间内轻松掌握Apache Doris,并将其在SQL方面的经验应用到这个OLAP引擎中。借助SQL的丰富功能,用户可以执行数据检索、聚合、多表连接、子查询、UDF、逻辑视图和物化视图,以满足自身需求。
Apache Doris具备MySQL兼容性,可以与大数据生态系统中的大多数GUI和BI工具集成,使用户能够实现更复杂和多样化的数据分析。
一家游戏公司已经从ELK技术栈转向了Apache Doris解决方案。他们基于Doris的日志系统所需的存储空间只有之前的1/6。
一家网络安全公司利用Apache Doris中的倒排索引构建了他们的日志分析系统,支持每秒写入30万行数据,仅使用以前所需的1/5服务器资源。
现在按照以下三个步骤来构建一个基于Apache Doris的日志分析系统。
在开始之前,从官方网站下载Apache Doris 2.0或更新版本,并部署集群。
这是一个表格创建的示例。
对配置的解释:
CREATE DATABASE log_db;
USE log_db;
CREATE RESOURCE "log_s3"
PROPERTIES
(
"type" = "s3",
"s3.endpoint" = "your_endpoint_url",
"s3.region" = "your_region",
"s3.bucket" = "your_bucket",
"s3.root.path" = "your_path",
"s3.access_key" = "your_ak",
"s3.secret_key" = "your_sk"
);
CREATE STORAGE POLICY log_policy_1day
PROPERTIES(
"storage_resource" = "log_s3",
"cooldown_ttl" = "86400"
);
CREATE TABLE log_table
(
``ts` DATETIMEV2,
``clientip` VARCHAR(20),
``request` TEXT,
``status` INT,
``size` INT,
INDEX idx_size (`size`) USING INVERTED,
INDEX idx_status (`status`) USING INVERTED,
INDEX idx_clientip (`clientip`) USING INVERTED,
INDEX idx_request (`request`) USING INVERTED PROPERTIES("parser" = "english")
)
ENGINE = OLAP
DUPLICATE KEY(`ts`)
PARTITION BY RANGE(`ts`) ()
DISTRIBUTED BY RANDOM BUCKETS AUTO
PROPERTIES (
"replication_num" = "1",
"storage_policy" = "log_policy_1day",
"deprecated_dynamic_schema" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "7",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "AUTO",
"dynamic_partition.replication_num" = "1"
);
Apache Doris支持多种数据导入方法。对于实时日志,推荐以下三种方法:
使用Kafka进行数据摄取
对于写入Kafka消息队列的JSON日志,创建常规加载(Routine Load),以便Doris从Kafka中拉取数据。以下是示例。property.*
配置为可选配置:
-- 准备Kafka集群和主题("log_topic")
-- 创建常规加载,从Kafka的 log_topic 加载数据到 "log_table"
CREATE ROUTINE LOAD load_log_kafka ON log_db.log_table
COLUMNS(ts, clientip, request, status, size)
PROPERTIES (
"max_batch_interval" = "10",
"max_batch_rows" = "1000000",
"max_batch_size" = "109715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA (
"kafka_broker_list" = "host:port",
"kafka_topic" = "log_topic",
"property.group.id" = "your_group_id",
"property.security.protocol"="SASL_PLAINTEXT",
"property.sasl.mechanism"="GSSAPI",
"property.sasl.kerberos.service.name"="kafka",
"property.sasl.kerberos.keytab"="/path/to/xxx.keytab",
"property.sasl.kerberos.principal"="xxx@yyy.com"
);
可以通过SHOW ROUTINE LOAD
命令查看常规加载的运行情况。
通过Logstash进行数据导入
配置Logstash的HTTP输出,然后通过HTTP Stream Load将数据发送到Doris。
1) 在logstash.yml
中指定批量大小和批量延迟,以提高数据写入性能。
pipeline.batch.size: 100000
pipeline.batch.delay: 10000
2) 在日志收集配置文件testlog.conf
中添加HTTP输出,URL为Doris中的Stream Load地址。
http basic auth
,使用echo -n 'username:password' | base64
进行计算。load_to_single_tablet
可以减少数据摄取中的小文件数量。output {
http {
follow_redirects => true
keepalive => false
http_method => "put"
url => "http://172.21.0.5:8640/api/logdb/logtable/_stream_load"
headers => [
"format", "json",
"strip_outer_array", "true",
"load_to_single_tablet", "true",
"Authorization", "Basic cm9vdDo=",
"Expect", "100-continue"
]
format => "json_batch"
}
}
通过自定义程序进行数据摄取
以下是通过HTTP Stream Load将数据摄取到Doris的示例。
注意:
curl \
--location-trusted \
-u username:password \
-H "format:json" \
-H "read_json_by_line:true" \
-H "load_to_single_tablet:true" \
-T logfile.json \
http://fe_host:fe_http_port/api/log_db/log_table/_stream_load
Apache Doris支持标准SQL,因此可以通过MySQL客户端或JDBC连接到Doris,然后执行SQL查询。
mysql -h fe_host -P fe_mysql_port -u root -Dlog_db
一些常见的日志分析查询:
SELECT * FROM log_table ORDER BY ts DESC LIMIT 10;
SELECT * FROM log_table WHERE clientip = '8.8.8.8' ORDER BY ts DESC LIMIT 10;
SELECT * FROM log_table WHERE request MATCH_ANY 'error 404' ORDER BY ts DESC LIMIT 10;
SELECT * FROM log_table WHERE request MATCH_ALL 'image faq' ORDER BY ts DESC LIMIT 10;
如果需要一种高效的日志分析解决方案,Apache Doris是非常友好的选择,尤其适合那些具备SQL知识的读者。相比ELK堆栈,使用Apache Doris可以获得更好的无模式支持,实现更快的数据写入和查询,并且减少存储负担。