spring boot에서 tcp server를 구현하는 방법중에서 spring-integration를 사용해서 tcp server를 구현해보도록 하겠습니다.
spring-integration은 메시지 기반의 통합을 지원하며, TCP와 같은 다양한 프로토콜을 통한 통신을 쉽게 구현할 수 있도록 도와줍니다.
아래의 과정을 통해 Spring Boot에서 TCP 서버를 만들 수 있습니다.
1. 의존성 추가하기
spring-integration 의존성을 추가해줍니다.
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-ip:6.2.1'
2. TCP 서버 설정 클래스 생성
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.tcp.TcpInboundGateway;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory;
import org.springframework.messaging.MessageChannel;
@Configuration
@EnableIntegration
public class TcpServerConfig {
@Value("${event.port}")
private int port;
@Bean
public CustomTcpSerializer serializer() {
return new CustomTcpSerializer();
}
@Bean
public AbstractServerConnectionFactory connectionFactory(CustomTcpSerializer serializer) {
TcpNioServerConnectionFactory connectionFactory = new TcpNioServerConnectionFactory(port);
connectionFactory.setSerializer(serializer);
connectionFactory.setDeserializer(serializer);
connectionFactory.setSingleUse(true);
return connectionFactory;
}
@Bean
public MessageChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel replyChannel() {
return new DirectChannel();
}
@Bean
public TcpInboundGateway inboundGateway(AbstractServerConnectionFactory connectionFactory, MessageChannel inboundChannel, MessageChannel replyChannel) {
TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
tcpInboundGateway.setConnectionFactory(connectionFactory);
tcpInboundGateway.setRequestChannel(inboundChannel);
tcpInboundGateway.setReplyChannel(replyChannel);
return tcpInboundGateway;
}
}
아래의 어노테이션을 통해 Spring Integration 설정 파일을 bean으로 등록합니다.
@Configuration : 설정 파일을 bean으로 등록하기 위한 어노테이션
@EnableIntegration: Spring Integration의 기능을 활성화하는 어노테이션
connectionFactory 메서드는 TcpNioServerConnectionFactory를 생성하고 구성합니다. 이 클래스는 TCP 서버와의 연결을 설정합니다.
setSerializer, setDeserializer에 CustomTcpSerializer 클래스를 인자로 넘겨서 직렬화 및 역직렬화에 사용하도록 설정합니다.
setSingleUse(true)는 서버가 한 번의 메시지를 처리한 후에 연결을 닫아야 함을 나타냅니다.
inboundChannel 및 replyChannel 메서드는 각각 DirectChannel 빈을 정의합니다. DirectChannel는 서로 다른 컴포넌트 간 통신에 사용되는 메시지 채널입니다.
inboundGateway 메서드는 TcpInboundGateway를 구성합니다. 이는 TCP 클라이언트로부터 수신된 메시지를 처리하는 데 사용됩니다. 해당 메서드에서는 연결 팩토리, 인바운드 및 리플라이 채널을 설정합니다.
setConnectionFactory 함수의 인자로는 connectionFactory 함수에서 생성한 TcpNioServerConnectionFactory 객체를 넘겨줍니다.
setRequestChannel, setReplyChannel 함수의 인자로는 inboundChannel, replyChannel 함수에서 생성한 DirectChannel 객체를 넘겨줍니다.
3. TCP 직렬화 및 역직렬화를 담당하는 클래스 작성
import org.springframework.integration.ip.tcp.serializer.AbstractPooledBufferByteArraySerializer;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.mapping.MessageMappingException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class CustomTcpSerializer extends AbstractPooledBufferByteArraySerializer {
public static final int STX = 0x02;
public static final int ETX = 0x03;
@Override
public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
int bite = inputStream.read();
if (bite < 0) {
throw new SoftEndOfStreamException("Stream closed between payloads");
}
buffer[0] = (byte) bite;
int length = inputStream.read();
if (length < 0) {
throw new SoftEndOfStreamException("Stream closed between payloads");
}
buffer[1] = (byte) length;
int n = 2;
try {
if (bite != STX) {
throw new MessageMappingException("Expected STX to begin message");
}
while (length != n) {
bite = inputStream.read();
checkClosure(bite);
buffer[n++] = (byte) bite;
if (n >= getMaxMessageSize()) {
throw new IOException("ETX not found before max message length: " + getMaxMessageSize());
}
}
if (bite != ETX) {
throw new MessageMappingException("Expected ETX to end message");
}
return copyToSizedArray(buffer, n);
} catch (IOException e) {
publishEvent(e, buffer, n);
throw e;
} catch (RuntimeException e) {
publishEvent(e, buffer, n);
throw e;
}
}
@Override
public void serialize(byte[] bytes, OutputStream outputStream) throws IOException {
outputStream.write(bytes);
}
}
STX(0x02, Start of Text), ETX(0x03, End of Text)는 일반적으로 데이터의 시작과 끝을 의미하는 16진수 값입니다.
doDeserialize 메서드는 입력 스트림에서 바이트 배열로 메시지를 역직렬화합니다.
STX로 시작해서 ETX로 끝나는 메시지를 읽어옵니다. inputStream.read() 에서 1byte씩 읽어서 STX, ETX를 체크하고 있습니다.
메시지의 길이와 내용을 읽어와서 바이트 배열에 저장하고, 메시지가 예상과 다르게 구성되었을 경우 예외를 던집니다.
메시지를 역직렬화한 후에는 해당 메시지를 반환합니다.
serialize 메서드는 주어진 바이트 배열을 출력 스트림에 직렬화합니다.
간단하게 바이트 배열을 출력 스트림에 쓰는 것으로 구현되어 있습니다.
4. TCP 서버의 엔드포인트를 정의하는 클래스 작성
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
@MessageEndpoint
public class TcpServerEndpoint {
private final MessageService messageService;
public TcpServerEndpoint(MessageService messageService) {
this.messageService = messageService;
}
@ServiceActivator(inputChannel = "inboundChannel", async = "true")
public void process(byte[] message) {
messageService.processMessage(message);
}
}
@MessageEndpoint 어노테이션은 해당 클래스가 메시지 엔드포인트임을 나타내는 어노테이션입니다. 메시지 엔드포인트는 메시지를 수신하고 처리하는 역할을 합니다.
@ServiceActivator 어노테이션은 메서드가 특정 채널(inboundChannel)에서 메시지를 수신하여 활성화되도록 지정합니다. async = "true" 옵션은 이 메서드가 비동기적으로 동작함을 나타냅니다.
process 메서드는 inboundChannel 채널에서 들어오는 메시지를 처리하는데 사용됩니다.
메시지는 바이트 배열로 전달되며, 이를 messageService를 통해 처리한 뒤 그 결과를 반환합니다.
5. 메세지를 처리할 클래스 작성
import org.springframework.stereotype.Service;
@Service
public class MessageService {
public void processMessage(byte[] message) {
System.out.println("Receive message : {}" + message);
}
}
processMessage 함수에서 전달 받은 메세지를 처리하는 로직을 작성해줍니다.
그리고 서버를 실행하고 TCP 클라이언트에서 메세지가 송신되면 processMessage 함수에서 메세지가 수신되는것을 확인할 수 있습니다.
참고자료
https://huskdoll.tistory.com/988#google_vignette
https://jakupsil.tistory.com/11
'개발 > spring, spring boot' 카테고리의 다른 글
[WebSocket][STOMP] Whoops! Lost connection to ws 에러 해결 (0) | 2024.01.30 |
---|---|
Spring Boot Cache 사용 방법 (0) | 2024.01.28 |
[mybatis] Cannot resolve reference to bean 'sqlSessionTemplate' while setting bean property 'sqlSessionTemplate' 에러 (2) | 2024.01.17 |
[Spring boot] Springdoc-openapi를 통한 Swagger UI 설정 (1) | 2024.01.11 |
[Spring boot] WebSocket + STOMP (0) | 2023.12.14 |