Hello World

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment

process task by queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

public abstract class TaskQueue<T> {

ExecutorService executor = null;

ArrayBlockingQueue<T> queue;

private volatile boolean running = false;

protected abstract void doWork(T item);

public TaskQueue(ExecutorService executor, ArrayBlockingQueue<T> queue){
this.executor = executor;
this.queue = queue;
start();
}


// start on Server startup
public void start() {
running = true;
executor.submit(new Runnable() {
@Override
public void run() {
processTask();
}
});

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
stop();
}
}));
}

private void processTask() {
try {
T item = null;
while(running){
item = queue.poll(5, TimeUnit.SECONDS);
if(item != null){
doWork(item);
}
}

// process left items on queue
while((item = queue.poll() ) != null){
doWork(item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public void stop() {
try {
running = false;
// long start = System.currentTimeMillis();
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
// long cost = System.currentTimeMillis() - start;
// System.out.println("cost time " + cost + "ms");
executor = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public void put(T item) {
try {
// System.out.println("put answer " + survey.getTitle());
queue.put(item);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

public class AnswerStatQueue extends TaskQueue<Answer>{

AnswerService answerService = new AnswerService();

private AnswerStatQueue(ExecutorService executor, ArrayBlockingQueue<Answer> queue) {
super(executor, queue);
}

@Override
protected void doWork(Answer answer) {
// System.out.println("procss answer " + answer.getTitle());
answerService.incrSurveyStat(answer);
// System.out.println("finish answer ......!!!");
}


//***************************single instance *****************************


private static volatile AnswerStatQueue instance;

public static AnswerStatQueue getInstance() {
if(instance == null){
synchronized (AnswerStatQueue.class) {
if(instance == null){
instance = createAnswerStateQueue();
}
}
}
return instance;
}

private static AnswerStatQueue createAnswerStateQueue(){
ExecutorService executor = Executors.newSingleThreadExecutor();
ArrayBlockingQueue<Answer> queue = new ArrayBlockingQueue<Answer>(10000);
instance = new AnswerStatQueue(executor, queue);
return instance;
}

}

ThreadPoolExecutor 学习笔记

一、接口 ExecutorService 线程池服务接口

#####1. 提交任务

提交任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24


<T> Future<T> submit(Callable<T> task)
提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
Future<?> submit(Runnable task)
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result)
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。

```

#####2.执行所有任务或执行任一任务

``` java 执行所有任务或执行任一任务


<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。

#####3. 关闭线程池服务

关闭线程池服务
1
2
3
4
5
6
7
8
9
10
11
12
13

void shutdown()
启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
List<Runnable> shutdownNow()
试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

boolean isShutdown()
如果此执行程序已关闭,则返回 true
boolean isTerminated()
如果关闭后所有任务都已完成,则返回 true

boolean awaitTermination(long timeout, TimeUnit unit)
请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。

#####关闭ExecutorService
下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务:

shutdownAndAwaitTermination
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

二、ThreadPoolExecutor 的整体介绍

核心和最大池大小
ThreadPoolExecutor 将根据 corePoolSize(参见 getCorePoolSize())和 maximumPoolSize(参见 getMaximumPoolSize())设置的边界自动调整池大小。

  • 当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
  • 如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
  • 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。

按需构造
默认情况下,即使核心线程最初只是在新任务到达时才创建和启动的,也可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 对其进行动态重写。如果构造带有非空队列的池,则可能希望预先启动线程。

保持活动时间
如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止(参见 getKeepAliveTime(java.util.concurrent.TimeUnit))。这提供了当池处于非活动状态时减少资源消耗的方法。
如果池后来变得更为活动,则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。
使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。默认情况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。
但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。

排队
所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:

  1. 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
  2. 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
  3. 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

排队有三种通用策略:

  1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  2. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

被拒绝的任务
当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。
在以上两种情况下,execute 方法都将调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。

下面提供了四种预定义的处理程序策略:

  1. 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
  2. 在 ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
  3. 在 ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。
    4。 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
    定义和使用其他种类的 RejectedExecutionHandler 类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时。

钩子 (hook) 方法
此类提供 protected 可重写的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,这两种方法分别在执行每个任务之前和之后调用。
它们可用于操纵执行环境;例如,重新初始化 ThreadLocal、搜集统计信息或添加日志条目。此外,还可以重写方法 terminated() 来执行 Executor 完全终止后需要完成的所有特殊处理。
如果钩子 (hook) 或回调方法抛出异常,则内部辅助线程将依次失败并突然终止。

队列维护
方法 getQueue() 允许出于监控和调试目的而访问工作队列。强烈反对出于其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 这两种方法可用于在取消大量已排队任务时帮助进行存储回收。

线程池没有被程序里的其它对象引用,且没有剩余的线程会时,会自动 shutdown。如果希望确保回收取消引用的池(即使用户忘记调用 shutdown()),则必须安排未使用的线程最终终止:设置适当保持活动时间,使用 0 核心线程的下边界和/或设置 allowCoreThreadTimeOut(boolean)。

三、ThreadPoolExecutor内部实现

ThreadPoolExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942


public class ThreadPoolExecutor extends AbstractExecutorService {

/**
* 线程池的主控制状态 ctl ,用 atomic integer 来表示两个概念的字段
* workerCount, 指示当前有效的线程数
* runState, 指示当前的运行状态 running, shutting down etc
* 为了把两个字段打包成一个int, 限制workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable.
*
*
* The runState provides the main lifecyle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* 运行状态的次序很重要,可以用来进行比较。runState 随着时间单调递增的, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* 为什么从SHUTDOWN 转到 TIDYING 需要检测,因为 queue有可能从non-emtpy 转变 empty ,
* but we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

/**
* Attempt to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

/**
* Attempt to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}


// 保存任务的工作队列
private final BlockingQueue<Runnable> workQueue;

// 保存当前正在执行线程的workers集合
private final HashSet<Worker> workers = new HashSet<Worker>();


// 主要的lock
private final ReentrantLock mainLock = new ReentrantLock();

private final Condition termination = mainLock.newCondition();



private volatile long keepAliveTime;

private volatile boolean allowCoreThreadTimeOut;


//*************************Worker***********************************


/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 初始Worker状态为-1
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 创建一个新的Thread线程对象
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
// 这里调用外部的runWorker!!!
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

// 用于外部的中断控制(强制中断)
void interruptIfStarted() {
Thread t;
// 当前Worker状态已启动,并且线程不为空
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}



private static final boolean ONLY_ONE = true;

protected void terminated() { }


/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
// 在每个 Worker 退时,会调用 tryTerminate 方法,所以这个方法会被调用多次
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
// 处理以下状态直接返回
// 正在运行,
// 已经>TIDYING 清理中
// SHUTDOWN ,但 workQueue不为空
return;

// 活动线程数不为0里,中断一个空闲Worker并return
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}

// 当活动工作线程数为0时
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置为 TIDYING 状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 设置为 TERMINATED 终止状态
ctl.set(ctlOf(TERMINATED, 0));
// 发送 termination.signalAll 信号, 在 awaitTermination 方法中等待该信号
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

//******************************** interruptWorkers ***************************


/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
// 强制中断所有Worker,即使Worker 正在运行中
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
// 中断空闲Worker
// @param onlyOne 为true,只来自 tryTerminate的调用。

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 这里获取Worker的锁,说明当前线程处getTask 的wait状态
// Worker 在运行时获取独占锁,在getTask释放锁,这里只中断空闲线程
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}

// 只中断一个,返回
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}



/**
* Drains the task queue into a new list, normally using
* drainTo. But if the queue is a DelayQueue or any other kind of
* queue for which poll or drainTo may fail to remove some
* elements, it deletes them one by one.
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
List<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
// 参数校验
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
*
* 1. 如果当前运行的线程数比corePoolSize少, 会尝试启动一个新的线程来执行当前任务。
* 调用addWorker会检查runState 和 workerCount,通过返回 flase 来避免 添加不必要的核心线程

2. 如果一个任务可以成功放入队列,我们仍然需要double-check 来决定是否需要添加一个线程。(因为自从上次checking存在的线程,可能已经死亡)或者进行这个方法时,线程池关闭了。所以,我们需要recheck state 来判断是否需要在stopped时 roll back,或者启动一个新的线程。
*
* 3. 如果任务不能放入队列,我们会尝试启动一个新的线程。如果启动失败,我们会知道我们正在关闭或saturated,来拒绝这个任务
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// worker数小于corePoolSize启动新的Worker
if (addWorker(command, true))
return;// 启动新的core线程处理任务成功,直接返回
c = ctl.get();
}//启动core线程失败,继续后续处理

if (isRunning(c) && workQueue.offer(command)) {
// 线程池正在运行,且成功添加到workQueue中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// recheck 当前状态不是运行,则移除并拒绝这个任务
reject(command);
else if (workerCountOf(recheck) == 0)
// 当前没有正在执行的Worker,则启动一个新Worker
addWorker(null, false);
}
else if (!addWorker(command, false))
// 在队列放不下的情况下,启动新的Worker失败(线程数超过maxPoolSize)
reject(command);
}

/*
* Methods for creating, running and cleaning up after workers
*/

/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread#start), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// runState >= SHUTDOWN
//
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 在于线程池的容量返回
// 大于corePoolSize 或 maximumPoolSize 返回
return false;
if (compareAndIncrementWorkerCount(c))
break retry;

// 再次检查运行状态
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);//new一个新的Worker
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);// 添加到workers集合
int s = workers.size();
if (s > largestPoolSize)//记录线程最大时的线程数
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//启动Worker线程
workerStarted = true;
}
}
} finally {
// 添加失败,进行worker 清理工作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

// Worker退出时,尝试终止整个线程池
tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 如果正在 SHUTDOWN 并且
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

// workers 是否需要判断超时退出
boolean timed; // Are workers subject to culling?

for (;;) {
// 判断当前线程数是否需要超时退出
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 当前线程不需要超时break
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;

// 递减WorkerCount计数,直接返回null
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and
* clearInterruptsForTaskRun called to ensure that unless pool is
* stopping, this thread does not have its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to
* afterExecute. We separately handle RuntimeException, Error
* (both of which the specs guarantee that we trap) and arbitrary
* Throwables. Because we cannot rethrow Throwables within
* Runnable.run, we wrap them within Errors on the way out (to the
* thread's UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();// 获取当前Worker的执行线程
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts 允许被中断
boolean completedAbruptly = true;
try {
// 循环获取任务, getTask 为空时,结束循环,完成当前线程
while (task != null || (task = getTask()) != null) {
// 执行任务时,获取独占锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt

// 如果线程池正在停止,强制线程被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//中断worker线程

try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}



//**************************** shutdown ***************************************



/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置运行状态为SHUTDOWN
advanceRunState(SHUTDOWN);
interruptIdleWorkers();//中断所有线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}

// shutdown后,尝试终止线程池
tryTerminate();
}

/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}

// shutdown后,尝试终止线程池
tryTerminate();
return tasks;
}

public boolean isShutdown() {
return ! isRunning(ctl.get());
}

/**
* Returns true if this executor is in the process of terminating
* after {@link #shutdown} or {@link #shutdownNow} but has not
* completely terminated. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
* ignored or suppressed interruption, causing this executor not
* to properly terminate.
*
* @return true if terminating but not yet terminated
*/
public boolean isTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}

public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}

public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
// 等待 termination
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}

/**
* Invokes {@code shutdown} when this executor is no longer
* referenced and it has no threads.
*/
protected void finalize() {
shutdown();
}



//******************************预启动线程 *************************

/**
* Starts a core thread, causing it to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed. This method will return {@code false}
* if all core threads have already been started.
*
* @return {@code true} if a thread was started
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}

/**
* Same as prestartCoreThread except arranges that at least one
* thread is started even if corePoolSize is 0.
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}

/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
*
* @return the number of threads started
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}

/**
* Removes this task from the executor's internal queue if it is
* present, thus causing it not to be run if it has not already
* started.
*
* <p> This method may be useful as one part of a cancellation
* scheme. It may fail to remove tasks that have been converted
* into other forms before being placed on the internal queue. For
* example, a task entered using {@code submit} might be
* converted into a form that maintains {@code Future} status.
* However, in such cases, method {@link #purge} may be used to
* remove those Futures that have been cancelled.
*
* @param task the task to remove
* @return true if the task was removed
*/
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
// 在 SHUTDOWN 并且队列为空时,尝试终止 tryTerminate
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}

/**
* Tries to remove from the work queue all {@link Future}
* tasks that have been cancelled. This method can be useful as a
* storage reclamation operation, that has no other impact on
* functionality. Cancelled tasks are never executed, but may
* accumulate in work queues until worker threads can actively
* remove them. Invoking this method instead tries to remove them now.
* However, this method may fail to remove tasks in
* the presence of interference by other threads.
*/
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// Take slow path if we encounter interference during traversal.
// Make copy for traversal and call remove for cancelled entries.
// The slow path is more likely to be O(N*N).
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}

// 在 SHUTDOWN 并且队列为空时,尝试终止 tryTerminate
tryTerminate(); // In case SHUTDOWN and now empty
}

五、 ExecutorCompletionService

使用提供的 Executor 来执行任务的 CompletionService。此类将把 提交任务完成时的结果 放置 在可使用 take 访问的队列上。该类非常轻便,适合于在执行几组任务时临时使用。

(把执行任务完成的结果提交到一个队列中。)

nsq internals

###NSQ Internals
NSQ内幕

NSQ is composed of 3 daemons:
NSQ 由3个守护进程组成:

  • nsqd is the daemon that receives, queues, and delivers messages to clients.
    nsqd是接收、队列和传送 消息到客户端的守护进程

  • nsqlookupd is the daemon that manages topology information and provides an eventually consistent discovery service.
    nsqlookupd是管理的拓扑信息,并提供了最终一致发现服务的守护进程。

  • nsqadmin is a web UI to introspect the cluster in realtime (and perform various administrative tasks).
    nsqadmin是一个Web UI来实时监控集群(和执行各种管理任务)。

