import {EMPTY, forkJoin, merge, Observable, of, Subject, Subscription, throwError, timer} from 'rxjs';
import {catchError, map, mapTo, mergeMap, share, switchMap, take, tap} from 'rxjs/operators';
import {heartbeatPacketWsGuard, WebSocketOutputOverload, WsMessageType} from './incomingMessagesTypes.ws';
import {authMessageSenderWS$, authToken, heartbeatMessageSenderWS$, packetIndex, userJoinRoomSenderWS$} from './senders.ws';
import {blobToArray$, EMIT, emit} from '../../utils/utils';
import {webSocketSorter$} from './incomingMessagesSorter.ws';
import {envVars} from '../../environments/env';
import {WebSocketError, WebSocketErrorType} from './webSocketError.ws';
import {webSocket, WebSocketSubject} from 'rxjs/webSocket';
import {handleUnusedTransceivers, roomGlobalRef} from '../../pages/Room/utils/roomGlobalRef';
import {createRoomUserTokenService$, getRoomStateService$, listEventsService$} from '../roomServices';
import {ModalType} from '../../singleComponents/modals/modal.types';
import {modalsSystem} from '../../singleComponents/modals/modalSystem';
import {AuthenticationPacket, HeartbeatPacket, Packet} from '../gRPC/wsserver/packets.public_pb';
import {subscribeThenDo} from '../../utils/rxjsOperators';
import {reconnectPcSender} from '../../pages/Room/utils/listeners/reconnectPcSender';
import {reconnectPcReceiver} from '../../pages/Room/utils/listeners/reconnectPcReceiver';
import {listUsersService$} from '../userServices';
import {RoomStreamType, RoomUserReceiverConnectionStatus, RoomUserSenderConnectionStatus} from '../gRPC/rooms/enums_pb';
import {EventInfo, RoomUserInfo} from '../gRPC/rooms/models_pb';
import {roomActions} from '../../store/slices/room';
import {roomUsersActions, selectUserFlags} from '../../store/slices/roomUsers';
import {roomsActions} from '../../store/slices/rooms';
import {selectClientId, selectClientUserInfo, userActions} from '../../store/slices/user';
import {store} from '../../store/store';
import {chatsActions} from '../../store/slices/chats';
import {RoomStreamNotEvent, roomStreamsActions} from '../../store/slices/roomStreams';
import {resendClientFlags$} from '../../pages/Room/utils/resendClientFlags';
import {
	checkIsActiveNotification,
	closeNotification,
	Notification,
	showNotification,
	showPermanentNotification
} from '../../utils/showNotification';
import {isiOS} from '../../utils/browserTypeDetection';
import {getActiveStreams$} from '../sfu/senders.sfu';

// TODO: Cannot apply constraints. No camera stream available

///
/// == Main variables ==
/// webSocketCore$$ - use it to send messages to WS
/// webSocketOutput$$ - use it to listen to incoming WS messages
///
let isWebSocketOpening = false;
export let isWebSocketOpen = false;
export let isRoomDataSynchronized = true;
export let webSocketCore$$: WebSocketSubject<any>;
export let webSocketOutput$$: WebSocketOutputOverload;
let lastWsConnectionUpdate = '';

const reconnectionsCounter = {
	number: 0,
	add: function () {
		this.number++;
	},
	reset: function () {
		this.number = 0;
	}
};

let visibleAt = Date.now();
document.addEventListener('visibilitychange', function () {
	if (!document.hidden) {
		visibleAt = Date.now();
	}
});


///
/// == noPingTimeout ==
///
/// starts when subscribed and after specified time throws error
/// unsubscribe it to cancel error throwing
/// in switchMap it is auto-unsubscribed
/// you can unsubscribe (complete it without error throwing) using noPingTimeoutCompleteCaller
///
const pingTime = 8000;

const noPingTimeoutCompleteCaller = new Subject<void>();

