Apache Geode CHANGELOG

Code Examples

An application can run a transaction directly or invoke a function which contains a transaction. This section illustrates these two use cases with code fragments that demonstrate the proper way to program a transaction.

An expected use case operates on two regions within a transaction. For performance purposes the Geode transaction implementation requires that region entries of partitioned regions be colocated. See Custom-Partitioning and Colocating Data for details on how to colocate region entries.

Transaction within an Application

An application/client uses the CacheTransactionManager API. This most basic code fragment shows the structure of a transaction, with its begin to start the transaction, commit to end the transaction, and handling of exceptions that these methods may throw.

CacheTransactionManager txManager =
          cache.getCacheTransactionManager();

try {
    txManager.begin();
    // ... do transactional, region operations
    txManager.commit();
} catch (CommitConflictException conflict) {
    // ... do necessary work for a transaction that failed on commit
} finally {
    // All other exceptions will be handled by the caller.
    // Examples of some exceptions: the data is not colocated, a rebalance
    // interfered with the transaction, or the server is gone.
    // Any exception thrown by a method other than commit() needs
    // to do a rollback to avoid leaking the transaction state.
    if(txManager.exists()) {
        txManager.rollback();
    }       
}

More details of a transaction appear in this next application/client code fragment example. In this typical transaction, the put operations must be atomic and two regions are involved.

In this transaction, a customer’s purchase is recorded. The cash region contains each customer’s cash balance available for making trades. The trades region records each customer’s balance spent on trades.

If there is a conflict upon commit of the transaction, an exception is thrown, and this example tries again.

// inputs needed for this transaction; shown as variables for simplicity
final String customer = "Customer1";
final Integer purchase = 1000;

// region set up shown to promote understanding
Cache cache = new CacheFactory().create();
Pool pool = PoolManager.createFactory()
           .addLocator("localhost", LOCATOR_PORT)
           .create("pool-name");
Region<String, Integer> cash =
           cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
           .setPoolName(pool.getName())
           .create("cash");
Region<String, Integer> trades = 
           cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
           .setPoolName(pool.getName())
           .create("trades");

// transaction code
CacheTransactionManager txManager = cache.getCacheTransactionManager();
boolean retryTransaction = false;
do {
  try {
    txManager.begin();

    // Subtract out the cost of the trade for this customer's balance
    Integer cashBalance = cash.get(customer);
    Integer newBalance = (cashBalance != null ? cashBalance : 0) - purchase;
    cash.put(customer, newBalance);

    // Add in the cost of the trade for this customer
    Integer tradeBalance = trades.get(customer);
    newBalance = (tradeBalance != null ? tradeBalance : 0) + purchase;
    trades.put(customer, newBalance);

    txManager.commit();
    retryTransaction = false;
  } 
  catch (CommitConflictException conflict) {
    // entry value changed causing a conflict for this customer, so try again
    retryTransaction = true;
  } finally {
    // All other exceptions will be handled by the caller. 
    // Any exception thrown by a method other than commit() needs
    // to do a rollback to avoid leaking the transaction state.
    if(txManager.exists()) {
      txManager.rollback();
    }       
  }       

} while (retryTransaction);

Design transactions such that any get operations are within the transaction. This causes those entries to be part of the transactional state, which is desired such that intersecting transactions can be detected and signal commit conficts.

Transaction within a Function

A transaction may be embedded in a function. The application invokes the function, and the function contains the transaction that does the begin, the region operations, and the commit or rollback.

This use of a function can have performance benefits. The performance benefit results from both the function and the region data residing on servers. As the function invokes region operations, those operations on region entries stay on the server, so there is no network round trip time to do get or put operations on region data.

This function example accomplishes atomic updates on a single region representing the quantity of products available in inventory. Doing this in a transaction prevents double allocating inventory for two orders placed simultaneously.

/**
 * Atomically reduce inventory quantity
 */
public class TransactionalFunction extends Function {

  /**
   * Returns true if the function had the requested quantity of
   * inventory and successfully completed the transaction to 
   * record the reduced inventory that fulfills the order.
   */
  @Override
  public void execute(FunctionContext context) {
    RegionFunctionContext rfc = (RegionFunctionContext) context;
    Region<ProductId, Integer> inventoryRegion = rfc.getDataSet();

    CacheTransactionManager 
        txManager = context.getCache().getCacheTransactionManager();

    // single argument will be a ProductId and a quantity
    ProductRequest request = (ProductRequest) rfc.getArguments();
    ProductId productRequested = request.getProductId();
    Integer qtyRequested = request.getQuantity();

    boolean success = false;

    do {
      boolean commitConflict = false;
      try {
        txManager.begin();

        Integer qtyAvailable = inventoryRegion.get(productRequested);
        if (qtyAvailable >= qtyRequested) {
          // enough inventory is available, so process request
          Integer remaining = qtyAvailable - qtyRequested;
          inventoryRegion.put(productRequested, remaining);
          txManager.commit();
          success = true;
        } 

      } catch (CommitConflictException conflict) {
        // retry transaction, as another request on this same key succeeded,
        // so this transaction attempt failed
        commitConflict = true;
      } finally {
        // All other exceptions will be handled by the caller; however,
        // any exception thrown by a method other than commit() needs
        // to do a rollback to avoid leaking the transaction state.
        if(txManager.exists()) {
          txManager.rollback();
        }       
      }

    } while (commitConflict);

    context.getResultSender().lastResult(success);
  }

  @Override
  public String getId() {
    return "TxFunction";
  }

  /**
   * Returning true causes this function to execute on the server
   * that holds the primary bucket for the given key. It can save a
   * network hop from the secondary to the primary.
   */
  @Override
  public boolean optimizeForWrite() {
    return true;
  }
}

The application-side details on function implementation are not covered in this example. The application sets up the function context and the argument. See the section on Function Execution for details on functions.

The function implementation needs to catch the commit conflict exception such that it can retry the entire transaction. The exception only occurs if another request for the same product intersected with this one, and that other request’s transaction committed first.

The optimizeForWrite method is defined to cause the system to execute the function on the server that holds the primary bucket for the given key. It can save a network hop from the secondary to the primary.

Note that the variable qtyAvailable is a reference, because the Region.get operation returns a reference within this server-side code. Read Region Operations Return References for details and how to work around the implications of a reference as a return value when working with server code.