Data flow in NSQ is modeled as a tree of streams and consumers. A topic is a distinct stream of data. A channel is a logical grouping of consumers subscribed to a given topic.
在NSQ数据流建模为一个消息流和消费者的树。一个topic是一个独特的数据流。一个channel是消费者订阅了某个topic的逻辑分组。

数据流图

A single nsqd can have many topics and each topic can have many channels. A channel receives a copy of all the messages for the topic, enabling multicast style delivery while each message on a channel is distributed amongst its subscribers, enabling load-balancing.
单个nsqd可以有很多的topic,每个topic可以有多channel。一个channel接收到一个topic中所有消息的副本,启用组播方式的传输,使消息同时在每个channel的所有订阅用户间分发,从而实现负载平衡。

These primitives form a powerful framework for expressing a variety of . simple and complex topologies
这些原语组成一个强大的框架,用于表示各种简单和复杂的拓扑结构

For more information about the design of NSQ see the design doc.
有关NSQ的设计的更多信息请参见设计文档

topics/channels

Topics and channels, the core primitives of NSQ, best exemplify how the design of the system translates seamlessly to the features of Go.
topics和channels,NSQ的核心基础,最能说明如何把go语言的特点无缝地转化为系统设计。

Go’s channels (henceforth referred to as “go-chan” for disambiguation) are a natural way to express queues, thus an NSQ topic/channel, at its core, is just a buffered go-chan of Message pointers. The size of the buffer is equal to the –mem-queue-size configuration parameter.
Go语言中的channel(为消除歧义以下简称为“go-chan”)是实现队列一种自然的方式,因此一个NSQ topic/channel,其核心,只是一个缓冲的go-chan Message指针。缓冲区的大小等于 –mem-queue-size 的配置参数。

After reading data off the wire, the act of publishing a message to a topic involves:
在懂了读数据后,发布消息到一个topic的行为涉及到:

  1. instantiation of a Message struct (and allocation of the message body []byte)
  2. read-lock to get the Topic
  3. read-lock to check for the ability to publish
  4. send on a buffered go-chan

To get messages from a topic to its channels the topic cannot rely on typical go-chan receive semantics, because multiple goroutines receiving on a go-chan would distribute the messages while the desired end result is to copy each message to every channel (goroutine).
从一个topic中的channel获取消息不能依赖于经典的 go-chan 语义,因为多个goroutines 在一个go-chan上接收消息将会分发消息,而最终要的结果是复制每个消息到每一个channel(goroutine).

Instead, each topic maintains 3 primary goroutines. The first one, called router, is responsible for reading newly published messages off the incoming go-chan and storing them in a queue (memory or disk).
替代的是,每个topic维护着3个主要的goroutines。第一个被称为router,它负责用来从 incoming go-chan 读取最近发布的消息,并把消息保存到队列中(内存或硬盘)。

The second one, called messagePump, is responsible for copying and pushing messages to channels as described above.
第二个,称为messagePump,是负责复制和推送消息到如上所述的channels。

The third is responsible for DiskQueue IO and will be discussed later.
第三个是负责DiskQueue IO和将在后面讨论。

Channels are a little more complicated but share the underlying goal of exposing a single input and single output go-chan (to abstract away the fact that, internally, messages might be in memory or on disk):
Channels是一个有点复杂,但共享着 go-chan 单一输入和输出(抽象出来的事实是,在内部,消息可能会在内存或磁盘上):

queue goroutine

Additionally, each channel maintains 2 time-ordered priority queues responsible for deferred and in-flight message timeouts (and 2 accompanying goroutines for monitoring them).
此外,每个channel的维护负责2个时间排序优先级队列,用来实现传输中(in-flight)消息超时(第2个随行goroutines用于监视它们)。

Parallelization is improved by managing a per-channel data structure, rather than relying on the Go runtime’s global timer scheduler.
并行化的提高是通过每个数据结构管理一个channel,而不是依靠Go 运行时的全局定时器调度。

Note: Internally, the Go runtime uses a single priority queue and goroutine to manage timers. This supports (but is not limited to) the entirety of the time package. It normally obviates the need for a user-land time-ordered priority queue but it’s important to keep in mind that it’s a single data structure with a single lock, potentially impacting GOMAXPROCS > 1 performance. See runtime/time.goc.
注意:在内部,Go 运行时使用一个单一优先级队列和的goroutine来管理定时器。这支持(但不局限于)的整个time package。它通常避免了需要一个用户空间的时间顺序的优先级队列,但要意识到这是一个很重要的一个有着单一锁的数据结构,有可能影响GOMAXPROCS> 1的表现。请参阅 runtime/time.goc

####Backend / DiskQueue

One of NSQ’s design goals is to bound the number of messages kept in memory. It does this by transparently writing message overflow to disk via DiskQueue (which owns the third primary goroutine for a topic or channel).
NSQ的设计目标之一就是要限定保持在内存中的消息数。它通过 DiskQueue 透明地将溢出的消息写入到磁盘上(对于一个topic或 channel 而言,DiskQueue 拥有的第三个主要的goroutine)。

Since the memory queue is just a go-chan, it’s trivial to route messages to memory first, if possible, then fallback to disk:
由于内存队列只是一个 go-chan,把消息放到内存中显得不重要,如果可能的话,则退回到磁盘:

for msg := range c.incomingMsgChan {
    select {
    case c.memoryMsgChan <- msg:
    default:
        err := WriteMessageToBackend(&msgBuf, msg, c.backend)
        if err != nil {
            // ... handle errors ...
        }
    }
}

Taking advantage of Go’s select statement allows this functionality to be expressed in just a few lines of code: the default case above only executes if memoryMsgChan is full.
说到Go select 语句的优势在于用在短短的几行代码实现这个功能:default 语句只在memoryMsgChan已满的情况下执行。

NSQ also has the concept of ephemeral channels. Ephemeral channels discard message overflow (rather than write to disk) and disappear when they no longer have clients subscribed. This is a perfect use case for Go’s interfaces. Topics and channels have a struct member declared as a Backend interface rather than a concrete type. Normal topics and channels use a DiskQueue while ephemeral channels stub in a DummyBackendQueue, which implements a no-op Backend.
NSQ还具有的临时channel的概念。临时的channel将丢弃溢出的消息(而不是写入到磁盘),在没有客户端订阅时消失。这是一个完美的Go’s Interface 案例。topics 和 channel 有一个结构成员声明为一个 Backend interface,而不是一个具体的类型。正常的topic和channel使用DiskQueue,而临时channel stub in a DummyBackendQueue,它实现了一个no-op 的Backend。

Reducing GC Pressure 降低GC的压力

In any garbage collected environment you’re subject to the tension between throughput (doing useful work), latency (responsiveness), and resident set size (footprint).
在任何垃圾回收环境中,你可能会关注到吞吐量量(做无用功),延迟(响应),并驻留集大小(footprint)。

As of Go 1.2, the GC is mark-and-sweep (parallel), non-generational, non-compacting, stop-the-world and mostly precise . It’s mostly precise because the remainder of the work wasn’t completed in time (it’s slated for Go 1.3).
Go的1.2版本,GC采用,mark-and-sweep (parallel), non-generational, non-compacting, stop-the-world and mostly precise。这主要是因为剩余的工作未完成(它预定于Go 1.3 实现)。

The Go GC will certainly continue to improve, but the universal truth is: the less garbage you create the less time you’ll collect.
Go 的 GC一定会不断改进,但普遍的真理是:你创建的垃圾越少,收集的时间越少。

First, it’s important to understand how the GC is behaving under real workloads. To this end, nsqd publishes GC stats in statsd format (alongside other internal metrics). nsqadmin displays graphs of these metrics, giving you insight into the GC’s impact in both frequency and duration:
首先,重要的是要了解GC在真实的工作负载下是如何表现。为此,nsqd以statsd格式发布的GC统计(伴随着其他的内部指标)。nsqadmin显示这些度量的图表,让您洞察GC的影响,频率和持续时间:

single node view

In order to actually reduce garbage you need to know where it’s being generated. Once again the Go toolchain provides the answers:
为了切实减少垃圾,你需要知道它是如何生成的。再次Go toolchain 提供了答案:

  1. Use the testing package and go test -benchmem to benchmark hot code paths. It profiles the number of allocations per iteration (and benchmark runs can be compared with benchcmp).
    使用testing package 和 go test benchmem来 benchmark 热点代码路径。它分析每个迭代分配的内存数量(和benchmark 运行可以用 benchcmp 进行比较)。
  2. Build using go build -gcflags -m, which outputs the result of escape analysis.
    编译时使用 go build -gcflags -m , 会输出逃逸分析的结果。

