import { chain, filter, forEach, map } from './iter'
import { MapSet } from './mapSet'
import { parallelRunner, type Runner } from './runner'
import safeProp from './safeProp'

/**
 * The implementing function of a {@link DAG} task
 *
 * Takes a `depend` function that dynamically declares a dependency on another task, and returns said task's return value
 *
 * Returns the task's return value
 *
 * @see {@link DAG}
 */
export type DagFn<
	K extends string,
	R extends Record<K, any>,
	Y extends K = K,
> = (opts: {
	/**
	 * Dynamically declare a dependency on another task by name
	 *
	 * If the dependency already completed, returns its result; if it already failed, throws
	 *
	 * If the dependency didn't complete yet, reschedules the current task to run after the dependency, by throwing a {@link DependencyException}
	 */
	depend: <X extends K>(key: X) => Awaited<R[X]>
}) => Promise<R[Y]>

/**
 * Type helper to automatically type a function as a {@link DagFn}
 */
export function dagFn<K extends string, R extends Record<K, any>>(
	fn: DagFn<K, R>
): DagFn<K, R>
/**
 * Type helper to automatically type a function as a {@link DagFn}
 */
export function dagFn<R extends Record<string, any>>(
	fn: DagFn<keyof R & string, R>
): DagFn<keyof R & string, R>
/**
 * Type helper to automatically type a function as a {@link DagFn}
 */
export function dagFn(fn: DagFn<any, any>): DagFn<any, any> {
	return fn
}

type DagFnArg<K extends string, R extends Record<K, any>> = Parameters<
	DagFn<K, R>
>[0]
type RunResult<K extends string> = ['done'] | ['failed'] | ['depends', K]

/**
 * Exception thrown when a task dynamically declares a dependency on another task, but that task is not completed yet
 *
 * Normally not constructed manually, automatically generated by {@link DagFn}'s `depend` helper
 *
 * @see {@link DAG}
 * @see {@link DagFn}
 */
export class DependencyException<K> extends Error {
	#dependency: K

	/**
	 * Construct a new dependency exception
	 */
	constructor(dependency: K) {
		super(`Task is depending on ${dependency}`)
		this.#dependency = dependency
	}

	/**
	 * The dependency that needs to be resolved
	 */
	get dependency() {
		return this.#dependency
	}
}

/**
 * How to resolve failures in a {@link DAG}
 */
export type FailureMode =
	/**
	 * Immediately fails as soon as a failure is detected
	 *
	 * This could be a cycle, a failed task, etc.
	 */
	| 'early'
	/**
	 * Fail as late as possible, even when failure is inevitable
	 */
	| 'late'

/**
 * Status of a {@link DAG} task
 */
export type TaskStatus =
	/**
	 * Task does not exist
	 */
	| 'nonexistent'
	/**
	 * Task is ready to be executed
	 */
	| 'ready'
	/**
	 * Task is currently running
	 */
	| 'running'
	/**
	 * Task is done running (successfully)
	 */
	| 'done'
	/**
	 * Task failed running
	 */
	| 'failed'
	/**
	 * Task is in an impossible state
	 */
	| 'impossible'

/**
 * Initializer for a {@link DAG}
 *
 * Object or map of tasks, with a list or set of dependencies each, as well as a {@link DagFn}
 *
 * @see {@link DAG}
 * @see {@link DagFn}
 */
export type DagInitializer<K extends string, R extends Record<K, any>> =
	| Map<K, DagFn<K, R> | { fn: DagFn<K, R>; deps: K[] | Set<K> }>
	| Record<K, DagFn<K, R> | { fn: DagFn<K, R>; deps: K[] | Set<K> }>

/**
 * Type helper to auto type a {@link DagInitializer}
 *
 * @see {@link DAG}
 * @see {@link DagFn}
 */
export function dagInit<R extends Record<string, any>>(): <
	X extends Partial<{
		[K in keyof R & string]: {
			fn: DagFn<keyof R & string, R, K>
			deps: (keyof R & string)[]
		}
	}>,
