import {
	ActiveStreamsRequestPacket,
	DisconnectPacket,
	Packet,
	PingPacket,
	StreamInfo,
	StreamsPacket,
	StreamsTogglePacket
} from '../gRPC/sfu/packets_pb';
import {Action, StreamType} from '../gRPC/sfu/enums_pb';
import {roomGlobalRef} from '../../pages/Room/utils/roomGlobalRef';
import {StreamData} from '../../pages/Room/utils/SubscriptionManager';
import {sfuReceiverChannelOutput$$} from '../../pages/Room/utils/listeners/pcReceiverOfferAndChannelReady';
import {SfuMessageType} from './incomingMessagesTypes.sfu';
import {subscribeThenDo} from '../../utils/rxjsOperators';
import {EMIT} from '../../utils/utils';

export const pingSenderSFU = (timestamp: number, channel?: RTCDataChannel) => {

	const packet = new Packet();
	packet.setAction(Action.PONG);

	const pingPacket = new PingPacket();
	pingPacket.setTimestamp(timestamp);

	const serializedPingPacket = pingPacket.serializeBinary();
	packet.setData(serializedPingPacket);

	const serializedPacket = packet.serializeBinary();

	//ignore if channel is not available already.
	if (channel?.readyState === 'open') {
		channel.send(serializedPacket);
	}
};

interface streamsToggleSenderSfuArgs {
	channel?: RTCDataChannel,
	muteStreams?: StreamInfo.AsObject[],
	unmuteStreams?: StreamInfo.AsObject[]
}

let isMcuMuted: boolean | undefined;
export const mcuMuteToggleSenderSFU = (mute: boolean, channel?: RTCDataChannel) => {
	if (isMcuMuted !== undefined && isMcuMuted === mute) {
		return;
	}
	const packet = new Packet();
	const streamsTogglePacket = new StreamsTogglePacket();
	const streamInfo = new StreamInfo();
	streamInfo.setType(StreamType.MCU_VIDEO);
	streamInfo.setUserId('mcu');
	if (mute) {
		streamsTogglePacket.setDisableStreamsList([streamInfo]);
	} else {
		streamsTogglePacket.setEnableStreamsList([streamInfo]);
	}
	const serializedStreamsTogglePacket = streamsTogglePacket.serializeBinary();
	packet.setAction(Action.TOGGLE_STREAMS);
	packet.setData(serializedStreamsTogglePacket);
	const serializedPacket = packet.serializeBinary();

	//ignore if channel is not available already.
	if (channel?.readyState === 'open') {
		channel.send(serializedPacket);
	}
};

export const streamsToggleSenderSFU = ({channel, muteStreams, unmuteStreams}: streamsToggleSenderSfuArgs) => {
	const packet = new Packet();
	const streamsTogglePacket = new StreamsTogglePacket();
	console.log('unmute');
	console.log(unmuteStreams);
	console.log('mute');
	console.log(muteStreams);
	const muteStreamsObj = muteStreams?.map(stream => {
		const a = new StreamInfo();
		a.setMid(stream.mid);
		a.setType(stream.type);
		a.setUserId(stream.userId);
		return a;
	});
	const unmuteStreamsObj = unmuteStreams?.map(stream => {
		const a = new StreamInfo();
		a.setMid(stream.mid);
		a.setType(stream.type);
		a.setUserId(stream.userId);
		return a;
	});

	muteStreamsObj && streamsTogglePacket.setDisableStreamsList(muteStreamsObj);
	unmuteStreamsObj && streamsTogglePacket.setEnableStreamsList(unmuteStreamsObj);
	const serializedStreamsTogglePacket = streamsTogglePacket.serializeBinary();

	packet.setAction(Action.TOGGLE_STREAMS);
	packet.setData(serializedStreamsTogglePacket);

	const serializedPacket = packet.serializeBinary();

	//ignore if channel is not available already.
	if (channel?.readyState === 'open') {
		channel.send(serializedPacket);
	}
};

export const disconnectSFU = (channel?: RTCDataChannel) => {

	const packet = new Packet();
	const disconnectPacket = new DisconnectPacket();

	const serializedDisconnectPacket = disconnectPacket.serializeBinary();

	packet.setAction(Action.DISCONNECT);
	packet.setData(serializedDisconnectPacket);

	const serializedPacket = packet.serializeBinary();

	//ignore if channel is not available already.
	if (channel?.readyState === 'open') {
		channel.send(serializedPacket);
	}
};

export const streamsSenderSFU = (
	sdpOffer: Uint8Array,
	streamsToAdd: StreamInfo.AsObject[],
	streamsToRemove: StreamData[],
	manualRenegotiation?: boolean
) => {
	const packet = new Packet();
	packet.setAction(Action.NEGOTIATE_STREAMS);
	const streamsPacket = new StreamsPacket();
	console.log('#### streamsSenderSFU #### (add, remove)');
	console.log(streamsToAdd);
	console.log(streamsToRemove);

	const addStreamList: StreamInfo[] = [];
	const removeStreamList: StreamInfo[] = [];

	streamsToAdd.forEach((el) => {
		const streamInfo = new StreamInfo();
		streamInfo.setMid(el.mid!);
		streamInfo.setUserId(el.userId);
		streamInfo.setType(el.type);
		addStreamList.push(streamInfo);
	});

	streamsToRemove.forEach((el) => {
		const streamInfo = new StreamInfo();
		streamInfo.setUserId(el.userId);
		streamInfo.setType(el.type);
		removeStreamList.push(streamInfo);
	});

	streamsPacket.setAddRequestsList(addStreamList);
	streamsPacket.setRemovalRequestsList(removeStreamList);
	streamsPacket.setSdpOffer(sdpOffer);

	const serializedStreamsPacket = streamsPacket.serializeBinary();

	packet.setData(serializedStreamsPacket);

	const serializedPacket = packet.serializeBinary();

	//ignore if channel is not available already.
	if (manualRenegotiation) {
		const channel = roomGlobalRef.mediaSenderServerChannel;
		if (channel?.readyState === 'open') {
			channel.send(serializedPacket);
		}
	} else {
		const channel = roomGlobalRef.mediaReceiverServerChannel;
		if (channel?.readyState === 'open') {
			channel.send(serializedPacket);
		}
	}
};

export const sendActiveStreamsRequest = () => {
	const packet = new Packet();
	const activeStreamsPacket = new ActiveStreamsRequestPacket();

	const serializedActiveStreamsPacket = activeStreamsPacket.serializeBinary();

	packet.setAction(Action.GET_ACTIVE_STREAMS);
	packet.setData(serializedActiveStreamsPacket);

	const serializedPacket = packet.serializeBinary();

	const channel = roomGlobalRef.mediaReceiverServerChannel;
	if (channel?.readyState === 'open') {
		channel.send(serializedPacket);
	}
};

export const getActiveStreams$ = () => {
	return EMIT.pipe(subscribeThenDo(sfuReceiverChannelOutput$$(SfuMessageType.ACTIVE_STREAMS), () => sendActiveStreamsRequest()));
}
