My thread pool

package com.bawi.threads;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyThreadPool {
    
    static class MyBlockingQueue {
        Lock lock = new ReentrantLock();
        Condition notEmpty;
        Runnable runnable;
        
        public MyBlockingQueue() {
            lock = new ReentrantLock();
            notEmpty = lock.newCondition();
        }

        public Runnable take() throws InterruptedException {
            try {
                System.out.println("[" + Thread.currentThread().getName() + "] Take: Before lockInterruptibly");
                lock.lockInterruptibly();
                System.out.println("[" + Thread.currentThread().getName() + "] Take: After lockInterruptibly");
                while (runnable == null) {
                    System.out.println("[" + Thread.currentThread().getName() + "] Take: Before await");
                    notEmpty.await(); // releases the lock and becomes disabled for thread scheduling, start waiting for signal(all) or interrupts  
                    System.out.println("[" + Thread.currentThread().getName() + "] Take: After await");
                }
                return runnable;
            } 
            finally {
                runnable = null;
                lock.unlock();
            }
        }
        
        public void put(Runnable newRunnable) {
            try {
                System.out.println("[" + Thread.currentThread().getName() + "] Put: Before lockInterruptibly");
                lock.lockInterruptibly();
                System.out.println("[" + Thread.currentThread().getName() + "] Put: After lockInterruptibly");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                this.runnable = newRunnable;
                System.out.println("[" + Thread.currentThread().getName() + "] Put: Before signal");
                notEmpty.signal();
                System.out.println("[" + Thread.currentThread().getName() + "] Put: After signal");
            }
            finally {
                lock.unlock();
            }
        }
        
    }
    
    static class MyPool {
        
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
        List<Thread> workerThreads = new ArrayList<>();
        
        public static MyPool createAndStartThreadPool(int size) {
            MyPool myPool = new MyPool().withWorkerThreadCount(size);
            myPool.start();
            return myPool;
            
        }
        private MyPool withWorkerThreadCount(int size) {
            addWorkers(size);
            return this;
        }
        private void start() {
            System.out.println("[" + Thread.currentThread().getName() + "] Starting my worker threads in my pool");
            workerThreads.stream().forEach(Thread::start);
        }
        private void addWorkers(int size) {
            for (int i = 0; i < size; i++) {
                workerThreads.add(createWorkerThread());
            }
        }
        private Thread createWorkerThread() {
            return new Thread(new Runnable() {
                
                @Override
                public void run() {
                    System.out.println("[" + Thread.currentThread().getName() + "] Started my worker thread in my pool");
                    while (true) {
                        Runnable runnable;
                        try {
                            runnable = myBlockingQueue.take();
                        } catch (@SuppressWarnings("unused") InterruptedException e) {
                            System.out.println("[" + Thread.currentThread().getName() + "] My worker thread interrupted, exiting ...");
                            return;
                        }

                        System.out.println("[" + Thread.currentThread().getName() + "] Starting to run a task");
                        runnable.run();
                        System.out.println("[" + Thread.currentThread().getName() + "] Finished running a task");
                    }
                }
            });
        }
        public void submit(Runnable r) {
            myBlockingQueue.put(r);
        }
        public void shutdown() {
            workerThreads.stream().forEach(Thread::interrupt);
        };
    }
    
    public static void main(String[] args) {
        MyPool myPool = MyPool.createAndStartThreadPool(2);

        myPool.submit(() -> doTheWork("My Runnable 1"));
        myPool.submit(() -> doTheWork("My Runnable 2"));
        myPool.submit(() -> doTheWork("My Runnable 3"));
        
        sleepMillis(5000);

        myPool.shutdown();

    }

    static void sleepMillis(int sleepMillis) {
        try {
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void doTheWork(String name) {
        int sleepMillis = 1000;
        System.out.println("Task: " + name + ", sleeping: " + sleepMillis +  " ms ... [" + Thread.currentThread().getName() + "]"); 
        sleepMillis(sleepMillis);
    }
}

/*
[main] Starting my worker threads in my pool
[Thread-0] Started my worker thread in my pool
[Thread-1] Started my worker thread in my pool
[Thread-0] Take: Before lockInterruptibly
[Thread-0] Take: After lockInterruptibly
[Thread-0] Take: Before await
[Thread-1] Take: Before lockInterruptibly
[Thread-1] Take: After lockInterruptibly
[Thread-1] Take: Before await
[main] Put: Before lockInterruptibly
[main] Put: After lockInterruptibly
[main] Put: Before signal
[main] Put: After signal
[Thread-0] Take: After await
[Thread-0] Starting to run a task
Task: My Runnable 1, sleeping: 1000 ms ... [Thread-0]
[main] Put: Before lockInterruptibly
[main] Put: After lockInterruptibly
[main] Put: Before signal
[main] Put: After signal
[Thread-1] Take: After await
[Thread-1] Starting to run a task
Task: My Runnable 2, sleeping: 1000 ms ... [Thread-1]
[main] Put: Before lockInterruptibly
[main] Put: After lockInterruptibly
[main] Put: Before signal
[main] Put: After signal
[Thread-0] Finished running a task
[Thread-0] Take: Before lockInterruptibly
[Thread-0] Take: After lockInterruptibly
[Thread-0] Starting to run a task
Task: My Runnable 3, sleeping: 1000 ms ... [Thread-0]
[Thread-1] Finished running a task
[Thread-1] Take: Before lockInterruptibly
[Thread-1] Take: After lockInterruptibly
[Thread-1] Take: Before await
[Thread-0] Finished running a task
[Thread-0] Take: Before lockInterruptibly
[Thread-0] Take: After lockInterruptibly
[Thread-0] Take: Before await
[Thread-1] My worker thread interrupted, exiting ...
[Thread-0] My worker thread interrupted, exiting ...
*/
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s