Develope kafka rest api with nodejs to make it callable from CPI through sap cloud connector Part II

In previous blog Part I, we have prepared the on premise  kafka envirement . Today we will try build nodejs Kafka rest api proxy and use cpi to produce and consume kafka message by deployed rest api .

We can follow the following steps :

Step 1 :  deploy  CPI iflow to consume  kafka topic’s msessage

Step 2 : deploy  nodejs rest api  to produce message to Kafka  and  consume kafka message and forward the message to cpi  .

code snippy :

Server.js

const express = require('express');
const bodyParser = require('body-parser');
const { Kafka, logLevel } = require('kafkajs');
const axios = require('axios');
const oauth = require('axios-oauth-client');
const cpiurl = 'https://1s4hcextension.it-cpi001-rt.cfapps.eu10.hana.ondemand.com/http/kafka/sender'; const getClientCredentials = oauth.client(axios.create(), { url: 'https://1s4hcextension.authentication.eu10.hana.ondemand.com/oauth/token', grant_type: 'client_credentials', client_id: client_id_from_cpi_runtime_service_key, client_secret: client_secret_from_cpi_runtime_service_key, scope: '' }); const app = express(); const kafka = new Kafka({ clientId: 'my-app', requestTimeout: 25000, connectionTimeout: 30000, authenticationTimeout:30000, retry: { initialRetryTime: 3000, retries: 0 }, logLevel: logLevel.INFO, brokers: ['localhost:9092'] }); app.use(express.text())
app.use(bodyParser.json()); app.listen(4004, () => { console.log('===> Server started') }) // depoly api to produce message to kafka
app.post('/kafka/:topic', (req, res) => { const topicv = req.params.topic; console.log(topicv);
const kafka = new Kafka({ clientId: 'my-app', requestTimeout: 25000, connectionTimeout: 30000, authenticationTimeout:30000, retry: { initialRetryTime: 3000, retries: 0 }, brokers: ['localhost:9092'] }) const producer = kafka.producer(); const run = async () => { await producer.connect(); await producer.send({ topic: topicv, messages: [ { value: req.body.msg.toString() }, ], }); await producer.disconnect(); res.status(200).send('message send');
}
run().catch((e)=>{ console.log(e); res.status(500).send('error');
}) }) ; // consume kafka message and forward the message to CPI const consumer = kafka.consumer({ groupId: 'test-group' }); const run = async () => { // Consuming await consumer.connect() await consumer.subscribe({ topic: 'dblab01', fromBeginning: true }) await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }); getClientCredentials().then((token1)=>{ // console.log(token1.access_token); const config = { headers:{ "Authorization": `Bearer ${token1.access_token}`, "Content-Type": "application/json" } }; const data ={ msg: message.value.toString() }; axios.post(cpiurl,data,config).then(res=>{ console.log(res); }).catch(e=>{ console.log(e); }); }).catch(e=>{ console.log(e); }) }, })
} run().catch((e)=>{ console.log(e);
}) 

package.json

{ "name": "capkafka", "version": "1.0.0", "description": "capkafka", "main": "server.js", "dependencies": { "axios": "^0.27.2", "axios-oauth-client": "^1.4.4", "body-parser": "latest", "express": "^4.17.1", "kafkajs": "latest", "passport": "^0.4.0" }, "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "author": "jacky liu", "license": "ISC"
}

Start the nodejs application :

Step 3 : test the deployed rest api with postman locally .

Step 4 : check the message in CPI  .

Step 4 :  Deploy  a iflow to call the nodejs rest api to producet messge in Kafka . .I will skip this . Cpi  http or https adapter support  calling op rest api with the help of  sap cloud connector .

The end

Best regards!

Jacky Liu