import { ActiveSync, SomeStringMayBe } from "activeDoc/activeSync"
import { invariant } from "logs"
import Peer, { DataConnection } from "peerjs"
import { BehaviorSubject, Subject } from "rxjs"
import { filter, map, tap } from "rxjs/operators"
import { isSyncMessage, PeerMessage, PeerMessageTracking } from "./messageTypes"
import {
	DocChanges, getMsgTracking, isAck, isInstruction, isRepo, isRollup, PeerConnection, randId,
	wrapAsMessage
} from "./types"

export const peerConnectFrom$ : Subject<string> = new Subject<string>()
export const peerConnectTo$ : Subject<string> = new Subject<string>()

export class PeerSync implements ActiveSync {
	connections:Map<string, PeerConnection> = new Map<string, PeerConnection>()
	trace$:Subject<any> = new Subject<any>()
	rawMessage$:Subject<PeerMessage> = new Subject<PeerMessage>()
	peerMessage$:Subject<PeerMessage> = new Subject<PeerMessage>()
	requireAck$:Subject<PeerMessage> = new Subject<PeerMessage>()
	ack$:Subject<PeerMessage> = new Subject<PeerMessage>()
	tracking:Map<string, PeerMessageTracking> = new Map<string, PeerMessageTracking>()
	onConnections$:BehaviorSubject<PeerConnection[]> = new BehaviorSubject<PeerConnection[]>([])
	syncMessageIn$:Subject<DocChanges> = new Subject<DocChanges>()
	syncMessageOut$:Subject<DocChanges> = new Subject<DocChanges>()
	askingSeed$:Subject<SomeStringMayBe> = new Subject<SomeStringMayBe>()
	askedSeed$:Subject<SomeStringMayBe> = new Subject<SomeStringMayBe>()
	receivedInstructions$:Subject<DocChanges[]> = new Subject<DocChanges[]>()
	sendingInstructions$:Subject<DocChanges[]> = new Subject<DocChanges[]>()
	send$:Subject<PeerMessage> = new Subject<PeerMessage>()
	
	private local:Peer
	
	constructor (public readonly id = "")
	{
		this.id = (id) ? id : randId()
		this.local = new Peer(this.id)
		// me connect to the notification server
		this.local.on("open", (connectionId:string) => {
			this.trace$.next(`local peer open with id ${connectionId}`)
		})
		// someone try to connect to me
		this.local.on('error', (e) => {
			this.trace$.next(`Connection error ${JSON.stringify(e)}`)
		})
		this.local.on('connection', (connection:DataConnection) => {
			this.addConnection(connection.peer, { connection, inward: true })
			peerConnectFrom$.next(connection.peer)
			this.trace$.next(`A connection from remote: ${connection.peer}`)
			connection.on('data', (data) => {
				this.rawMessage$.next(data)
				this.trace$.next(`** hey! data: ${JSON.stringify(data)}`)
			})
		})
		this.wire()
	}
	
	
	public instanceId () {return this.id}
	
	public send = (msg:PeerMessage, peers?:string[]) =>
	{
		const interested = (id:string) => {
			return (peers) ? peers.includes(id) : true
		}
		Array.from(this.connections.keys())
			 .filter(id => interested(id))
			 .forEach(
				 (peerId:string) => {
					 const individualMsg = {
						 ...msg,
						 peerId, sender: this.instanceId()!, senderTimestamp: new Date().valueOf()
					 }
					 this.trace$.next(
						 `sending to ${peerId} .. data ${JSON.stringify(individualMsg)}`)
					 const { connection } = this.connections.get(peerId)!
					 if (individualMsg.requireAck) {
						 this.addTracking(getMsgTracking(individualMsg))
					 }
					 connection.send(individualMsg)
				 }
			 )
	}
	