With that in mind, the following optimizations proved useful for nsqd:
考虑到这一点,下面的优化证明对nsqd是有用的:

  1. Avoid []byte to string conversions.
    避免[]byte 到 string 的转换
  2. Re-use buffers or objects (and someday possibly sync.Pool aka issue 4720).
    buffers 或 object 的重新利用(并且某一天可能面临 ync.Pool 又名 issue 4720
  3. Pre-allocate slices (specify capacity in make) and always know the number and size of items over the wire.
    预先分配 slices(在make时指定容量)并且总是知道其中承载元素的数量和大小
  4. Apply sane limits to various configurable dials (such as message size).
    对各种配置项目使用一些明智的限制(例如消息大小)
  5. Avoid boxing (use of interface{}) or unnecessary wrapper types (like a struct for a “multiple value” go-chan).
    避免装箱(使用 interface{})或一些不必要的包装类型(例如一个 多值的”go-chan” 结构体)
  6. Avoid the use of defer in hot code paths (it allocates).
    避免在热点代码路径使用defer(它也消耗内存)

TCP Protocol

The NSQ TCP protocol is a shining example of a section where these GC optimization concepts are utilized to great effect.
NSQ的TCP协议是一个这些GC优化概念发挥了很大作用的的辉榜样。

The protocol is structured with length prefixed frames, making it straightforward and performant to encode and decode:
该协议用含有长度前缀的帧构造,使其可以直接高效的编码和解码:

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
    size      frame ID     data

Since the exact type and size of a frame’s components are known ahead of time, we can avoid the encoding/binary package’s convenience Read() and Write() wrappers (and their extraneous interface lookups and conversions) and instead call the appropriate binary.BigEndian methods directly.
由于提前知道了帧部件的确切类型与大小,我们避免了 encodgin/binary 便利 Read() 和 Write() 包装(以及它们外部interface 的查询与转换),而是直接调用相应的 binary.BigEndian 方法。

To reduce socket IO syscalls, client net.Conn are wrapped with bufio.Reader and bufio.Writer. The Reader exposes ReadSlice(), which reuses its internal buffer. This nearly eliminates allocations while reading off the socket, greatly reducing GC pressure. This is possible because the data associated with most commands does not escape (in the edge cases where this is not true, the data is explicitly copied).
为了减少 socket 的IO系统调用,客户端 net.Conn 都用 bufio.Reader和bufio.Writer 包装。Reader 暴露了 ReadSlice() ,它会重复使用其内部缓冲区。这几乎消除了从socket 读出数据的内存分配,大大降低GC的压力。这可能是因为与大多数命令关联的数据does not escape(在边缘情况下,这是不正确的,数据是显示复制的)。

At an even lower level, a MessageID is declared as [16]byte to be able to use it as a map key (slices cannot be used as map keys). However, since data read from the socket is stored as []byte, rather than produce garbage by allocating string keys, and to avoid a copy from the slice to the backing array of the MessageID, the unsafe package is used to cast the slice directly to a MessageID:
在一个更低的水平,提供一个 MessageID 被声明为[16]byte,以便能够把它作为一个map key(slice不能被用作map key)。然而,由于从socket读取数据存储为[]byte,而不是通过分配字符串键产生垃圾,并避免从slice的副本拷贝的数组形式的MessageID, unsafe package是用来直接把 slice 转换成一个MessageID:

id := *(*nsq.MessageID)(unsafe.Pointer(&msgID))

Note: This is a hack. It wouldn’t be necessary if this was optimized by the compiler and Issue 3512 is open to potentially resolve this. It’s also worth reading through issue 5376, which talks about the possibility of a “const like” byte type that could be used interchangeably where string is accepted, without allocating and copying.
注: 这是一个hack。它将不是必要的,如果编译器优化 和 Issue 3512 解决这个问题。另外值得一读通过issue 5376,其中谈到的“const like” byte 类型 与 string 类型可以互换使用,而不需要分配和复制。

Similarly, the Go standard library only provides numeric conversion methods on a string. In order to avoid string allocations, nsqd uses a custom base 10 conversion method that operates directly on a []byte.

同样,Go 标准库只提供了一个数字转换成string的方法。为了避免string分配,nsqd使用一个自定义的10进制转换方法在[]byte直接操作。

These may seem like micro-optimizations but the TCP protocol contains some of the hottest code paths. In aggregate, at the rate of tens of thousands of messages per second, they have a significant impact on the number of allocations and overhead:

这些看似微观优化,但却包含了TCP协议中一些最热门的代码路径。总体而言,每秒上万消息的速度,对分配和开销的数目显著影响:

benchmark                    old ns/op    new ns/op    delta
BenchmarkProtocolV2Data           3575         1963  -45.09%

benchmark                    old ns/op    new ns/op    delta
BenchmarkProtocolV2Sub256        57964        14568  -74.87%
BenchmarkProtocolV2Sub512        58212        16193  -72.18%
BenchmarkProtocolV2Sub1k         58549        19490  -66.71%
BenchmarkProtocolV2Sub2k         63430        27840  -56.11%

benchmark                   old allocs   new allocs    delta
BenchmarkProtocolV2Sub256           56           39  -30.36%
BenchmarkProtocolV2Sub512           56           39  -30.36%
BenchmarkProtocolV2Sub1k            56           39  -30.36%
BenchmarkProtocolV2Sub2k            58           42  -27.59%

HTTP

NSQ’s HTTP API is built on top of Go’s net/http package. Because it’s just HTTP, it can be leveraged in almost any modern programming environment without special client libraries.
NSQ的HTTP API是建立在Go的 net/http 包之上。因为它只是 HTTP,它可以利用没有特殊的客户端库的几乎所有现代编程环境。

Its simplicity belies its power, as one of the most interesting aspects of Go’s HTTP tool-chest is the wide range of debugging capabilities it supports. The net/http/pprof package integrates directly with the native HTTP server, exposing endpoints to retrieve CPU, heap, goroutine, and OS thread profiles. These can be targeted directly from the go tool:
它的简单性掩盖了它的能力,作为Go的HTTP tool-chest最有趣的方面之一是广泛的调试功能支持。该 net/http/pprof 包直接集成了原生的HTTP服务器,暴露获取CPU,堆,goroutine和操作系统线程性能的endpoints。这些可以直接从 go tool 找到:

$ go tool pprof http://127.0.0.1:4151/debug/pprof/profile

This is a tremendously valuable for debugging and profiling a running process!
这对调试和分析一个运行的进程非常有价值!

In addition, a /stats endpoint returns a slew of metrics in either JSON or pretty-printed text, making it easy for an administrator to introspect from the command line in realtime:
此外,/stats endpoint 返回的指标摆在任何JSON或良好格式的文本,很容易使管理员能够实时从命令行监控:

$ watch -n 0.5 'curl -s http://127.0.0.1:4151/stats | grep -v connected'

This produces continuous output like:
这产生的连续输出如下:


[page_views ] depth: 0 be-depth: 0 msgs: 105525994 e2e%: 6.6s, 6.2s, 6.2s
[page_view_counter ] depth: 0 be-depth: 0 inflt: 432 def: 0 re-q: 34684 timeout: 34038 msgs: 105525994 e2e%: 5.1s, 5.1s, 4.6s
[realtime_score ] depth: 1828 be-depth: 0 inflt: 1368 def: 0 re-q: 25188 timeout: 11336 msgs: 105525994 e2e%: 9.0s, 9.0s, 7.8s
[variants_writer ] depth: 0 be-depth: 0 inflt: 592 def: 0 re-q: 37068 timeout: 37068 msgs: 105525994 e2e%: 8.2s, 8.2s, 8.2s

[poll_requests  ] depth: 0     be-depth: 0     msgs: 11485060 e2e%: 167.5ms, 167.5ms, 138.1ms
    [social_data_collector    ] depth: 0     be-depth: 0     inflt: 2    def: 3    re-q: 7568  timeout: 402   msgs: 11485060 e2e%: 186.6ms, 186.6ms, 138.1ms

[social_data    ] depth: 0     be-depth: 0     msgs: 60145188 e2e%: 199.0s, 199.0s, 199.0s
    [events_writer            ] depth: 0     be-depth: 0     inflt: 226  def: 0    re-q: 32584 timeout: 30542 msgs: 60145188 e2e%: 6.7s, 6.7s, 6.7s
    [social_delta_counter     ] depth: 17328 be-depth: 7327  inflt: 179  def: 1    re-q: 155843 timeout: 11514 msgs: 60145188 e2e%: 234.1s, 234.1s, 231.8s

[time_on_site_ticks] depth: 0     be-depth: 0     msgs: 35717814 e2e%: 0.0ns, 0.0ns, 0.0ns
    [tail821042#ephemeral     ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 33909699 e2e%: 0.0ns, 0.0ns, 0.0ns

Finally, Go 1.2 brought measurable HTTP performance gains. It’s always nice when recompiling against the latest version of Go provides a free performance boost!
最后,Go 1.2带来可观的HTTP性能提升。与Go的最新版本重新编译时,它总是很高兴为您提供免费的性能提升!

#### Dependencies 依赖

Coming from other ecosystems, Go’s philosophy (or lack thereof) on managing dependencies takes a little time to get used to.
对于其它生态系统,Go 依赖关系管理(或缺乏)的哲学需要一点时间去适应。

NSQ evolved from being a single giant repo, with relative imports and little to no separation between internal packages, to fully embracing the recommended best practices with respect to structure and dependency management.
NSQ从一个单一的巨大仓库衍化而来的,包含相关的imports 和 小到未分离的内部 packages,完全遵守构建和依赖管理的最佳实践。

There are two main schools of thought:
有两大流派的思想:

  1. Vendoring: copy dependencies at the correct revision into your application’s repo and modify your import paths to reference the local copy.
    Vendoring:拷贝正确版本的依赖到你的应用程序的仓库,并修改您的import 路径来引用本地副本。
  2. Virtual Env: list the revisions of dependencies you require and at build time, produce a pristine GOPATH environment containing those pinned dependencies.
    Virtual Env:列出你在构建时所需要的依赖版本,产生一种原生的GOPATH环境变量包含这些固定依赖。
    Note: This really only applies to binary packages as it doesn’t make sense for an importable package to make intermediate decisions as to which version of a dependency to use.
    注:这确实只适用于二进制包,因为它没有任何意义的一个导入的包,使中间的决定,如一种依赖使用的版本。

NSQ uses godep to provide support for (2) above.
NSQ 使用 godep 提供如上述2中的支持。

It works by recording your dependencies in a Godeps file, which it later uses to construct a GOPATH environment. To build, it wraps and executes the standard Go toolchain inside that environment. The Godeps file is just JSON and can be edited by hand.
它的工作原理是在Godeps文件记录你的依赖,方便日后构建GOPATH环境。为了编译,它在环境里包装并执行的标准Go toolchain。该Godeps文件仅仅是JSON格式,可以进行手工编辑。

It even supports go get like semantics. For example, to produce a reliable build of NSQ:
它甚至还支持像 go get 的语义。例如,用来产生可靠的NSQ构建:

$ godep get github.com/bitly/nsq/...

Testing

Go provides solid built-in support for writing tests and benchmarks and, because Go makes it so easy to model concurrent operations, it’s trivial to stand up a full-fledged instance of nsqd inside your test environment.
Go 提供了编写测试和基准测试的内建支持,这使用 Go 很容易并发操作进行建模,这是微不足道的站起来的一个完整的实例nsqd您的测试环境中。

However, there was one aspect of the initial implementation that became problematic for testing: global state. The most obvious offender was the use of a global variable that held the reference to the instance of nsqd at runtime, i.e. var nsqd *NSQd.
然而,最初实现有可能变成测试问题的一个方面:全局状态。最明显的offender 是运行时使用该持有nsqd 的引用实例的全局变量,例如 包含配置元数据 和 到parent nsqd的引用。

Certain tests would inadvertently mask this global variable in their local scope by using short-form variable assignment, i.e. nsqd := NewNSQd(…). This meant that the global reference did not point to the instance that was currently running, breaking tests.
某些测试会使用短形式的变量赋值,无意中在局部范围掩盖这个全局变量,即nsqd:= NewNSQd(…) 。这意味着,全局引用没有指向了当前正在运行的实例,破坏了测试实例。

To resolve this, a Context struct is passed around that contains configuration metadata and a reference to the parent nsqd. All references to global state were replaced with this local Context, allowing children (topics, channels, protocol handlers, etc.) to safely access this data and making it more reliable to test.
要解决这个问题,一个包含配置元数据 和 到parent nsqd的引用 上下文结构被传来传去。到全局状态的所有引用都替换为本地的语境,允许 children(topics,channels,协议处理程序等)来安全地访问这些数据,使之更可靠的测试。

Robustness

A system that isn’t robust in the face of changing network conditions or unexpected events is a system that will not perform well in a distributed production environment.
一个面对不断变化的网络条件或突发事件不健壮的系统,不会是一个在分布式生产环境中表现良好的系统。

NSQ is designed and implemented in a way that allows the system to tolerate failure and behave in a consistent, predictable, and unsurprising way.
NSQ设计和的方式是 使系统能够容忍故障而表现出一致的,可预测的和令人吃惊的方式来实现。

The overarching philosophy is to fail fast, treat errors as fatal, and provide a means to debug any issues that do occur.
总体理念是快速失败, treat errors as fatal,并提供了一种方式来调试发生的任何问题。

But, in order to react you need to be able to detect exceptional conditions
但是,为了应对,你需要能够检测异常情况

Heartbeats and Timeouts

The NSQ TCP protocol is push oriented. After connection, handshake, and subscription the consumer is placed in a RDY state of 0. When the consumer is ready to receive messages it updates that RDY state to the number of messages it is willing to accept. NSQ client libraries continually manage this behind the scenes, resulting in a flow-controlled stream of messages.
NSQ 的TCP协议是面向push的。在建立连接,握手,和订阅后,消费者被放置在一个为0的RDY状态。当消费者准备好接收消息,它更新的RDY状态到准备接收消息的数量。NSQ客户端库不断在幕后管理,消息控制流的结果。

Periodically, nsqd will send a heartbeat over the connection. The client can configure the interval between heartbeats but nsqd expects a response before it sends the next one.
每隔一段时间,nsqd将发送一个心跳线连接。客户端可以配置心跳之间的间隔,但nsqd会期待一个回应在它发送下一个心掉之前。

The combination of application level heartbeats and RDY state avoids head-of-line blocking, which can otherwise render heartbeats useless (i.e. if a consumer is behind in processing message flow the OS’s receive buffer will fill up, blocking heartbeats).
组合应用级别的心跳和RDY状态,避免头阻塞现象,也可能使心跳无用(即,如果消费者是在后面的处理消息流的接收缓冲区中的操作系统将被填满,堵心跳)。

To guarantee progress, all network IO is bound with deadlines relative to the configured heartbeat interval. This means that you can literally unplug the network connection between nsqd and a consumer and it will detect and properly handle the error.
为了保证进度,所有的网络IO时间上限势必与配置的心跳间隔相关联。这意味着,你可以从字面上拔掉之间的网络连接nsqd和消费者,它会检测并正确处理错误。

When a fatal error is detected the client connection is forcibly closed. In-flight messages are timed out and re-queued for delivery to another consumer. Finally, the error is logged and various internal metrics are incremented.
当检测到一个致命错误,客户端连接被强制关闭。在传输中的消息会超时而重新排队等待传递到另一个消费者。最后,错误会被记录并累计到各种内部指标。

Managing Goroutines 管理 Goroutines

It’s surprisingly easy to start goroutines. Unfortunately, it isn’t quite as easy to orchestrate their cleanup. Avoiding deadlocks is also challenging. Most often this boils down to an ordering problem, where a goroutine receiving on a go-chan exits before the upstream goroutines sending on it.
非常容易启动 goroutine。不幸的是,不是很容易以协调他们的清理工作。避免死锁也极具挑战性。大多数情况下这可以归结为一个顺序的问题,在上游goroutine 发送消息到 go-chan 之前,另一个goroutine 从 go-chan 上接收消息。

Why care at all though? It’s simple, an orphaned goroutine is a memory leak. Memory leaks in long running daemons are bad, especially when the expectation is that your process will be stable when all else fails.
为什么要关心这些?这很显然,孤立的goroutine是内存泄漏。内存泄露在长期运行的守护进程中是相当糟糕的,尤其当期望的是你的进程能够稳定运行,但其它都失败了。

To further complicate things, a typical nsqd process has many goroutines involved in message delivery. Internally, message “ownership” changes often. To be able to shutdown cleanly, it’s incredibly important to account for all intraprocess messages.
更复杂的是,一个典型的nsqd进程中有许多参与消息传递 goroutines。在内部,消息的“所有权”频繁变化。为了能够完全关闭,统计全部进程内的消息是非常重要的。

Although there aren’t any magic bullets, the following techniques make it a little easier to manage
虽然目前还没有任何灵丹妙药,下列技术使它变得更轻松管理

WaitGroups

The sync package provides sync.WaitGroup, which can be used to perform accounting of how many goroutines are live (and provide a means to wait on their exit).
sync 包提供了 sync.WaitGroup, 可以被用来累计多少个 goroutine 是活跃的(并且意味着一直等待直到它们退出)。

To reduce the typical boilerplate, nsqd uses this wrapper:
为了减少经典样板,nsqd 使用以下装饰器:

type WaitGroupWrapper struct {
    sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}

// can be used as follows:
wg := WaitGroupWrapper{}
wg.Wrap(func() { n.idPump() })
...
wg.Wait()

Exit Signaling 退出信号

The easiest way to trigger an event in multiple child goroutines is to provide a single go-chan that you close when ready. All pending receives on that go-chan will activate, rather than having to send a separate signal to each goroutine.
有一个简单的方式在多个child goroutine 中触发一个事件是 提供一个go-chane, 当你准备好时关闭它。所有在那个 go-chan 上挂起的go-chan 都将会被激活,而不是向每个goroutine中发送一个单独的信号。

func work() {
    exitChan := make(chan int)
    go task1(exitChan)
    go task2(exitChan)
    time.Sleep(5 * time.Second)
    close(exitChan)
}
func task1(exitChan chan int) {
    <-exitChan
    log.Printf("task1 exiting")
}

func task2(exitChan chan int) {
    <-exitChan
    log.Printf("task2 exiting")
}

Synchronizing Exit 退出时的同步

It was quite difficult to implement a reliable, deadlock free, exit path that accounted for all in-flight messages. A few tips:
实现一个可靠的,无死锁的,所有传递中的消息的退出路径 是相当困难的。一些提示:

  1. Ideally the goroutine responsible for sending on a go-chan should also be responsible for closing it.
    理想的情况是负责发送到go-chan的goroutine中也应负责关闭它。

  2. If messages cannot be lost, ensure that pertinent go-chans are emptied (especially unbuffered ones!) to guarantee senders can make progress.
    如果message不能丢失,确保相关的 go-chan 被清空(尤其是无缓冲的!),以保证发送者可以取得进展。

  3. Alternatively, if a message is no longer relevant, sends on a single go-chan should be converted to a select with the addition of an exit signal (as discussed above) to guarantee progress.
    另外,如果消息是不重要的,发送给一个单一的 go-chan 应转换为一个 select 附加一个退出信号(如上所述),以保证取得进展。

  4. The general order should be:
    一般的顺序应该是:

    1. Stop accepting new connections (close listeners)
      停止接受新的连接(close listeners)
    2. Signal exit to child goroutines (see above)
      发送退出信号给child goroutines(如上文)
    3. Wait on WaitGroup for goroutine exit (see above)
      在 WaitGroup 等待 goroutine 退出(如上文)
    4. Recover buffered data
      恢复缓冲数据
    5. Flush anything left to disk
      刷新所有东西到硬盘
Logging 日志

Finally, the most important tool at your disposal is to log the entrance and exit of your goroutines!. It makes it infinitely easier to identify the culprit in the case of deadlocks or leaks.
最后,日志是您所获得的记录goroutine进入和退出的重要工具!。这使得它相当容易识别死锁或泄漏的情况的罪魁祸首。

nsqd log lines include information to correlate goroutines with their siblings (and parent), such as the client’s remote address or the topic/channel name.
nsqd日志行包括goroutine 与他们的siblings(and parent)的信息,如客户端的远程地址或 topic/channel name。

The logs are verbose, but not verbose to the point where the log is overwhelming. There’s a fine line, but nsqd leans towards the side of having more information in the logs when a fault occurs rather than trying to reduce chattiness at the expense of usefulness.
该日志是冗长的,但不是冗长的地步日志是压倒性的。有一条细线,但nsqd 倾向于发生故障时在日志中提供更多的信息,而不是试图减少繁琐的有效性为代价。

nsq design

NSQ Design设计

NSQ is a successor to simplequeue (part of simplehttp[https://github.com/bitly/simplehttp]) and as such is designed to (in no particular order):
NSQ是继承于 simplequeue(部分的 simplehttp),因此被设计为(排名不分先后):

  • provide easy topology solutions that enable high-availability and eliminate SPOFs
    提供更简单的拓扑方案,达到高可性和消除单点故障
  • address the need for stronger message delivery guarantees
    满足更强的消息可靠传递的保证
  • bound the memory footprint of a single process (by persisting some messages to disk)
    限制单个进程的内存占用(通过持久化一些消息到硬盘上)
  • greatly simplify configuration requirements for producers and consumers
    极大简化了生产者和消费者的配置要求
  • provide a straightforward upgrade path
    提供了一个简单的升级路径
  • improve efficiency
    提升效率

Simplifying Configuration and Administration 简化配置和管理

A single nsqd instance is designed to handle multiple streams of data at once. Streams are called “topics” and a topic has 1 or more “channels”. Each channel receives a copy of all the messages for a topic. In practice, a channel maps to a downstream service consuming a topic.
单个 nsqd 实例被设计成可以同时处理多个数据流。流被称为“topics”和 topics有1个或多个“channel”。每个channel 都接收到一个topic 中所有消息的拷贝。在实践中,一个channel映射到下行服务消费一个topic。

Topics and channels are not configured a priori. Topics are created on first use by publishing to the named topic or by subscribing to a channel on the named topic. Channels are created on first use by subscribing to the named channel.
topic 和 channel 都没有预先配置。topic 由第一次发布消息到命名的topic 或第一次通过订阅一个命名topic来创建。channel 被第一次订阅到指定的channel创建。

Topics and channels all buffer data independently of each other, preventing a slow consumer from causing a backlog for other channels (the same applies at the topic level).
topic和channel的所有缓冲的数据相互独立,防止缓慢消费者造成对其他channel的积压(同样适用于topic级别)。

A channel can, and generally does, have multiple clients connected. Assuming all connected clients are in a state where they are ready to receive messages, each message will be delivered to a random client. For example:
一个channel就可以了,一般会有多个客户端连接。假设所有已连接的客户端处于准备接收消息的状态,每个消息将被传递到一个随机的客户端。例如:

nsqd clients

To summarize, messages are multicast from topic -> channel (every channel receives a copy of all messages for that topic) but evenly distributed from channel -> consumers (each consumer receives a portion of the messages for that channel).
总之,消息从topic->channel 是多播的(每个channel接收的所有该topic消息的副本),即使均匀分布在 topic->sonsumers 之间(每个消费者收到该channel的消息的一部分)。

NSQ also includes a helper application, nsqlookupd, which provides a directory service where consumers can lookup the addresses of nsqd instances that provide the topics they are interested in subscribing to. In terms of configuration, this decouples the consumers from the producers (they both individually only need to know where to contact common instances of nsqlookupd, never each other), reducing complexity and maintenance.
NSQ还包括一个辅助应用程序,nsqlookupd,它提供了一个目录服务,消费者可以查找到提供他们感兴趣订阅topic的 nsqd 地址 。在配置方面,把消费者与生产者解耦开(它们都分别只需要知道哪里去连接nsqlookupd的共同实例,而不是对方),降低复杂性和维护。

At a lower level each nsqd has a long-lived TCP connection to nsqlookupd over which it periodically pushes its state. This data is used to inform which nsqd addresses nsqlookupd will give to consumers. For consumers, an HTTP /lookup endpoint is exposed for polling.
在更底的层面,每个nsqd有一个与 nsqlookupd 的长期TCP连接,定期推动其状态。这个数据被nsqlookupd用于给消费者通知nsqd地址。对于消费者来说,一个暴露的 HTTP /lookup 接口用于轮询。

To introduce a new distinct consumer of a topic, simply start up an NSQ client configured with the addresses of your nsqlookupd instances. There are no configuration changes needed to add either new consumers or new publishers, greatly reducing overhead and complexity.
为topic 引入一个新的消费者,只需启动一个配置了 nsqlookup实例地址的NSQ 客户端。无需为添加任何新的消费者或生产者更改配置,大大降低了开销和复杂性。

NOTE: in future versions, the heuristic nsqlookupd uses to return addresses could be based on depth, number of connected clients, or other “intelligent” strategies. The current implementation is simply all. Ultimately, the goal is to ensure that all producers are being read from such that depth stays near zero.
注:在将来的版本中,启发式nsqlookupd可以基于深度,已连接的客户端数量,或其他“智能”策略来返回地址。当前的实现是简单的返回所有地址。最终的目标是要确保所有深度接近零的生产者被读取。

It is important to note that the nsqd and nsqlookupd daemons are designed to operate independently, without communication or coordination between siblings.
值得注意的是,重要的是nsqd和nsqlookupd守护进程被设计成独立运行,没有相互之间的沟通或协调。

We also think that it’s really important to have a way to view, introspect, and manage the cluster in aggregate. We built nsqadmin to do this. It provides a web UI to browse the hierarchy of topics/channels/consumers and inspect depth and other key statistics for each layer. Additionally it supports a few administrative commands such as removing and emptying a channel (which is a useful tool when messages in a channel can be safely thrown away in order to bring depth back to 0).
我们还认为重要的是有一个方式来聚合 查看,监测,并管理集群。我们建立nsqadmin做到这一点。它提供了一个Web UI来浏览topic/channels/consumers 和深度检查等每一层的关键统计数据。此外,它还支持几个管理命令例如 移除channel 和 清空channel(这是一个有用的工具,当在一个channel中的信息可以被安全地扔掉,以使深度返回到0)。

nsqadmin

Straightforward Upgrade Path 简单的升级路径

This was one of our highest priorities. Our production systems handle a large volume of traffic, all built upon our existing messaging tools, so we needed a way to slowly and methodically upgrade specific parts of our infrastructure with little to no impact.
这是我们的高优先级之一。我们的生产系统处理大量的流量,都建立在我们现有的消息工具上,所以我们需要一种方法来慢慢地,有条不紊地升级我们特定部分的基础设施,而不产生任何影响。

First, on the message producer side we built nsqd to match simplequeue. Specifically, nsqd exposes an HTTP /put endpoint, just like simplequeue, to POST binary data (with the one caveat that the endpoint takes an additional query parameter specifying the “topic”). Services that wanted to switch to start publishing to nsqd only have to make minor code changes.
首先,在消息生产者方面,我们建立nsqd匹配simplequeue。具体来说,nsqd暴露了一个 HTTP /PUT 端点,就像simplequeue,上传二进制数据(需要注意的一点是 endpoint 需要一个额外的查询参数来指定”topic”)。想切换到发布消息到nsqd的服务只需要很少的代码变更。

Second, we built libraries in both Python and Go that matched the functionality and idioms we had been accustomed to in our existing libraries. This eased the transition on the message consumer side by limiting the code changes to bootstrapping. All business logic remained the same.
第二,我们建立了兼容已有库功能和语义的Python和Go库。这使得消息的消费者通过很少的代码改变来使用。所有的业务逻辑保持不变。

Finally, we built utilities to glue old and new components together. These are all available in the examples directory in the repository:
最后,我们建立工具连接起新旧组件。这些都在仓库的示例目录中:

1. nsq_pubsub - expose a pubsub like HTTP interface to topics in an NSQ cluster  
nsq_pubsub -在NSQ集群中以HTTP接口的形式暴露的一个pubsub  
2. nsq_to_file - durably write all messages for a given topic to a file  
nsq_to_file -将一个给定topic的所有消息持久化到文件  
3. nsq_to_http - perform HTTP requests for all messages in a topic to (multiple) endpoints  
nsq_to_http -对一个topic的所有消息的执行HTTP请求到(多个)endpoints

Eliminating SPOFs

NSQ is designed to be used in a distributed fashion. nsqd clients are connected (over TCP) to all instances providing the specified topic. There are no middle-men, no message brokers, and no SPOFs:
NSQ被设计以分布的方式被使用。nsqd客户端(通过TCP)连接到指定topic的所有生产者实例。没有中间人,没有消息代理,也没有单点故障:

nsq clients

This topology eliminates the need to chain single, aggregated, feeds. Instead you consume directly from all producers. Technically, it doesn’t matter which client connects to which NSQ, as long as there are enough clients connected to all producers to satisfy the volume of messages, you’re guaranteed that all will eventually be processed.
这种拓扑结构消除单链,聚合,反馈。相反,你的消费者直接访问所有所有生产者。从技术上讲,哪个客户端连接到哪个NSQ产东重要,只要有足够的消费者连接到所有生产者,以满足大量的消息,你能被保证:所有最终将被处理。

For nsqlookupd, high availability is achieved by running multiple instances. They don’t communicate directly to each other and data is considered eventually consistent. Consumers poll all of their configured nsqlookupd instances and union the responses. Stale, inaccessible, or otherwise faulty nodes don’t grind the system to a halt.
对于nsqlookupd,高可用性是通过运行多个实例来实现。他们不直接相互通信和数据被认为是最终一致。消费者轮询所有的配置的nsqlookupd实例和合并response。失败的,无法访问的,或以其他方式故障的节点不会让系统陷于停顿。

Message Delivery Guarantees 消息传递担保

NSQ guarantees that a message will be delivered at least once, though duplicate messages are possible. Consumers should expect this and de-dupe or perform idempotent operations.
NSQ保证消息将交付至少一次,虽然重复的消息是可能的。消费者应该关注到这一点,删除重复数据或执行幂等操作。

This guarantee is enforced as part of the protocol and works as follows (assume the client has successfully connected and subscribed to a topic):
这个担保是作为协议和工作流的一部分,工作原理如下(假设客户端成功连接并订阅一个topic):

  1. client indicates they are ready to receive messages
    客户表示他们已经准备好接收消息
  2. NSQ sends a message and temporarily stores the data locally (in the event of re-queue or timeout)
    NSQ发送一条消息,并暂时将数据存储在本地(在re-queue或timeout)
  3. client replies FIN (finish) or REQ (re-queue) indicating success or failure respectively. If client does not reply NSQ will timeout after a configurable duration and automatically re-queue the message
    客户端回复FIN(结束)或REQ(重新排队)分别指示成功或失败。如果客户端没有回复,NSQ会在设定的时间超时,自动重新排队消息

This ensures that the only edge case that would result in message loss is an unclean shutdown of an nsqd process. In that case, any messages that were in memory (or any buffered writes not flushed to disk) would be lost.
这确保了消息丢失唯一可能的情况是不正常结nsqd进程。在这种情况下,这是在内存中的任何信息(或任何缓冲未刷新到磁盘)都将丢失。

If preventing message loss is of the utmost importance, even this edge case can be mitigated. One solution is to stand up redundant nsqd pairs (on separate hosts) that receive copies of the same portion of messages. Because you’ve written your consumers to be idempotent, doing double-time on these messages has no downstream impact and allows the system to endure any single node failure without losing messages.
如何防止消息丢失是最重要的,即使是这个意外情况可以得到缓解。一种解决方案是构成冗余nsqd对(在不同的主机上)接收消息的相同部分的副本。因为你实现的消费者是幂等的,以两倍时间处理这些消息不会对下游造成影响,并使得系统能够承受任何单一节点故障而不会丢失信息。

The takeaway is that NSQ provides the building blocks to support a variety of production use cases and configurable degrees of durability.
附加的是 NSQ 提供构建基础以支持多种生产用例和持久化的可配置性。

Bounded Memory Footprint 限定内存占用

nsqd provides a configuration option –mem-queue-size that will determine the number of messages that are kept in memory for a given queue. If the depth of a queue exceeds this threshold messages are transparently written to disk. This bounds the memory footprint of a given nsqd process to mem-queue-size #_of_channels_and_topics:
nsqd提供一个 –mem-queue-size 配置选项,这将决定一个队列保存在内存中的消息数量。如果队列深度超过此阈值,消息将透明地写入磁盘。nsqd 进程的内存占用被限定于 mem-queue-size
#_of_channels_and_topics:

message overflow

Also, an astute observer might have identified that this is a convenient way to gain an even higher guarantee of delivery by setting this value to something low (like 1 or even 0). The disk-backed queue is designed to survive unclean restarts (although messages might be delivered twice).
此外,一个精明的观察者可能会发现,这是一个方便的方式来获得更高的传递保证:把这个值设置的比较低(如1或甚至是0)。磁盘支持的队列被设计为在不重启的情况下存在(虽然消息可能被传递两次)。

Also, related to message delivery guarantees, clean shutdowns (by sending a nsqd process the TERM signal) safely persist the messages currently in memory, in-flight, deferred, and in various internal buffers.
此外,涉及到信息传递保证,干净关机(通过给nsqd进程发送TERM信号)坚持安全地把消息保存在内存中,传输中,延迟,以及内部的各种缓冲区。

Note, a channel whose name ends in the string #ephemeral will not be buffered to disk and will instead drop messages after passing the mem-queue-size. This enables consumers which do not need message guarantees to subscribe to a channel. These ephemeral channels will also not persist after its last client disconnects.
请注意,一个以 #ephemeral 结束的 channel 名称不会在超过 mem-queue-size之后刷新到硬盘。这使得消费者并不需要订阅频道的消息担保。这些临时channel 将在最后一个客户端断开连接后消失。

Efficiency 效率

NSQ was designed to communicate over a “memcached-like” command protocol with simple size-prefixed responses. All message data is kept in the core including metadata like number of attempts, timestamps, etc. This eliminates the copying of data back and forth from server to client, an inherent property of the previous toolchain when re-queueing a message. This also simplifies clients as they no longer need to be responsible for maintaining message state.
NSQ被设计成一个使用简单 size-prefixed 为前缀的,与“memcached-like”类似的命令协议。所有的消息数据被保持在核心中,包括像尝试次数、时间截等元数据类。这消除了数据从服务器到客户端来回拷贝,当重新排队消息时先前工具链的固有属性。这也简化了客户端,因为他们不再需要负责维护消息的状态。

Also, by reducing configuration complexity, setup and development time is greatly reduced (especially in cases where there are >1 consumers of a topic).
此外,通过降低配置的复杂性,安装和开发的时间大大缩短(尤其是在有超过 > 1 消费者的topic)。

For the data protocol, we made a key design decision that maximizes performance and throughput by pushing data to the client instead of waiting for it to pull. This concept, which we call RDY state, is essentially a form of client-side flow control.
对于数据的协议,我们做了一个重要的设计决策,通过推送数据到客户端最大限度地提高性能和吞吐量的,而不是等待客户端拉数据。这个概念,我们称之为RDY状态,基本上是客户端流量控制的一种形式。

When a client connects to nsqd and subscribes to a channel it is placed in a RDY state of 0. This means that no messages will be sent to the client. When a client is ready to receive messages it sends a command that updates its RDY state to some # it is prepared to handle, say 100. Without any additional commands, 100 messages will be pushed to the client as they are available (each time decrementing the server-side RDY count for that client).
当客户端连接到nsqd和并订阅到一个channel时,它被放置在一个RDY为0状态。这意味着,还没有信息被发送到客户端。当客户端已准备好接收消息发送,更新它的命令RDY状态到它准备处理的数量,比如100。无需任何额外的指令,当100条消息可用时,将被传递到客户端(服务器端为那个客户端每次递减RDY计数)。

Client libraries are designed to send a command to update RDY count when it reaches ~25% of the configurable max-in-flight setting (and properly account for connections to multiple nsqd instances, dividing appropriately).
客户端库的被设计成在RDY count 达到配置 max-in-flight 的25% 发送一个命令来更新RDY 计数(并适当考虑连接到多个nsqd情况下,适当地分配)。

nsq protocol

This is a significant performance knob as some downstream systems are able to more-easily batch process messages and benefit greatly from a higher max-in-flight.
这是一个重要的性能控制,使一些下游系统能够更轻松地批量处理信息,并从更高的 max-in-flight 中受益。

Notably, because it is both buffered and push based with the ability to satisfy the need for independent copies of streams (channels), we’ve produced a daemon that behaves like simplequeue and pubsub combined . This is powerful in terms of simplifying the topology of our systems where we would have traditionally maintained the older toolchain discussed above.
值得注意的是,因为它既是基于缓冲和推来满足需要(channel)流的独立副本的能力,我们已经提供了 行为像simplequeue和 pubsub 相结合的守护进程。这是简化我们的系统拓扑结构的强大工具,如上述讨论那样我们会维护传统的toolchain。

Go

We made a strategic decision early on to build the NSQ core in Go. We recently blogged about our use of Go at bitly and alluded to this very project - it might be helpful to browse through that post to get an understanding of our thinking with respect to the language.
我们早早做了一个战略决策,利用Go来建立NSQ的核心。我们最近的博客上讲述我们在bitly如何使用Go,并提到这个适合的项目-通过浏览那篇文章可能对理解我们如何重视这么语言有所帮助。

Regarding NSQ, Go channels (not to be confused with NSQ channels) and the language’s built in concurrency features are a perfect fit for the internal workings of nsqd. We leverage buffered channels to manage our in memory message queues and seamlessly write overflow to disk.
关于NSQ,Go channels(不要与NSQ channel混淆),并且内置并发性功能的语言的非常适合于的nsqd的内部工作。我们充分利用缓冲的channel 来管理我们在内存中的消息队列和无缝r把溢出消息放到硬盘。

The standard library makes it easy to write the networking layer and client code. The built in memory and cpu profiling hooks highlight opportunities for optimization and require very little effort to integrate. We also found it really easy to test components in isolation, mock types using interfaces, and iteratively build functionality.
标准库使得很容易地编写网络层和客户端代码。只需要付出很少的努力,来整合内置的内存和CPU剖析进行优化。我们还发现它易于单独测试组件,模拟类型接口,以迭代方式构建功能。

nsq introduction

NSQ

a realtime distributed messaging platform
一个实时的分布式消息平台

####Introduction 介绍
NSQ is a realtime distributed messaging platform designed to operate at bitly’s scale, handling billions of messages per day (current peak of 90k+ messages per second).
NSQ是一个实时的分布式消息平台,被设计用来承载 bitly 处理每天数十亿的消息(每秒90k+的峰值)。

It promotes distributed and decentralized topologies without single points of failure, enabling fault tolerance and high availability coupled with a reliable message delivery guarantee. See features & guarantees.
它提升了分布式和分散式拓扑结构,避免了单点故障。通过容错性和高可用性来保障消息的传达。请参阅特性和保证。

Operationally, NSQ is easy to configure and deploy (all parameters are specified on the command line and compiled binaries have no runtime dependencies). For maximum flexibility, it is agnostic to data format (messages can be JSON, MsgPack, Protocol Buffers, or anything else). Official Go and Python libraries are available out of the box and, if you’re interested in building your own client, there’s a protocol spec (see client libraries).
使用上,NSQ易于配置和部署(所有参数都指定在命令行和编译的二进制文件没有运行时依赖)。为了获得最大的灵活性,它是不可知的数据格式(消息可以是JSON,MsgPack,Protocol Buffers,或其他任何东西)。官方的go和Python库可开箱即用,如果你有兴趣建立自己的客户端,这里有一个协议规范(参见客户端库)。

####Features & Guarantees 特性与保证

Features特性

  • support distributed topologies with no SPOF
    支持分布式拓扑,没有单点故障
  • horizontally scalable (no brokers, seamlessly add more nodes to the cluster)
    横向拓展(没有brokers, 无缝地往集群里添加更多节点)
  • low-latency push based message delivery (performance)
    基于推送的低延迟消息传递(性能)
  • combination load-balanced and multicast style message routing
    结合负载平衡和组播方式的消息路由
  • excel at both streaming (high-throughput) and job oriented (low-throughput) workloads
    同时擅长于流(高流量)与工作导向(低流量)工作负载
  • primarily in-memory (beyond a high-water mark messages are transparently kept on disk)
    主要是在内存中(超过上限的消息将被透明地保存在磁盘上)
  • runtime discovery service for consumers to find producers (nsqlookupd)
    运行发现服务,为消费者找到生产者(nsqlookupd)
  • transport layer security (TLS)
    传输层安全协议(TLS)
  • data format agnostic
    数据格式无关
  • few dependencies (easy to deploy) and a sane, bounded, default configuration
    没有依赖(易于部署),附有一个健全的,限定的默认配置
  • simple TCP protocol supporting client libraries in any language
    支持任何语言客户端库的简单TCP协议
  • HTTP interface for stats, admin actions, and producers (no client library needed to publish)
    HTTP接口进行统计,管理操作和生产(无需客户端库发布)
  • integrates with statsd for realtime instrumentation
    集成了statsd用于实时监控
  • robust cluster administration interface (nsqadmin)
    强大的集群管理界面(nsqadmin)

####Guarantees保证

As with any distributed system, achieving your goal is a matter of making intelligent tradeoffs. By being transparent about the reality of these tradeoffs we hope to set expectations about how NSQ will behave when deployed in production.
正如任何分布式系统,你的目标是实现智能权衡。透过高透明度有关这些权衡,我们希望设置NSQ在生产环境部署的的预期行为。

messages are not durable (by default)
默认情况下消息不进行持久化。

Although the system supports a “release valve” (–mem-queue-size) after which messages will be transparently kept on disk, it is primarily an in-memory messaging platform.
虽然该系统支持超过“阈值”(–mem-queue-size)的值之后,信息将被透明地保存在磁盘上,但它主要是一个内存中的消息传递平台。

–mem-queue-size can be set to 0 to to ensure that all incoming messages are persisted to disk. In this case, if a node failed, you are susceptible to a reduced failure surface (i.e. did the OS or underlying IO subsystem fail).
–mem-queue-size值可以设置为0,以确保所有传入的消息被保存到磁盘上。在这种情况下,如果一个节点失败,则很容易减小遭受的损失(即没操作系统或底层IO子系统失败)。

There is no built in replication. However, there are a variety of ways this tradeoff is managed such as deployment topology and techniques which actively slave and persist topics to disk in a fault-tolerant fashion.
没有内置的复制。但是,也有各种折衷的管理方式,诸如配置从节点,并把topics持久化到硬盘上的部署与拓扑方式。

messages are delivered at least once
消息至少传递一次

Closely related to above, this assumes that the given nsqd node does not fail.
与上述密切相关的是,这里假设给定nsqd节点不会失败。

This means, for a variety of reasons, messages can be delivered multiple times (client timeouts, disconnections, requeues, etc.). It is the client’s responsibility to perform idempotent operations or de-dupe.
这意味着,由于各种原因,消息可能会被传送多次(客户端超时,断线,对其重新排队,等等)。客户端承担起履行幂等操作或重复数据删除的责任。

messages received are un-ordered
消息的接收是无序的

You cannot rely on the order of messages being delivered to consumers.
你不能依赖于消息传递到消费者的顺序。

Similar to message delivery semantics, this is the result of requeues, the combination of in-memory and on disk storage, and the fact that each nsqd node shares nothing.
类似的消息传递语义,这是对其重新排队的结果,结合了内存和磁盘上的存储,每个nsqd节点间不共享任何内容。

It is relatively straightforward to achieve loose ordering (i.e. for a given consumer its messages are ordered but not across the cluster as a whole) by introducing a window of latency in your consumer to accept messages and order them before processing (although, in order to preserve this invariant one must drop messages falling outside that window).
这里有一个比较简单的实现松散的顺序(即对于一个给定的消费者它的消息是有序的,但不是以整个集群作为一个整体)通过引入一个延迟窗口,在您的消费者接受信息,并在处理之前排序(虽然,为了保持顺序不变必须抛弃在窗口之外的消息)。

consumers eventually find all topic producers
消费者最终会发现所有topic的生产者。

The discovery service (nsqlookupd) is designed to be eventually consistent. nsqlookupd nodes do not coordinate to maintain state or answer queries.
发现服务(nsqlookupd)被设计为最终一致。nsqlookupd节点并不需要互相协调来保持状态或回复查询。

Network partitions do not affect availability in the sense that both sides of the partition can still answer queries. Deployment topology has the most significant effect of mitigating these types of issues.
网络分区不影响可用性,分区双方仍然可以回复查询。部署拓扑结构能在很大程度减轻这类问题。

###FAQ

####Deployment部署

What is the recommended topology for nsqd?
推荐的nsqd拓扑结构是什么样的?

We strongly recommend running an nsqd alongside any service(s) that produce messages.
我们强烈建议nsqd与任何产生消息的服务一起运行。

nsqd is a relatively lightweight process with a bounded memory footprint, which makes it well suited to “playing nice with others”.
nsqd是一个占用有限内存的相对轻量级进程,这使得它非常适合于“playing nice with others”。

This pattern aids in structuring message flow as a consumption problem rather than a production one.
这种模式构建消息流有利于消费问题,而不是一个生产问题。

Another benefit is that it essentially forms an independent, sharded, silo of data for that topic on a given host.
另一个好处是,它本质上形成一个独立,分片,筒仓的数据对于一个给定主机上的topic。

NOTE: this isn’t an absolute requirement though, it’s just simpler (see question below).
注意:这不是一个绝对的要求,虽然,它只是简单的(见下面的问题)。

Why can’t nsqlookupd be used by producers to find where to publish to?
为什么不能nsqlookupd使用由生产者来找到在哪里发布到?

NSQ promotes a consumer-side discovery model that alleviates the upfront configuration burden of having to tell consumers where to find the topic(s) they need.
NSQ提供了消费者发现模型,降低了消息者找到所需 topic 之前的前期配置负担。

However, it does not provide any means to solve the problem of where a service should publish to. This is a chicken and egg problem, the topic does not exist prior to the first publish.
然而,它不提供任何手段来解决一个服务应发布到哪的问题。这是一个鸡和蛋的问题,topic 不会在第一个发布之前存在。

By co-locating nsqd (see question above), you sidestep this problem entirely (your service simply publishes to the local nsqd) and allow NSQ’s runtime discovery system to work naturally.
通过协同定位nsqd(见上面的问题),你完全回避这个问题(您的服务只是发布到本地nsqd),并允许NSQ运行时发现系统自然工作。

I just want to use nsqd as a work queue on a single node, is that a suitable use case?
我只是想用nsqd作为单个节点的工作队列,是一个合适的用例?

Yep, nsqd can run standalone just fine.
是的,独立运行nsqd就好了。

nsqlookupd is beneficial in larger distributed environments.
nsqlookupd 有利于在更大的分布式环境。

How many nsqlookupd should I run?
我需要启动多少个nsqlookupd?

Typically only a few depending on your cluster size, number of nsqd nodes and consumers, and your desired fault tolerance.
通常只要几个,取决于你集群大小,nsqd节点和消费者的数目,以及您需要的容错性。

3 or 5 works really well for deployments involving up to several hundred hosts and thousands of consumers.
3或5个就能够为部署上百的主机与数千的消费者工作很好。

nsqlookupd nodes do not require coordination to answer queries. The metadata in the cluster is eventually consistent.
nsqlookupd节点并没有需要协调来回答查询。集群中的元数据是最终一致。

####Publishing发布消息

Do I need a client library to publish messages?
是否需要一个客户端库发布消息?

NO! Just use the HTTP endpoints for publishing (/pub and /mpub). It’s simple, it’s easy, and it’s ubiquitous in almost any programming environment.

In fact, the overwhelming majority of NSQ deployments use HTTP to publish.
事实上,绝大多数NSQ部署使用HTTP进行发布。

Why force a client to handle responses to the TCP protocol’s PUB and MPUB commands?
为什么要强制客户端处理响应TCP协议的PUB和MPUB命令?

We believe NSQ’s default mode of operation should prioritize safety and we wanted the protocol to be simple and consistent.
我们认为NSQ的默认模式应优先考虑安全性,我们希望协议是简单和一致。

When can a PUB or MPUB fail?
何时PUT 或 MPU 会失败?

  1. The topic name is not formatted correctly (to character/length restrictions). See the topic and channel name spec.
    topic名称格式不正确(以字符/长度的限制)。请参阅topic和channel名称规范。
  2. The message is too large (this limit is exposed as a parameter to nsqd).
    该消息是太大(这个限制是作为参数传递给nsqd)
  3. The topic is in the middle of being deleted.
    主题是在被删除中。
  4. nsqd is in the middle of cleanly exiting.
    nsqd正在退出中。
  5. Any client connection-related failures during the publish.
    发布时任何客户端连接相关的故障。

(1) and (2) should be considered programming errors. (3) and (4) are rare and (5) is a natural part of any TCP based protocol.
(1)和(2)应当考虑的编程错误。(3)和(4)是罕见的,(5)是任何基于TCP协议的一个自然组成部分。

How can I mitigate scenario (3) above?
我怎样才能减轻上述场景(3)?

Deleting topics is a relatively infrequent operation. If you need to delete a topic, orchestrate the timing such that publishes eliciting topic creations will never be performed until a sufficient amount of time has elapsed since deletion.
删除topic是一个相对较少的操作。如果你需要删除一个主题,编排,使得publish引出话题的作品将永远,直到时间足够量的自删除已经过去执行的时机。

####Design and Theory设计与理念

How do you recommend naming topics and channels?
你推荐如何命令topics和channels?

A topic name should describe the data in the stream.
一个 topic name 应该描述流里的数据。

A channel name should describe the work performed by its consumers.
一个channel name 应该描述消费者执行的工作。

For example, good topic names are encodes, decodes, api_requests, page_views and good channel names are archive, analytics_increment, spam_analysis.

Are there any limitations to the number of topics and channels a single nsqd can support?
是否有单个nsqd结点上支持topics和channels的数量上限?

There are no built-in limits imposed. It is only limited by the memory and CPU of the host nsqd is running on (per-client CPU usage was greatly reduced in issue #236).
有没有内置强加的限制。它仅受限于运行nsqd 的主机内存和CPU(每客户端CPU占用率大大降低在问题#236)。

How are new topics announced to the cluster?
如何新topic发布到集群?

The first PUB or SUB to a topic will create the topic on an nsqd. Topic metadata will then propagate to the configured nsqlookupd. Other readers will discover this topic by periodically querying the nsqlookupd.
在topic 上的第一个 PUB或SUB 将在nsqd上创建 topic。然后topic的元数据将传播到配置nsqlookupd。其它readers 将通过定期查询nsqlookupd发现这个topic。

Can NSQ do RPC?
NSQ可以用来做RPC?

Yes, it’s possible, but NSQ was not designed with this use case in mind.
是的,这是可以的,但NSQ最初并没有设计这个用例。

We intend to publish some docs on how this could be structured but in the meantime reach out if you’re interested.

程序员的工具箱

figure out how logger get the source method name

info in Logger
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

package java.util.logging;

public class Logger {

public void info(String msg) {
if (Level.INFO.intValue() < levelValue) {
return;
}
log(Level.INFO, msg);
}

/**
* Log a message, with no arguments.
* <p>
* If the logger is currently enabled for the given message
* level then the given message is forwarded to all the
* registered output Handler objects.
* <p>
* @param level One of the message level identifiers, e.g., SEVERE
* @param msg The string message (or a key in the message catalog)
*/
public void log(Level level, String msg) {
if (level.intValue() < levelValue || levelValue == offValue) {
return;
}
LogRecord lr = new LogRecord(level, msg);
doLog(lr);
}

LogRecord
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

public LogRecord(Level level, String msg) {
// Make sure level isn't null, by calling random method.
level.getClass();
this.level = level;
message = msg;
// Assign a thread ID and a unique sequence number.
sequenceNumber = globalSequenceNumber.getAndIncrement();
threadID = defaultThreadID();
millis = System.currentTimeMillis();
needToInferCaller = true;
}


public String getSourceClassName() {
if (needToInferCaller) {
inferCaller();
}
return sourceClassName;
}

// 这里就是找出当前执行的类与方法的地方
// Private method to infer the caller's class and method names
private void inferCaller() {
needToInferCaller = false;
JavaLangAccess access = SharedSecrets.getJavaLangAccess();
Throwable throwable = new Throwable();
int depth = access.getStackTraceDepth(throwable);

String logClassName = "java.util.logging.Logger";
String plogClassName = "sun.util.logging.PlatformLogger";
boolean lookingForLogger = true;
for (int ix = 0; ix < depth; ix++) {
// Calling getStackTraceElement directly prevents the VM
// from paying the cost of building the entire stack frame.
StackTraceElement frame =
access.getStackTraceElement(throwable, ix);
String cname = frame.getClassName();
if (lookingForLogger) {
// 跳过所有looger之前的栈帧
// Skip all frames until we have found the first logger frame.
if (cname.equals(logClassName) || cname.startsWith(plogClassName)) {
lookingForLogger = false;
}
} else {
if (!cname.equals(logClassName) && !cname.startsWith(plogClassName)) {
// 跳过反射调用
// skip reflection call
if (!cname.startsWith("java.lang.reflect.") && !cname.startsWith("sun.reflect.")) {
// We've found the relevant frame.
setSourceClassName(cname);
setSourceMethodName(frame.getMethodName());
return;
}
}
}
}
// We haven't found a suitable frame, so just punt. This is
// OK as we are only committed to making a "best effort" here.
}



Selector Provider SPI

Selector Provider SPI

Opens a selector.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

public abstract class Selector implements Closeable {

/**
* Initializes a new instance of this class.
*/
protected Selector() { }

/**
* Opens a selector.
*
* <p> The new selector is created by invoking the {@link
* java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
* of the system-wide default {@link
* java.nio.channels.spi.SelectorProvider} object. </p>
*
* @return A new selector
*
* @throws IOException
* If an I/O error occurs
*/
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

SelectorProvider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

public abstract class SelectorProvider {

private static final Object lock = new Object();
private static SelectorProvider provider = null;

/**
* Initializes a new instance of this class. </p>
*
* @throws SecurityException
* If a security manager has been installed and it denies
* {@link RuntimePermission}<tt>("selectorProvider")</tt>
*/
protected SelectorProvider() {
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkPermission(new RuntimePermission("selectorProvider"));
}

private static boolean loadProviderFromProperty() {
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
// 通过反射加载类,并创建类的实例
Class<?> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());
provider = (SelectorProvider)c.newInstance();
return true;
} catch (ClassNotFoundException x) {
throw new ServiceConfigurationError(null, x);
} catch (IllegalAccessException x) {
throw new ServiceConfigurationError(null, x);
} catch (InstantiationException x) {
throw new ServiceConfigurationError(null, x);
} catch (SecurityException x) {
throw new ServiceConfigurationError(null, x);
}
}

private static boolean loadProviderAsService() {

// 这里通过 ServiceLoader 加载 SelectorProvider
ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();

// XXX 这里为什么要使用一个for 循环呢,底下不是只判断一次就可以了?
// 并且实际上也只执行一次???
for (;;) {
try {
if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {
if (sce.getCause() instanceof SecurityException) {
// Ignore the security exception, try the next provider
continue;
}
throw sce;
}
}
}

/**
* Returns the system-wide default selector provider for this invocation of
* the Java virtual machine.
*
* <p> The first invocation of this method locates the default provider
* object as follows: </p>
*
* <ol>
*
* <li><p> If the system property
* <tt>java.nio.channels.spi.SelectorProvider</tt> is defined then it is
* taken to be the fully-qualified name of a concrete provider class.
* The class is loaded and instantiated; if this process fails then an
* unspecified error is thrown. </p></li>
*
* <li><p> If a provider class has been installed in a jar file that is
* visible to the system class loader, and that jar file contains a
* provider-configuration file named
* <tt>java.nio.channels.spi.SelectorProvider</tt> in the resource
* directory <tt>META-INF/services</tt>, then the first class name
* specified in that file is taken. The class is loaded and
* instantiated; if this process fails then an unspecified error is
* thrown. </p></li>
*
* <li><p> Finally, if no provider has been specified by any of the above
* means then the system-default provider class is instantiated and the
* result is returned. </p></li>
*
* </ol>
*
* <p> Subsequent invocations of this method return the provider that was
* returned by the first invocation. </p>
*
* @return The system-wide default selector provider
*/
public static SelectorProvider provider() {
// 同步锁
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
// 从系统属性配置中加载
if (loadProviderFromProperty())
return provider;
// 以SPI的方式加载
if (loadProviderAsService())
return provider;

// 返回默认的SelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}

ServiceLoader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128

public final class ServiceLoader<S> implements Iterable<S>{

private static final String PREFIX = "META-INF/services/";

// 服务接口的class
// The class or interface representing the service being loaded
private Class<S> service;

// The class loader used to locate, load, and instantiate providers
private ClassLoader loader;

// 根据实例化的顺序缓存providers
// Cached providers, in instantiation order
private LinkedHashMap<String,S> providers = new LinkedHashMap<String,S>();

// The current lazy-lookup iterator
private LazyIterator lookupIterator;

// 清除此加载器的服务者缓存,以重载所有服务者。
/**
* Clear this loader's provider cache so that all providers will be
* reloaded.
*
* <p> After invoking this method, subsequent invocations of the {@link
* #iterator() iterator} method will lazily look up and instantiate
* providers from scratch, just as is done by a newly-created loader.
*
* <p> This method is intended for use in situations in which new providers
* can be installed into a running Java virtual machine.
*/
public void reload() {
providers.clear();// 清除缓存
lookupIterator = new LazyIterator(service, loader);
}

// 私有构造函数
private ServiceLoader(Class<S> svc, ClassLoader cl) {
service = svc;
loader = cl;
reload();
}


// 针对给定服务类型和类加载器创建新的服务加载器。
/**
* Creates a new service loader for the given service type and class
* loader.
*
* @param service
* The interface or abstract class representing the service
*
* @param loader
* The class loader to be used to load provider-configuration files
* and provider classes, or <tt>null</tt> if the system class
* loader (or, failing that, the bootstrap class loader) is to be
* used
*
* @return A new service loader
*/
public static <S> ServiceLoader<S> load(Class<S> service,
ClassLoader loader){
return new ServiceLoader<S>(service, loader);
}


// 针对给定服务类型创建新的服务加载器,使用当前线程的上下文类加载器。
/**
* Creates a new service loader for the given service type, using the
* current thread's {@linkplain java.lang.Thread#getContextClassLoader
* context class loader}.
*
* <p> An invocation of this convenience method of the form
*
* <blockquote><pre>
* ServiceLoader.load(<i>service</i>)</pre></blockquote>
*
* is equivalent to
*
* <blockquote><pre>
* ServiceLoader.load(<i>service</i>,
* Thread.currentThread().getContextClassLoader())</pre></blockquote>
*
* @param service
* The interface or abstract class representing the service
*
* @return A new service loader
*/
public static <S> ServiceLoader<S> load(Class<S> service) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return ServiceLoader.load(service, cl);
}

// 针对给定服务类型创建新的服务加载器,使用扩展类加载器。
/**
* Creates a new service loader for the given service type, using the
* extension class loader.
*
* <p> This convenience method simply locates the extension class loader,
* call it <tt><i>extClassLoader</i></tt>, and then returns
*
* <blockquote><pre>
* ServiceLoader.load(<i>service</i>, <i>extClassLoader</i>)</pre></blockquote>
*
* <p> If the extension class loader cannot be found then the system class
* loader is used; if there is no system class loader then the bootstrap
* class loader is used.
*
* <p> This method is intended for use when only installed providers are
* desired. The resulting service will only find and load providers that
* have been installed into the current Java virtual machine; providers on
* the application's class path will be ignored.
*
* @param service
* The interface or abstract class representing the service
*
* @return A new service loader
*/
public static <S> ServiceLoader<S> loadInstalled(Class<S> service) {
ClassLoader cl = ClassLoader.getSystemClassLoader();
ClassLoader prev = null;
while (cl != null) {
prev = cl;
cl = cl.getParent();
}
return ServiceLoader.load(service, prev);
}

ServiceLoader--》LazyIterator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141

private class LazyIterator implements Iterator<S>{

Class<S> service;
ClassLoader loader;
Enumeration<URL> configs = null;
Iterator<String> pending = null;
String nextName = null;

private LazyIterator(Class<S> service, ClassLoader loader) {
this.service = service;
this.loader = loader;
}

public boolean hasNext() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
// 配置文件的完整文件名
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}

// 解析文件
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
pending = parse(service, configs.nextElement());
}
// 得到nextName
nextName = pending.next();
return true;
}

public S next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
String cn = nextName;
nextName = null;
try {
// 加载相应的服务实现类
S p = service.cast(Class.forName(cn, true, loader)
.newInstance());
// 放入缓存中
providers.put(cn, p);
return p;
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated: " + x,
x);
}
throw new Error(); // This cannot happen
}

public void remove() {
throw new UnsupportedOperationException();
}

}

// 以延迟方式加载此加载器服务的可用提供者。
/**
* Lazily loads the available providers of this loader's service.
*
* <p> The iterator returned by this method first yields all of the
* elements of the provider cache, in instantiation order. It then lazily
* loads and instantiates any remaining providers, adding each one to the
* cache in turn.
*
* <p> To achieve laziness the actual work of parsing the available
* provider-configuration files and instantiating providers must be done by
* the iterator itself. Its {@link java.util.Iterator#hasNext hasNext} and
* {@link java.util.Iterator#next next} methods can therefore throw a
* {@link ServiceConfigurationError} if a provider-configuration file
* violates the specified format, or if it names a provider class that
* cannot be found and instantiated, or if the result of instantiating the
* class is not assignable to the service type, or if any other kind of
* exception or error is thrown as the next provider is located and
* instantiated. To write robust code it is only necessary to catch {@link
* ServiceConfigurationError} when using a service iterator.
*
* <p> If such an error is thrown then subsequent invocations of the
* iterator will make a best effort to locate and instantiate the next
* available provider, but in general such recovery cannot be guaranteed.
*
* <blockquote style="font-size: smaller; line-height: 1.2"><span
* style="padding-right: 1em; font-weight: bold">Design Note</span>
* Throwing an error in these cases may seem extreme. The rationale for
* this behavior is that a malformed provider-configuration file, like a
* malformed class file, indicates a serious problem with the way the Java
* virtual machine is configured or is being used. As such it is
* preferable to throw an error rather than try to recover or, even worse,
* fail silently.</blockquote>
*
* <p> The iterator returned by this method does not support removal.
* Invoking its {@link java.util.Iterator#remove() remove} method will
* cause an {@link UnsupportedOperationException} to be thrown.
*
* @return An iterator that lazily loads providers for this loader's
* service
*/
public Iterator<S> iterator() {
return new Iterator<S>() {

Iterator<Map.Entry<String,S>> knownProviders
= providers.entrySet().iterator();

public boolean hasNext() {
if (knownProviders.hasNext())
return true;
// 在reload方法中, lookupIterator = new LazyIterator(service, loader);

return lookupIterator.hasNext();
}

public S next() {
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}

public void remove() {
throw new UnsupportedOperationException();
}

};
}

解析SPI配置文件的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

// Parse a single line from the given configuration file, adding the name
// on the line to the names list.
//
private int parseLine(Class service, URL u, BufferedReader r, int lc,
List<String> names)
throws IOException, ServiceConfigurationError
{
String ln = r.readLine();
if (ln == null) {
return -1;
}
int ci = ln.indexOf('#');
if (ci >= 0) ln = ln.substring(0, ci);
ln = ln.trim();
int n = ln.length();
if (n != 0) {
if ((ln.indexOf(' ') >= 0) || (ln.indexOf('\t') >= 0))
fail(service, u, lc, "Illegal configuration-file syntax");
int cp = ln.codePointAt(0);
if (!Character.isJavaIdentifierStart(cp))
fail(service, u, lc, "Illegal provider-class name: " + ln);
for (int i = Character.charCount(cp); i < n; i += Character.charCount(cp)) {
cp = ln.codePointAt(i);
if (!Character.isJavaIdentifierPart(cp) && (cp != '.'))
fail(service, u, lc, "Illegal provider-class name: " + ln);
}
if (!providers.containsKey(ln) && !names.contains(ln))
// 添加当前行解析出的类名
names.add(ln);
}
return lc + 1;
}

// Parse the content of the given URL as a provider-configuration file.
//
// @param service
// The service type for which providers are being sought;
// used to construct error detail strings
//
// @param u
// The URL naming the configuration file to be parsed
//
// @return A (possibly empty) iterator that will yield the provider-class
// names in the given configuration file that are not yet members
// of the returned set
//
// @throws ServiceConfigurationError
// If an I/O error occurs while reading from the given URL, or
// if a configuration-file format error is detected
//
private Iterator<String> parse(Class service, URL u)
throws ServiceConfigurationError
{
InputStream in = null;
BufferedReader r = null;
ArrayList<String> names = new ArrayList<String>();
try {
in = u.openStream();
r = new BufferedReader(new InputStreamReader(in, "utf-8"));
int lc = 1;
while ((lc = parseLine(service, u, r, lc, names)) >= 0);
} catch (IOException x) {
fail(service, "Error reading configuration file", x);
} finally {
try {
if (r != null) r.close();
if (in != null) in.close();
} catch (IOException y) {
fail(service, "Error closing configuration file", y);
}
}
return names.iterator();
}

默认情况下的DefaultSelectorProvider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

public class DefaultSelectorProvider {

/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }

/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
String osname = AccessController.doPrivileged(
new GetPropertyAction("os.name"));
if ("SunOS".equals(osname)) {
return new sun.nio.ch.DevPollSelectorProvider();
}

// use EPollSelectorProvider for Linux kernels >= 2.6
if ("Linux".equals(osname)) {
String osversion = AccessController.doPrivileged(
new GetPropertyAction("os.version"));
String[] vers = osversion.split("\\.", 0);
if (vers.length >= 2) {
try {
int major = Integer.parseInt(vers[0]);
int minor = Integer.parseInt(vers[1]);
if (major > 2 || (major == 2 && minor >= 6)) {
return new sun.nio.ch.EPollSelectorProvider();
}
} catch (NumberFormatException x) {
// format not recognized
}
}
}

return new sun.nio.ch.PollSelectorProvider();
}

}

thrift async client

####一、Java的thrift异步Client

#####1.异步的thrift client类 TAsyncClient

TAsyncClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

public abstract class TAsyncClient {
protected final TProtocolFactory ___protocolFactory;
protected final TNonblockingTransport ___transport;
protected final TAsyncClientManager ___manager;
protected TAsyncMethodCall ___currentMethod;
private Exception ___error;
private long ___timeout;

public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) {
this(protocolFactory, manager, transport, 0);
}

public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout) {
this.___protocolFactory = protocolFactory;
this.___manager = manager;
this.___transport = transport;
this.___timeout = timeout;
}

public TProtocolFactory getProtocolFactory() {
return ___protocolFactory;
}

public long getTimeout() {
return ___timeout;
}

public boolean hasTimeout() {
return ___timeout > 0;
}

public void setTimeout(long timeout) {
this.___timeout = timeout;
}

/**
* Is the client in an error state?
* @return If client in an error state?
*/
public boolean hasError() {
return ___error != null;
}

/**
* Get the client's error - returns null if no error
* @return Get the client's error. <br /> returns null if no error
*/
public Exception getError() {
return ___error;
}

protected void checkReady() {
// 同时只能调用一个方法
// Ensure we are not currently executing a method
if (___currentMethod != null) {
throw new IllegalStateException("Client is currently executing another method: " + ___currentMethod.getClass().getName());
}

// Ensure we're not in an error state
if (___error != null) {
throw new IllegalStateException("Client has an error!", ___error);
}
}

/**
* Called by delegate method when finished
*/
protected void onComplete() {
// 当前方法完成
___currentMethod = null;
}

/**
* Called by delegate method on error
*/
protected void onError(Exception exception) {
// 发生错误,关闭连接,赋值给__error
___transport.close();
___currentMethod = null;
___error = exception;
}
}

