[nio-cvs] r41 - in branches/home/psmith/restructure: . src/buffer src/compat src/io src/protocol/yarpc src/statemachine

psmith at common-lisp.net psmith at common-lisp.net
Thu Jan 18 04:01:11 UTC 2007


Author: psmith
Date: Wed Jan 17 23:01:11 2007
New Revision: 41

Modified:
   branches/home/psmith/restructure/run-yarpc.lisp
   branches/home/psmith/restructure/src/buffer/buffer.lisp
   branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
   branches/home/psmith/restructure/src/io/async-fd.lisp
   branches/home/psmith/restructure/src/io/async-socket.lisp
   branches/home/psmith/restructure/src/io/nio-server.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
   branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp
   branches/home/psmith/restructure/src/statemachine/state-machine.lisp
Log:
Yarpc working end-to-end

Modified: branches/home/psmith/restructure/run-yarpc.lisp
==============================================================================
--- branches/home/psmith/restructure/run-yarpc.lisp	(original)
+++ branches/home/psmith/restructure/run-yarpc.lisp	Wed Jan 17 23:01:11 2007
@@ -2,11 +2,10 @@
 (require :asdf)
 (require :nio-yarpc)
 
-(let ((jobq (nio-compat:concurrent-queue)))
-  (sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity #'(lambda()(nio-yarpc:yarpc-state-machine jobq)) :host "127.0.0.1")) :name "nio-server")
-  (format t "server toplevel waiting for job~%" )
-  (loop
+(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1")) :name "nio-server")
+(loop
     ;;block waiting for jobs
-    (multiple-value-bind (job result-queue) (nio-compat:take jobq)
+    (format t "Server toplevel waiting for job~%" )
+    (destructuring-bind (job result-queue) (nio-compat:take nio-yarpc:job-queue)
       (format t "Server received job ~A~%" job)
-      (nio-compat:add result-queue (nio-yarpc:execute-call job)))))
+      (nio-compat:add result-queue (nio-yarpc:execute-call job))))

Modified: branches/home/psmith/restructure/src/buffer/buffer.lisp
==============================================================================
--- branches/home/psmith/restructure/src/buffer/buffer.lisp	(original)
+++ branches/home/psmith/restructure/src/buffer/buffer.lisp	Wed Jan 17 23:01:11 2007
@@ -55,25 +55,45 @@
 
 ;;Utils by slyrus (http://paste.lisp.org/display/11149)
 (defun hex-dump-byte (address)
-  (format nil "~2,'0X"
-          (sb-alien:deref
-           (sb-alien:sap-alien
-            (sb-alien::int-sap address)
-            (* (sb-alien:unsigned 8))))))
+  (format nil "~2,'0X" (byte-value address)))
+
+(defun byte-value (address)
+  (sb-alien:deref
+   (sb-alien:sap-alien
+    (sb-alien::int-sap address)
+    (* (sb-alien:unsigned 8)))))
 
 (defun hex-dump-memory (start-address length)
   (loop for i from start-address below (+ start-address length)
      collect (format nil (hex-dump-byte i))))
 
+;;-- end utils
+
+
+(defun pretty-hex-dump (start-address length)
+;  (format t "start: ~A length ~A~%" start-address length)
+  (with-output-to-string (str)
+    (let ((rows (floor (/ length 16))))
+;      (format t "rows: ~A remainder ~A~%" rows remainder)
+      (dotimes (row-index (+ 1 rows))
+	(format str "~A~%"
+		(with-output-to-string (readable)
+		  (dotimes (column-index 16)
+		    (let ((address (+ start-address (* row-index 16) column-index)))
+					;	    (format t "Current address : ~A~%" address)
+		      (if (>= address (+ start-address length)) 
+			  (progn 
+			    (format str "--")
+			    (format readable "--"))
+			  (progn 
+			    (format str (if (eql column-index 7) "~A   " "~A ") (hex-dump-byte address))
+			    (format readable "~A" (code-char (byte-value address)))))))))))))
 
 (defun make-uint8-seq (size)
   "Make uint8 sequence."
   (make-sequence '(vector (unsigned-byte 8)) size :initial-element 0))
 
 
-;;-- end utils
-
-
 ;;A buffer that deals with bytes
 (defclass byte-buffer (buffer)())
 
@@ -83,7 +103,7 @@
 
 (defmethod print-object ((byte-buffer byte-buffer) stream)
   (with-slots (capacity position limit buf) byte-buffer
-    (format stream "<byte-buffer :capacity ~A :position ~A :limit ~A :buf ~%~A>~%" capacity position limit (if buf (hex-dump-memory (cffi:pointer-address buf) limit) nil))))
+    (format stream "<byte-buffer :capacity ~A :position ~A :limit ~A :buf ~%~A>~%" capacity position limit (if buf (pretty-hex-dump (cffi:pointer-address buf) limit) nil))))
 
 (defmethod free-buffer((byte-buffer byte-buffer))
   (with-slots (capacity position limit buf) byte-buffer

Modified: branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
==============================================================================
--- branches/home/psmith/restructure/src/compat/concurrent-queue.lisp	(original)
+++ branches/home/psmith/restructure/src/compat/concurrent-queue.lisp	Wed Jan 17 23:01:11 2007
@@ -47,7 +47,7 @@
   `(if ,a-buffer 
        (let ((head (car ,a-buffer)))
 	 (setf ,a-buffer (cdr ,a-buffer))
-#+nio-debug (format t "reader ~A woke, read ~A as ~A~%" sb-thread:*current-thread* head ,loc)
+#+nio-debug (format t "concurent-queue:take - (~A) read ~A at ~A~%" sb-thread:*current-thread* head ,loc)
 	 head)
        nil))
 
@@ -64,6 +64,7 @@
 
 ;Append the element to the tail of this queue
 (defmethod add ((queue concurrent-queue) elt)
+#+nio-debug (format t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt)
   (sb-thread:with-mutex ((buffer-lock queue))
     (setf (buffer queue) (append (buffer queue) (list elt)) )
     (sb-thread:condition-notify (buffer-queue queue))))

Modified: branches/home/psmith/restructure/src/io/async-fd.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-fd.lisp	(original)
+++ branches/home/psmith/restructure/src/io/async-fd.lisp	Wed Jan 17 23:01:11 2007
@@ -125,11 +125,12 @@
 	  (error 'read-error)))
 
        ((= new-bytes 0)
-	nil);;(throw 'end-of-file nil))
+	nil);;(throw 'end-of-file nil)
 
        (t
 	  ;;Update buffer position
-	  (inc-position foreign-read-buffer new-bytes))))))
+	  (inc-position foreign-read-buffer new-bytes)
+	  (setf (read-ready state-machine) nil))))))
 
 (defun close-async-fd (async-fd)
   "Close ASYNC-FD's fd after everything has been written from write-queue."

Modified: branches/home/psmith/restructure/src/io/async-socket.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-socket.lisp	(original)
+++ branches/home/psmith/restructure/src/io/async-socket.lisp	Wed Jan 17 23:01:11 2007
@@ -178,7 +178,7 @@
 
 
 
-(defun socket-accept (socket-fd connection-factory)
+(defun socket-accept (socket-fd connection-type)
   "Accept connection from SOCKET-FD. Allocates and returns socket structure denoting the connection."
 
   (flet ((parse-inet6-addr (addr) 
@@ -202,7 +202,7 @@
 	;; accept connection
       (let* ((res (%accept socket-fd addr len)) 
 ;;	     (async-socket-fd (make-instance 'async-socket-fd :read-fd res :write-fd res)))
-	     (async-socket-fd (create-state-machine connection-factory res res (make-instance 'async-socket-fd))))
+	     (async-socket-fd (create-state-machine connection-type res res (make-instance 'async-socket-fd))))
 
 	(unless (< res 0)
 	  (let ((len-value (mem-ref len :unsigned-int)))

Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp	(original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp	Wed Jan 17 23:01:11 2007
@@ -55,7 +55,7 @@
 
   			     
 
-(defun start-server (connection-handler accept-filter connection-factory 
+(defun start-server (connection-handler accept-filter connection-type 
 		     &key 
 		     (protocol :inet) 
 		     (port (+ (random 60000) 1024)) 
@@ -99,7 +99,7 @@
 		  (cond
 		    ;; new connection
 		    ((= fd sock)
-		     (let ((async-fd (socket-accept fd connection-factory)))
+		     (let ((async-fd (socket-accept fd connection-type)))
 #+nio-debug		       (format t "start-server - New conn: ~A~%" async-fd)
 		       (cond
 			 ((null async-fd)

Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp	Wed Jan 17 23:01:11 2007
@@ -32,7 +32,7 @@
              yarpc-state-machine-factory get-packet-factory 
 
 	     ;; yarpc-state-machine
-	     yarpc-state-machine 
+	     yarpc-state-machine job-queue
 	     ;to be moved
 	     test-rpc test-rpc-list test-rpc-string execute-call
 	     

Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp	Wed Jan 17 23:01:11 2007
@@ -38,7 +38,7 @@
 	      :documentation "The queue used to hand off work from an external thread to the io thread")
    (result-queue :initform (nio-compat:concurrent-queue)
 		 :accessor result-queue
-		 :documentation "The queue used to hand off work from an external thread to the io thread")))
+		 :documentation "The queue used to return results from the io thread to an external thread")))
 
 (defun yarpc-client-state-machine ()
     (make-instance 'yarpc-client-state-machine))
@@ -55,26 +55,26 @@
 (defconstant STATE-INITIALISED 0)
 (defconstant STATE-SENT-REQUEST 1)
 
-(defparameter state STATE-INITIALISED)
-
 (defmethod process-outgoing-packet((sm yarpc-client-state-machine))
   (format t "process-outgoing-packet called, polling the job-queue ~%")
   (let ((packet (nio-compat:take (job-queue sm) :blocking-call nil)))
     (when packet
       (format t "process-outgoing-packet got job ~A ~%" packet)
-      (setf state STATE-SENT-REQUEST))
+      (setf (state sm) STATE-SENT-REQUEST))
       packet))
 
 (defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet))
-  (assert (eql state STATE-SENT-REQUEST))
+  (assert (eql (state sm) STATE-SENT-REQUEST))
   (format t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response)
-  (nio-compat:add (result-queue sm) response)
-  (setf state STATE-INITIALISED))
+  (let* ((*package* (find-package :nio-yarpc))
+         (result  (read-from-string (response response))))
+    (nio-compat:add (result-queue sm) result)
+    (setf (state sm) STATE-INITIALISED)))
   
 ;Called from an external thread i.e. *not* the nio thread
 ;Blocks calling thread on the remote m/c's response
 (defmethod remote-execute ((sm yarpc-client-state-machine) call-string)
 ;    (queue-outgoing-packet 
-  (assert (eql state STATE-INITIALISED))
+  (assert (eql (state sm) STATE-INITIALISED))
   (nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string))
   (nio-compat:take (result-queue sm)))
\ No newline at end of file

Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	Wed Jan 17 23:01:11 2007
@@ -33,20 +33,13 @@
 ;; A server that processes remote procedure calls and returns results
 ;;
 (defclass yarpc-state-machine (state-machine)
-  ((job-queue :initarg :job-queue
-	      :initform (error "Must supply a job queue to write work to.")
-	      :accessor job-queue
-	      :documentation "The queue used to hand off work from the NIO thread to an external thread for execution")
+  (
    (result-queue :initform (nio-compat:concurrent-queue)
 		 :accessor result-queue
 		 :documentation "The queue used to return results from an external thread to the nio thread")))
 
-(defun yarpc-state-machine (read-fd write-fd socket job-queue)
-  (let ((sm (make-instance 'yarpc-state-machine :read-fd read-fd :write-fd write-fd :socket socket :job-queue job-queue)))
-    (nio-buffer:clear (foreign-read-buffer sm))
-    (nio-buffer:clear (foreign-write-buffer sm))
-    (format t "yarpc-state-machine - Created ~S~%" sm)
-    sm))
+(defparameter job-queue (nio-compat:concurrent-queue)
+  "The queue used to hand off work from the NIO thread to an external thread for execution")
 
 (defparameter yarpc-pf (yarpc-packet-factory))
 
@@ -59,27 +52,18 @@
 (defconstant STATE-INITIALISED 0)
 (defconstant STATE-SEND-RESPONSE 1)
 
-(defparameter state STATE-INITIALISED)
-
-
 (defmethod process-outgoing-packet((sm yarpc-state-machine))
   (format t "yarpc-state-machine: process-outgoing-packet called, polling the results-queue ~%")
-  (let ((packet (nio-compat:take (result-queue sm) :blocking-call nil)))
-    (format t "yarpc-state-machine: process-outgoing-packet got result ~A ~%" packet)
-    packet))
-
+  (let ((result (nio-compat:take (result-queue sm) :blocking-call nil)))
+    (format t "yarpc-state-machine: process-outgoing-packet got result ~A ~%" result)
+     (when result
+	(method-response-packet result))))
 
-;Process a call method packet, returns 
+;Process a call method packet by placing it in the job-queue
 (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
-  (assert (eql state STATE-INITIALISED))
+  (assert (eql (state sm) STATE-INITIALISED))
   (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call)
-  (nio-compat:add (job-queue sm) (cons (call-string call) (result-queue sm))))  
-
-
-;Called from an external thread i.e. *not* the nio thread
-;Blocks waiting for a job (call-string,result-queue) to process and return the result into the result queue
-;(defmethod get-job ((sm yarpc-state-machine))
-;  (values (nio-compat:take (job-queue sm)) (result-queue sm)))
+  (nio-compat:add job-queue (list (call-string call) (result-queue sm))))
 
 
 

Modified: branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp	(original)
+++ branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp	Wed Jan 17 23:01:11 2007
@@ -29,5 +29,5 @@
 	    (:export
 
 	     ;; state-machine
-	     state-machine packet-factory get-packet-factory get-packet process-outgoing-packet process-incoming-packet
+	     state-machine packet-factory get-packet-factory get-packet process-outgoing-packet process-incoming-packet state
 	     ))

Modified: branches/home/psmith/restructure/src/statemachine/state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/statemachine/state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/statemachine/state-machine.lisp	Wed Jan 17 23:01:11 2007
@@ -37,7 +37,9 @@
 ;This way only the protocols packet heirarchy knows about binary representations and 
 ;  the SM can deal with protocol logic and state maintenance
 ;
-(defclass state-machine (async-fd)())
+(defclass state-machine (async-fd)
+ ((state :initform 0
+ 	 :accessor state)))
 
 (defmethod print-object ((sm state-machine) stream)
   (format stream "#<STATE-MACHINE ~A >" (call-next-method sm nil)))



More information about the Nio-cvs mailing list