import { Injectable } from '@angular/core';
import { lastValueFrom, Observable, ReplaySubject } from 'rxjs';
import { distinctUntilChanged, filter, map, take, tap } from 'rxjs/operators';
import { NGXLogger } from 'ngx-logger';
import { environment } from '@env/environment';
import { Centrifuge, Subscription } from 'centrifuge';
import { Nullable } from '@core/interfaces/nullable';

@Injectable()
export class SocketsService {
  private server: Nullable<Centrifuge>;
  private eventsSub!: Nullable<Subscription>;
  private connected$ = new ReplaySubject<void>(1);

  private events$: ReplaySubject<{ event: string; payload: any }> =
    new ReplaySubject(1);
  private notifications$: ReplaySubject<{
    severity: string | 'info' | 'warning' | 'success' | 'error';
    message: string;
    category: string | 'main';
    payload: any;
  }> = new ReplaySubject(1);

  private accessToken!: Observable<string>;
  private refreshToken!: () => Observable<string>;

  constructor(private logger: NGXLogger) {}

  onConnected(): Observable<void> {
    return this.connected$.asObservable();
  }

  getAccessToken() {
    return this.accessToken;
  }

  connect(
    accessToken: Observable<string>,
    refreshToken: () => Observable<string>,
    userId: string,
  ): Observable<Centrifuge> {
    this.accessToken = accessToken;
    this.refreshToken = refreshToken;
    return new Observable((sub) => {
      if (this.server) {
        this.server.disconnect();
      }
      this.server = new Centrifuge(environment.WS_URL, {
        debug: true,
        // token: accessToken,
        getToken: async (ctx) => {
          this.logger.info(ctx);
          return await lastValueFrom(this.getAccessToken());
        },
      });

      this.server.on('connected', () => {
        if (!this.server) {
          return;
        }
        if (this.eventsSub) {
          this.eventsSub.unsubscribe();
          this.server.removeSubscription(this.eventsSub);
        }
        this.eventsSub = this.server.newSubscription(`user#${userId}`);

        this.eventsSub.on('publication', (ctx) => {
          this.logger.info('ws:events', ctx.data);
          this.events$.next(ctx.data);
        });
        this.eventsSub.subscribe();
        this.connected$.next();
      });

      this.server.on('disconnected', (ctx) => {
        if (!this.server) {
          return;
        }
        if (this.eventsSub) {
          this.eventsSub.unsubscribe();
          this.server.removeSubscription(this.eventsSub);
        }
        this.eventsSub = null;

        if (ctx?.code) {
          this.logger.error(`[ws disconnected] ${ctx.code}: ${ctx.reason}`);
        }
      });

      this.server.on('error', (err) => {
        this.logger.warn('ws:error', err);
        if (err?.type === 'connect' && err?.error?.code === 109) {
          this.refreshToken().pipe(take(1)).subscribe();
        }
        sub.error(err);
      });

      this.server.connect();
      sub.next(this.server);
    });
  }

  disconnect(): void {
    if (this.server) {
      this.server.disconnect();
      this.server = null;
    }
  }

  events(): Observable<{ event: string; payload: any }> {
    return this.events$.pipe(
      filter<any>((e) => e.event !== 'notification'),
      distinctUntilChanged(),
      map(({ event, data }: { event: string; data: any }) => {
        return {
          event: event || 'system',
          payload: data || null,
        };
      }),
    );
  }

  notifications(): Observable<{
    severity: string | 'info' | 'warning' | 'success' | 'error';
    message: string;
    category: string | 'main';
    payload: any;
    title?: string;
  }> {
    return this.events$.pipe(
      filter<any>((e) => e.event === 'notification'),
      distinctUntilChanged(),
      map(({ event, data }: { event: string; data: any }) => {
        return {
          severity: data?.severity || 'info',
          message: data?.message || '',
          payload: data?.payload || {},
          category: 'main',
          title: data?.title,
        };
      }),
    );
  }
}
