init:对接Demo

This commit is contained in:
yanqs 2023-08-29 00:22:06 +08:00
parent daa7776f74
commit 0f7a34839e
6 changed files with 225 additions and 0 deletions

View File

@ -1,2 +1,3 @@
# huawei-mrs-kafka-demo
华为大数据平台 kerberos 认证 kafka对接示例

88
pom.xml Normal file
View File

@ -0,0 +1,88 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bcrjl.mrs</groupId>
<artifactId>huawei-mrs-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0-hw-ei-312005</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>huawei-cloud-sdk</id>
<name>HuaWei Cloud Mirrors</name>
<url>https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>huawei-mirrors</id>
<name>HuaWei Mirrors</name>
<url>https://repo.huaweicloud.com/repository/maven/</url>
</repository>
</repositories>
</project>

View File

@ -0,0 +1,17 @@
package com.bcrjl.mrs;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 功能描述项目启动类
*
* @author yanqs
* @since 2023-08-28
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

View File

@ -0,0 +1,82 @@
package com.bcrjl.mrs.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import java.util.HashMap;
import java.util.Map;
/**
* 功能描述华为Mrs kafka配置
*
* @author yanqs
* @since 2023-08-28
*/
@Configuration
public class HuaWeiMrsKafkaConfiguration {
@Value("${huawei.mrs.kafka.enable}")
public Boolean enable;
@Value("${huawei.mrs.kafka.bootstrap-servers}")
public String boostrapServers;
@Value("${huawei.mrs.kafka.security.protocol}")
public String securityProtocol;
@Value("${huawei.mrs.kafka.kerberos.domain.name}")
public String kerberosDomainName;
@Value("${huawei.mrs.kafka.sasl.kerberos.service.name}")
public String kerberosServiceName;
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<String, String> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory
= new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
//禁止消费者自启动达到动态启动消费者的目的
factory.setAutoStartup(enable);
return factory;
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put("security.protocol", securityProtocol);
configs.put("kerberos.domain.name", kerberosDomainName);
configs.put("bootstrap.servers", boostrapServers);
configs.put("sasl.kerberos.service.name", kerberosServiceName);
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> configs = new HashMap<>();
configs.put("security.protocol", securityProtocol);
configs.put("kerberos.domain.name", kerberosDomainName);
configs.put("bootstrap.servers", boostrapServers);
configs.put("sasl.kerberos.service.name", kerberosServiceName);
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);
return new KafkaTemplate<>(producerFactory);
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
}

View File

@ -0,0 +1,23 @@
package com.bcrjl.mrs.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* 功能描述kafka消费订阅
*
* @author yanqs
* @since 2023-08-28
*/
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic1")
public void topicMessage(ConsumerRecord<?, ?> record) {
log.info("Kafka->topic:topic1-->{}", record.value());
//TODO:业务逻辑
}
}

View File

@ -0,0 +1,14 @@
huawei:
mrs:
kafka:
enable: false
bootstrap-servers: 10.244.231.2:21007,10.244.230.202:21007,10.244.230.125:21007
security:
protocol: SASL_PLAINTEXT
kerberos:
domain:
name: hadoop.hadoop_651_arm.com
sasl:
kerberos:
service:
name: kafka