[elephant-devel] running out of locks

Ben ben at medianstrip.net
Fri Feb 18 22:32:44 UTC 2005


try this one?

B

On Fri, 18 Feb 2005, [utf-8] Gábor Melis wrote:

> On Thursday 17 February 2005 23:44, you wrote:
>> here's the tarball, let me know if you have problems - B
>
> I got the feedback fever.
>
> add-index with populate maps the btree and runs out of locks here :-)
> degree-2?
>
-------------- next part --------------
;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10 -*-
;;;
;;; collections.lisp -- view Berkeley DBs as Lisp collections
;;; 
;;; Initial version 8/26/2004 by Ben Lee
;;; <blee at common-lisp.net>
;;; 
;;; part of
;;;
;;; Elephant: an object-oriented database for Common Lisp
;;;
;;; Copyright (c) 2004 by Andrew Blumberg and Ben Lee
;;; <ablumberg at common-lisp.net> <blee at common-lisp.net>
;;;
;;; This program is released under the following license
;;; ("GPL").  For differenct licensing terms, contact the
;;; copyright holders.
;;;
;;; This program is free software; you can redistribute it
;;; and/or modify it under the terms of the GNU General
;;; Public License as published by the Free Software
;;; Foundation; either version 2 of the License, or (at
;;; your option) any later version.
;;;
;;; This program is distributed in the hope that it will be
;;; useful, but WITHOUT ANY WARRANTY; without even the
;;; implied warranty of MERCHANTABILITY or FITNESS FOR A
;;; PARTICULAR PURPOSE. See the GNU General Public License
;;; for more details.
;;;
;;; The GNU General Public License can be found in the file
;;; LICENSE which should have been distributed with this
;;; code.  It can also be found at
;;;
;;; http://www.opensource.org/licenses/gpl-license.php
;;;
;;; You should have received a copy of the GNU General
;;; Public License along with this program; if not, write
;;; to the Free Software Foundation, Inc., 59 Temple Place,
;;; Suite 330, Boston, MA 02111-1307 USA
;;;

(in-package "ELEPHANT")

;;; collection types
;;; we're slot-less
(defclass persistent-collection (persistent) ()
  (:documentation "Abstract superclass of all collection types."))

;;; btree access
(defclass btree (persistent-collection) ()
  (:documentation "A hash-table like interface to a BTree,
which stores things in a semi-ordered fashion."))

(defgeneric get-value (key bt)
  (:documentation "Get a value from a Btree."))

(defgeneric (setf get-value) (value key bt)
  (:documentation "Put a key / value pair into a BTree."))

(defgeneric remove-kv (key bt)
  (:documentation "Remove a key / value pair from a BTree."))

(defmethod get-value (key (bt btree))
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf value-buf)
    (buffer-write-int (oid bt) key-buf)
    (serialize key key-buf)
    (let ((buf (db-get-key-buffered 
		(controller-btrees *store-controller*) 
		key-buf value-buf)))
      (if buf (values (deserialize buf) T)
	  (values nil nil)))))

(defmethod (setf get-value) (value key (bt btree))
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf value-buf)
    (buffer-write-int (oid bt) key-buf)
    (serialize key key-buf)
    (serialize value value-buf)
    (db-put-buffered (controller-btrees *store-controller*) 
		     key-buf value-buf
		     :auto-commit *auto-commit*)
    value))

(defmethod remove-kv (key (bt btree))
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf)
    (buffer-write-int (oid bt) key-buf)
    (serialize key key-buf)
    (db-delete-buffered (controller-btrees *store-controller*) 
			key-buf	:auto-commit *auto-commit*)))


;; Secondary indices

(defclass indexed-btree (btree)
  ((indices :accessor indices :initform (make-hash-table))
   (indices-cache :accessor indices-cache :initform (make-hash-table)
		  :transient t))
  (:metaclass persistent-metaclass)
  (:documentation "A BTree which supports secondary indices."))

(defmethod shared-initialize :after ((instance indexed-btree) slot-names
				     &rest rest)
  (declare (ignore slot-names rest))
  (setf (indices-cache instance) (indices instance)))