2.异步方法调用
TAsyncMethodCall.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244

public abstract class TAsyncMethodCall<T> {

private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
private static AtomicLong sequenceIdCounter = new AtomicLong(0);

public static enum State {
CONNECTING,
WRITING_REQUEST_SIZE,
WRITING_REQUEST_BODY,
READING_RESPONSE_SIZE,
READING_RESPONSE_BODY,
RESPONSE_READ,
ERROR;
}

/**
* Next step in the call, initialized by start()
*/
private State state = null;

protected final TNonblockingTransport transport;
private final TProtocolFactory protocolFactory;
protected final TAsyncClient client;
private final AsyncMethodCallback<T> callback;
private final boolean isOneway;
private long sequenceId;

private ByteBuffer sizeBuffer;
private final byte[] sizeBufferArray = new byte[4];
private ByteBuffer frameBuffer;

private long startTime = System.currentTimeMillis();

protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) {
this.transport = transport;
this.callback = callback;
this.protocolFactory = protocolFactory;
this.client = client;
this.isOneway = isOneway;
this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement();
}

protected State getState() {
return state;
}

protected boolean isFinished() {
return state == State.RESPONSE_READ;
}

protected long getStartTime() {
return startTime;
}

protected long getSequenceId() {
return sequenceId;
}

