import { Injectable } from '@angular/core';
import { ReplaySubject, Subject } from 'rxjs';
import { createConsumer, Consumer, Subscription as WebsocketSubscription } from '@rails/actioncable';
import { environment } from '../../../environments/environment';
import { WebsocketChannelHandle } from '../models/websocket-channel-handle.model';

@Injectable()
export class WebsocketService {
  constructor() {}

  consumer: Consumer;

  connect(identity) {
    if (!this.consumer) {
      this.consumer = createConsumer(`${environment.captureApi.url}/websocket?identity=${identity}`);
    }
  }

  disconnect() {
    if (this.consumer) {
      this.consumer.connection.close();
    }
  }

  channelHandle(channelName, params?: Record<string, any>): WebsocketChannelHandle {
    const connected: Subject<any> = new ReplaySubject();
    const rejected: Subject<any> = new Subject();
    const disconnected: Subject<any> = new Subject();
    const messageReceived: Subject<any> = new ReplaySubject(1);

    const subscription: WebsocketSubscription = this.consumer.subscriptions.create(
      { channel: channelName, ...params },
      {
        connected: () => connected.next(),
        rejected: () => rejected.next(),
        disconnected: () => disconnected.next(),
        received: (data) => messageReceived.next(data)
      }
    );

    return {
      connected: connected.asObservable(),
      rejected: rejected.asObservable(),
      disconnected: disconnected.asObservable(),
      messageReceived: messageReceived.asObservable(),
      send: (data: any) => subscription.send(data),
      unsubscribe: () => subscription.unsubscribe(),
      perform: (action: string, data?: any) => subscription.perform(action, data)
    };
  }
}
