zhengxiaoyong

关于生产者-消费者-订阅者模式的那些事

生产者/消费者模式

简介

用来干嘛的?

生产者/消费者模式的产生主要目的就是为了解决非同步的生产与消费之间的问题。

什么是非同步呢?
比如我刚刚生产了某个产品,而此时你正在打游戏,没空来取,要打完游戏来取,这就导致了我生产产品和你取产品是两个非同步的动作,你不知道我什么时候生产完产品,而我也不知道你什么时候来取。

而生产者/消费者模式就是解决这个非同步问题的,因为肯定不可能我生产完一个就给你打个电话叫你来取,然后等你取完我再生产下一个,这是多么低效的一种做法。所以这个模式运用而生,这个模式在生活中也有很好的体现,如:快递员派信这个例子,我就是生产者,快递员就是消费者,而生产者与消费者之间是通过什么来解决这种非同步的问题呢?就是一个存储中介,作为快递员派信这个例子中,信箱就是这个存储中介,每次我只要把写完的信扔入信箱,而快递员隔三差五的就会来取一次信,这两个动作是完全异步的,我把信扔入信箱后就不需要管什么了,之后肯定有快递员来取。

如何设计

设计图

生产者/消费者模式能够解决非同步的生产与消费的问题,归功就是存储中介的作用,因为生产者只要把生产完的物品放入存储中介中就行了,而不必关系消费者什么时候来取,当消费者需要时自然会来取,当存储中介满了的话,那么生产者将停止生产,因为再生产就没地放了,这时候就需要等待消费者消费了,而当存储中介没有时,这时候消费者来取那肯定取不到,所以也需要Wait,等待生产者生产后才能取到。所以这就有了下面这个设计图:
这里写图片描述

定位

从上图中可以知道,一个完整的生产者/消费者模式具有生产者、消费者、存储中介以及产品。这四个缺一不可,而关于它们的定位也至关重要
1 . Product
对于产品,什么样的对象才能成为产品呢,一是根据当时的业务逻辑来判断,比如执行完某些操作后的产生的Result,二是必须保持每个产品之间的完整性和独立性,保证各个产品之间互不影响、互不关联。
2 . Store
对于存储中介,它肯定是一块具有额定大小的存储空间,而这个存储空间一般来说具有FIFO的数据结构,比如JDK内置了具有阻塞作用的有界队列:ArrayBlockingQueue、LinkedBlockingQueue。并且存储中介需要起到生产者与消费者解耦的作用,这样的好处是当后期生产者或者消费者的生产方式或处理方式变了,这样只需要改变一方,而另外一方则不需要调整。而且它负责协调生产者与消费者之间的生产消费关系。
3 . Producer
对于生产者,它是具有配置Product各种属性的一个对象,可以设计成Factory、Builder、装饰者模式等等,一般来说生产者有单独的一个线程用来生产产品,当然如果量大的话可以用多个线程去生产,不过需要处理一下线程同步的问题(Semaphore|synchronized|ThreadLocal)
4 . Consumer
对于消费者,和Producer差不多,主要就是用来处理Product的,一般也有单独的一个线程去处理Product。

实例

1 . 生产者
AppleProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AppleProducer {
private Store<Apple> mStore;
private static ExecutorService mWorkThread = Executors.newFixedThreadPool(5);
private String name;

public AppleProducer setName(String name) {
this.name = name;
return this;
}

public AppleProducer bindStore(Store<Apple> store) {
this.mStore = store;
return this;
}

public void production() {
mWorkThread.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
Apple apple = new Apple("第" + name + "个产品");
mStore.push(apple);
}
});
}
}

2 . 消费者
AppleConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class AppleConsumer {
private Store<Apple> mStore;
private ExecutorService mWorkThread = Executors.newFixedThreadPool(1);

public AppleConsumer bindStore(Store<Apple> store) {
this.mStore = store;
return this;
}

public void consume() {
mWorkThread.submit(new Runnable() {
@Override
public void run() {
for (; ; ) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
Apple apple = mStore.take();
System.out.println("apple:" + apple.getName() + "消费了");
}
}
});

}
}

3 . 存储中介
Store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Store<T> {
private BlockingQueue<T> mQueue = new ArrayBlockingQueue<>(10, true);

public void push(T t) {
try {
mQueue.put(t);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
System.out.println("product生产了...");
}

public T take() {
T t = null;
try {
t = mQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
System.out.println("product取出了...");
return t;
}

public void release() {
if (mQueue.isEmpty())
return;
mQueue.clear();
}
}

4 . 产品

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Apple {
private String name;

public Apple(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

5 . Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Test {
public static void main(String[] args) {
Store<Apple> store = new Store<>();

new AppleConsumer()
.bindStore(store)
.consume();

for (int i = 0; i < 30; i++) {
new AppleProducer()
.bindStore(store)
.setName(i + "")
.production();
}
}
}

result:

product生产了…
product生产了…
product生产了…
product生产了…
product生产了…
product取出了…
apple:第1个产品消费了
product生产了…
product生产了…
product生产了…
product生产了…
product生产了…
product生产了…
product取出了…
apple:第0个产品消费了
product生产了…
product取出了…
product生产了…
apple:第3个产品消费了
product取出了…
apple:第2个产品消费了
product生产了…
product取出了…
apple:第4个产品消费了
product生产了…
product取出了…
apple:第5个产品消费了
product生产了…
product取出了…
apple:第6个产品消费了
product生产了…


对于生产者,为了避免生产线程数量过多采取了一个线程池控制生产线程的数量,而生产者每一件产品都是由单独的一个线程来生产,对于消费者,用一个线程去轮询取队列里的产品,有则取出,没有则阻塞等待,由于ArrayBlockingQueue本身是支持并发的,所以在多线程共同操作一个存储队列的情况下,并不会有并发的问题。
所以生产者/消费者模式也支持多生产——多消费的模式:
这里写图片描述
对于有众多种类不同的生产者,可以用一个工厂类来管理。

数据源/订阅者模式

关于这个模式,其实是生产者/消费者模式的变体,这种模式并不需要存储中介,而是通过一个DataSource空壳来包装数据,对于发布者提交了一个Task后,将立即返回一个DataSource,对于任务执行完后的结果,如果你想获取则必须通过datasource.subscribe(new XxxSubscriber(…))来订阅获取执行后的结果,而如果不通过数据源订阅的方式来获取而直接通过datasource.getData()获取则返回null,因为DataSource只是一个获取数据的空壳。这种模式在Fresco源码中有很好的体现,用了大量的这种模式。
这里写图片描述

栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  DataSource<Apple> dataSource = ProducerFactory.newAppleProducer().submit(name);
//Apple apple = dataSource.getData();// apple is null!
dataSource.subscribe(new BaseDataSubscriber<Apple>() {
@Override
protected void onSuccess(DataSource<Apple> dataSource) {
Apple apple = dataSource.getData();
//...
}

@Override
protected void onFailure(DataSource<Apple> dataSource,Throwable throwable) {
//...
}
});
坚持原创技术分享,您的支持将鼓励我继续创作!