💾 Archived View for envs.net › ~wheresalice › posts › 2017-06-25-r-kafka-producer › index.gmi captured on 2021-12-05 at 23:47:19. Gemini links have been rewritten to link to archived content
⬅️ Previous capture (2021-12-04)
-=-=-=-=-=-=-
I've written a lot about Kafka and how easy it is to use, but that's not always the case. The generally recommended method for interacting with Kafka is via the `librdkafka` C/C++ library and whilst some popular programming languages have an abstraction of this, that's not always the case. R is one of the languages which does not have a modern implementation, the one package being from 2015 and targeted against Kafka 0.8. There is another option though, Confluent's REST Proxy [1] provides an HTTP interface for producing and consuming messages.
So this weekend, inspired by discussions at Berlin Buzzwords, I wrote some code to use the Kafka REST API in R. My R skills aren't particularly good, but it works for me.
This time I'm developing against the Landoop Kafka Development Environment [2] which provides all of the community Confluent packages in one Docker image as well as some nice web interfaces. I'm also using mitmproxy [3] which makes it a bit easier to debug bad client requests.
I'm a big fan of tmux, so each of the following commands are running in a separate pane.
docker run --rm --net=host landoop/fast-data-dev mitmproxy -R http://localhost:8082
It takes a minute or so for the processes in the docker container to settle, and longer the first time for the download. After that you'll be able to open http://localhost:3030 [4] and see things running and healthchecks passing.
First we need a couple of libraries, httr [5] to make the HTTP requests and rjson [6] to encode to JSON.
library(httr) library(rjson)
It's now quite simple to make an HTTP request to the Kafka REST Proxy in order to get a list of topics that are available. This should show us that everything is working.
# This would ordinarily be on port 8082, but we're going through mitmproxy here for debugging proxy_uri = "http://localhost:8080" # Get a list of available topics get_topics <- function() { uri <- paste(proxy_uri, "topics", sep="/") http_topics <- GET(uri) stop_for_status(http_topics) return(fromJSON(content(http_topics, "text"))) } topics <- get_topics() print(paste("There are", length(topics), "topics")) print(topics)
You should see a list of topics, and you should also see the http request going through mitmproxy if you are using that.
Producing messages to Kafka is a little harder, we need to build up some JSON to define the schema and a record.
# Produce one Avro message # This looks complicated, but really the hardest part was working out that you need to do toJSON on value_schema generatedbody <- list( value_schema=toJSON(list( type="record", name="User", fields=list( list( name="name", type="string" ) ) )), records=list( list( value=list( name="testUser" ) ) ) )
Which generates the following JSON:
"{\"value_schema\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"User\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\"}]}\",\"records\":[{\"value\":{\"name\":\"testUser\"}},{\"value\":{\"name\":\"anotherTestUser\"}}]}"
As a side note, this suggests you can send multiple records in a single HTTP request. This does work for a few small records, however it makes debugging a lot more complicated and means that a single bad value will fail the whole request.
Now lets actually send this to Kafka:
# actually send the request response <- POST(url=paste(proxy_uri, "topics", "avrotest", sep="/"), content_type("application/vnd.kafka.avro.v2+json"), accept("application/vnd.kafka.v2+json"), body=toJSON(generatedbody), encode='json' ) stop_for_status(response) content(response, "text") if (is.null(fromJSON(content(response, "text"))$offsets[[1]]$error)){ print("OK") }
You should get back a good response and be able to see this in the mitmproxy if you're using it. You should also be able to see the message in the web interface [7]
Now that wasn't too bad for a single static message. But it's more useful to be able to send records from a data.frame. This is where my lack of R skills causes me a problem. The following code works, but it isn't terribly elegant.
First of all lets build up some JSON to describe the cars data.frame:
# publish a data.frame to Kafka # cars is an inbuilt data.frame to test with df <- head(cars, n=2) # limit to just two records to make testing easier fields <- list() # start with an empty list of fields field_types <- lapply(df, typeof) # work out the type of the fields # make an array of fields like [{name="name", type="type"}] for(f in colnames(df)) { fields <- append(fields, list(list(name=f, type=field_types[[f]]))) } value_schema <- list( type="record", name="Car", # @TODO turn this into a function and accept a name for the record fields=fields )
And then lets loop through that data.frame sending an HTTP request for each row:
# For each car, generate and send an HTTP request to the REST api for(car in 1:nrow(df)) { dfbody <- list( value_schema=toJSON(value_schema), records=list(list(value=list(speed=df[car, "speed"], dist=df[car, "dist"]))) #@TODO working out how to dynamicly generate column list here is breaking my head ) print(toJSON(dfbody)) dfresponse <- POST(url=paste(proxy_uri, "topics", "avrocars", sep="/"), content_type("application/vnd.kafka.avro.v2+json"), accept("application/vnd.kafka.v2+json"), body=toJSON(dfbody), encode='json' ) stop_for_status(dfresponse) content(dfresponse, "text") if (is.null(fromJSON(content(dfresponse, "text"))$offsets[[1]]$error)){ print("OK") } }
Again, we can see multiple requests going through mitmproxy (hopefully without errors) and view the data in the webui [8].
This code [9] is by no means a finished product, but it is something I can take to our data science team and say "Look what I did!"
---
Written by WheresAlice on 1 January 0001.
[2] Landoop Kafka Development Environment (https://github.com/Landoop/fast-data-dev)
[3] mitmproxy (https://mitmproxy.org/)
[4] http://localhost:3030 (http://localhost:3030)
[5] httr (https://cran.r-project.org/web/packages/httr/vignettes/quickstart.html)
[6] rjson (https://www.rdocumentation.org/packages/rjson/versions/0.2.15)
[7] web interface (http://localhost:3030/kafka-topics-ui/#/cluster/fast-data-dev/topic/n/avrotest/)
[8] the webui (http://localhost:3030/kafka-topics-ui/#/cluster/fast-data-dev/topic/n/avrocars/)
[9] This code (https://gist.github.com/WheresAlice/e4b0281222b5b9fa6642703daf7d06f2)
Twelve-Factor Apps, Sidecars, and Big Data: <no value>
Berlin Buzzwords - the best bits: <no value>
Getting started with testing Kafka: <no value>
---