const noPingTimeout = new Observable(obs => {
	const sub = timer(pingTime).subscribe(() => {
		if (obs.closed) {
			obs.complete();
		} else {
			obs.error(new WebSocketError(WebSocketErrorType.NO_PING));
		}
	});
	noPingTimeoutCompleteCaller.pipe(
		take(1)
	).subscribe(() => {
		sub.unsubscribe();
		obs.complete();
	});
}).pipe(
	catchError(err => {
		if (err?.type === WebSocketErrorType.NO_PING) {
			webSocketCore$$.complete();
			return EMPTY;
		}
		return throwError(err);
	})
);

///
/// == Web Socket Core ==
///
/// responsible to send messages to ws
/// it connects to webSocketMessagePasser and send there incoming messages
///

//@ts-ignore
window.wsReconnection = true;

//@ts-ignore
window.wsPing = true;
let canWebSocketCore = true;
let shouldWebSocketCoreAgain = false;
let webSocketCoreSubscription: Subscription | undefined;
const nextReconnectionsDelays = [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 15000, 20000]; // first reconnect is instant
const webSocketMessagePasser = new Subject<void>();
const webSocketIsOpen = new Subject<AuthenticationPacket.AsObject>();
let authMessageSenderSubscription: Subscription | undefined;

//this is called every time ws closeObserver runs (expect the case of wsReconnection turned off)
const webSocketCoreCaller = () => {
	if (canWebSocketCore) {
		console.log('webSocketCoreCaller has been called');
		canWebSocketCore = false;
		const delay = nextReconnectionsDelays.length > reconnectionsCounter.number ?
			nextReconnectionsDelays[reconnectionsCounter.number] :
			nextReconnectionsDelays[nextReconnectionsDelays.length - 1];
		console.log('DELAY:', delay);
		reconnectionsCounter.add();
		//@ts-ignore
		if (window.wsReconnection) {
			webSocketCore();
		} else {
			shouldWebSocketCoreAgain = true;
		}
		timer(delay).subscribe(() => {
			canWebSocketCore = true;
			if (shouldWebSocketCoreAgain) {
				shouldWebSocketCoreAgain = false;
				webSocketCoreCaller();
			}
		});
	} else {
		shouldWebSocketCoreAgain = true;
	}

};

const webSocketCore = () => {
	webSocketCore$$ = webSocket({
		url: envVars.apiWsUrl,
		serializer: (outgoingMsg: any) => outgoingMsg,
		deserializer: (incomingMsg) => incomingMsg.data,
		closeObserver: {
			next: () => {
				console.log('closeObserver');
				lastWsConnectionUpdate = 'closeObserver';
				store.dispatch(userActions.setIsConnectedWithWs(false));
				roomGlobalRef.flagsQueue?.close();
				roomGlobalRef.statusesQueue?.close();
				roomGlobalRef.roomUsersStatusesQueue.close();
				packetIndex.resetValue();
				isWebSocketOpening = false;
				isWebSocketOpen = false;
				closeNotification('connected');
				if (visibleAt && ((Date.now() - visibleAt)) > 5000) {
					if(!checkIsActiveNotification('reconnection')) showPermanentNotification(Notification.WARNING, 'We detected some connection issues, reconnecting...', 'reconnection');
				}
				noPingTimeoutCompleteCaller.next();
				webSocketCore$$.complete();
				if (lastWsConnectionUpdate !== 'closingObserver') {
					// unsubscribe previous sub to not send many room joins
					if (webSocketCoreSubscription && !webSocketCoreSubscription.closed) {
						webSocketCoreSubscription.unsubscribe();
					}
					webSocketCoreSubscription = setWebSocket$().subscribe();
				}
				if (authMessageSenderSubscription && !authMessageSenderSubscription.closed) {
					authMessageSenderSubscription.unsubscribe();
				}
			}
		},
		closingObserver: {
			next: () => {
				lastWsConnectionUpdate = 'closingObserver';
				store.dispatch(userActions.setIsConnectedWithWs(false));
				console.log('closingObserver');
				isWebSocketOpening = false;
				isWebSocketOpen = false;
				roomGlobalRef.flagsQueue?.close();
				roomGlobalRef.statusesQueue?.close();
				roomGlobalRef.roomUsersStatusesQueue.close();
				packetIndex.resetValue();
				closeNotification('connected');
				if (visibleAt && ((Date.now() - visibleAt)) > 5000) {
					if(!checkIsActiveNotification('reconnection')) showPermanentNotification(Notification.WARNING, 'We detected some connection issues, reconnecting...', 'reconnection');
				}
				noPingTimeoutCompleteCaller.next();
				webSocketCore$$.unsubscribe();
				console.log(webSocketCore$$.closed, 'isClosed');
				// unsubscribe previous sub to not send many room joins
				if (webSocketCoreSubscription && !webSocketCoreSubscription.closed) {
					webSocketCoreSubscription.unsubscribe();
				}
				webSocketCore$$.complete();
				webSocketCoreSubscription = setWebSocket$().subscribe();
				if (authMessageSenderSubscription && !authMessageSenderSubscription.closed) {
					authMessageSenderSubscription.unsubscribe();
				}
			}
		},
		openObserver: {
			next: () => {
				lastWsConnectionUpdate = 'openObserver';
				reconnectionsCounter.reset();
				console.log('openObserver');
				packetIndex.resetValue();
				isWebSocketOpen = true;
				authMessageSenderSubscription = authMessageSenderWS$().pipe(
					take(1)
				).subscribe({
					next: (packet) => {
						console.log(packet);
						//@ts-ignore
						window.lastAuthPacket = packet;
						webSocketIsOpen.next(packet);
						console.log('CONNECTED WITH WS');
						store.dispatch(userActions.setIsConnectedWithWs(true));
						authToken.current = packet.token;
					},
					error: (err) => {
						console.log(err);
					}
				});
			}
		}
	});
	//! added webSocketCore$$ to window - remove it in future
	console.log('### window.webSocketCore$$ is set');
	//@ts-ignore
	window.webSocketCore$$ = webSocketCore$$;
	webSocketCore$$.pipe(
		catchError((err) => {
			console.log(err);
			return EMPTY;
		})
	).subscribe((msg) => {
		webSocketMessagePasser.next(msg);
	});
};

