import {concatMap, EMPTY, from, Subject, Subscription, throwError} from 'rxjs';
import {bufferCount, catchError, map, mapTo, mergeMap, take, tap, timeout} from 'rxjs/operators';
import {roomGlobalRef} from '../roomGlobalRef';
import {webSocketOutput$$} from '../../../../services/ws/webSocket';
import {WsMessageType} from '../../../../services/ws/incomingMessagesTypes.ws';
import {updateSDP} from '../updaters/updateSdp';
import {findSfuSenderWS$, sendICECandidatesWS$} from '../../../../services/ws/senders.ws';
import {SfuType} from '../../../../services/gRPC/sfu/enums_pb';
import pako from 'pako';
import {channelMessageDeserializer, SfuMessageType} from '../../../../services/sfu/incomingMessagesTypes.sfu';
import {PingPacket} from '../../../../services/gRPC/sfu/packets_pb';
import {SfuOutputOverload, sfuSorter$} from '../../../../services/sfu/incomingMessagesSorter.sfu';
import {pingSenderSFU} from '../../../../services/sfu/senders.sfu';
import {stringToByteArray} from '../../../../utils/utils';
import {reconnectPcReceiver} from './reconnectPcReceiver';

//
// Channels outputs
//

export let sfuReceiverChannelCore$$ = new Subject<MessageEvent>();
export const sfuReceiverChannelOutput$$ = ((messageType: SfuMessageType) => sfuReceiverChannelCore$$.pipe(
	map((messageEvent) => channelMessageDeserializer(messageEvent)),
	mergeMap((serverPacket) => sfuSorter$(serverPacket, messageType))
)) as SfuOutputOverload;

export const pcReceiverOfferAndChannelReady$ = () => {
	roomGlobalRef.pcMediaReceiver!.createDataChannel('dummyChannel');

	const innerSubs: Subscription[] = [];
	const subject = new Subject<void>();
	const buffer = subject.pipe(
		bufferCount(2),
		take(1),
		tap({
			finalize: () => {
				innerSubs.forEach(sub => {
					sub.unsubscribe();
				});
			}
		}),
		timeout(30000),
		catchError((err) => {
			console.error(err);
			reconnectPcReceiver(true);
			return EMPTY;
		}),
		map(() => undefined)
	);

	const offerWsSub = webSocketOutput$$(WsMessageType.OFFER).subscribe(offer => {
		const uint8Array = Uint8Array.from(atob(offer.sdpAnswer as string), c => c.charCodeAt(0));
		const offerSdpAnswer = JSON.parse(pako.inflate(uint8Array, {to: 'string'}));

		switch (offer.sfuType) {
			case SfuType.STYPE_ROUTER: {
				console.log(`%c offer for pcMediaReceiver received`, 'color: #524DDA; font-weight: 900; background: black');
				const sub = from(roomGlobalRef.pcMediaReceiver!.setRemoteDescription(offerSdpAnswer)).subscribe(() => {
					roomGlobalRef.queuedCandidates.setIsOfferReady(SfuType.STYPE_ROUTER, true);
					const unsentCandidates = roomGlobalRef.queuedCandidates[SfuType.STYPE_ROUTER].unsentCandidates;
					const notAddedCandidates = roomGlobalRef.queuedCandidates[SfuType.STYPE_ROUTER].notAddedCandidates;

					unsentCandidates.forEach((c: any) => {
						roomGlobalRef.queuedCandidates.shiftUnsent(SfuType.STYPE_ROUTER);
						const sub = sendICECandidatesWS$(stringToByteArray(c), SfuType.STYPE_ROUTER).subscribe();
						roomGlobalRef.queuedCandidates.addSub(SfuType.STYPE_ROUTER, sub);
					});

					notAddedCandidates.forEach((c: any) => {
						roomGlobalRef.queuedCandidates.shiftNotAdded(SfuType.STYPE_ROUTER);
						if (roomGlobalRef.pcMediaReceiver) {
							const sub = from(roomGlobalRef.pcMediaReceiver.addIceCandidate(c)).subscribe({
								error: (err) => {
									console.error('Failed to add receiver ice candidate', err);
								}
							});
							roomGlobalRef.queuedCandidates.addSub(SfuType.STYPE_ROUTER, sub);
						}
					});
					}
				);
				innerSubs.push(sub);
				return;
			}
			case SfuType.STYPE_GATEWAY: {
				return;
			}
			case SfuType.STYPE_UNKNOWN:
			default: {
				return throwError(() => new Error('unknown connection type'));
			}
		}
	});
	innerSubs.push(offerWsSub);

	/// Listen to channels, init ping-pong channel listeners, init sfuReceiverChannelOutput$$
	roomGlobalRef.pcMediaReceiver!.ondatachannel = (event) => {
		const channel = event.channel;
		const channelLabel = channel.label;

		switch (channelLabel) {
			case 'pingpong': {
				channel.onmessage = function (messageEvent) {
					const pingFromServer = channelMessageDeserializer(messageEvent);
					if (pingFromServer.type === 'sfu.api.v1.PingPacket') {
						const pingDataFromServer = PingPacket.deserializeBinary(pingFromServer.data as Uint8Array);
						const timestamp = pingDataFromServer.toObject().timestamp;
						pingSenderSFU(timestamp, channel);
					}
				};
				subject.next();
				return;
			}
			case 'server': {
				// we don't need to wait to this channel and message
				roomGlobalRef.mediaReceiverServerChannel = channel;
				roomGlobalRef.mediaReceiverServerChannel.onmessage = (messageEvent) => {
					sfuReceiverChannelCore$$.next(messageEvent);
				};
				return;
			}
			default: {
				console.error('channel with this label is unsupported');
				return;
			}
		}
	};

	/// Send offers
	const sub = from(roomGlobalRef.pcMediaReceiver!.createOffer()).pipe(
		concatMap((offer) => {
			offer.sdp = updateSDP(offer.sdp, 300, 600, 1200, 500, 1000, 2000);
			return from(roomGlobalRef.pcMediaReceiver!.setLocalDescription(offer)).pipe(mapTo(offer));
		}),
		concatMap((offer) => {
			return findSfuSenderWS$(pako.deflate(JSON.stringify(offer)), SfuType.STYPE_ROUTER);
		})
	).subscribe({
		error: (err) => {
			console.error('error while creating offer for receiver');
			subject.error(err);
		},
		complete: () => {
			subject.next();
		}
	});

	innerSubs.push(sub)

	return buffer;
};
