使用 Spring Boot 和 Kafka Streams 进行实时数据处理
创始人
2025-07-03 14:31:42
0

Spring Boot 和 Apache Kafka Streams 是两个强大的工具,它们使开发人员能够创建可靠且可扩展的实时数据处理应用程序。在这篇文章中,我们将了解 Spring Boot 和 Kafka Streams 如何协同工作,如何利用流处理来发挥应用程序的优势。还将探索交互式查询,这是一个相对较新且有趣的功能,为实时数据分析提供了新的机会。

安装Kafka

Kafka可以从官方网站https://kafka.apache.org/downloads下载。一旦 Kafka 启动并运行,就创建一个主题。

创建Spring Boot项目

创建一个新的 Spring Boot 项目,并且引入“Spring Web”和“Spring for Apache Kafka”两个依赖项。

@SpringBootApplication
public class KafkaStreamsDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsDemoApplication.class, args);
    }
}

配置Kafka

接下来,在应用程序的 application.properties 文件中配置 Kafka 创建的主题和代理地址。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest

创建 Kafka 流处理器

下一步是构建一个 Kafka Streams 处理器,从“my-topic”读取消息并处理,然后将结果输出到另一个主题。使用 KStream API 来处理逻辑,如下:

@Bean
public Function, KStream> process() {
    return input -> input
            .mapValues(value -> value.toUpperCase())
            .to("output-topic");
}

交互式查询

交互式查询是 Kafka Streams 的创新新功能之一。借助此功能,可以立即查询 Kafka Streams 应用程序的状态存储。让我们看看如何使用交互式查询检索存储在状态存储中的大写消息的数量。

@Autowiredprivate 
InteractiveQueryService interactiveQueryService;

@GetMapping("/messageCount")
public long getMessageCount() {
  ReadOnlyKeyValueStore store = interactiveQueryService.getQueryableStore("message-count-store", QueryableStoreTypes.keyValueStore());   
  return store.get("uppercase-message-count");
}

在此代码中,我们使用 InteractiveQueryService 来获取“message-count-store”的状态存储的句柄,然以查询该存储来获取大写消息的计数。

发送数据到Kafka

在实际应用程序中,数据将从多个源发送到 Kafka。在本示例中,我们将使用一个简单的 Kafka 生产者来与“my-topic”进行通信。

@Autowired
private KafkaTemplate kafkaTemplate;

public void produceMessage(String message) {
    kafkaTemplate.send("my-topic", message);
}

使用处理后的数据

使用 Kafka 消费者最终从“output-topic”接收编辑后的数据,如下:

@KafkaListener(topics = "output-topic", groupId = "my-group")
public void consume(String message) {
    System.out.println("Received: " + message);
}

总结

在本文中,我们了解了如何使用 Spring Boot 和 Kafka Streams 创建用于实时数据处理的应用程序,并且引入了交互式查询这一有趣的新功能。借助交互式查询,可以通过处理实时数据以及实时查询 Kafka Streams 应用程序的状态来创建交互式动态应用程序。

相关内容

热门资讯

如何允许远程连接到MySQL数... [[277004]]【51CTO.com快译】默认情况下,MySQL服务器仅侦听来自localhos...
如何利用交换机和端口设置来管理... 在网络管理中,总是有些人让管理员头疼。下面我们就将介绍一下一个网管员利用交换机以及端口设置等来进行D...
施耐德电气数据中心整体解决方案... 近日,全球能效管理专家施耐德电气正式启动大型体验活动“能效中国行——2012卡车巡展”,作为该活动的...
Windows恶意软件20年“... 在Windows的早期年代,病毒游走于系统之间,偶尔删除文件(但被删除的文件几乎都是可恢复的),并弹...
20个非常棒的扁平设计免费资源 Apple设备的平面图标PSD免费平板UI 平板UI套件24平图标Freen平板UI套件PSD径向平...
德国电信门户网站可实时显示全球... 德国电信周三推出一个门户网站,直观地实时提供其安装在全球各地的传感器网络检测到的网络攻击状况。该网站...
着眼MAC地址,解救无法享受D... 在安装了DHCP服务器的局域网环境中,每一台工作站在上网之前,都要先从DHCP服务器那里享受到地址动...
为啥国人偏爱 Mybatis,... 关于 SQL 和 ORM 的争论,永远都不会终止,我也一直在思考这个问题。昨天又跟群里的小伙伴进行...