>(
	init: X
) => X
/**
 * Type helper to auto type a {@link DagInitializer}
 *
 * @see {@link DAG}
 * @see {@link DagFn}
 */
export function dagInit() {
	return <X>(init: X) => init
}

/**
 * A DAG (Directed Acyclic Graph) of tasks, where a task is a node, and a dependency is an edge
 *
 * Used to run potentially large graphs of tasks, where a task can depend on any number of tasks, maximizing concurrency and ensuring dependency order
 *
 * As this is a DAG, it is invalid to specify dependency cycles, or have a mutual dependency
 *
 * Modeled after {@link https://github.com/natnat-mc/moonbuild | moonbuild}, itself modeled after {@link https://www.gnu.org/software/make/ | Make}
 *
 * @see {@link DagFn}
 * @see {@link DagInitializer}
 * @see {@link Runner}
 */
export class DAG<K extends string, R extends Record<K, any>> {
	// all the tasks in the graph
	#tasks = new Map<
		K,
		{
			fn: DagFn<K, R>

			dependsDirect: Set<K>
			suppliesDirect: Set<K>

			dependsTransitive: Set<K>
			suppliesTransitive: Set<K>
		}
	>()

	// currently running tasks
	#running = new Set<K>()

	// successfully ran tasks, with their result
	#done = new Map<K, Awaited<R[K]>>()

	// failed tasks
	#failed = new Map<K, any>()

	// tasks that can never run (depending on failed or impossible tasks)
	#impossible = new Set<K>()

	// how early should the DAG fail
	#failureMode: FailureMode = 'early'

	// how to run the tasks we have
	#exec: Runner

	// if we're already running a job
	#isRunning = false

	// handlers for when a task starts, completes or fails
	#onStart = new Set<(task: K) => void>()
	#onComplete = new Set<<X extends K>(task: X, result: R[X]) => void>()
	#onFailure = new Set<(task: K, err: any) => void>()

	/**
	 * Build a DAG instance
	 */
	constructor({
		failureMode = 'early',
		exec = parallelRunner,
		tasks,
		onStart,
		onComplete,
		onFailure,
	}: {
		/**
		 * What to do when failure is detected
		 *
		 * @defaultValue 'early'
		 */
		failureMode?: FailureMode
		/**
		 * What {@link Runner} implementation to use to run tasks
		 *
		 * @see {@link Runner}
		 * @defaultValue parallelRunner
		 */
		exec?: Runner
		/**
		 * Automatically populate the DAG with tasks
		 */
		tasks?: DagInitializer<K, R>
		/**
		 * Event handler called when a task is started
		 */
		onStart?: (task: K) => void
		/**
		 * Event handler called when a task is completed
		 */
		onComplete?: <X extends K>(task: X, result: R[X]) => void
		/**
		 * Event handler called when a task fails
		 */
		onFailure?: (task: K, err: any) => void
	} = {}) {
		this.#failureMode = failureMode
		this.#exec = exec

		if (tasks) {
			// eslint-disable-next-line no-inner-declarations
			function* itTasks(): Generator<
				[K, DagFn<K, R> | { fn: DagFn<K, R>; deps: K[] | Set<K> }]
			> {
				if (tasks instanceof Map) {
					for (const task of tasks.entries()) yield task
					return
				}
				for (const k of Object.keys(tasks!)) {
					const key = k as K
					yield [key, tasks![key]]
				}
			}

			for (const [k, v] of itTasks()) {
				this.addTask(k as K, typeof v == 'function' ? v : v.fn)
			}
			for (const [k, v] of itTasks()) {
				if (typeof v != 'function') {
					for (const d of v.deps) {
						this.addDependency(k, d)
					}
				}
			}
		}

		if (onStart) this.#onStart.add(onStart)
		if (onComplete) this.#onComplete.add(onComplete)
		if (onFailure) this.#onFailure.add(onFailure)
	}

