My thread pool second implementation

package com.bawi.threads.my.thread.pool;

import java.util.LinkedList;

public class MyBlockingQueue<T> {
	
	private LinkedList<T> linkedList = new LinkedList<>();
	private final int size;
	
	public MyBlockingQueue(int size) {
		this.size = size;
	}
	
	public synchronized T take() throws InterruptedException {
		while (linkedList.size() == 0) {
			wait();
		};
		notify();
		return linkedList.removeLast();
	}
	
	public synchronized void put(T element) throws InterruptedException {
		while (linkedList.size() == size) {
			wait();
		}
		linkedList.addFirst(element);
		notify();
	}
}

package com.bawi.threads.my.thread.pool;

public class MyWorkerThread extends Thread {

    private MyBlockingQueue<Runnable> queue;
    private volatile boolean isShutdown = false;

    public MyWorkerThread(MyBlockingQueue<Runnable> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (!isShutdown) {
            try {
                Runnable task = queue.take();
                task.run();
            } catch (InterruptedException e) {
                System.out.println("[" + Thread.currentThread().getName() + "] Interrupted waiting on next task");
            }
        }
    }

    public void shutdown() {
        isShutdown = true;
        interrupt();
    }
}

package com.bawi.threads.my.thread.pool;

import java.util.ArrayList;
import java.util.List;

public class MyThreadPoolBuilder {

    private int workerThreadsCount;
    private int blockingQueueSize;

    public MyThreadPoolBuilder withWorkerThreadCount(int count) {
        workerThreadsCount = count;
        return this;
    }

    public MyThreadPoolBuilder withBlockingQueueSize(int size) {
        blockingQueueSize = size;
        return this;
    }

    public MyThreadPool build() {
        validate();
        MyBlockingQueue<Runnable> blockingQueue = new MyBlockingQueue<>(blockingQueueSize);
        List<MyWorkerThread> workerThreads = new ArrayList<>(workerThreadsCount);
        for (int i = 0; i < workerThreadsCount; i++) {
            workerThreads.add(new MyWorkerThread(blockingQueue));
        }
        return new MyThreadPool(workerThreads, blockingQueue);
    }

    private void validate() {
        if (blockingQueueSize <= 0) {
            throw new IllegalArgumentException("Blocking queue size must me positive");
        }
        if (workerThreadsCount <= 0) {
            throw new IllegalArgumentException("Worker thread count must me positive");
        }
    }

    public MyThreadPool buildAndStart() {
        MyThreadPool myThreadPool = build();
        myThreadPool.start();
        return myThreadPool;
    }
}


package com.bawi.threads.my.thread.pool;

import java.util.List;

public class MyThreadPool {

    private final MyBlockingQueue<Runnable> blockingQueue;
    private final List<MyWorkerThread> workerThreads;
    private volatile boolean isShutdown = false;

    public MyThreadPool(List<MyWorkerThread> workerThreads, MyBlockingQueue<Runnable> blockingQueue) {
        this.workerThreads = workerThreads;
        this.blockingQueue = blockingQueue;
    }

    public void start() {
        workerThreads.stream().forEach(MyWorkerThread::start);
    }

    public void submit(Runnable task) {
        if (!isShutdown) {
            try {
                blockingQueue.put(task);
            } catch (InterruptedException e) {
                System.out.println("Interrupted putting new task on a processing blockingQueue");
            }
        }
    }

    public void shutdownNow() {
        isShutdown = true;
        workerThreads.stream().forEach(MyWorkerThread::shutdown);
    }

    public static void main(String[] args) throws InterruptedException {
        MyThreadPool myThreadPool = new MyThreadPoolBuilder()
                .withWorkerThreadCount(2)
                .withBlockingQueueSize(4)
                .buildAndStart();

        myThreadPool.submit(createTask(1));
        myThreadPool.submit(createTask(2));
        myThreadPool.submit(createTask(3));
        myThreadPool.submit(createTask(4));
        myThreadPool.submit(createTask(5));
        myThreadPool.submit(createTask(6));

        Thread.sleep(10); // sleep until all tasks will be picked up for processing
        myThreadPool.shutdownNow();
    }

    private static Runnable createTask(int taskNumber) {
        return () -> {
            System.out.println("[" + Thread.currentThread().getName() + "] Hello from task #" + taskNumber);
        };
    }

}

[Thread-0] Hello from task #1
[Thread-1] Hello from task #2
[Thread-0] Hello from task #3
[Thread-1] Hello from task #4
[Thread-1] Hello from task #6
[Thread-0] Hello from task #5
[Thread-0] Interrupted waiting on next task
[Thread-1] Interrupted waiting on next task


#Ouputput
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s