public class RabbitMQTransactionDemo {
private static final String QUEUE_NAME = "transaction_queue";
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try {
// 开启事务
channel.txSelect();
// 发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 提交事务
channel.txCommit();
} catch (Exception e) {
// 事务回滚
channel.txRollback();
e.printStackTrace();
}
// 关闭信道和连接
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class RocketMQTransactionDemo {
public static void main(String[] args) throws Exception {
// 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("group_name");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务逻辑,根据业务逻辑结果返回相应的状态
// 返回 LocalTransactionState.COMMIT_MESSAGE 表示事务提交
// 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事务回滚
// 返回 LocalTransactionState.UNKNOW 表示事务状态未知
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据消息的状态,来判断本地事务的最终状态
// 返回 LocalTransactionState.COMMIT_MESSAGE 表示事务提交
// 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事务回滚
// 返回 LocalTransactionState.UNKNOW 表示事务状态未知
}
});
// 启动事务消息生产者
producer.start();
// 构造消息
Message msg = new Message("topic_name", "tag_name", "Hello, RocketMQ!".getBytes());
// 发送事务消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("Send Result: " + sendResult);
// 关闭事务消息生产者
producer.shutdown();
}
}
public class KafkaTransactionDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id");
Producer producer = new KafkaProducer<>(props);
// 初始化事务
producer.initTransactions();
try {
// 开启事务
producer.beginTransaction();
// 发送消息
ProducerRecord record = new ProducerRecord<>("topic_name", "Hello, Kafka!");
producer.send(record);
// 提交事务
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 处理异常情况
producer.close();
} finally {
producer.close();
}
}
}
RabbitMQ使用ACK(消息确认)机制来确保消息的可靠传递。消费者收到消息后,需要向RabbitMQ发送ACK来确认消息的处理状态。只有在收到ACK后,RabbitMQ才会将消息标记为已成功传递,否则会将消息重新投递给其他消费者或者保留在队列中。
以下是RabbitMQ ACK的Java示例:
public class RabbitMQAckDemo {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
String queueName = "queue_name";
channel.queueDeclare(queueName, false, false, false, null);
// 创建消费者
String consumerTag = "consumer_tag";
boolean autoAck = false; // 关闭自动ACK
// 消费消息
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消费消息
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
try {
// 模拟处理消息的业务逻辑
processMessage(message);
// 手动发送ACK确认消息
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 处理消息异常,可以选择重试或者记录日志等操作
System.out.println("Failed to process message: " + message);
e.printStackTrace();
// 手动发送NACK拒绝消息,并可选是否重新投递
long deliveryTag = envelope.getDeliveryTag();
boolean requeue = true; // 重新投递消息
channel.basicNack(deliveryTag, false, requeue);
}
}
});
}
private static void processMessage(String message) {
// 模拟处理消息的业务逻辑
}
}
RocketMQ的ACK机制由消费者控制,消费者从消息队列中消费消息后,可以手动发送ACK确认消息的处理状态。只有在收到ACK后,RocketMQ才会将消息标记为已成功消费,否则会将消息重新投递给其他消费者。
public class RocketMQAckDemo {
public static void main(String[] args) throws Exception {
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("localhost:9876");
// 订阅消息
consumer.subscribe("topic_name", "*");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt message : msgs) {
try {
// 消费消息
String msgBody = new String(message.getBody(), "UTF-8");
System.out.println("Received message: " + msgBody);
// 模拟处理消息的业务逻辑
processMessage(msgBody);
// 手动发送ACK确认消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 处理消息异常,可以选择重试或者记录日志等操作
System.out.println("Failed to process message: " + new String(message.getBody()));
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
}
private static void processMessage(String message) {
// 模拟处理消息的业务逻辑
}
}
Kafka的ACK机制用于控制生产者在发送消息后,需要等待多少个副本确认才视为消息发送成功。这个机制可以通过设置acks参数来进行配置。
下面是一个使用Java编写的Kafka生产者示例代码:
public class KafkaProducerDemo {
public static void main(String[] args) {
// 配置Kafka生产者的参数
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群的地址和端口
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器
props.put("acks", "all"); // 设置ACK机制为所有副本都确认
// 创建生产者实例
KafkaProducer producer = new KafkaProducer<>(props);
// 构造消息
String topic = "my_topic";
String key = "my_key";
String value = "Hello, Kafka!";
// 创建消息记录
ProducerRecord record = new ProducerRecord<>(topic, key, value);
// 发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("发送消息出现异常:" + exception.getMessage());
} else {
System.out.println("消息发送成功!位于分区 " + metadata.partition() + ",偏移量 " + metadata.offset());
}
}
});
// 关闭生产者
producer.close();
}
}