(defgeneric add-index (bt &key index-name key-form populate)
  (:documentation 
   "Add a secondary index.  The indices are stored in an eq
hash-table, so the index-name should be a symbol.  key-form
should be a symbol naming a function, or a list which
defines a lambda -- actual functions aren't supported.  The
function should take 3 arguments: the secondary DB, primary
key and value, and return two values: a boolean indicating
whether to index this key / value, and the secondary key if
so.  If populate = t it will fill in secondary keys for
existing primary entries (may be expensive!)"))

(defgeneric get-index (bt index-name)
  (:documentation "Get a named index."))

(defgeneric remove-index (bt index-name)
  (:documentation "Remove a named index."))

(defmethod add-index ((bt indexed-btree) &key index-name key-form populate)
  (if (and (not (null index-name))
	   (symbolp index-name) (or (symbolp key-form) (listp key-form)))
      (let ((indices (indices bt))
	    (index (make-instance 'btree-index :primary bt 
				  :key-form key-form)))
	(setf (gethash index-name (indices-cache bt)) index)
	(setf (gethash index-name indices) index)
	(setf (indices bt) indices)
	(when populate
	  (let ((key-fn (key-fn index)))
	    (with-buffer-streams (primary-buf secondary-buf)	      
	      (with-transaction ()
		(map-btree 
		 #'(lambda (k v)
		     (multiple-value-bind (index? secondary-key)
			 (funcall key-fn index k v)
		       (when index?
			 (buffer-write-int (oid bt) primary-buf)
			 (serialize k primary-buf)
			 (buffer-write-int (oid index) secondary-buf)
			 (serialize secondary-key secondary-buf)
			 ;; should silently do nothing if
			 ;; the key/value already exists
			 (db-put-buffered 
			  (controller-indices *store-controller*)
			  secondary-buf primary-buf)
			 (reset-buffer-stream primary-buf)
			 (reset-buffer-stream secondary-buf))))
		 bt
		 :degree-2 t)))))
	index)
      (error "Invalid index initargs!")))

(defmethod get-index ((bt indexed-btree) index-name)
  (gethash index-name (indices-cache bt)))

(defmethod remove-index ((bt indexed-btree) index-name)
  (remhash index-name (indices-cache bt))
  (let ((indices (indices bt)))
    (remhash index-name indices)
    (setf (indices bt) indices)))

(defmethod (setf get-value) (value key (bt indexed-btree))
  "Set a key / value pair, and update secondary indices."
  (declare (optimize (speed 3)))
  (let ((indices (indices-cache bt)))
    (with-buffer-streams (key-buf value-buf secondary-buf)
      (buffer-write-int (oid bt) key-buf)
      (serialize key key-buf)
      (serialize value value-buf)
      (with-transaction ()
	(db-put-buffered (controller-btrees *store-controller*) 
			 key-buf value-buf)
	(loop for index being the hash-value of indices
	      do
	      (multiple-value-bind (index? secondary-key)
		  (funcall (key-fn index) index key value)
		(when index?
		  (buffer-write-int (oid index) secondary-buf)
		  (serialize secondary-key secondary-buf)
		  ;; should silently do nothing if the key/value already
		  ;; exists
		  (db-put-buffered (controller-indices *store-controller*)
				   secondary-buf key-buf)
		  (reset-buffer-stream secondary-buf))))
	value))))

(defmethod remove-kv (key (bt indexed-btree))
  "Remove a key / value pair, and update secondary indices."
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf secondary-buf)
    (buffer-write-int (oid bt) key-buf)
    (serialize key key-buf)
    (with-transaction ()
      (let ((value (get-value key bt)))
	(when value
	  (let ((indices (indices-cache bt)))
	    (loop 
	     for index being the hash-value of indices
	     do
	     (multiple-value-bind (index? secondary-key)
		 (funcall (key-fn index) index key value)
	       (when index?
		 (buffer-write-int (oid index) secondary-buf)
		 (serialize secondary-key secondary-buf)
		 ;; need to remove kv pairs with a cursor! --
		 ;; this is a C performance hack
		 (sleepycat::db-delete-kv-buffered 
		  (controller-indices *store-controller*)
		  secondary-buf key-buf)
		 (reset-buffer-stream secondary-buf))))
	    (db-delete-buffered (controller-btrees *store-controller*) 
				key-buf)))))))

(defclass btree-index (btree)
  ((primary :type indexed-btree :reader primary :initarg :primary)
   (key-form :reader key-form :initarg :key-form)
   (key-fn :type function :accessor key-fn :transient t))
  (:metaclass persistent-metaclass)
  (:documentation "Secondary index to an indexed-btree."))

(defmethod shared-initialize :after ((instance btree-index) slot-names
				     &rest rest)
  (declare (ignore slot-names rest))
  (let ((key-form (key-form instance)))
    (if (and (symbolp key-form) (fboundp key-form))
	(setf (key-fn instance) (fdefinition key-form))
	(setf (key-fn instance) (compile nil key-form)))))