public TAsyncClient getClient() {
return client;
}

public boolean hasTimeout() {
return client.hasTimeout();
}

public long getTimeoutTimestamp() {
return client.getTimeout() + startTime;
}

// 每个具体的方法调用会在这里面写入请求的方法名等信息。。。。
protected abstract void write_args(TProtocol protocol) throws TException;

// 准备方法调用:初始化缓存,写入参数到缓存中
/**
* Initialize buffers.
* @throws TException if buffer initialization fails
*/
protected void prepareMethodCall() throws TException {
// 准备方法调用写入参数
TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE);
TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);

// 写入方法的请求参数
write_args(protocol);

int length = memoryBuffer.length();
frameBuffer = ByteBuffer.wrap(memoryBuffer.getArray(), 0, length);

// 写入frame size 和 实现内容
TFramedTransport.encodeFrameSize(length, sizeBufferArray);
sizeBuffer = ByteBuffer.wrap(sizeBufferArray);
}

/**
* Register with selector and start first state, which could be either connecting or writing.
* @throws IOException if register or starting fails
*/
void start(Selector sel) throws IOException {
SelectionKey key;
if (transport.isOpen()) {
// 写入请求大小
state = State.WRITING_REQUEST_SIZE;
// 注册到选择器上
key = transport.registerSelector(sel, SelectionKey.OP_WRITE);
} else {
// 建立连接
state = State.CONNECTING;
// 注册到选择器上
key = transport.registerSelector(sel, SelectionKey.OP_CONNECT);

// non-blocking connect can complete immediately,
// in which case we should not expect the OP_CONNECT
if (transport.startConnect()) {
registerForFirstWrite(key);
}
}

// 附上TAsyncMethodCall自身
key.attach(this);
}

