java之Spring Kafka集成测试写入高水印文件时出错

bluestorm 阅读:14 2024-06-03 14:00:57 评论:0

我正在 Spring Boot 应用程序中使用 spring-kaka-2.2.0 编写集成测试,我几乎成功了,我的测试用例仍然返回 true,但在那之后我仍然看到多个错误。

2019-02-21 11:12:35.434 ERROR 5717 --- [       Thread-7] kafka.server.ReplicaManager              : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645 
 
org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint 
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory) 

测试配置
@EnableKafka 
@TestConfiguration 
public class KafkaProducerConfigTest { 
 
@Bean 
public EmbeddedKafkaBroker embeddedKafkaBroker() { 
    return new EmbeddedKafkaBroker(1,false,2,"test-events"); 
} 
 
 
@Bean 
public ProducerFactory<String, Object> producerFactory() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString()); 
    props.put(ProducerConfig.RETRIES_CONFIG, 0); 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); 
    return new DefaultKafkaProducerFactory<>(props); 
} 
 
@Bean 
public KafkaTemplate<String, Object> kafkaTemplate() { 
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory()); 
    return kafkaTemplate; 
   } 
 
@Bean("consumerFactory") 
 public ConsumerFactory<String, Professor> createConsumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString()); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     JsonDeserializer<Professor> jsonDeserializer = new JsonDeserializer<>(Professor.class,false); 
     return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), jsonDeserializer); 
 } 
 
@Bean("kafkaListenerContainerFactory") 
 public ConcurrentKafkaListenerContainerFactory<String, Professor> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, Professor> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(createConsumerFactory()); 
     factory.setBatchListener(true); 
     factory.getContainerProperties().setAckMode(AckMode.BATCH); 
     return factory; 
 } 
 
@Bean 
public StringJsonMessageConverter converter() { 
    return new StringJsonMessageConverter(); 
} 
 
@Bean 
public Listener listener() { 
    return new Listener(); 
} 
 
public class Listener { 
    public final CountDownLatch latch = new CountDownLatch(1); 
 
    @Getter 
    public List<Professor> list; 
 
    @KafkaListener(topics = "test-events", containerFactory = "kafkaListenerContainerFactory") 
    public void listen1(List<Professor> foo) { 
 
        list=foo; 
        this.latch.countDown(); 
       } 
    } 
 
} 

测试类
@EnableKafka 
@SpringBootTest(classes = { KafkaProducerConfigTest.class }) 
@RunWith(SpringRunner.class) 
public class KafkaProducerServiceTest { 
 
@Autowired 
private KafkaConsumerService kafkaConsumerService; 
 
@Autowired 
private Listener listener; 
 
@Test 
public void testReceive() throws Exception { 
    Professor professor = new Professor("Ajay", new Department("social", 1234)); 
    List<Professor> pro = new ArrayList<>(); 
    pro.add(professor); 
    System.out.println(pro); 
    kafkaConsumerService.professor(pro); 
    System.out.println("The professor object is sent to kafka -----------------------------------"); 
    listener.latch.await(); 
    List<Professor> result = listener.getList(); 
    Professor resultPro = result.get(0); 
    System.out.println(result); 
    System.out.println(resultPro); 
 
    assertEquals(pro.get(0).getName(), result.get(0).getName()); 
 
     } 
 
 } 

测试用例 testReceive()正在通过但仍然有多个错误消息

堆栈跟踪错误 1 ​​
019-02-21 11:12:35.434 ERROR 5717 --- [       Thread-7] kafka.server.ReplicaManager              : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645 
 
org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint 
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory) 

堆栈跟踪错误 2
2019-02-21 11:12:35.446  WARN 5717 --- [pool-8-thread-1] kafka.utils.CoreUtils$                   : /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/__consumer_offsets-4/00000000000000000000.index (No such file or directory) 
 
java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/__consumer_offsets-4/00000000000000000000.index (No such file or directory) 

堆栈跟踪错误 3
2019-02-21 11:12:35.451  WARN 5717 --- [pool-8-thread-1] kafka.utils.CoreUtils$                   : /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/test-events-0/00000000000000000000.timeindex (No such file or directory) 
 
java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/test-events-0/00000000000000000000.timeindex (No such file or directory) 
at java.io.RandomAccessFile.open0(Native Method) ~[na:1.8.0_191] 

请您参考如下方法:

我遇到了类似的问题,在 Gary Russell 的帮助下,我通过将 log dir 指向 gradle build output dir log.dir=out/embedded-kafka 解决了这个问题。或者在 maven 的情况下 log.dir=target/embedded-kafka .

下面的代码片段展示了如何使用 @EmbeddedKafka 来做到这一点。 .

@ExtendWith(SpringExtension.class) 
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = {Application.class}) 
@EmbeddedKafka( 
        topics = "topic", 
        partitions = 1, 
        controlledShutdown = true, 
        brokerProperties={ 
                "log.dir=out/embedded-kafka" 
        }) 
@TestPropertySource( 
        properties = { 
                "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" 
        }) 
public class OutboxEventsTest { 
... 
} 


标签:java
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号