(defmethod get-value (key (bt btree-index))
  "Get the value in the primary DB from a secondary key."
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf value-buf)
    (buffer-write-int (oid bt) key-buf)
    (serialize key key-buf)
    (let ((buf (db-get-key-buffered 
		(controller-indices-assoc *store-controller*) 
		key-buf value-buf)))
      (if buf (values (deserialize buf) T)
	  (values nil nil)))))

(defmethod (setf get-value) (value key (bt btree-index))
  "Puts are not allowed on secondary indices.  Try adding to
the primary."
  (declare (ignore value key bt))
  (error "Puts are forbidden on secondary indices.  Try adding to the primary."))

(defgeneric get-primary-key (key bt)
  (:documentation "Get the primary key from a secondary key."))

(defmethod get-primary-key (key (bt btree-index))
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf value-buf)
    (buffer-write-int (oid bt) key-buf)
    (serialize key key-buf)
    (let ((buf (db-get-key-buffered 
		(controller-indices *store-controller*) 
		key-buf value-buf)))
      (if buf 
	  (let ((oid (buffer-read-fixnum buf)))
	    (values (deserialize buf) oid))
	  (values nil nil)))))

(defmethod remove-kv (key (bt btree-index))
  "Remove a key / value from the PRIMARY by a secondary
lookup, updating ALL other secondary indices."
  (declare (optimize (speed 3)))
  (remove-kv (get-primary-key key bt) (primary bt)))


;; Cursor operations

(defclass cursor ()
  ((handle :accessor cursor-handle :initarg :handle)
   (oid :accessor cursor-oid :type fixnum :initarg :oid)
   (initialized-p :accessor cursor-initialized-p
		  :type boolean :initform nil :initarg :initialized-p)
   (btree :accessor cursor-btree :initarg :btree))
  (:documentation "A cursor for traversing (primary) BTrees."))

(defgeneric make-cursor (bt &key degree-2 dirty-read)
  (:documentation "Construct a cursor for traversing BTrees."))