void registerForFirstWrite(SelectionKey key) throws IOException {
state = State.WRITING_REQUEST_SIZE;
key.interestOps(SelectionKey.OP_WRITE);
}

protected ByteBuffer getFrameBuffer() {
return frameBuffer;
}


// 迁移到下一个状态
/**
* Transition to next state, doing whatever work is required. Since this
* method is only called by the selector thread, we can make changes to our
* select interests without worrying about concurrency.
* @param key
*/
protected void transition(SelectionKey key) {
// Ensure key is valid
if (!key.isValid()) {
key.cancel();
Exception e = new TTransportException("Selection key not valid!");
onError(e);
return;
}

// Transition function
try {
switch (state) {
case CONNECTING:
doConnecting(key);
break;
case WRITING_REQUEST_SIZE:
doWritingRequestSize();
break;
case WRITING_REQUEST_BODY:
doWritingRequestBody(key);
break;
case READING_RESPONSE_SIZE:
doReadingResponseSize();
break;
case READING_RESPONSE_BODY:
doReadingResponseBody(key);
break;
default: // RESPONSE_READ, ERROR, or bug
throw new IllegalStateException("Method call in state " + state
+ " but selector called transition method. Seems like a bug...");
}
} catch (Exception e) {
key.cancel();
key.attach(null);
onError(e);
}
}

