Simple Cassandra Template Integrated with ObjectPool

Below is a Cassandra Template Integrated with ObjectPool Created in previous post

This has been tested with 30 parallel sessions opened to cassandra

We create one cluster object per application but we can create as many session as your system and configuration permits..

Note: Best way to create session is not attaching them to any keyspace.. so that we can reuse the same session/connections for multiple threads.

/**
*
*/
package com.linkwithweb.products.daolayer.cassandra;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.Session;
import com.linkwithweb.products.daolayer.ObjectPool;

/**
* @author ashwinrayaprolu
*
*/
@Configuration
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraTemplate extends ObjectPool<Session> {
private static final Log LOG = LogFactory.getLog(CassandraTemplate.class);
@Autowired
private Environment env;

public CassandraTemplate() {
this(4, 7, 5, 40);
}

/**
* @param minIdle
* @param maxIdle
* @param validationInterval
* @param maxConnections
*/
public CassandraTemplate(int minIdle, int maxIdle, long validationInterval, int maxConnections) {
super(minIdle, maxIdle, validationInterval, maxConnections);
}

/**
* @return
*/
@Bean
public Cluster cassandraCluster() {

Cluster cluster = null;
try {
PoolingOptions poolingOptions = new PoolingOptions();
;

poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, 4).setMaxConnectionsPerHost(HostDistance.LOCAL, 10)
.setCoreConnectionsPerHost(HostDistance.REMOTE, 2).setMaxConnectionsPerHost(HostDistance.REMOTE, 4).setHeartbeatIntervalSeconds(60);

cluster = Cluster.builder()
// (1)
.addContactPoint(env.getProperty("cassandra.contactpoints")).withQueryOptions(new QueryOptions().setFetchSize(2000))
.withPoolingOptions(poolingOptions).build();
} catch (Exception e) {
e.printStackTrace();
} finally {

}

return cluster;

}

/**
* @return
* @throws Exception
*/
public Session cassandraSession() throws Exception {
Session session = cassandraCluster().connect(); // (2)
return session;
}

@Override
protected Session create() {
try {
return cassandraSession();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}

@Override
public void close(Session object) {
object.close();
}
}

 

Generic Object Pool in Java

I recently had to create a generic object pool with minimalistic code. I didn’t want to add too much load by adding thirdparty jar’s.. Moreover i wanted code that can be applied to creation of any objects.. Below is implementation of bounded Object Pool


package com.linkwithweb.products.daolayer;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author ashwinrayaprolu
*
* @param <T>
*/
public abstract class ObjectPool<T> {
private Queue<T> pool;

/**
* Stores number of connections that are being used
*/
private final AtomicInteger usageCount = new AtomicInteger(0);
// Maximum number of connections that can be open. Defaulted to 20
private int maxConnections = 20;

private ScheduledExecutorService executorService;

/**
* Creates the pool.
*
* @param minIdle
* minimum number of objects residing in the pool
*/
public ObjectPool(final int minIdle,final int maxConnections) {
// initialize pool
this.maxConnections = maxConnections;
initialize(minIdle);
}

/**
* Creates the pool.
*
* @param minIdle
* minimum number of objects residing in the pool
* @param maxIdle
* maximum number of objects residing in the pool
* @param validationInterval
* time in seconds for periodical checking of minIdle / maxIdle
* conditions in a separate thread.
* When the number of objects is less than minIdle, missing
* instances will be created.
* When the number of objects is greater than maxIdle, too many
* instances will be removed.
*/
public ObjectPool(final int minIdle, final int maxIdle, final long validationInterval,final int maxConnections) {
this.maxConnections = maxConnections;
// initialize pool
initialize(minIdle);

// check pool conditions in a separate thread
executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
int size = pool.size();
if (size < minIdle) {
if(usageCount.compareAndSet(maxConnections, maxConnections)){
return;
}
int sizeToBeAdded = minIdle - size;
for (int i = 0; i < sizeToBeAdded; i++) {
System.out.println("Background Thread Creating Objects");
pool.add(create());
}
} else if (size > maxIdle) {
int sizeToBeRemoved = size - maxIdle;
for (int i = 0; i < sizeToBeRemoved; i++) {
System.out.println("Background Thread dumping Objects");
pool.poll();
}
}
}
}, validationInterval, validationInterval, TimeUnit.SECONDS);
}

/**
* Gets the next free object from the pool. If the pool doesn't contain any
* objects,
* a new object will be created and given to the caller of this method back.
*
* @return T borrowed object
*/
public T borrowObject() {
T object;

if(usageCount.compareAndSet(maxConnections, maxConnections)){
return null;
}

int preBorrowCount = usageCount.get();
if ((object = pool.poll()) == null) {
object = create();
}
while (usageCount.compareAndSet(preBorrowCount, preBorrowCount+1));

return object;
}

/**
* Returns object back to the pool.
*
* @param object
* object to be returned
*/
public void returnObject(T object) {
if (object == null) {
return;
}
int preReturnCount = usageCount.get();
this.pool.offer(object);
while (usageCount.compareAndSet(preReturnCount, preReturnCount-1));
}

/**
* Shutdown this pool.
*/
public void shutdown() {
if (executorService != null) {
executorService.shutdown();
}
}

/**
* Creates a new object.
*
* @return T new object
*/
protected abstract T create();

protected abstract void close(T object);

private void initialize(final int minIdle) {
pool = new ConcurrentLinkedQueue<T>();

for (int i = 0; i < minIdle; i++) {
pool.add(create());
}
}
}