export const setWebSocket$ = () => {
	if (isWebSocketOpening) {
		webSocketCore$$.complete();
		return EMIT;
	} else {
		isWebSocketOpening = true;
		return EMIT.pipe(
			subscribeThenDo(webSocketIsOpen.pipe(take(1)), webSocketCoreCaller),
			// mergeMap(() => authMessageSenderWS$()),
			//! [rejoin room] remove it in future when backend will handle it
			mergeMap((packet) => {
				// check if user already in room
				if (roomGlobalRef.currentRoomPid) {
					return EMIT.pipe(
						mergeMap(() => {
							if (!packet.client?.roomId) {
								return EMIT.pipe(
									mergeMap(() => createRoomUserTokenService$({roomPid: roomGlobalRef.currentRoomPid!})),
									mergeMap((token) => userJoinRoomSenderWS$(token, true).pipe(
										catchError(err => {
											console.error(err);
											const data = {
												title: 'Connection Error',
												text: `Error during joining casa.\n${err}\n\nPlease try again and refresh the page.`,
												preventCoverClick: true,
												hideButton: true
											};
											modalsSystem(ModalType.INFO, data).subscribe();
											return EMPTY;
										})
									))
								);
							}
							return EMIT;
						}),
						// TODO: check this date, add logs, debug
						map(() => Date.now()),
						tap((date) => console.log('GET STATE DATE', date)),
						mergeMap((date) => getRoomStateService$().pipe(
							// get extra data about user based on roomState
							mergeMap((roomState) => {
								const clientUserInfo = selectClientUserInfo(store.getState());
								const clientId = selectClientId(store.getState());
								const list = roomState.usersList.filter(user => user.userId !== clientId).map(el => el.userId);
								if (list.length) {
									return listUsersService$(list).pipe(
										map((userInfoList) => ({
											roomState: roomState,
											userInfoList: [...userInfoList, clientUserInfo],
											date
										}))
									);
								} else {
									return of({roomState: roomState, userInfoList: [clientUserInfo], date});
								}
							}),
							tap(({roomState, userInfoList, date}) => {
								// fill store with data about playlist
								const streamsList = roomState.streamsList;
								const zucasaQueuedEventsSids = streamsList.filter(i => i.type === RoomStreamType.RSTYPE_EVENT).map(i => i.metadata!.sid);

								if (zucasaQueuedEventsSids.length) {
									forkJoin([listEventsService$({idsList: zucasaQueuedEventsSids})]).subscribe(
										(results) => {
											const streamsWithInfo = streamsList.map(s => {
												switch (s.type) {
													case RoomStreamType.RSTYPE_EVENT:
														return {...s, info: results[0].eventsList.find((e: EventInfo.AsObject) => e.id === s.metadata!.sid)!};
													default:
														return s as RoomStreamNotEvent;
												}
											});
											store.dispatch(roomStreamsActions.setRoomStreams(streamsWithInfo));
										}
									);
								} else {
									store.dispatch(roomStreamsActions.setRoomStreams(streamsList as RoomStreamNotEvent[]));
								}

								const clientId = selectClientId(store.getState());
								// change flags in response to flags which are set on client's machine
								// TODO: check client flags, if list is empty, resend it
								const usersListWithModifiedClient = roomState.usersList.reduce((acc: RoomUserInfo.AsObject[], cur: RoomUserInfo.AsObject) => {
									if (cur.userId === clientId) {
										const flagsList = [...Object.keys(selectUserFlags(clientId)(store.getState())!)].map(n => Number(n));
										console.log(flagsList);
										const client = {...cur, flagsList};
										return [...acc, client];
									}
									return [...acc, cur];
								}, []);

								// fill stores with all other data
								console.log('refresh room state');
								console.log(JSON.parse(JSON.stringify(roomState.usersList)));
								store.dispatch(roomUsersActions.upsertUsersBasedOnResponses({
									roomState: {...roomState, usersList: usersListWithModifiedClient},
									userInfoList
								}));

								store.dispatch(roomsActions.addRoom({...roomState.info!, event: roomState.event?.info}));
								store.dispatch(roomActions.setInfo(roomState.info!));
								store.dispatch(roomStreamsActions.setEvent(roomState.event!));
								store.dispatch(roomActions.setChat(roomState.chat!));
								const roomChatData = roomState.chat!;
								const eventChatData = roomState.event?.chat;
								store.dispatch(chatsActions.setChat({
									id: roomChatData.id,
									type: 'ROOM',
									status: roomChatData.status,
									createdAt: roomChatData.createdAt,
									updatedAt: roomChatData.updatedAt
								}));
								if (eventChatData) {
									store.dispatch(chatsActions.setChat({
										id: eventChatData.id,
										type: 'EVENT',
										status: eventChatData.status,
										createdAt: eventChatData.createdAt,
										updatedAt: eventChatData.updatedAt
									}));
								}
								roomGlobalRef.flagsQueue?.open(date);
								roomGlobalRef.statusesQueue?.open(date);
								roomGlobalRef.roomUsersStatusesQueue.open(date);
							})
						)),
						mergeMap(({roomState}) => {
							const clientId = selectClientId(store.getState());
							const clientState = roomState.usersList.find(u => u.userId === clientId)!;
							console.log(clientState);
							const shouldReconnectReceiver = roomGlobalRef.pcMediaReceiver && (roomGlobalRef.pcMediaReceiver.connectionState === 'failed' || roomGlobalRef.pcMediaReceiver?.connectionState === 'closed' || clientState.receiverConnectionStatus === RoomUserReceiverConnectionStatus.RURCSTATUS_DISCONNECTED || (isiOS() && clientState.receiverConnectionStatus === RoomUserReceiverConnectionStatus.RURCSTATUS_RECONNECTING));
							if (shouldReconnectReceiver) {
								reconnectPcReceiver(true);
							}

							const shouldReconnectSender = roomGlobalRef.pcMediaSender && (roomGlobalRef.pcMediaSender.connectionState === 'failed' || roomGlobalRef.pcMediaSender?.connectionState === 'closed' || clientState.senderConnectionStatus === RoomUserSenderConnectionStatus.RUSCSTATUS_DISCONNECTED || (isiOS() && clientState.senderConnectionStatus === RoomUserSenderConnectionStatus.RUSCSTATUS_RECONNECTING));
							if (shouldReconnectSender) {
								reconnectPcSender(true);
								return EMIT;
							}

							if (checkIsActiveNotification('reconnection') && !(shouldReconnectSender || shouldReconnectReceiver)) {
								closeNotification('reconnection');
								showNotification(Notification.SUCCESS, 'You have been connected with a server correctly', 'connected');
							}

							if (roomGlobalRef.pcMediaSender && (roomGlobalRef.pcMediaSender.connectionState === 'connected' || clientState.senderConnectionStatus === RoomUserSenderConnectionStatus.RUSCSTATUS_CONNECTED)) {
								getActiveStreams$().pipe(take(1)).subscribe((r) => {
									handleUnusedTransceivers(r);
									roomGlobalRef.currentSubscriptionState.applyTransceiversSubs(r);
									roomGlobalRef.clientSubscriptionState.applyTransceiversSubs(r);
									roomState.usersList.forEach((user) => {
										roomGlobalRef.flagsQueue.applyFlagsUpdate(user.flagsList, user.userId, Date.now());
									});
								});

								return resendClientFlags$();
							}

							roomGlobalRef.shouldResendFlags = true;
							return EMIT;
							// TODO: send it only if sender is connected on frontend and backend side
							// TODO: merge backend and frontend information about connection state
							// TODO: unsub packets when WS is closed - probably we will need to put
							//  them into queue and resend after reconnection - we resend flags anyway, but messages can stuck during reconnection - check it and fix if needed
							// TODO: talk to nesti, sometimes there is no need to clear flags
						})
					);
				} else {
					if (checkIsActiveNotification('reconnection')) {
						closeNotification('reconnection');
						showNotification(Notification.SUCCESS, 'You have been connected with a server correctly', 'connected');
					}
				}
				return EMIT;
			}),
			//! [rejoin room end]
			mapTo(emit)
		);
	}

};