protected void onError(Exception e) {
client.onError(e);
callback.onError(e);
state = State.ERROR;
}

private void doReadingResponseBody(SelectionKey key) throws IOException {
if (transport.read(frameBuffer) < 0) {
throw new IOException("Read call frame failed");
}
if (frameBuffer.remaining() == 0) {
cleanUpAndFireCallback(key);
}
}

private void cleanUpAndFireCallback(SelectionKey key) {
state = State.RESPONSE_READ;
key.interestOps(0);
// this ensures that the TAsyncMethod instance doesn't hang around
key.attach(null);
client.onComplete();
callback.onComplete((T)this);
}

private void doReadingResponseSize() throws IOException {
if (transport.read(sizeBuffer) < 0) {
throw new IOException("Read call frame size failed");
}
if (sizeBuffer.remaining() == 0) {
state = State.READING_RESPONSE_BODY;
frameBuffer = ByteBuffer.allocate(TFramedTransport.decodeFrameSize(sizeBufferArray));
}
}

private void doWritingRequestBody(SelectionKey key) throws IOException {
if (transport.write(frameBuffer) < 0) {
throw new IOException("Write call frame failed");
}
if (frameBuffer.remaining() == 0) {
if (isOneway) {
cleanUpAndFireCallback(key);
} else {
state = State.READING_RESPONSE_SIZE;
sizeBuffer.rewind(); // Prepare to read incoming frame size
key.interestOps(SelectionKey.OP_READ);
}
}
}

