import {trueFn} from '../internals/trueFn';
import {curry2} from '../internals/curry2';
import {curry3} from '../internals/curry3';

let streamsToUpdate = [];
let inStream;
let order = [];
let orderNextIndex = -1;
let flushingUpdateQueue = false;
let flushingStreamValue = false;

export class Stream {
  hasValue = false;
  value = undefined;
  updateFns = [];
  listeners = [];
  isQueued = false;
  end = undefined;

  constructor(initialValue) {
    if (!(this instanceof Stream)) {
      throw new Error('Stream should be created with `new`.');
    }

    if (initialValue === trueFn) {
      this.fn = trueFn;
      this.dependencies = [];
      this.areDependenciesMet = false;
      this.changedDependencies = undefined;
      this.shouldUpdate = false;
    } else {
      this.end = new Stream(trueFn);
      this.fnArgs = [];

      this.end.listeners.push(this);

      if (arguments.length > 0) {
        updateStreamValue(initialValue, this);
      }
    }

    return this;
  }

  push = (...values) => {
    for (let i = 0; i < values.length; i++) {
      let value = values[i];

      if (value && value.then) {
        value.then((val) => {
          this.push(val);
        });
      } else {
        updateStreamValue(values[i], this);
      }
    }

    return this;
  };

  get = () => this.value;

  static of(...args) {
    return new Stream(...args);
  }

  toJSON() {
    return this.value;
  }

  toString() {
    return `stream(${this.value})`;
  }

  static immediate(s) {
    if (s.areDependenciesMet === false) {
      s.areDependenciesMet = true; // eslint-disable-line no-param-reassign

      updateStream(s);
    }

    return s;
  }

  immediate() {
    return Stream.immediate(this);
  }

  static endsOn(endS, stream) {
    return stream.endsOn(endS);
  }

  endsOn(endS) {
    detachStreamFromDependencies(this.end);
    endS.listeners.push(this.end);
    this.end.dependencies.push(endS);

    return this;
  }

  static isStream(stream) {
    return stream instanceof Stream;
  }

  static combine(fn, streams) {
    return new Stream().combine(fn, streams);
  }

  combine(fn, streams) {
    let end = createDependentStream([], trueFn);
    let dependencies = [];
    let dependenciesEndStreams = [];

    for (let i = 0; i < streams.length; ++i) {
      if (streams[i] !== undefined) {
        dependencies.push(streams[i]);

        if (streams[i].end !== undefined) {
          dependenciesEndStreams.push(streams[i].end);
        }
      }
    }

    this.fn = fn;
    this.dependencies = dependencies;
    this.areDependenciesMet = false;
    this.changedDependencies = [];
    this.shouldUpdate = false;

    addDependenciesListener(dependencies, this);

    this.fnArgs = [this.changedDependencies, this].concat(this.dependencies);
    this.end = end;

    end.listeners.push(this);
    addDependenciesListener(dependenciesEndStreams, end);

    end.dependencies = dependenciesEndStreams;

    updateStream(this);

    return this;
  }

  reduce(reducer, dependencies) {
    this.combine((changed, self, ...streams) => {
      let state = self.value;
      let actions = [];

      streams.forEach((stream) => {
        if (changed.includes(stream)) {
          actions.push(stream.value);
        }
      });

      let newState = reducer(state, actions);

      if (newState !== state) {
        self.push(newState);
      }
    }, dependencies).immediate();

    return this;
  }

  static reduce(initialValue, reducer, dependencies) {
    return new Stream(initialValue).reduce(reducer, dependencies);
  }

  static map = curry2((fn, stream) =>
    Stream.combine(
      (changed, self, s) => {
        self.push(fn(s.value));
      },
      [stream],
    ),
  );

  map(fn) {
    return Stream.map(fn, this);
  }

  static scan = curry3((fn, accumulator, stream) => {
    let newStream = Stream.combine(
      (changed, self, stream) => {
        accumulator = fn(accumulator, stream.value); // eslint-disable-line no-param-reassign

        self.push(accumulator);
      },
      [stream],
    );

    if (!newStream.hasValue) {
      newStream.push(accumulator);
    }

    return newStream;
  });

  static scan(fn, accumulator) {
    return Stream.scan(fn, accumulator, this);
  }