	/**
	 * Add an event handler for task start events
	 */
	onStart(handler: (task: K) => void): void {
		this.#onStart.add(handler)
	}
	/**
	 * Remove an event handler for task start events
	 */
	offStart(handler: (task: K) => void): void {
		this.#onStart.delete(handler)
	}
	/**
	 * Add an event handler for task complete events
	 */
	onComplete(handler: <X extends K>(task: X, result: R[X]) => void): void {
		this.#onComplete.add(handler)
	}
	/**
	 * Remove an event handler for task complete events
	 */
	offComplete(handler: <X extends K>(task: X, result: R[X]) => void): void {
		this.#onComplete.delete(handler)
	}
	/**
	 * Add an event handler for task failure events
	 */
	onFailure(handler: (task: K, err: any) => void): void {
		this.#onFailure.add(handler)
	}
	/**
	 * Remove an event handler for task failure events
	 */
	offFailure(handler: (task: K, err: any) => void): void {
		this.#onFailure.delete(handler)
	}

	/**
	 * Add a new task to this DAG
	 */
	addTask(
		/**
		 * The task's identifier
		 */
		key: K,
		/**
		 * The task's implementation
		 */
		fn: DagFn<K, R>
	) {
		safeProp(key)
		if (this.#tasks.has(key)) throw new Error(`Task ${key} already exists`)
		this.#tasks.set(key, {
			fn,
			dependsDirect: new Set(),
			suppliesDirect: new Set(),
			dependsTransitive: new Set(),
			suppliesTransitive: new Set(),
		})
	}

	/**
	 * Statically declare a dependency between two tasks
	 */
	addDependency(
		/**
		 * The task that will depend on the other task
		 */
		task: K,
		/**
		 * The task that is being depended on
		 */
		dependency: K
	): void
	addDependency(a: K, b: K) {
		const statusA = this.status(a)
		const statusB = this.status(b)

		// sanity check
		if (statusA === 'nonexistent')
			throw new Error(`Task ${a} does not exist`)
		if (statusA === 'running')
			throw new Error(`Task ${a} is already running`)
		if (statusA === 'done' || statusA === 'failed')
			throw new Error(`Task ${a} is already finished`)

		if (statusB === 'nonexistent')
			throw new Error(`Task ${b} (dependency of ${a}) does not exist`)

		if (this.#depsD(a).has(b)) return // nothing to do
		if (this.#depsT(b).has(a))
			throw new Error(
				`${a} is already a ${
					this.#depsD(b).has(a) ? 'direct' : 'transitive'
				} dependency of ${b}, cycle detected`
			)

		// a depends on b directly
		this.#depsD(a).add(b) // directly

		// a and all of its reverse dependencies depend transitively on b and its transitive dependencies
		forEach(chain([a], this.#rdepsT(a)), x => {
			const deps = this.#depsT(x)
			forEach(chain([b], this.#depsT(b)), y => deps.add(y))
		})

		// b reverse depends on a directly
		this.#rdepsD(b).add(a)

		// b and all of its dependencies reverse depend transitively on a and its transitive dependencies
		forEach(chain([b], this.#depsT(b)), x => {
			const rdeps = this.#rdepsT(x)
			forEach(chain([a], this.#rdepsT(a)), y => rdeps.add(y))
		})
	}

	/**
	 * Checks the status of a task
	 */
	status(task: K): TaskStatus {
		if (!this.#tasks.has(task)) return 'nonexistent'
		if (this.#running.has(task)) return 'running'
		if (this.#done.has(task)) return 'done'
		if (this.#failed.has(task)) return 'failed'
		if (this.#impossible.has(task)) return 'impossible'
		return 'ready'
	}
	/**
	 * Checks if a task is declared
	 */
	isPresent(task: K): boolean {
		return this.#tasks.has(task)
	}
	/**
	 * Checks if a task is in the 'ready' status
	 */
	isReady(task: K): boolean {
		return this.status(task) === 'ready'
	}
	/**
	 * Checks if a task is in the 'running' status
	 */
	isRunning(task: K): boolean {
		return this.status(task) === 'running'
	}
	/**
	 * Checks if a task is in the 'done' or 'failed' status
	 */
	isFinished(task: K): boolean {
		const st = this.status(task)
		return st === 'done' || st === 'failed'
	}
	/**
	 * Checks if a task is in the 'ready', 'done' or 'failed' status
	 */
	isPossible(task: K): boolean {
		const st = this.status(task)
		return st === 'ready' || st === 'done' || st === 'running'
	}

	/**
	 * Return the result of a task if it is in the 'done' status
	 *
	 * Throws otherwise
	 */
	getResultOk(task: K): Awaited<R[K]> {
		if (!this.#done.has(task)) throw new Error(`Task ${task} is not done`)
		return this.#done.get(task)!
	}
	/**
	 * Return the error of a task if it is in the 'failed' status
	 *
	 * Throws otherwise
	 */
	getResultKo(task: K): any {
		if (!this.#failed.has(task))
			throw new Error(`Task ${task} is not failed`)
		return this.#failed.get(task)!
	}
	/**
	 * Returns the result of a task if it is in the 'done' status
	 *
	 * Throws the error of the task if it is in the 'failed' status
	 *
	 * Throws otherwise
	 */
	getResult(task: K): Awaited<R[K]> {
		if (this.#done.has(task)) return this.#done.get(task)!
		if (this.#failed.has(task)) throw this.#failed.get(task)!
		throw new Error(`Task ${task} is not finished`)
	}

	/**
	 * Lists the identifiers of all tasks contained in this DAG
	 */
	*tasks(): Generator<K> {
		for (const task of this.#tasks.keys()) yield task
	}

	/**
	 * What this DAG will do when failure is detected
	 */
	get failureMode(): FailureMode {
		return this.#failureMode
	}

	#flagImpossible(task: K) {
		const taskObj = this.#tasks.get(task)
		if (!taskObj) throw new Error(`Task ${task} is not defined`)

		this.#impossible.add(task)
		for (const supplied of taskObj.suppliesDirect)
			this.#flagImpossible(supplied)
	}

	async #execute<T>(fn: () => Promise<T>): Promise<T> {
		return this.#exec.run(fn)
	}

	async #run(task: K): Promise<RunResult<K>> {
		const st = this.status(task)
		if (st === 'running') throw new Error(`Task ${task} is already running`)
		if (st === 'failed') throw new Error(`Task ${task} is already failed`)
		if (st === 'nonexistent') throw new Error(`Task ${task} is not defined`)

		const {
			fn,
			dependsDirect: depends,
			suppliesDirect: supplies,
		} = this.#tasks.get(task)!

		for (const dep of depends) {
			const st = this.status(dep)
			if (st === 'failed')
				throw new Error(`Task ${dep} (dependency of ${task}) is failed`)
			if (st !== 'done')
				throw new Error(
					`Task ${dep} (dependency of ${task}) is not done`
				)
		}

		this.#running.add(task)
		this.#onStart.forEach(x => x(task))

		try {
			const args: DagFnArg<K, R> = {
				depend: (task: K) => {
					if (!this.#done.has(task)) {
						throw new DependencyException(task)
					}
					return this.#done.get(task)!
				},
			}
			const ret = await this.#execute(() => fn(args))

			this.#running.delete(task)
			this.#done.set(task, ret)
			this.#onComplete.forEach(x => x(task, ret))
			return ['done']
		} catch (e) {
			this.#running.delete(task)

			if (e instanceof DependencyException) {
				const { dependency } = e
				if (this.isPossible(dependency)) {
					this.addDependency(task, dependency)
					return ['depends', dependency]
				}
			}

			this.#failed.set(task, e)
			this.#onFailure.forEach(x => x(task, e))
			for (const supplied of supplies) this.#flagImpossible(supplied)
			return ['failed']
		}
	}

	#depsD(task: K) {
		const taskObj = this.#tasks.get(task)
		if (!taskObj) throw new Error(`Task ${task} does not exist`)
		return taskObj.dependsDirect
	}
	#depsT(task: K) {
		const taskObj = this.#tasks.get(task)
		if (!taskObj) throw new Error(`Task ${task} does not exist`)
		return taskObj.dependsTransitive
	}
	#rdepsD(task: K) {
		const taskObj = this.#tasks.get(task)
		if (!taskObj) throw new Error(`Task ${task} does not exist`)
		return taskObj.suppliesDirect
	}
	#rdepsT(task: K) {
		const taskObj = this.#tasks.get(task)
		if (!taskObj) throw new Error(`Task ${task} does not exist`)
		return taskObj.suppliesTransitive
	}

	/**
	 * Checks the direct dependencies of a given task
	 */
	directDependencies(task: K): Set<K> {
		return new Set(this.#depsD(task))
	}
	/**
	 * Checks the transitive dependencies of a given task
	 */
	transitiveDependencies(task: K): Set<K> {
		return new Set(this.#depsT(task))
	}

	/**
	 * Checks the tasks that directly depend on a given task
	 */
	directSupplies(task: K): Set<K> {
		return new Set(this.#rdepsD(task))
	}
	/**
	 * Checks the tasks that transitively depend on a given task
	 */
	transitiveSupplies(task: K): Set<K> {
		return new Set(this.#rdepsT(task))
	}

	/**
	 * Build a new DAG containing a subset of tasks
	 *
	 * This version fails if a dependency is not listed
	 */
	subGrpah<X extends K>(
		/**
		 * Tasks that should be included
		 */
		tasks: Iterable<X>,
		params: {
			/**
			 * If this should fail if a dependency is not listed
			 */
			exclusive: true
		}
	): DAG<X, Pick<R, X>>
	/**
	 * Build a new DAG containing a subset of tasks
	 *
	 * This version automatically includes dependencies
	 */
	subGrpah(
		/**
		 * Tasks that should be included
		 */
		tasks: Iterable<K>,
		params?: {
			/**
			 * If this should fail if a dependency is not listed
			 *
			 * @defaultValue false
			 */
			exclusive?: false
		}
	): DAG<K, R>
	subGrpah(
		tasks: Iterable<K>,
		{
			exclusive = false,
			failureMode = this.#failureMode,
			exec = this.#exec,
		}: {
			exclusive?: boolean
			failureMode?: FailureMode
			exec?: Runner
		} = {}
	): DAG<K, R> {
		const actualTasks = new Set<K>(tasks)
		for (const task of tasks) {
			for (const dep of this.transitiveDependencies(task)) {
				if (exclusive) {
					if (!actualTasks.has(dep))
						throw new Error(`Subgraph doesn't include all tasks`)
				} else {
					actualTasks.add(dep)
				}
			}
		}

		const sub = new DAG<K, R>({ failureMode, exec })
		for (const task of actualTasks) {
			sub.addTask(task, this.#tasks.get(task)!.fn)
		}
		for (const task of actualTasks) {
			for (const dep of this.directDependencies(task)) {
				sub.addDependency(task, dep)
			}
		}
		for (const onStart of this.#onStart) sub.onStart(onStart)
		for (const onComplete of this.#onComplete) sub.onComplete(onComplete)
		for (const onFailure of this.#onFailure) sub.onFailure(onFailure)
		return sub
	}

	/**
	 * Run the given set of tasks for this DAG, and return an object containing the return values of these tasks
	 *
	 * @see {@link runAllTasks}
	 */
	async runTasks<X extends K>(
		/**
		 * The tasks that should be run
		 *
		 * Dependencies will automatically be added, but not returned
		 */
		tasks: Set<X>
	): Promise<Pick<R, X>> {
		const results = await this.subGrpah(tasks, {
			exclusive: false,
		}).runAllTasks()
		return Object.fromEntries(
			Object.entries(results).filter(([k]) => tasks.has(k as X))
		) as Pick<R, X>
	}

	/**
	 * Runs a given task (and its dependencies) of this DAG and returns its result
	 */
	async runTask<X extends K>(task: X): Promise<Awaited<R[X]>> {
		const results = await this.subGrpah([task], {
			exclusive: false,
		}).runAllTasks()
		return results[task]
	}

	/**
	 * Run all the tasks of this DAG, and return the entire result object
	 *
	 * The result object is an object with a property per task, containing the task's result value
	 *
	 * Tasks are run by this DAG's {@link Runner}, in order of dependencies; a task is scheduled as soon as it doesn't have any pending dependency
	 *
	 * If this DAG's {@link FailureMode} is set to 'early', this fails as soon as a dependency fails or cannot be scheduled, otherwise it only fails at the last possible moment, running as many tasks as possible
	 */
	runAllTasks(): Promise<R> {
		if (this.#isRunning) throw new Error(`Already running tasks`)
		this.#isRunning = true

		const toRun = new Set(filter(this.tasks(), task => this.isReady(task)))

		const blockers = new MapSet<K, K>(
			map(toRun, task => [
				task,
				new Set<K>(
					filter(this.directDependencies(task), dep =>
						this.isReady(dep)
					)
				),
			])
		)

		let stop = false
		return new Promise((ok, ko) => {
			const checkEnd = () => {
				if (stop) return
				if (toRun.size) return
				if (this.#running.size) return
				stop = true

				for (const task of this.tasks()) {
					const st = this.status(task)
					if (st !== 'done')
						return ko(new Error(`Task ${task} is in status ${st}`))
				}

				const res = Object.fromEntries(
					map(
						this.tasks(),
						task => [task, this.getResultOk(task)] as const
					)
				) as R

				this.#isRunning = false
				ok(res)
			}

			const fail = (err: Error) => {
				if (stop) return
				stop = true

				this.#isRunning = false
				ko(err)
			}

			// eslint-disable-next-line prefer-const
			let dispatch: (task: K, chain: K[]) => void

			const triggerSupplied = (task: K, chain: K[]) => {
				toRun.delete(task)

				for (const supplied of this.directSupplies(task)) {
					blockers.delSet(supplied, task)
					dispatch(supplied, [...chain, task])
				}

				return checkEnd()
			}

			dispatch = (task: K, chain: K[]) => {
				if (stop) return
				if (blockers.sizeSet(task)) return
				if (!toRun.has(task)) return checkEnd()

				toRun.delete(task)

				const status = this.status(task)
				if (status === 'impossible' || status === 'nonexistent') {
					if (this.#failureMode === 'early') {
						return fail(new Error(`Task ${task} cannot run`))
					}
					return triggerSupplied(task, chain)
				} else if (status === 'failed') {
					if (this.#failureMode === 'early') {
						return fail(new Error(`Task ${task} is already failed`))
					}
					return triggerSupplied(task, chain)
				} else if (status === 'done') {
					return triggerSupplied(task, chain)
				} else if (status === 'running' || blockers.sizeSet(task)) {
					return
				}

				void this.#run(task)
					.then(result => {
						switch (result[0]) {
							case 'done': {
								return triggerSupplied(task, chain)
							}
							case 'depends': {
								const dependency = result[1]
								blockers.addSet(task, dependency)
								toRun.add(task)
								dispatch(dependency, [...chain, task])
								return
							}
							case 'failed': {
								if (this.#failureMode === 'early') {
									return fail(
										new Error(
											`Task ${task} failed`,
											this.getResultKo(task)
										)
									)
								}
								return triggerSupplied(task, chain)
							}
						}
					})
					.catch(e => {
						if (stop) return
						stop = true
						ko(e)
					})
			}

			for (const task of toRun) {
				void dispatch(task, [])
			}

			checkEnd()
		})
	}
}