private void doWritingRequestSize() throws IOException {
if (transport.write(sizeBuffer) < 0) {
throw new IOException("Write call frame size failed");
}
if (sizeBuffer.remaining() == 0) {
state = State.WRITING_REQUEST_BODY;
}
}

private void doConnecting(SelectionKey key) throws IOException {
if (!key.isConnectable() || !transport.finishConnect()) {
throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT");
}
registerForFirstWrite(key);
}
}

3. TAsyncClientManager

可以同时支持多个client的异步调用,相当于客户端的NIO selector 和线程管理。

TAsyncClientManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175

/**
* Contains selector thread which transitions method call objects
*/
public class TAsyncClientManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName());

private final SelectThread selectThread;
private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls = new ConcurrentLinkedQueue<TAsyncMethodCall>();

public TAsyncClientManager() throws IOException {
// 构造时即新建一个SelectThread线程
this.selectThread = new SelectThread();
selectThread.start();
}

// 进行方法调用的入口
public void call(TAsyncMethodCall method) throws TException {
if (!isRunning()) {
throw new TException("SelectThread is not running");
}
// 准备方法调用,写入参数等
method.prepareMethodCall();
// 加入等待队列
pendingCalls.add(method);
// 立即唤醒selector, select 阻塞中会立即返回
selectThread.getSelector().wakeup();
}

public void stop() {
selectThread.finish();
}

public boolean isRunning() {
return selectThread.isAlive();
}

private class SelectThread extends Thread {
private final Selector selector;
private volatile boolean running;
private final TreeSet<TAsyncMethodCall> timeoutWatchSet = new TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator());

public SelectThread() throws IOException {
// 创建selector
this.selector = SelectorProvider.provider().openSelector();
this.running = true;
this.setName("TAsyncClientManager#SelectorThread " + this.getId());

// We don't want to hold up the JVM when shutting down
setDaemon(true);
}

public Selector getSelector() {
return selector;
}

public void finish() {
running = false;
selector.wakeup();
}

public void run() {
while (running) {
try {
// 进行select操作
try {
if (timeoutWatchSet.size() == 0) {
// No timeouts, so select indefinitely
selector.select();
} else {
// We have a timeout pending, so calculate the time until then and select appropriately
long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp();
long selectTime = nextTimeout - System.currentTimeMillis();
if (selectTime > 0) {
// Next timeout is in the future, select and wake up then
selector.select(selectTime);
} else {
// Next timeout is now or in past, select immediately so we can time out
selector.selectNow();
}
}
} catch (IOException e) {
LOGGER.error("Caught IOException in TAsyncClientManager!", e);
}
// 迁移方法状态
transitionMethods();
// 判断方法过期
timeoutMethods();
// 开始那些等待的方法
startPendingMethods();
} catch (Exception exception) {
LOGGER.error("Ignoring uncaught exception in SelectThread", exception);
}
}
}

// Transition methods for ready keys
private void transitionMethods() {
try {
// 处理各种就绪的keys
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid()) {
// this can happen if the method call experienced an error and the
// key was cancelled. can also happen if we timeout a method, which
// results in a channel close.
// just skip
continue;
}
TAsyncMethodCall methodCall = (TAsyncMethodCall)key.attachment();
methodCall.transition(key);

// 执行完成或发生错误,从timeout 中移出
// If done or error occurred, remove from timeout watch set
if (methodCall.isFinished() || methodCall.getClient().hasError()) {
timeoutWatchSet.remove(methodCall);
}
}
} catch (ClosedSelectorException e) {
LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
}
}

// 判断方法是否timeout
// Timeout any existing method calls
private void timeoutMethods() {
Iterator<TAsyncMethodCall> iterator = timeoutWatchSet.iterator();
long currentTime = System.currentTimeMillis();
while (iterator.hasNext()) {
TAsyncMethodCall methodCall = iterator.next();
if (currentTime >= methodCall.getTimeoutTimestamp()) {
iterator.remove();
methodCall.onError(new TimeoutException("Operation " + methodCall.getClass() + " timed out after " + (currentTime - methodCall.getStartTime()) + " ms."));
} else {
break;
}
}
}

// Start any new calls
private void startPendingMethods() {
TAsyncMethodCall methodCall;
while ((methodCall = pendingCalls.poll()) != null) {
// Catch registration errors. method will catch transition errors and cleanup.
try {
// 开始执行方法,注册到selector上。
methodCall.start(selector);

// 加入timeout 监测
// If timeout specified and first transition went smoothly, add to timeout watch set
TAsyncClient client = methodCall.getClient();
if (client.hasTimeout() && !client.hasError()) {
timeoutWatchSet.add(methodCall);
}
} catch (Exception exception) {
LOGGER.warn("Caught exception in TAsyncClientManager!", exception);
methodCall.onError(exception);
}
}
}
}

/** Comparator used in TreeSet */
private static class TAsyncMethodCallTimeoutComparator implements Comparator<TAsyncMethodCall> {
public int compare(TAsyncMethodCall left, TAsyncMethodCall right) {
if (left.getTimeoutTimestamp() == right.getTimeoutTimestamp()) {
return (int)(left.getSequenceId() - right.getSequenceId());
} else {
return (int)(left.getTimeoutTimestamp() - right.getTimeoutTimestamp());
}
}
}
}
4. TAsyncMethodCall 具体实现示例
TAsyncMethodCall-add_call
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

public static class add_call extends org.apache.thrift.async.TAsyncMethodCall {

private long a;
private long b;
public add_call(long a, long b, org.apache.thrift.async.AsyncMethodCallback<add_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.a = a;
this.b = b;
}

// 这个方法实现 写入请求的参数
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
// 写入请求的方法名
prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("add", org.apache.thrift.protocol.TMessageType.CALL, 0));
add_args args = new add_args();
args.setA(a);
args.setB(b);
args.write(prot);
prot.writeMessageEnd();
}

二、Python的异步调用Client

1. 生成的Python接口文件
OAuthProviderService.py-Iface
1
2
3
4
5
6
7
8
9

class Iface(object):
def registerApp(self, app, callback):
"""
Parameters:
- app
"""
pass

2. 生成的Client实现类
OAuthProviderService.py-Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

class Client(Iface):
def __init__(self, transport, iprot_factory, oprot_factory=None):
self._transport = transport
self._iprot_factory = iprot_factory
self._oprot_factory = (oprot_factory if oprot_factory is not None
else iprot_factory)
# 初始化请求的id
self._seqid = 0
self._reqs = {}

# 请求回调时的处理类
@gen.engine
def recv_dispatch(self):
"""read a response from the wire. schedule exactly one per send that
expects a response, but it doesn't matter which callee gets which
response; they're dispatched here properly"""

#等待数据返回
#发送请求A
#发送请求B
#有可能发生:先返回请求B,再返回请求A的情景,所以根据seqid等信息来获取相应的callback方法

# 等待网络IO有数据返回,并且读取到frame
# wait for a frame header
frame = yield gen.Task(self._transport.readFrame)
#放入内存TTransport
tr = TTransport.TMemoryBuffer(frame)
#读取方法名称,消息类型,返回的reqid
iprot = self._iprot_factory.getProtocol(tr)
(fname, mtype, rseqid) = iprot.readMessageBegin()
#通过反射获取方法
method = getattr(self, 'recv_' + fname)
#调用回调方法
method(iprot, mtype, rseqid)

#实际的业务方法:注册App
def registerApp(self, app, callback):
"""
Parameters:
- app
"""
self._seqid += 1 #seqid加1
#将回调方法,放入类的请求队列中,相当于一个map
self._reqs[self._seqid] = callback
#发送数据请求
self.send_registerApp(app)
#等待请求返回
self.recv_dispatch()

def send_registerApp(self, app):
#获取输出协议
oprot = self._oprot_factory.getProtocol(self._transport)
#写入消息头:调用方法、消息类型、seqId
#写入请求参数:方法的调用参数
#flush transport的数据内容到网络
oprot.writeMessageBegin('registerApp', TMessageType.CALL, self._seqid)
args = registerApp_args()
args.app = app
args.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()

def recv_registerApp(self, iprot, mtype, rseqid):
#根据rseqid从请求队列中获取callback回调方法
callback = self._reqs.pop(rseqid)
#根据返回的消息类型进行相应处理
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
x.read(iprot)
iprot.readMessageEnd()
callback(x)
return
result = registerApp_result()
result.read(iprot)
iprot.readMessageEnd()
if result.success is not None:
callback(result.success)
return
callback(TApplicationException(TApplicationException.MISSING_RESULT, "registerApp failed: unknown result"))
return

3. 与tornado的ioloop 结合在一起的 TTornadoStreamTransport
TTornadoStreamTransport.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

class TTornadoStreamTransport(TTransport.TTransportBase):

"""a framed, buffered transport over a Tornado stream"""
#构造函数,传入目标的host,port
def __init__(self, host, port, stream=None):
self.host = host
self.port = port
self.is_queuing_reads = False
self.read_queue = []
self.__wbuf = StringIO()

# servers provide a ready-to-go stream
self.stream = stream
if self.stream is not None:
self._set_close_callback()

#打开transport,(进行网络连接)
# not the same number of parameters as TTransportBase.open
def open(self, callback):
#创建一个sock
logging.debug('socket connecting')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)

#构造tornado的iostream对象
self.stream = iostream.IOStream(sock)

#连接失败的回调函数
def on_close_in_connect(*_):
message = 'could not connect to %s:%s' % (self.host, self.port)
raise TTransportException(
type=TTransportException.NOT_OPEN,
message=message)
self.stream.set_close_callback(on_close_in_connect)

#连接成功的回调函数
def finish(*_):
self._set_close_callback()
callback()

self.stream.connect((self.host, self.port), callback=finish)

def _set_close_callback(self):
#这里定义的这个方法有啥用????
def on_close():
raise TTransportException(
type=TTransportException.END_OF_FILE,
message='socket closed')
self.stream.set_close_callback(self.close)

def close(self):
# don't raise if we intend to close
self.stream.set_close_callback(None)
self.stream.close()

#从不进行单独的读取操作,每次只能读取一个frame
def read(self, _):
# The generated code for Tornado shouldn't do individual reads -- only
# frames at a time
assert "you're doing it wrong" is True

@gen.engine
def readFrame(self, callback):
self.read_queue.append(callback)
logging.debug('read queue: %s', self.read_queue)

if self.is_queuing_reads:
# If a read is already in flight, then the while loop below should
# pull it from self.read_queue
return

self.is_queuing_reads = True
while self.read_queue:
next_callback = self.read_queue.pop()
result = yield gen.Task(self._readFrameFromStream)
next_callback(result)
self.is_queuing_reads = False

#从数据流中读取数据帧
@gen.engine
def _readFrameFromStream(self, callback):
logging.debug('_readFrameFromStream')
#读取帧头得到帧的长度
frame_header = yield gen.Task(self.stream.read_bytes, 4)
frame_length, = struct.unpack('!i', frame_header)
logging.debug('received frame header, frame length = %i', frame_length)
#读完整个帧
frame = yield gen.Task(self.stream.read_bytes, frame_length)
logging.debug('received frame payload')
callback(frame)

#写入到当前的buffer中
def write(self, buf):
self.__wbuf.write(buf)

def flush(self, callback=None):
#得到输出的缓冲内容
wout = self.__wbuf.getvalue()
#输出内容的长度
wsz = len(wout)

#frame的数据格式:4个字节的整数表示帧大小|帧的数据内容|

# reset wbuf before write/flush to preserve state on underlying failure
self.__wbuf = StringIO()
# N.B.: Doing this string concatenation is WAY cheaper than making
# two separate calls to the underlying socket object. Socket writes in
# Python turn out to be REALLY expensive, but it seems to do a pretty
# good job of managing string buffer operations without excessive copies
buf = struct.pack("!i", wsz) + wout

logging.debug('writing frame length = %i', wsz)
self.stream.write(buf, callback)

4. tornado与thrift 结合client 的包装器
ClientWrapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

class Client():
"""
Thrift Client proxying thrift methods defined on `iface_cls`.
A simple load balancer added in.
the MISSING_RESULT exception will be changed None to calller.

"""
def __init__(self, iface_cls, servers):
self._iface_cls = iface_cls
self._servers = servers


def __getattr__(self, attr):
@gen.engine
def client_call(*args, **kwargs):
server = self._find_server()
host, port = server.split(":")
transport = cloudatlas.thrift.TTornado.TTornadoStreamTransport(host, int(port))
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
_client = self._iface_cls(transport, pfactory)

try:
yield gen.Task(transport.open)

_callback = kwargs['callback']
del(kwargs['callback'])

result = yield gen.Task(getattr(_client, attr), *args, **kwargs)
#print result
if type(result) == Thrift.TApplicationException and result.type == Thrift.TApplicationException.MISSING_RESULT:
result = None # ---------------------- hacking for return None object
_client._transport.close()
_callback(result)
except TTransport.TTransportException as e:
_client._transport.close()
raise
except Exception as e:
_client._transport.close()
raise

setattr(self, attr, client_call)
return getattr(self, attr)



def _find_server(self):
''' no round robin, just random choose a server '''
return choice(self._servers)