(defgeneric cursor-close (cursor)
  (:documentation 
   "Close the cursor.  Make sure to close cursors before the
enclosing transaction is closed!"))

(defgeneric cursor-duplicate (cursor)
  (:documentation "Duplicate a cursor."))

(defgeneric cursor-current (cursor)
  (:documentation 
   "Get the key / value at the cursor position.  Returns
has-pair key value, where has-pair is a boolean indicating
there was a pair."))

(defgeneric cursor-first (cursor)
  (:documentation 
   "Move the cursor to the beginning of the BTree, returning
has-pair key value."))

(defgeneric cursor-last (cursor)
  (:documentation 
   "Move the cursor to the end of the BTree, returning
has-pair key value."))

(defgeneric cursor-next (cursor)   
  (:documentation 
   "Advance the cursor, returning has-pair key value."))

(defgeneric cursor-prev (cursor)
  (:documentation 
   "Move the cursor back, returning has-pair key value."))

(defgeneric cursor-set (cursor key)
  (:documentation 
   "Move the cursor to a particular key, returning has-pair
key value."))

(defgeneric cursor-set-range (cursor key) 
  (:documentation 
   "Move the cursor to the first key-value pair with key
greater or equal to the key argument, according to the lisp
sorter.  Returns has-pair key value."))

(defgeneric cursor-get-both (cursor key value)
  (:documentation 
   "Moves the cursor to a particular key / value pair,
returning has-pair key value."))

(defgeneric cursor-get-both-range (cursor key value)
  (:documentation 
   "Moves the cursor to the first key / value pair with key
equal to the key argument and value greater or equal to the
value argument.  Not really useful for us since primaries
don't have duplicates.  Returns has-pair key value."))

(defgeneric cursor-delete (cursor)
  (:documentation 
   "Delete by cursor.  The cursor is at an invalid position
after a successful delete."))

(defgeneric cursor-put (cursor value &key key)
  (:documentation 
  "Put by cursor.  Currently doesn't properly move the
cursor."))

(defmethod make-cursor ((bt btree) &key degree-2 dirty-read)
  "Make a cursor from a btree."
  (declare (optimize (speed 3)))
  (make-instance 'cursor 
		 :btree bt
		 :handle (db-cursor (controller-btrees *store-controller*)
				    :degree-2 degree-2 :dirty-read dirty-read)
		 :oid (oid bt)))

(defmacro with-btree-cursor ((var bt &key degree-2 dirty-read) &body body)
  "Macro which opens a named cursor on a BTree (primary or
not), evaluates the forms, then closes the cursor."
  `(let ((,var (make-cursor ,bt :degree-2 ,degree-2 :dirty-read ,dirty-read)))
    (unwind-protect
	 (progn , at body)
      (cursor-close ,var))))

(defun map-btree (fn bt &key degree-2 dirty-read)
  "Like maphash."
  (with-btree-cursor (curs bt :degree-2 degree-2 :dirty-read dirty-read)
    (loop
     (multiple-value-bind (more k v) (cursor-next curs)
       (unless more (return nil))
       (funcall fn k v)))))       

(defmethod cursor-close ((cursor cursor))
  (declare (optimize (speed 3)))
  (db-cursor-close (cursor-handle cursor))
  (setf (cursor-initialized-p cursor) nil))

(defmethod cursor-duplicate ((cursor cursor))
  (declare (optimize (speed 3)))
  (make-instance (type-of cursor)
		 :initialized-p (cursor-initialized-p cursor)
		 :oid (cursor-oid cursor)
		 :handle (db-cursor-duplicate 
			  (cursor-handle cursor) 
			  :position (cursor-initialized-p cursor))))

(defmethod cursor-current ((cursor cursor))
  (declare (optimize (speed 3)))
  (when (cursor-initialized-p cursor)
    (with-buffer-streams (key-buf value-buf)
      (multiple-value-bind (key val)
	  (db-cursor-move-buffered (cursor-handle cursor) key-buf value-buf
				   :current t)
	(if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	    (progn (setf (cursor-initialized-p cursor) t)
		   (values t (deserialize key) (deserialize val)))
	    (setf (cursor-initialized-p cursor) nil))))))

(defmethod cursor-first ((cursor cursor))
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf value-buf)
    (buffer-write-int (cursor-oid cursor) key-buf)
    (multiple-value-bind (key val)
	(db-cursor-set-buffered (cursor-handle cursor) 
				key-buf value-buf :set-range t)
      (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	  (progn (setf (cursor-initialized-p cursor) t)
		 (values t (deserialize key) (deserialize val)))
	  (setf (cursor-initialized-p cursor) nil)))))
		 
;;A bit of a hack.....
(defmethod cursor-last ((cursor cursor))
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf value-buf)
    (buffer-write-int (+ (cursor-oid cursor) 1) key-buf)
    (if (db-cursor-set-buffered (cursor-handle cursor) 
				key-buf value-buf :set-range t)    
	(progn (reset-buffer-stream key-buf)
	       (reset-buffer-stream value-buf)
	       (multiple-value-bind (key val)
		   (db-cursor-move-buffered (cursor-handle cursor) 
					    key-buf value-buf :prev t)
		 (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
		     (progn
		       (setf (cursor-initialized-p cursor) t)
		       (values t (deserialize key) (deserialize val)))
		     (setf (cursor-initialized-p cursor) nil))))
	(multiple-value-bind (key val)
	    (db-cursor-move-buffered (cursor-handle cursor) key-buf
				     value-buf :last t)
	  (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	      (progn
		(setf (cursor-initialized-p cursor) t)
		(values t (deserialize key) (deserialize val)))
	      (setf (cursor-initialized-p cursor) nil))))))

(defmethod cursor-next ((cursor cursor))
  (declare (optimize (speed 3)))
  (if (cursor-initialized-p cursor)
      (with-buffer-streams (key-buf value-buf)
	(multiple-value-bind (key val)
	    (db-cursor-move-buffered (cursor-handle cursor) 
				     key-buf value-buf :next t)
	  (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	      (values t (deserialize key) (deserialize val))
	      (setf (cursor-initialized-p cursor) nil))))
      (cursor-first cursor)))
	  
(defmethod cursor-prev ((cursor cursor))
  (declare (optimize (speed 3)))
  (if (cursor-initialized-p cursor)
      (with-buffer-streams (key-buf value-buf)
	(multiple-value-bind (key val)
	    (db-cursor-move-buffered (cursor-handle cursor)
				     key-buf value-buf :prev t)
	  (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	      (values t (deserialize key) (deserialize val))
	      (setf (cursor-initialized-p cursor) nil))))
      (cursor-last cursor)))
	  
(defmethod cursor-set ((cursor cursor) key)
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf value-buf)
    (buffer-write-int (cursor-oid cursor) key-buf)
    (serialize key key-buf)
    (multiple-value-bind (k val)
	(db-cursor-set-buffered (cursor-handle cursor)
				key-buf value-buf :set t)
      (if k
	  (progn (setf (cursor-initialized-p cursor) t)
		 (values t key (deserialize val)))
	  (setf (cursor-initialized-p cursor) nil)))))

(defmethod cursor-set-range ((cursor cursor) key)
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf value-buf)
    (buffer-write-int (cursor-oid cursor) key-buf)
    (serialize key key-buf)
    (multiple-value-bind (k val)
	(db-cursor-set-buffered (cursor-handle cursor)
				key-buf value-buf :set-range t)
      (if (and k (= (buffer-read-int k) (cursor-oid cursor)))
	  (progn (setf (cursor-initialized-p cursor) t)
		 (values t (deserialize k) (deserialize val)))
	  (setf (cursor-initialized-p cursor) nil)))))

(defmethod cursor-get-both ((cursor cursor) key value)
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf value-buf)
    (buffer-write-int (cursor-oid cursor) key-buf)
    (serialize key key-buf)
    (serialize value value-buf)
    (multiple-value-bind (k v)
	(db-cursor-get-both-buffered (cursor-handle cursor)
				     key-buf value-buf :get-both t)
      (declare (ignore v))
      (if k
	  (progn (setf (cursor-initialized-p cursor) t)
		 (values t key value))
	  (setf (cursor-initialized-p cursor) nil)))))

(defmethod cursor-get-both-range ((cursor cursor) key value)
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf value-buf)
    (buffer-write-int (cursor-oid cursor) key-buf)
    (serialize key key-buf)
    (serialize value value-buf)
    (multiple-value-bind (k v)
	(db-cursor-get-both-buffered (cursor-handle cursor)
				     key-buf value-buf :get-both-range t)
      (if k
	  (progn (setf (cursor-initialized-p cursor) t)
		 (values t key (deserialize v)))
	  (setf (cursor-initialized-p cursor) nil)))))

(defmethod cursor-delete ((cursor cursor))
  (declare (optimize (speed 3)))
  (if (cursor-initialized-p cursor)
      (with-buffer-streams (key-buf value-buf)
	(multiple-value-bind (key val)
	    (db-cursor-move-buffered (cursor-handle cursor) key-buf value-buf
				     :current t)
	  (declare (ignore val))
	  (when (and key (= (buffer-read-int key) (cursor-oid cursor)))
	    ;; in case of a secondary index this should delete everything
	    ;; as specified by the BDB docs.
	    (remove-kv (deserialize key) (cursor-btree cursor)))
	  (setf (cursor-initialized-p cursor) nil)))
      (error "Can't delete with uninitialized cursor!")))

(defmethod cursor-put ((cursor cursor) value &key (key nil key-specified-p))
  "Put by cursor.  Not particularly useful since primaries
don't support duplicates.  Currently doesn't properly move
the cursor."
  (declare (optimize (speed 3)))
  (if key-specified-p
      (setf (get-value key (cursor-btree cursor)) value)
      (if (cursor-initialized-p cursor)
	  (with-buffer-streams (key-buf value-buf)
	    (multiple-value-bind (k v)
		(db-cursor-move-buffered (cursor-handle cursor) key-buf 
					 value-buf :current t)
	      (declare (ignore v))
	      (if (and k (= (buffer-read-int k) (cursor-oid cursor)))
		  (setf (get-value (deserialize k) (cursor-btree cursor)) 
			value)
		  (setf (cursor-initialized-p cursor) nil))))
	  (error "Can't put with uninitialized cursor!"))))

;; Secondary cursors

(defclass secondary-cursor (cursor) ()
  (:documentation "Cursor for traversing secondary indices."))

(defgeneric cursor-pcurrent (cursor)
  (:documentation 
   "Returns has-tuple / secondary key / value / primary key
at the current position."))

(defgeneric cursor-pfirst (cursor)
  (:documentation 
   "Moves the key to the beginning of the secondary index.
Returns has-tuple / secondary key / value / primary key."))

(defgeneric cursor-plast (cursor)
  (:documentation 
   "Moves the key to the end of the secondary index.  Returns
has-tuple / secondary key / value / primary key."))

(defgeneric cursor-pnext (cursor)
  (:documentation 
   "Advances the cursor.  Returns has-tuple / secondary key /
value / primary key."))

(defgeneric cursor-pprev (cursor)
  (:documentation 
   "Moves the cursor back.  Returns has-tuple / secondary key
/ value / primary key."))

(defgeneric cursor-pset (cursor key)
  (:documentation 
  "Moves the cursor to a particular key.  Returns has-tuple
/ secondary key / value / primary key."))

(defgeneric cursor-pset-range (cursor key)
  (:documentation 
   "Move the cursor to the first key-value pair with key
greater or equal to the key argument, according to the lisp
sorter.  Returns has-pair secondary key value primary key."))

(defgeneric cursor-pget-both (cursor key value)
  (:documentation 
   "Moves the cursor to a particular secondary key / primary
key pair.  Returns has-tuple / secondary key / value /
primary key."))

(defgeneric cursor-pget-both-range (cursor key value)
  (:documentation 
   "Moves the cursor to a the first secondary key / primary
key pair, with secondary key equal to the key argument, and
primary key greater or equal to the pkey argument.  Returns
has-tuple / secondary key / value / primary key."))

(defgeneric cursor-next-dup (cursor)
  (:documentation 
   "Move to the next duplicate element (with the same key.)
Returns has-pair key value."))

(defgeneric cursor-next-nodup (cursor)
  (:documentation 
   "Move to the next non-duplicate element (with different
key.)  Returns has-pair key value."))

(defgeneric cursor-prev-nodup (cursor)
  (:documentation 
   "Move to the previous non-duplicate element (with
different key.)  Returns has-pair key value."))

(defgeneric cursor-pnext-dup (cursor)
  (:documentation 
   "Move to the next duplicate element (with the same key.)
Returns has-tuple / secondary key / value / primary key."))

(defgeneric cursor-pnext-nodup (cursor)
  (:documentation 
   "Move to the next non-duplicate element (with different
key.)  Returns has-tuple / secondary key / value / primary
key."))

(defgeneric cursor-pprev-nodup (cursor)
  (:documentation 
   "Move to the previous non-duplicate element (with
different key.)  Returns has-tuple / secondary key / value /
primary key."))

(defmethod make-cursor ((bt btree-index) &key degree-2 dirty-read)
  "Make a secondary-cursor from a secondary index."
  (declare (optimize (speed 3)))
  (make-instance 'secondary-cursor 
		 :btree bt
		 :handle (db-cursor 
			  (controller-indices-assoc *store-controller*)
			  :degree-2 degree-2 :dirty-read dirty-read)
		 :oid (oid bt)))

(defmethod cursor-pcurrent ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (when (cursor-initialized-p cursor)
    (with-buffer-streams (key-buf pkey-buf value-buf)
      (multiple-value-bind (key pkey val)
	  (db-cursor-pmove-buffered (cursor-handle cursor) 
				    key-buf pkey-buf value-buf
				    :current t)
	(if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	    (progn (setf (cursor-initialized-p cursor) t)
		   (values t (deserialize key) (deserialize val)
			   (progn (buffer-read-int pkey) (deserialize pkey))))
	    (setf (cursor-initialized-p cursor) nil))))))

(defmethod cursor-pfirst ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf pkey-buf value-buf)
    (buffer-write-int (cursor-oid cursor) key-buf)
    (multiple-value-bind (key pkey val)
	(db-cursor-pset-buffered (cursor-handle cursor) 
				 key-buf pkey-buf value-buf :set-range t)
      (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	  (progn (setf (cursor-initialized-p cursor) t)
		 (values t (deserialize key) (deserialize val)
			 (progn (buffer-read-int pkey) (deserialize pkey))))
	  (setf (cursor-initialized-p cursor) nil)))))
		 
;;A bit of a hack.....
(defmethod cursor-plast ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf pkey-buf value-buf)
    (buffer-write-int (+ (cursor-oid cursor) 1) key-buf)
    (if (db-cursor-set-buffered (cursor-handle cursor) 
				key-buf value-buf :set-range t)    
	(progn (reset-buffer-stream key-buf)
	       (reset-buffer-stream value-buf)
	       (multiple-value-bind (key pkey val)
		   (db-cursor-pmove-buffered (cursor-handle cursor) key-buf 
					     pkey-buf value-buf :prev t)
		 (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
		     (progn
		       (setf (cursor-initialized-p cursor) t)
		       (values t (deserialize key) (deserialize val)
			       (progn (buffer-read-int pkey) 
				      (deserialize pkey))))
		     (setf (cursor-initialized-p cursor) nil))))
	(multiple-value-bind (key pkey val)
	    (db-cursor-pmove-buffered (cursor-handle cursor) key-buf
				      pkey-buf value-buf :last t)
	  (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	      (progn
		(setf (cursor-initialized-p cursor) t)
		(values t (deserialize key) (deserialize val)
			(progn (buffer-read-int pkey) (deserialize pkey))))
	      (setf (cursor-initialized-p cursor) nil))))))

(defmethod cursor-pnext ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (if (cursor-initialized-p cursor)
      (with-buffer-streams (key-buf pkey-buf value-buf)
	(multiple-value-bind (key pkey val)
	    (db-cursor-pmove-buffered (cursor-handle cursor) 
				     key-buf pkey-buf value-buf :next t)
	  (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	      (values t (deserialize key) (deserialize val)
		      (progn (buffer-read-int pkey) (deserialize pkey)))
	      (setf (cursor-initialized-p cursor) nil))))
      (cursor-pfirst cursor)))
	  
(defmethod cursor-pprev ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (if (cursor-initialized-p cursor)
      (with-buffer-streams (key-buf pkey-buf value-buf)
	(multiple-value-bind (key pkey val)
	    (db-cursor-pmove-buffered (cursor-handle cursor)
				      key-buf pkey-buf value-buf :prev t)
	  (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	      (values t (deserialize key) (deserialize val)
		      (progn (buffer-read-int pkey) (deserialize pkey)))
	      (setf (cursor-initialized-p cursor) nil))))
      (cursor-plast cursor)))
	  
(defmethod cursor-pset ((cursor secondary-cursor) key)
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf pkey-buf value-buf)
    (buffer-write-int (cursor-oid cursor) key-buf)
    (serialize key key-buf)
    (multiple-value-bind (k pkey val)
	(db-cursor-pset-buffered (cursor-handle cursor)
				 key-buf pkey-buf value-buf :set t)
      (if k
	  (progn (setf (cursor-initialized-p cursor) t)
		 (values t key (deserialize val)
			 (progn (buffer-read-int pkey) (deserialize pkey))))
	  (setf (cursor-initialized-p cursor) nil)))))

(defmethod cursor-pset-range ((cursor secondary-cursor) key)
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf pkey-buf value-buf)
    (buffer-write-int (cursor-oid cursor) key-buf)
    (serialize key key-buf)
    (multiple-value-bind (k pkey val)
	(db-cursor-pset-buffered (cursor-handle cursor)
				 key-buf pkey-buf value-buf :set-range t)
      (if (and k (= (buffer-read-int k) (cursor-oid cursor)))
	  (progn (setf (cursor-initialized-p cursor) t)
		 (values t (deserialize k) (deserialize val)
			 (progn (buffer-read-int pkey) (deserialize pkey))))
	  (setf (cursor-initialized-p cursor) nil)))))

(defmethod cursor-pget-both ((cursor secondary-cursor) key pkey)
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf pkey-buf value-buf)
    (let ((primary-oid (oid (primary (cursor-btree cursor)))))
      (buffer-write-int (cursor-oid cursor) key-buf)
      (serialize key key-buf)
      (buffer-write-int primary-oid pkey-buf)
      (serialize pkey pkey-buf)
      (multiple-value-bind (k p val)
	  (db-cursor-pget-both-buffered (cursor-handle cursor)
					key-buf pkey-buf value-buf :get-both t)
	(declare (ignore p))
	(if k
	    (progn (setf (cursor-initialized-p cursor) t)
		   (values t key (deserialize val) pkey))
	    (setf (cursor-initialized-p cursor) nil))))))

(defmethod cursor-pget-both-range ((cursor secondary-cursor) key pkey)
  (declare (optimize (speed 3)))
  (with-buffer-streams (key-buf pkey-buf value-buf)
    (let ((primary-oid (oid (primary (cursor-btree cursor)))))    
      (buffer-write-int (cursor-oid cursor) key-buf)
      (serialize key key-buf)
      (buffer-write-int primary-oid pkey-buf)
      (serialize pkey pkey-buf)
      (multiple-value-bind (k p val)
	  (db-cursor-pget-both-buffered (cursor-handle cursor) key-buf 
					pkey-buf value-buf :get-both-range t)
	(if k
	    (progn (setf (cursor-initialized-p cursor) t)
		   (values t key (deserialize val)
			   (progn (buffer-read-int p) (deserialize p))))
	    (setf (cursor-initialized-p cursor) nil))))))

(defmethod cursor-delete ((cursor secondary-cursor))
  "Delete by cursor: deletes ALL secondary indices."
  (declare (optimize (speed 3)))
  (if (cursor-initialized-p cursor)
      (with-buffer-streams (key-buf pkey-buf value-buf)
	(multiple-value-bind (key pkey val)
	    (db-cursor-pmove-buffered (cursor-handle cursor) key-buf pkey-buf
				      value-buf :current t)
	  (declare (ignore val))
	  (when (and key (= (buffer-read-int key) (cursor-oid cursor))
		     (= (buffer-read-int pkey) (oid (primary 
						     (cursor-btree cursor)))))
	    (remove-kv (deserialize pkey) (primary (cursor-btree cursor))))
	  (setf (cursor-initialized-p cursor) nil)))
      (error "Can't delete with uninitialized cursor!")))

(defmethod cursor-get-both ((cursor secondary-cursor) key value)
  "cursor-get-both not implemented for secondary indices.
Use cursor-pget-both."
  (declare (ignore cursor key value))
  (error "cursor-get-both not implemented on secondary
indices.  Use cursor-pget-both."))

(defmethod cursor-get-both-range ((cursor secondary-cursor) key value)
  "cursor-get-both-range not implemented for secondary indices.
Use cursor-pget-both-range."
  (declare (ignore cursor key value))
  (error "cursor-get-both-range not implemented on secondary indices.  Use cursor-pget-both-range."))

(defmethod cursor-put ((cursor secondary-cursor) value &rest rest)
  "Puts are forbidden on secondary indices.  Try adding to
the primary."
  (declare (ignore rest value cursor))
  (error "Puts are forbidden on secondary indices.  Try adding to the primary."))

(defmethod cursor-next-dup ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (when (cursor-initialized-p cursor)
    (with-buffer-streams (key-buf value-buf)
      (multiple-value-bind (key val)
	  (db-cursor-move-buffered (cursor-handle cursor)
				   key-buf value-buf :next-dup t)
	(if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	    (values t (deserialize key) (deserialize val))
	    (setf (cursor-initialized-p cursor) nil))))))
	  
(defmethod cursor-next-nodup ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (if (cursor-initialized-p cursor)
      (with-buffer-streams (key-buf value-buf)
	(multiple-value-bind (key val)
	    (db-cursor-move-buffered (cursor-handle cursor)
				     key-buf value-buf :next-nodup t)
	  (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	      (values t (deserialize key) (deserialize val))
	      (setf (cursor-initialized-p cursor) nil))))
      (cursor-first cursor)))	  

(defmethod cursor-prev-nodup ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (if (cursor-initialized-p cursor)
      (with-buffer-streams (key-buf value-buf)
	(multiple-value-bind (key val)
	    (db-cursor-move-buffered (cursor-handle cursor)
				     key-buf value-buf :prev-nodup t)
	  (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	      (values t (deserialize key) (deserialize val))
	      (setf (cursor-initialized-p cursor) nil))))
      (cursor-last cursor)))

(defmethod cursor-pnext-dup ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (when (cursor-initialized-p cursor)
    (with-buffer-streams (key-buf pkey-buf value-buf)
      (multiple-value-bind (key pkey val)
	  (db-cursor-pmove-buffered (cursor-handle cursor)
				    key-buf pkey-buf value-buf :next-dup t)
	(if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	    (values t (deserialize key) (deserialize val)
		    (progn (buffer-read-int pkey) (deserialize pkey)))
	    (setf (cursor-initialized-p cursor) nil))))))
	  
(defmethod cursor-pnext-nodup ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (if (cursor-initialized-p cursor)
      (with-buffer-streams (key-buf pkey-buf value-buf)
	(multiple-value-bind (key pkey val)
	    (db-cursor-pmove-buffered (cursor-handle cursor) key-buf
				      pkey-buf value-buf :next-nodup t)
	  (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	      (values t (deserialize key) (deserialize val)
		      (progn (buffer-read-int pkey) (deserialize pkey)))
	      (setf (cursor-initialized-p cursor) nil))))
      (cursor-pfirst cursor)))

(defmethod cursor-pprev-nodup ((cursor secondary-cursor))
  (declare (optimize (speed 3)))
  (if (cursor-initialized-p cursor)
      (with-buffer-streams (key-buf pkey-buf value-buf)
	(multiple-value-bind (key pkey val)
	    (db-cursor-pmove-buffered (cursor-handle cursor) key-buf
				      pkey-buf value-buf :prev-nodup t)
	  (if (and key (= (buffer-read-int key) (cursor-oid cursor)))
	      (values t (deserialize key) (deserialize val)
		      (progn (buffer-read-int pkey) (deserialize pkey)))
	      (setf (cursor-initialized-p cursor) nil))))
      (cursor-plast cursor)))



More information about the elephant-devel mailing list