Every time I need to process a stream of data two things are usually true.
A I need to breakup a stream of data into smaller chunks I can then process. B I have to code the buffering of the stream manually. This involves reading in a chunk from the stream, then searching the chunk for the data I want, then fetching another chunk, and so on. In most languages this results in a non trivial bit of code.
At maligun.com we use Kafka for our event processing, as such I wrote a CLI tool to send a piped stream of events to Kafka. Kafka can accept any number serialization protocols in the event payload, which means it could be anything from JSON to Protobuf. As such I choose to delimit events by CR, \r, (Carriage Return). So the program should read chunks of data into a buffer until it finds a CR or EOF then post the payload to Kafka on a topic indicated by a command line option.
Because this is a CLI and I want to pipe events from a file, or another program
that generates events, our program will read from os.Stdin
. Normally we would
have to write the buffering and searching portion ourselves. Except golang has
a surprise called the buffo.Scanner.
Of course you can browse the documentation, but lets look at an implementation
I wrote called EventReader
. For simplicity sake ReadEvent()
will return a
new []btye
for each event it reads from the stream.
Here is how the main()
in our CLI will use it
Here is our implementation of EventReader
The cool part is the split
function. This function gets passed the contents
of the currently buffered read. This allows us to search the current buffer for
our delimiter (In this case \r) if we find it, we return how many bytes the
buffer should advance
before giving us data again and we return the data
the scanner should return to the caller. If we don’t find our delimiter, we
return 0, nil, nil
which tells to scanner read more data into the buffer and
call us again.
I for one was very happy to find this in the golang standard library. Thank you golang authors.