💾 Archived View for envs.net › ~wheresalice › posts › 2017-06-25-r-kafka-producer › index.gmi captured on 2021-12-04 at 18:04:22. Gemini links have been rewritten to link to archived content

View Raw

More Information

-=-=-=-=-=-=-

Publishing Avro to Kafka in R

I've written a lot about Kafka and how easy it is to use, but that's not always the case. The generally recommended method for interacting with Kafka is via the `librdkafka` C/C++ library and whilst some popular programming languages have an abstraction of this, that's not always the case. R is one of the languages which does not have a modern implementation, the one package being from 2015 and targeted against Kafka 0.8. There is another option though, Confluent's REST Proxy [1] provides an HTTP interface for producing and consuming messages.

So this weekend, inspired by discussions at Berlin Buzzwords, I wrote some code to use the Kafka REST API in R. My R skills aren't particularly good, but it works for me.

Setup

This time I'm developing against the Landoop Kafka Development Environment [2] which provides all of the community Confluent packages in one Docker image as well as some nice web interfaces. I'm also using mitmproxy [3] which makes it a bit easier to debug bad client requests.

I'm a big fan of tmux, so each of the following commands are running in a separate pane.

docker run --rm --net=host landoop/fast-data-dev
mitmproxy -R http://localhost:8082

It takes a minute or so for the processes in the docker container to settle, and longer the first time for the download. After that you'll be able to open http://localhost:3030 [4] and see things running and healthchecks passing.

The Code

First we need a couple of libraries, httr [5] to make the HTTP requests and rjson [6] to encode to JSON.

library(httr)
library(rjson)

It's now quite simple to make an HTTP request to the Kafka REST Proxy in order to get a list of topics that are available. This should show us that everything is working.

# This would ordinarily be on port 8082, but we're going through mitmproxy here for debugging
proxy_uri = "http://localhost:8080"

# Get a list of available topics
get_topics <- function() {
  uri <- paste(proxy_uri, "topics", sep="/")
  http_topics <- GET(uri)
  stop_for_status(http_topics)
  return(fromJSON(content(http_topics, "text")))
}
topics <- get_topics()
print(paste("There are", length(topics), "topics"))
print(topics)

You should see a list of topics, and you should also see the http request going through mitmproxy if you are using that.

Producing messages to Kafka is a little harder, we need to build up some JSON to define the schema and a record.

# Produce one Avro message
# This looks complicated, but really the hardest part was working out that you need to do toJSON on value_schema
generatedbody <- list(
  value_schema=toJSON(list(
    type="record",
    name="User",
    fields=list(
      list(
        name="name",
        type="string"
      )
    )
  )),
  records=list(
    list(
      value=list(
        name="testUser"
      )
    )
  )
)

Which generates the following JSON:

"{\"value_schema\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"User\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\"}]}\",\"records\":[{\"value\":{\"name\":\"testUser\"}},{\"value\":{\"name\":\"anotherTestUser\"}}]}"
As a side note, this suggests you can send multiple records in a single HTTP request. This does work for a few small records, however it makes debugging a lot more complicated and means that a single bad value will fail the whole request.

Now lets actually send this to Kafka:

# actually send the request
response <- POST(url=paste(proxy_uri, "topics", "avrotest", sep="/"),
                 content_type("application/vnd.kafka.avro.v2+json"),
                 accept("application/vnd.kafka.v2+json"),
                 body=toJSON(generatedbody),
                 encode='json'
)
stop_for_status(response)
content(response, "text")
if (is.null(fromJSON(content(response, "text"))$offsets[[1]]$error)){
  print("OK")
}

You should get back a good response and be able to see this in the mitmproxy if you're using it. You should also be able to see the message in the web interface [7]

Now that wasn't too bad for a single static message. But it's more useful to be able to send records from a data.frame. This is where my lack of R skills causes me a problem. The following code works, but it isn't terribly elegant.

First of all lets build up some JSON to describe the cars data.frame:

# publish a data.frame to Kafka
# cars is an inbuilt data.frame to test with
df <- head(cars, n=2) # limit to just two records to make testing easier
fields <- list() # start with an empty list of fields
field_types <- lapply(df, typeof) # work out the type of the fields
# make an array of fields like [{name="name", type="type"}]
for(f in colnames(df)) {
  fields <- append(fields, list(list(name=f, type=field_types[[f]])))
}
value_schema <- 
  list(
    type="record",
    name="Car", # @TODO turn this into a function and accept a name for the record
    fields=fields
  )

And then lets loop through that data.frame sending an HTTP request for each row:

# For each car, generate and send an HTTP request to the REST api
for(car in 1:nrow(df)) {
  dfbody <- list(
    value_schema=toJSON(value_schema),
    records=list(list(value=list(speed=df[car, "speed"], dist=df[car, "dist"]))) #@TODO working out how to dynamicly generate column list here is breaking my head
  )
  print(toJSON(dfbody))
  dfresponse <- POST(url=paste(proxy_uri, "topics", "avrocars", sep="/"),
                     content_type("application/vnd.kafka.avro.v2+json"),
                     accept("application/vnd.kafka.v2+json"),
                     body=toJSON(dfbody),
                     encode='json'
  )
  stop_for_status(dfresponse)
  content(dfresponse, "text")
  if (is.null(fromJSON(content(dfresponse, "text"))$offsets[[1]]$error)){
    print("OK")
  }
}

Again, we can see multiple requests going through mitmproxy (hopefully without errors) and view the data in the webui [8].

This code [9] is by no means a finished product, but it is something I can take to our data science team and say "Look what I did!"

---

Written by WheresAlice on 1 January 0001.

References

[1] Confluent's REST Proxy (http://docs.confluent.io/current/kafka-rest/docs/intro.html#produce-and-consume-avro-messages)

[2] Landoop Kafka Development Environment (https://github.com/Landoop/fast-data-dev)

[3] mitmproxy (https://mitmproxy.org/)

[4] http://localhost:3030 (http://localhost:3030)

[5] httr (https://cran.r-project.org/web/packages/httr/vignettes/quickstart.html)

[6] rjson (https://www.rdocumentation.org/packages/rjson/versions/0.2.15)

[7] web interface (http://localhost:3030/kafka-topics-ui/#/cluster/fast-data-dev/topic/n/avrotest/)

[8] the webui (http://localhost:3030/kafka-topics-ui/#/cluster/fast-data-dev/topic/n/avrocars/)

[9] This code (https://gist.github.com/WheresAlice/e4b0281222b5b9fa6642703daf7d06f2)

Related articles

Twelve-Factor Apps, Sidecars, and Big Data: <no value>

Berlin Buzzwords - the best bits: <no value>

Getting started with testing Kafka: <no value>

---