NIOSingleStepByteToMessageProcessor

public final class NIOSingleStepByteToMessageProcessor<Decoder> where Decoder : NIOSingleStepByteToMessageDecoder

NIOSingleStepByteToMessageProcessor uses a NIOSingleStepByteToMessageDecoder to produce messages from a stream of incoming bytes. It works like ByteToMessageHandler but may be used outside of the channel pipeline. This allows processing of wrapped protocols in a general way.

A NIOSingleStepByteToMessageProcessor is first initialized with a NIOSingleStepByteToMessageDecoder. Then call process as each ByteBuffer is received from the stream. The closure is called repeatedly with each message produced by the decoder.

When your stream ends, call finishProcessing to ensure all buffered data is passed to your decoder. This will call decodeLast one or more times with any remaining data.

Example

Below is an example of a protocol decoded by TwoByteStringCodec that is sent over HTTP. RawBodyMessageHandler forwards the headers and trailers directly and uses NIOSingleStepByteToMessageProcessor to send whole decoded messages.

class TwoByteStringCodec: NIOSingleStepByteToMessageDecoder {
    typealias InboundOut = String

    public func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
        return buffer.readString(length: 2)
    }

    public func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
        return try self.decode(buffer: &buffer)
    }
}

class RawBodyMessageHandler: ChannelInboundHandler {
    typealias InboundIn = HTTPServerRequestPart // alias for HTTPPart<HTTPRequestHead, ByteBuffer>
    // This converts the body from ByteBuffer to String, our message type
    typealias InboundOut = HTTPPart<HTTPRequestHead, String>

    private var messageProcessor: NIOSingleStepByteToMessageProcessor<TwoByteStringCodec>? = nil

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let req = self.unwrapInboundIn(data)
        do {
            switch req {
            case .head(let head):
                // simply forward on the head
                context.fireChannelRead(self.wrapInboundOut(.head(head)))
            case .body(let body):
                if self.messageProcessor == nil {
                    self.messageProcessor = NIOSingleStepByteToMessageProcessor(TwoByteStringCodec())
                }
                try self.messageProcessor!.process(buffer: body) { message in
                    self.channelReadMessage(context: context, message: message)
                }
            case .end(let trailers):
                // Forward on any remaining messages and the trailers
                try self.messageProcessor?.finishProcessing(seenEOF: false) { message in
                    self.channelReadMessage(context: context, message: message)
                }
                context.fireChannelRead(self.wrapInboundOut(.end(trailers)))
            }
        } catch {
            context.fireErrorCaught(error)
        }
    }

    // Forward on the body messages as whole messages
    func channelReadMessage(context: ChannelHandlerContext, message: String) {
        context.fireChannelRead(self.wrapInboundOut(.body(message)))
    }
}

private class DecodedBodyHTTPHandler: ChannelInboundHandler {
    typealias InboundIn = HTTPPart<HTTPRequestHead, String>
    typealias OutboundOut = HTTPServerResponsePart

    var msgs: [String] = []

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let message = self.unwrapInboundIn(data)

        switch message {
        case .head(let head):
            print("head: \(head)")
        case .body(let msg):
            self.msgs.append(msg)
        case .end(let trailers):
            print("trailers: \(trailers)")
            var responseBuffer = context.channel.allocator.buffer(capacity: 32)
            for msg in msgs {
                responseBuffer.writeString(msg)
                responseBuffer.writeStaticString("\n")
            }
            var headers = HTTPHeaders()
            headers.add(name: "content-length", value: String(responseBuffer.readableBytes))

            context.write(self.wrapOutboundOut(HTTPServerResponsePart.head(
                HTTPResponseHead(version: .init(major: 1, minor: 1),
                                 status: .ok, headers: headers))), promise: nil)

            context.write(self.wrapOutboundOut(HTTPServerResponsePart.body(
                .byteBuffer(responseBuffer))), promise: nil)
            context.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)), promise: nil)
        }
    }
}

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let bootstrap = ServerBootstrap(group: group).childChannelInitializer({channel in
    channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true, withErrorHandling: true).flatMap { _ in
        channel.pipeline.addHandlers([RawBodyMessageHandler(), DecodedBodyHTTPHandler()])
    }
})
let channelFuture = bootstrap.bind(host: "127.0.0.1", port: 0)

NIOSingleStepByteToMessageProcessor Public API

  • Feed data into the NIOSingleStepByteToMessageProcessor

    Declaration

    Swift

    public func process(buffer: ByteBuffer, _ messageReceiver: (Decoder.InboundOut) throws -> Void) throws

    Parameters

    buffer

    The ByteBuffer containing the next data in the stream

    messageReceiver

    A closure called for each message produced by the Decoder

  • Call when there is no data left in the stream. Calls Decoder.decodeLast one or more times. If there is no data left decodeLast will be called one time with an empty ByteBuffer.

    Declaration

    Swift

    public func finishProcessing(seenEOF: Bool, _ messageReceiver: (Decoder.InboundOut) throws -> Void) throws

    Parameters

    seenEOF

    Whether an EOF was seen on the stream.

    messageReceiver

    A closure called for each message produced by the Decoder.