///
/// == Web Socket Message Transmitter ==
///
/// it must be called once per session
/// responsible to receive messages from ws
/// it connects to webSocketOutput$$ and send there incoming messages
/// all WS listeners listen to the webSocketOutput$$
/// the webSocketOutput$$ should never complete
///


export const initWebSocketMessageTransmitter = () => {
	// everything inside this pipe runes once even if there are more subscriptions of webSocketOutput$ / wsOutput$
	const webSocketCore$ = webSocketMessagePasser.pipe(
		// throw an error when no message is received in pingTime
		switchMap(message => merge(
			of(message),
			noPingTimeout
		)),
		// convert Blob to Uint8Array
		mergeMap(message => blobToArray$(message as Blob).pipe(
			catchError(() => EMPTY)
		)),
		// convert Uint8Array to ServerPacket
		map(message => Packet.deserializeBinary(message).toObject()),
		// skip and response to heartbeat message
		mergeMap(serverPacket => {
			if (heartbeatPacketWsGuard(serverPacket)) {
				// TODO: use latency from this packet to count ws no ping delay
				// @ts-ignore
				if (window.wsPing) {
					heartbeatMessageSenderWS$(HeartbeatPacket.deserializeBinary(serverPacket.data).toObject().timestamp);
				}
				return EMPTY;
			} else {
				console.log(`%c ${serverPacket.type} `, 'color: black; font-weight: 900; background: #df8fff');
				return of(serverPacket).pipe(
					catchError(() => EMPTY)
				);
			}
		}),
		// catch any error from above (e.g. blobToArray error, no ping error)
		catchError((error: WebSocketError) => {
			console.error(error);
			return EMPTY;
		}),
		// final actions
		tap({
			error: err => {
				console.error('The webSocketMessagePasser should be active all time.');
				console.error(err);
			},
			complete: () => {
				console.error('The webSocketMessagePasser should be active all time.2');
			}
		}),
		// now, the observable can be called many times and socket will be opened once
		share()
	) as Observable<Packet.AsObject>;

	// main message observable - this part splits to each subscription (code below will be run multiple times (one for each sub))
	webSocketOutput$$ = ((messageType: WsMessageType) => webSocketCore$.pipe(
		// pass only messages of specific type
		mergeMap((message) => webSocketSorter$(message, messageType))
	)) as WebSocketOutputOverload;

};
