with Kafka and MinIO?
import { Kafka } from "kafkajs"
export default defineComponent({
props: {
kafka: {
type: "app",
app: "kafka",
}
},
async run({steps, $}) {
const kafka = new Kafka({
brokers: [`${this.kafka.$auth.host}:${this.kafka.$auth.port}`],
});
const consumer = kafka.consumer({ groupId: 'TestGroup' });
await consumer.connect()
await consumer.subscribe({ topic: 'SampleTopic', fromBeginning: true });
let consumedMessage = "";
const eachMessage = async function({ topic, partition, message }){
consumedMessage = message.value.toString();
return consumedMessage;
};
await consumer.run({
eachMessage,
});
const producer = kafka.producer();
await producer.connect()
await producer.send({
topic: 'SampleTopic',
messages: [
{ value: 'Welcome KafkaJS + Pipedream users! '+ new Date().toISOString() },
],
});
await producer.disconnect();
const data = await consumer.describeGroup();
return { consumedMessage, groupDescription: data };
},
})
import * as Minio from 'minio'
export default defineComponent({
props: {
minio: {
type: "app",
app: "minio",
}
},
async run({steps, $}) {
const minioClient = new Minio.Client({
endPoint: this.minio.$auth.endpoint,
port: this.minio.$auth.port,
useSSL: this.minio.$auth.use_ssl,
accessKey: this.minio.$auth.access_key,
secretKey: this.minio.$auth.access_secret,
})
const buckets = await minioClient.listBuckets()
return buckets;
},
})