import amqp from "../../amqp.app.mjs";
import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform";
export default {
  key: "amqp-receive-message",
  name: "New Message",
  description: "Emit new event for each new message in an [AMQP 1.0](https://www.amqp.org/sites/amqp.org/files/amqp.pdf) queue. [See the library example here](https://github.com/amqp/rhea-promise#receiving-a-message).",
  type: "source",
  version: "0.0.2",
  dedupe: "unique",
  props: {
    amqp,
    timer: {
      type: "$.interface.timer",
      label: "Timer",
      description: "The timer to use to schedule the next poll.",
      default: {
        intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL,
      },
    },
    receiverName: {
      type: "string",
      label: "Receiver Name",
      description: "The name of the receiver. e.g. (`my-receiver`)",
    },
    queueName: {
      propDefinition: [
        amqp,
        "queueName",
      ],
    },
  },
  async run(event) {
    const { timestamp } = event;
    const {
      host,
      port,
      username,
      password,
    } = this.amqp.$auth;
    const {
      receiverName,
      queueName,
    } = this;
    const connection = await this.amqp.openConnection({
      host,
      port,
      username,
      password,
    });
    console.log("Create receiver");
    const receiver = await this.amqp.createReceiver({
      connection,
      name: receiverName,
      source: {
        address: queueName,
      },
      onSessionError: (context) => {
        throw context.session.error ?? "Unknown Session Error";
      },
    });
    try {
      const messages = await this.amqp.onMessageReceiver(receiver);
      messages.forEach((message, idx) => {
        const id = timestamp + idx;
        this.$emit(message, {
          id,
          ts: id,
          summary: `New Message ${message.message_id}`,
        });
      });
    } catch (error) {
      if (error.innerError) {
        console.log("Inner error", JSON.stringify(error.innerError));
      }
      throw error;
    } finally {
      await this.amqp.close(receiver);
      await this.amqp.close(connection);
    }
  },
};