import {StreamType} from '../../../services/gRPC/sfu/enums_pb';
import {SubscriptionState} from './SubscriptionState';
import {roomGlobalRef} from './roomGlobalRef';
import {setTransceiversAndMid} from './updaters/useSetTransceiversAndSubscribeStreams';
import {createAndSetOffer$} from './updaters/useSetOfferAndSubscribeStreams';
import {streamsSenderSFU} from '../../../services/sfu/senders.sfu';
import pako from 'pako';
import {race, Subscription} from 'rxjs';
import {webSocketOutput$$} from '../../../services/ws/webSocket';
import {WsMessageType} from '../../../services/ws/incomingMessagesTypes.ws';
import {take} from 'rxjs/operators';
import {selectIsMcuVideoStream} from '../../../store/slices/room';
import {store} from '../../../store/store';
import {roomUsersActions, selectScreenSharingUserId} from '../../../store/slices/roomUsers';
import {RoomUserFlag} from '../../../services/gRPC/rooms/enums_pb';
import {sfuReceiverChannelOutput$$} from './listeners/pcReceiverOfferAndChannelReady';
import {SfuMessageType} from '../../../services/sfu/incomingMessagesTypes.sfu';

export type StreamData = {
	type: StreamType,
	userId: string,
	mid?: string,
}

export class SubscriptionManager {
	static applyNextImmediately = false

	private readonly debounceTime = 1000;
	private isOpen = false;
	private receivedCallWhileBlocked = false;
	private timer: NodeJS.Timeout | undefined;
	private sub1: Subscription | undefined;
	private sub2: Subscription | undefined;

	private debounce = (fn: () => any) => {
		if (this.timer) {
			clearTimeout(this.timer);
		}
		if (SubscriptionManager.applyNextImmediately) {
			this.timer = setTimeout(fn, 0);
			SubscriptionManager.applyNextImmediately = false
		} else {
			this.timer = setTimeout(fn, this.debounceTime);
		}
	};

	private process() {
		console.log('start updating subscriptions');
		this.debounce(() => {
			console.log('send subscriptions');
			// compare states - currentState shows what should clientState have already subscribed/unsubscribed.
			const results = SubscriptionState.compare(roomGlobalRef.currentSubscriptionState, roomGlobalRef.clientSubscriptionState);
			// if there are no differences it means that states are equal and no changes are needed
			if (results.extraItems.length === 0 && results.missingItems.length === 0) return;
			// changes are needed - block the process
			this.block();
			// based on differences setup transceivers, and create StreamInfo with correct mid - it will be sent to achieve subscription/un-subscriptions.
			const {streamsToAdd, streamsToRemove, usedTransceivers} = setTransceiversAndMid([...results.missingItems], [...results.extraItems]);
			// create new offer and send it via RTC channel with subscription and un-subscriptions request.
			this.sub1 = createAndSetOffer$().subscribe((offer) => {
				streamsSenderSFU(pako.deflate(JSON.stringify(offer)), streamsToAdd, streamsToRemove);
			});
			// wait for ws StreamsAnswerResponse
			this.sub2 = race(sfuReceiverChannelOutput$$(SfuMessageType.STREAMS_ANSWER_PACKET), webSocketOutput$$(WsMessageType.STREAMS_ANSWER)).pipe(
				take(1)
			).subscribe((packet) => {
				const addedStreams = packet.addRequestsList;
				const removedStreams = packet.removalRequestsList;

				//todo move this setTransceiversAndMid Manager
				usedTransceivers.forEach(({transceiver, kind}) => {
					if(!addedStreams.find(s => s.mid === transceiver.mid)) {
						console.log('unused transceiver:', transceiver.mid, kind);
						transceiver.direction = 'inactive';
						if(kind === 'audio') {
							roomGlobalRef.unusedAudioTransceivers.push(transceiver)
						} else {
							roomGlobalRef.unusedVideoTransceivers.push(transceiver);
						}

					}
				})
				// update clientSubscriptionState state and stores based on packet
				addedStreams.forEach(streamInfo => {
					roomGlobalRef.clientSubscriptionState.sub([streamInfo.type], streamInfo.userId);
					const flag = SubscriptionManager.streamTypeToFlag(streamInfo.type);
					store.dispatch(roomUsersActions.applyUserFlags({userId: streamInfo.userId, flags: [flag], timestamp: 1}));
				});
				removedStreams.forEach(streamInfo => {
					roomGlobalRef.clientSubscriptionState.unsub([streamInfo.type], streamInfo.userId);
					const flag = SubscriptionManager.streamTypeToFlag(streamInfo.type);
					store.dispatch(roomUsersActions.rejectUserFlags({userId: streamInfo.userId, flags: [flag]}));
				});
				// compare again currentSubscriptionState with clientSubscriptionState
				const results = SubscriptionState.compare(roomGlobalRef.currentSubscriptionState, roomGlobalRef.clientSubscriptionState);
				console.log(results);
				// if the results shows differences it means that currentSubscriptionState already changed or error occurred while
				// subscribing streams - in that case we need to try the process again
				if (results.extraItems.length > 0 || results.missingItems.length > 0) {
					// we need to again try the process
					this.applySubs();
				}
				// add new camera streams to muteManger
				roomGlobalRef.videoStreamMuteManager.addStreams(packet.addRequestsList);
				// depending on case mute or unmute them all
				if (selectIsMcuVideoStream(store.getState()) && !selectScreenSharingUserId(store.getState())) {
					roomGlobalRef.videoStreamMuteManager.muteAllStreams();
				} else {
					roomGlobalRef.videoStreamMuteManager.unmuteAllStreams();
				}
				// unblock the process and run again if needed
				this.open();
			});
		});
	};

	open() {
		this.isOpen = true;
		if (this.receivedCallWhileBlocked) {
			this.process();
		}
	}

	block() {
		this.receivedCallWhileBlocked = false;
		this.isOpen = false;
	};

	applySubs() {
		if (this.isOpen) {
			this.process();
		} else {
			this.receivedCallWhileBlocked = true;
		}
	}

	destroy() {
		this.isOpen = false;
		this.receivedCallWhileBlocked = false;
		this.sub1?.unsubscribe();
		this.sub2?.unsubscribe();
	}

	private static streamTypeToFlag(type: StreamType) {
		switch (type) {
			case StreamType.AUDIO:
				return RoomUserFlag.RUFLAG_SHARING_AUDIO;
			case StreamType.CAMERA:
				return RoomUserFlag.RUFLAG_SHARING_VIDEO;
			case StreamType.SCREEN:
				return RoomUserFlag.RUFLAG_SHARING_SCREEN_VIDEO;
			case StreamType.SCREEN_AUDIO:
				return RoomUserFlag.RUFLAG_SHARING_SCREEN_AUDIO;
			default:
				throw new Error('streamTypeToFlag doesn\'t support type ' + type + '. This type should not appear here.');
		}
	}
}
