Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

groupBy.ts 10.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. import { Subscriber } from '../Subscriber';
  2. import { Subscription } from '../Subscription';
  3. import { Observable } from '../Observable';
  4. import { Operator } from '../Operator';
  5. import { Subject } from '../Subject';
  6. import { OperatorFunction } from '../types';
  7. /* tslint:disable:max-line-length */
  8. export function groupBy<T, K>(keySelector: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;
  9. export function groupBy<T, K>(keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable<K, T>) => Observable<any>): OperatorFunction<T, GroupedObservable<K, T>>;
  10. export function groupBy<T, K, R>(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): OperatorFunction<T, GroupedObservable<K, R>>;
  11. export function groupBy<T, K, R>(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, subjectSelector?: () => Subject<R>): OperatorFunction<T, GroupedObservable<K, R>>;
  12. /* tslint:enable:max-line-length */
  13. /**
  14. * Groups the items emitted by an Observable according to a specified criterion,
  15. * and emits these grouped items as `GroupedObservables`, one
  16. * {@link GroupedObservable} per group.
  17. *
  18. * ![](groupBy.png)
  19. *
  20. * When the Observable emits an item, a key is computed for this item with the keySelector function.
  21. *
  22. * If a {@link GroupedObservable} for this key exists, this {@link GroupedObservable} emits. Elsewhere, a new
  23. * {@link GroupedObservable} for this key is created and emits.
  24. *
  25. * A {@link GroupedObservable} represents values belonging to the same group represented by a common key. The common
  26. * key is available as the key field of a {@link GroupedObservable} instance.
  27. *
  28. * The elements emitted by {@link GroupedObservable}s are by default the items emitted by the Observable, or elements
  29. * returned by the elementSelector function.
  30. *
  31. * ## Examples
  32. *
  33. * ### Group objects by id and return as array
  34. *
  35. * ```ts
  36. * import { of } from 'rxjs';
  37. * import { mergeMap, groupBy, reduce } from 'rxjs/operators';
  38. *
  39. * of(
  40. * {id: 1, name: 'JavaScript'},
  41. * {id: 2, name: 'Parcel'},
  42. * {id: 2, name: 'webpack'},
  43. * {id: 1, name: 'TypeScript'},
  44. * {id: 3, name: 'TSLint'}
  45. * ).pipe(
  46. * groupBy(p => p.id),
  47. * mergeMap((group$) => group$.pipe(reduce((acc, cur) => [...acc, cur], []))),
  48. * )
  49. * .subscribe(p => console.log(p));
  50. *
  51. * // displays:
  52. * // [ { id: 1, name: 'JavaScript'},
  53. * // { id: 1, name: 'TypeScript'} ]
  54. * //
  55. * // [ { id: 2, name: 'Parcel'},
  56. * // { id: 2, name: 'webpack'} ]
  57. * //
  58. * // [ { id: 3, name: 'TSLint'} ]
  59. * ```
  60. *
  61. * ### Pivot data on the id field
  62. *
  63. * ```ts
  64. * import { of } from 'rxjs';
  65. * import { groupBy, map, mergeMap, reduce } from 'rxjs/operators';
  66. *
  67. * of(
  68. * { id: 1, name: 'JavaScript' },
  69. * { id: 2, name: 'Parcel' },
  70. * { id: 2, name: 'webpack' },
  71. * { id: 1, name: 'TypeScript' },
  72. * { id: 3, name: 'TSLint' }
  73. * )
  74. * .pipe(
  75. * groupBy(p => p.id, p => p.name),
  76. * mergeMap(group$ =>
  77. * group$.pipe(reduce((acc, cur) => [...acc, cur], [`${group$.key}`]))
  78. * ),
  79. * map(arr => ({ id: parseInt(arr[0], 10), values: arr.slice(1) }))
  80. * )
  81. * .subscribe(p => console.log(p));
  82. *
  83. * // displays:
  84. * // { id: 1, values: [ 'JavaScript', 'TypeScript' ] }
  85. * // { id: 2, values: [ 'Parcel', 'webpack' ] }
  86. * // { id: 3, values: [ 'TSLint' ] }
  87. * ```
  88. *
  89. * @param {function(value: T): K} keySelector A function that extracts the key
  90. * for each item.
  91. * @param {function(value: T): R} [elementSelector] A function that extracts the
  92. * return element for each item.
  93. * @param {function(grouped: GroupedObservable<K,R>): Observable<any>} [durationSelector]
  94. * A function that returns an Observable to determine how long each group should
  95. * exist.
  96. * @return {Observable<GroupedObservable<K,R>>} An Observable that emits
  97. * GroupedObservables, each of which corresponds to a unique key value and each
  98. * of which emits those items from the source Observable that share that key
  99. * value.
  100. * @method groupBy
  101. * @owner Observable
  102. */
  103. export function groupBy<T, K, R>(keySelector: (value: T) => K,
  104. elementSelector?: ((value: T) => R) | void,
  105. durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
  106. subjectSelector?: () => Subject<R>): OperatorFunction<T, GroupedObservable<K, R>> {
  107. return (source: Observable<T>) =>
  108. source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
  109. }
  110. export interface RefCountSubscription {
  111. count: number;
  112. unsubscribe: () => void;
  113. closed: boolean;
  114. attemptedToUnsubscribe: boolean;
  115. }
  116. class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> {
  117. constructor(private keySelector: (value: T) => K,
  118. private elementSelector?: ((value: T) => R) | void,
  119. private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
  120. private subjectSelector?: () => Subject<R>) {
  121. }
  122. call(subscriber: Subscriber<GroupedObservable<K, R>>, source: any): any {
  123. return source.subscribe(new GroupBySubscriber(
  124. subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector
  125. ));
  126. }
  127. }
  128. /**
  129. * We need this JSDoc comment for affecting ESDoc.
  130. * @ignore
  131. * @extends {Ignored}
  132. */
  133. class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscription {
  134. private groups: Map<K, Subject<T | R>> = null;
  135. public attemptedToUnsubscribe: boolean = false;
  136. public count: number = 0;
  137. constructor(destination: Subscriber<GroupedObservable<K, R>>,
  138. private keySelector: (value: T) => K,
  139. private elementSelector?: ((value: T) => R) | void,
  140. private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
  141. private subjectSelector?: () => Subject<R>) {
  142. super(destination);
  143. }
  144. protected _next(value: T): void {
  145. let key: K;
  146. try {
  147. key = this.keySelector(value);
  148. } catch (err) {
  149. this.error(err);
  150. return;
  151. }
  152. this._group(value, key);
  153. }
  154. private _group(value: T, key: K) {
  155. let groups = this.groups;
  156. if (!groups) {
  157. groups = this.groups = new Map<K, Subject<T | R>>();
  158. }
  159. let group = groups.get(key);
  160. let element: R;
  161. if (this.elementSelector) {
  162. try {
  163. element = this.elementSelector(value);
  164. } catch (err) {
  165. this.error(err);
  166. }
  167. } else {
  168. element = <any>value;
  169. }
  170. if (!group) {
  171. group = (this.subjectSelector ? this.subjectSelector() : new Subject<R>()) as Subject<T | R>;
  172. groups.set(key, group);
  173. const groupedObservable = new GroupedObservable(key, group, this);
  174. this.destination.next(groupedObservable);
  175. if (this.durationSelector) {
  176. let duration: any;
  177. try {
  178. duration = this.durationSelector(new GroupedObservable<K, R>(key, <Subject<R>>group));
  179. } catch (err) {
  180. this.error(err);
  181. return;
  182. }
  183. this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
  184. }
  185. }
  186. if (!group.closed) {
  187. group.next(element);
  188. }
  189. }
  190. protected _error(err: any): void {
  191. const groups = this.groups;
  192. if (groups) {
  193. groups.forEach((group, key) => {
  194. group.error(err);
  195. });
  196. groups.clear();
  197. }
  198. this.destination.error(err);
  199. }
  200. protected _complete(): void {
  201. const groups = this.groups;
  202. if (groups) {
  203. groups.forEach((group, key) => {
  204. group.complete();
  205. });
  206. groups.clear();
  207. }
  208. this.destination.complete();
  209. }
  210. removeGroup(key: K): void {
  211. this.groups.delete(key);
  212. }
  213. unsubscribe() {
  214. if (!this.closed) {
  215. this.attemptedToUnsubscribe = true;
  216. if (this.count === 0) {
  217. super.unsubscribe();
  218. }
  219. }
  220. }
  221. }
  222. /**
  223. * We need this JSDoc comment for affecting ESDoc.
  224. * @ignore
  225. * @extends {Ignored}
  226. */
  227. class GroupDurationSubscriber<K, T> extends Subscriber<T> {
  228. constructor(private key: K,
  229. private group: Subject<T>,
  230. private parent: GroupBySubscriber<any, K, T | any>) {
  231. super(group);
  232. }
  233. protected _next(value: T): void {
  234. this.complete();
  235. }
  236. /** @deprecated This is an internal implementation detail, do not use. */
  237. _unsubscribe() {
  238. const { parent, key } = this;
  239. this.key = this.parent = null;
  240. if (parent) {
  241. parent.removeGroup(key);
  242. }
  243. }
  244. }
  245. /**
  246. * An Observable representing values belonging to the same group represented by
  247. * a common key. The values emitted by a GroupedObservable come from the source
  248. * Observable. The common key is available as the field `key` on a
  249. * GroupedObservable instance.
  250. *
  251. * @class GroupedObservable<K, T>
  252. */
  253. export class GroupedObservable<K, T> extends Observable<T> {
  254. /** @deprecated Do not construct this type. Internal use only */
  255. constructor(public key: K,
  256. private groupSubject: Subject<T>,
  257. private refCountSubscription?: RefCountSubscription) {
  258. super();
  259. }
  260. /** @deprecated This is an internal implementation detail, do not use. */
  261. _subscribe(subscriber: Subscriber<T>) {
  262. const subscription = new Subscription();
  263. const { refCountSubscription, groupSubject } = this;
  264. if (refCountSubscription && !refCountSubscription.closed) {
  265. subscription.add(new InnerRefCountSubscription(refCountSubscription));
  266. }
  267. subscription.add(groupSubject.subscribe(subscriber));
  268. return subscription;
  269. }
  270. }
  271. /**
  272. * We need this JSDoc comment for affecting ESDoc.
  273. * @ignore
  274. * @extends {Ignored}
  275. */
  276. class InnerRefCountSubscription extends Subscription {
  277. constructor(private parent: RefCountSubscription) {
  278. super();
  279. parent.count++;
  280. }
  281. unsubscribe() {
  282. const parent = this.parent;
  283. if (!parent.closed && !this.closed) {
  284. super.unsubscribe();
  285. parent.count -= 1;
  286. if (parent.count === 0 && parent.attemptedToUnsubscribe) {
  287. parent.unsubscribe();
  288. }
  289. }
  290. }
  291. }