	connectTo = (peerId:string) =>
	{
		// we only want to connect to a peer that has not been connected
		if (this.connections.has(peerId)) {
			invariant.log('**  SEEN this peerId ')
			this.trace$.next(`connecting to ${peerId} again`)
			return Promise.resolve(this.connections.get(peerId))
		}
		
		invariant.log('!! have not seen this peerId before ...')
		return new Promise((resolve, reject) => {
			invariant.log('A> we are in the promise ...')
			const connection:DataConnection = this.local.connect(peerId, { reliable: true });
			invariant.log(`[peer]`, `started connecting to ${peerId}`)
			
			connection.on('open', () => {
				this.addConnection(peerId, { connection, inward: false });
				peerConnectTo$.next(peerId)
				this.trace$.next(`[open] remote peer: ${peerId} accepted to local peer`)
				connection.on('data', (data) => {
					this.rawMessage$.next(data)
					this.trace$.next(` --> data from ${connection.peer}: ${JSON.stringify(data)}`)
				});
				invariant.log('B> resolved ...')
				// resolve(connection)
			});
			
			connection.on('close', () => {
				invariant.log(`[peer] the remote connection is close`)
			})
			
			connection.on('disconnected', () => {
				invariant.log(`[peer] the remote connection is close`)
			})
			
			
			connection.on('error', (err) => {
				invariant.log(`[peer] connection error`, err)
				this.trace$.next(`[error] from ${connection.peer}: ${JSON.stringify(err)}`)
				// reject(err)
			})
			
			resolve("fast")
		})
		
	}
	
	public incoming$ = ():Subject<DocChanges> => this.syncMessageIn$
	public outgoing$ = () => this.syncMessageOut$
	
	private addConnection = (id:string, conn:PeerConnection) => {
		this.connections.set(id, conn)
		this.onConnections$.next(Array.from(this.connections.values()))
	}
	
	private wire ()
	{
		// everything goes out
		this.send$.subscribe(msg => {this.send(msg)})
		
		// everything coming in must filter from the top
		this.rawMessage$.pipe(filter((msg) => !!(msg.requireAck) && !isAck(msg))).subscribe(
			this.requireAck$)
		this.rawMessage$.pipe(filter((msg) => isAck(msg))).subscribe(this.ack$)
		this.rawMessage$.pipe(filter((msg) => !isAck(msg))).subscribe(this.peerMessage$)
		// this.rawMessage$.pipe(filter((msg) => isRollup(msg)),
		// 					  map((msg: any) => (msg.id))
		// 					  ).subscribe(this.askingSeed$)
		this.requireAck$.subscribe(this.responseAckRequirement)
		this.ack$.subscribe(this.matchingAck)
		
		this.peerMessage$.pipe(
			filter(msg => isSyncMessage(msg)),
			map(msg => msg.data as DocChanges),
		).subscribe(this.syncMessageIn$)
		
		this.wireSeedAndInstruction()
		this.wireInAndOutMessage()
	}
	
	private wireSeedAndInstruction ()
	{
		this.askingSeed$.pipe(map((id) => wrapAsMessage("rollup", id)),
							  tap(msg => {invariant.log(`[roll-up]`, msg )})
							  ).subscribe(this.send$)
		
		this.rawMessage$.pipe(
			filter((msg) => isRollup(msg)),
			map(msg => msg.data), // get into the payload's data as id
			)
			.subscribe(this.askedSeed$)
		
		this.sendingInstructions$.pipe(map(changes => wrapAsMessage("instruction", changes)))
			.subscribe(this.send$)
		
		this.rawMessage$.pipe(
			filter((msg) => isInstruction(msg)),
			map(msg => msg.data), // get into the payload's data as id
			)
			.subscribe(this.receivedInstructions$)
	}
	
	private wireInAndOutMessage ()
	{
		// handle the changes coming in
		this.rawMessage$
			.pipe(
				filter((msg) => isRepo(msg)), // find the "repo" message
				map(msg => msg.data), // get into the payload
				tap(msg => {invariant.log(`way, we got the msg`, msg)})
			)
			.subscribe(this.syncMessageIn$) // rewire it into the next layer!
		
		// handling the change to go out
		this.syncMessageOut$.pipe(map(msg => wrapAsMessage("repo", msg))).subscribe(this.send$)
		
	}
	
	private responseAckRequirement = (msg:PeerMessage) =>
	{
		const { id, sender } = msg
		const resp:PeerMessage = {
			id, peerId: sender!, sender: this.instanceId()!,
			ackTimestamp: new Date().valueOf(),
			topic: "ack", subTopic: "", data: null
		}
		this.send(resp, [sender!])
	}
	
	private matchingAck = (ackMsg:PeerMessage) => {
		const { id, sender } = ackMsg
		const key = id + '-' + sender
		if (this.tracking.has(key)) {
			invariant.log(`delete key ${key}`)
			this.tracking.delete(key)
		}
	}
	
	private addTracking (info:PeerMessageTracking)
	{
		const { id, peerId } = info
		const key = `${id}-${peerId}`
		this.tracking.set(key, info)
	}
	
	
}