  static chain = curry2((fn, stream) => {
    // Internal state to end flat map stream
    let flatEnd = new Stream(1);
    let internalEnded = Stream.on(() => {
      let alive = flatEnd.value - 1;

      flatEnd.push(alive);

      if (alive <= 0) {
        flatEnd.end.push(true);
      }
    });

    internalEnded(stream.end);

    let last = new Stream();
    let flatStream = Stream.combine(
      (changed, own, stream) => {
        last.end.push(true);

        let newStream = fn(stream.value); // our fn stream makes streams

        flatEnd.push(flatEnd.value + 1);
        internalEnded(newStream.end);

        // Update self on call - newStream is never handed out so deps don't matter
        last = Stream.map((value) => {
          own.push(value);
        }, newStream);
      },
      [stream],
    );

    Stream.endsOn(flatEnd.end, flatStream);

    return flatStream;
  });

  chain(f) {
    return Stream.chain(f, this);
  }

  static on = curry2((fn, stream) =>
    Stream.combine(
      (changed, self, s) => {
        fn(s.value);
      },
      [stream],
    ),
  );

  on(f) {
    return Stream.on(f, this);
  }

  static subscribe = curry2((fn, stream) => {
    let hasRun = false;

    return stream.hasValue
      ? Stream.combine(
          (changed, self, dependency) => {
            if (hasRun) {
              fn(dependency.value);
            }

            hasRun = true;
          },
          [stream],
        )
      : Stream.combine(
          (changed, self, dependency) => {
            fn(dependency.value);
          },
          [stream],
        );
  });

  subscribe(f) {
    return Stream.subscribe(f, this);
  }

  static merge = curry2((s1, s2) => {
    let s = Stream.combine(
      (changed, self, s12, s22) => {
        if (changed[0]) {
          self.push(changed[0].value);
        } else if (s12.hasValue) {
          self.push(s12.value);
        } else if (s22.hasValue) {
          self.push(s22.value);
        }
      },
      [s1, s2],
    );

    s.immediate();

    s.endsOn(Stream.combine(() => true, [s1.end, s2.end]));

    return s;
  });

  merge(s2) {
    return Stream.merge(s2, this);
  }

  pipe(f) {
    return f(this);
  }

  static fromPromise(p) {
    let s = new Stream();

    p.then((val) => {
      s.push(val);

      s.end.push(true);
    });

    return s;
  }

  static flattenPromise(s) {
    return Stream.combine(
      (chnaged, self, s) => {
        s.value.then((result) => {
          self.push(result);
        });
      },
      [s],
    );
  }

  of(v) {
    return new Stream(v);
  }

  static ap = curry2((s2, s1) => {
    return Stream.combine(
      (changed, self, s1, s2) => {
        self.push(s1.value(s2.value));
      },
      [s1, s2],
    );
  });

  ap(s2) {
    return Stream.ap(s2, this);
  }
}

function createDependentStream(dependencies, fn) {
  let s = new Stream();

  s.fn = fn;
  s.dependencies = dependencies;
  s.areDependenciesMet = false;
  s.changedDependencies = dependencies.length > 0 ? [] : undefined;
  s.shouldUpdate = false;

  addDependenciesListener(dependencies, s);

  return s;
}

/**
 * Check if all the dependencies have values.
 */
function areStreamInitialDependenciesMet(stream) {
  stream.areDependenciesMet = stream.dependencies.every((dependency) => dependency.hasValue); // eslint-disable-line no-param-reassign

  return stream.areDependenciesMet;
}

function areStreamDependenciesMet(stream) {
  return stream.areDependenciesMet === true || areStreamInitialDependenciesMet(stream);
}

function isStreamEnded(stream) {
  return stream.end && stream.end.value === true;
}

function doListenersNeedUpdating(stream) {
  return stream.listeners.some((listener) => listener.shouldUpdate);
}

/**
 * Update a dependent stream using its dependencies in an atomic way.
 */
function updateStream(stream) {
  if (isStreamEnded(stream) || !areStreamDependenciesMet(stream)) {
    return;
  }

  if (inStream !== undefined) {
    updaterStreamLater(updateStream, stream);

    return;
  }

  inStream = stream;

  if (stream.changedDependencies) {
    // stream.fnArgs[stream.fnArgs.length - 1] = stream.changedDependencies; // eslint-disable-line no-param-reassign
    stream.fnArgs[0] = stream.changedDependencies; // eslint-disable-line no-param-reassign
  }

  let returnValue = stream.fn.apply(stream.fn, stream.fnArgs);

  if (returnValue && returnValue.then) {
    returnValue.then((val) => {
      if (typeof val !== 'undefined') {
        stream.push(val);
      }
    });
  } else if (typeof returnValue !== 'undefined') {
    stream.push(returnValue);
  }

  inStream = undefined;

  if (typeof stream.changedDependencies !== 'undefined') {
    stream.changedDependencies = []; // eslint-disable-line no-param-reassign
  }

  stream.shouldUpdate = false; // eslint-disable-line no-param-reassign

  if (!(flushingUpdateQueue || flushingStreamValue)) {
    flushUpdate();
  }

  if (doListenersNeedUpdating(stream)) {
    if (flushingStreamValue) {
      stream.listeners.forEach((listener) => {
        if (listener.shouldUpdate) {
          updaterStreamLater(updateStream, listener);
        }
      });
    } else {
      stream.push(stream.value);
    }
  }
}

/**
 * Update the dependencies of a stream
 */
function updateListeners(stream) {
  for (let i = 0; i < stream.listeners.length; ++i) {
    if (stream.listeners[i].end === stream) {
      endStream(stream.listeners[i]);
    } else {
      if (stream.listeners[i].changedDependencies !== undefined) {
        stream.listeners[i].changedDependencies.push(stream);
      }

      stream.listeners[i].shouldUpdate = true; // eslint-disable-line no-param-reassign

      findStreamDependencies(stream.listeners[i]);
    }
  }

  for (; orderNextIndex >= 0; --orderNextIndex) {
    if (order[orderNextIndex].shouldUpdate === true) {
      updateStream(order[orderNextIndex]);
    }

    order[orderNextIndex].isQueued = false;
  }
}

/**
 * Add stream dependencies to the global `order` queue.
 */
function findStreamDependencies(stream) {
  if (stream.isQueued === false) {
    stream.isQueued = true; // eslint-disable-line no-param-reassign

    for (let i = 0; i < stream.listeners.length; ++i) {
      findStreamDependencies(stream.listeners[i]);
    }

    order[++orderNextIndex] = stream;
  }
}

function updaterStreamLater(updater, stream) {
  streamsToUpdate.push(stream);
  stream.updateFns.push(updater);

  stream.shouldUpdate = true; // eslint-disable-line no-param-reassign
}

function flushUpdate() {
  flushingUpdateQueue = true;

  while (streamsToUpdate.length > 0) {
    let streamToUpdate = streamsToUpdate.shift();
    let nextUpdateFn = streamToUpdate.updateFns.shift();

    if (nextUpdateFn && streamToUpdate.shouldUpdate) {
      nextUpdateFn(streamToUpdate);
    }
  }

  flushingUpdateQueue = false;
}

/**
 * Push down a value into a stream.
 */
function updateStreamValue(value, stream) {
  stream.value = value; // eslint-disable-line no-param-reassign
  stream.hasValue = true; // eslint-disable-line no-param-reassign

  if (inStream === undefined) {
    flushingStreamValue = true;

    updateListeners(stream);

    if (streamsToUpdate.length > 0) {
      flushUpdate();
    }

    flushingStreamValue = false;
  } else if (inStream === stream) {
    markStreamListeners(stream, stream.listeners);
  } else {
    updaterStreamLater((streamToUpdateLater) => {
      updateStreamValue(value, streamToUpdateLater);
    }, stream);
  }
}

function markStreamListeners(stream, listeners) {
  for (let i = 0; i < listeners.length; ++i) {
    if (listeners[i].end === stream) {
      endStream(listeners[i]);
    } else {
      if (listeners[i].changedDependencies !== undefined) {
        listeners[i].changedDependencies.push(stream);
      }

      listeners[i].shouldUpdate = true; // eslint-disable-line no-param-reassign
    }
  }
}

/**
 * Add dependencies to a stream.
 */
function addDependenciesListener(dependencies, listener) {
  for (let i = 0; i < dependencies.length; ++i) {
    dependencies[i].listeners.push(listener);
  }
}

/**
 * Removes an stream from a dependency array.
 */
function removeStreamFromListeners(stream, listeners) {
  listeners[listeners.indexOf(stream)] = listeners[listeners.length - 1]; // eslint-disable-line no-param-reassign

  listeners.length--; // eslint-disable-line no-param-reassign
}

/**
 * Detach a stream from its dependencies.
 */
function detachStreamFromDependencies(stream) {
  for (let i = 0; i < stream.dependencies.length; ++i) {
    removeStreamFromListeners(stream, stream.dependencies[i].listeners);
  }

  stream.dependencies.length = 0; // eslint-disable-line no-param-reassign
}

/**
 * Ends a stream.
 */
function endStream(stream) {
  if (stream.dependencies !== undefined) {
    detachStreamFromDependencies(stream);
  }

  if (stream.end !== undefined) {
    detachStreamFromDependencies(stream.end);
  }
}

export default Stream;
