[nio-cvs] r40 - in branches/home/psmith/restructure: . src/compat src/io src/protocol/yarpc

psmith at common-lisp.net psmith at common-lisp.net
Wed Jan 17 05:06:15 UTC 2007


Author: psmith
Date: Wed Jan 17 00:06:13 2007
New Revision: 40

Added:
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
      - copied, changed from r37, branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
Modified:
   branches/home/psmith/restructure/run-yarpc-client.lisp
   branches/home/psmith/restructure/run-yarpc.lisp
   branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
   branches/home/psmith/restructure/src/io/async-fd.lisp
   branches/home/psmith/restructure/src/io/async-socket.lisp
   branches/home/psmith/restructure/src/io/nio-server.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
Log:
yarpc work, saving...

Modified: branches/home/psmith/restructure/run-yarpc-client.lisp
==============================================================================
--- branches/home/psmith/restructure/run-yarpc-client.lisp	(original)
+++ branches/home/psmith/restructure/run-yarpc-client.lisp	Wed Jan 17 00:06:13 2007
@@ -2,10 +2,9 @@
 (require :asdf)
 (require :nio-yarpc)
 
-(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1" :port 9897)) :name "nio-server")
+;;shouldn't be listenting on the client hence nil for accept SM to start-server
+(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity nil :host "127.0.0.1" :port 9897)) :name "nio-server")
 (sleep 4)
-(let ((sm (nio:add-connection "127.0.0.1" 16323 'nio-yarpc:yarpc-state-machine)))
+(let ((sm (nio:add-connection "127.0.0.1" 16323 'nio-yarpc:yarpc-client-state-machine)))
 (format t "toplevel adding conn ~A~%" sm)
 (format t "Result of remote-execute ~A~%" (nio-yarpc:remote-execute sm "(nio-yarpc:test-rpc-list)")))
-
-

Modified: branches/home/psmith/restructure/run-yarpc.lisp
==============================================================================
--- branches/home/psmith/restructure/run-yarpc.lisp	(original)
+++ branches/home/psmith/restructure/run-yarpc.lisp	Wed Jan 17 00:06:13 2007
@@ -2,4 +2,11 @@
 (require :asdf)
 (require :nio-yarpc)
 
-(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1")
+(let ((jobq (nio-compat:concurrent-queue)))
+  (sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity #'(lambda()(nio-yarpc:yarpc-state-machine jobq)) :host "127.0.0.1")) :name "nio-server")
+  (format t "server toplevel waiting for job~%" )
+  (loop
+    ;;block waiting for jobs
+    (multiple-value-bind (job result-queue) (nio-compat:take jobq)
+      (format t "Server received job ~A~%" job)
+      (nio-compat:add result-queue (nio-yarpc:execute-call job)))))

Modified: branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
==============================================================================
--- branches/home/psmith/restructure/src/compat/concurrent-queue.lisp	(original)
+++ branches/home/psmith/restructure/src/compat/concurrent-queue.lisp	Wed Jan 17 00:06:13 2007
@@ -51,18 +51,18 @@
 	 head)
        nil))
 
-
-(defmethod take ((queue concurrent-queue))
+;Do an (optionally blocking) remove of the element at the head of this queue
+(defmethod take ((queue concurrent-queue) &key (blocking-call t))
   (sb-thread:with-mutex ((buffer-lock queue))
     ;if its there, pop it
     (let ((ret (pop-elt (buffer queue) "1sttry")))
-      (if ret
+      (if (or ret (not blocking-call))
 	  ret
 	  (progn
 	    (sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue))
 	    (pop-elt (buffer queue) "2ndtry"))))))
 
-
+;Append the element to the tail of this queue
 (defmethod add ((queue concurrent-queue) elt)
   (sb-thread:with-mutex ((buffer-lock queue))
     (setf (buffer queue) (append (buffer queue) (list elt)) )

Modified: branches/home/psmith/restructure/src/io/async-fd.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-fd.lisp	(original)
+++ branches/home/psmith/restructure/src/io/async-fd.lisp	Wed Jan 17 00:06:13 2007
@@ -84,10 +84,6 @@
 ;;Implement this in concrete SM for read
 (defgeneric process-write (async-fd))
 
-
-
-;Loop over state machines calling process-outgoing-packets via state-machine::process-write
-
 ;;SM factory 
 (defun create-state-machine(sm-type read-fd write-fd socket)
   (let ((sm (make-instance sm-type :read-fd read-fd :write-fd write-fd :socket socket)))

Modified: branches/home/psmith/restructure/src/io/async-socket.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-socket.lisp	(original)
+++ branches/home/psmith/restructure/src/io/async-socket.lisp	Wed Jan 17 00:06:13 2007
@@ -178,7 +178,7 @@
 
 
 
-(defun socket-accept (socket-fd connection-type)
+(defun socket-accept (socket-fd connection-factory)
   "Accept connection from SOCKET-FD. Allocates and returns socket structure denoting the connection."
 
   (flet ((parse-inet6-addr (addr) 
@@ -202,7 +202,7 @@
 	;; accept connection
       (let* ((res (%accept socket-fd addr len)) 
 ;;	     (async-socket-fd (make-instance 'async-socket-fd :read-fd res :write-fd res)))
-	     (async-socket-fd (create-state-machine connection-type res res (make-instance 'async-socket-fd))))
+	     (async-socket-fd (create-state-machine connection-factory res res (make-instance 'async-socket-fd))))
 
 	(unless (< res 0)
 	  (let ((len-value (mem-ref len :unsigned-int)))

Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp	(original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp	Wed Jan 17 00:06:13 2007
@@ -55,7 +55,7 @@
 
   			     
 
-(defun start-server (connection-handler accept-filter connection-type 
+(defun start-server (connection-handler accept-filter connection-factory 
 		     &key 
 		     (protocol :inet) 
 		     (port (+ (random 60000) 1024)) 
@@ -99,7 +99,7 @@
 		  (cond
 		    ;; new connection
 		    ((= fd sock)
-		     (let ((async-fd (socket-accept fd connection-type)))
+		     (let ((async-fd (socket-accept fd connection-factory)))
 #+nio-debug		       (format t "start-server - New conn: ~A~%" async-fd)
 		       (cond
 			 ((null async-fd)

Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp	Wed Jan 17 00:06:13 2007
@@ -28,6 +28,14 @@
 	    
 	    (:export
 
+             ;;base
+             yarpc-state-machine-factory get-packet-factory 
+
 	     ;; yarpc-state-machine
-	     yarpc-state-machine yarpc-state-machine-factory test-rpc test-rpc-list test-rpc-string get-packet-factory remote-execute
+	     yarpc-state-machine 
+	     ;to be moved
+	     test-rpc test-rpc-list test-rpc-string execute-call
+	     
+	     ;;yarpc-client-state-machine
+	     yarpc-client-state-machine remote-execute
 	     ))

Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd	Wed Jan 17 00:06:13 2007
@@ -7,6 +7,7 @@
     :components ((:file "nio-yarpc-package")
                  (:file "yarpc-packet-factory" :depends-on ("nio-yarpc-package"))
 		 (:file "yarpc-state-machine" :depends-on ("yarpc-packet-factory"))
+		 (:file "yarpc-client-state-machine" :depends-on ("yarpc-packet-factory"))
 		 )
 
-    :depends-on (:nio :nio-sm))
\ No newline at end of file
+    :depends-on (:nio :nio-sm :nio-compat))
\ No newline at end of file

Copied: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (from r37, branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp)
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp	Wed Jan 17 00:06:13 2007
@@ -28,108 +28,53 @@
 
 (declaim (optimize (debug 3) (speed 3) (space 0)))
 
-;; YetAnotherRPC state machine
+;; YetAnotherRPC Client state machine
 ;;
-;; A server that processes remote procedure calls and returns results
+;; A client that accepts jobs to be run via a threadsafe queue and then submits them to the remote end for execution
 ;;
-;; Test with: 
-;; > telnet 127.0.0.1 16323
-;; Trying 127.0.0.1...
-;; Connected to 127.0.0.1.
-;; Escape character is '^]'.
-;; (test-rpc "who" 2 's)
-;; response - who 2 'S
-;;
-(defclass yarpc-state-machine (state-machine)
-  ((outgoing-packet :initarg :outgoing-packet
-                    :accessor outgoing-packet
-		    :initform nil)))
+(defclass yarpc-client-state-machine (state-machine)
+  ((job-queue :initform (nio-compat:concurrent-queue)
+	      :accessor job-queue
+	      :documentation "The queue used to hand off work from an external thread to the io thread")
+   (result-queue :initform (nio-compat:concurrent-queue)
+		 :accessor result-queue
+		 :documentation "The queue used to hand off work from an external thread to the io thread")))
 
-(defun yarpc-state-machine ()
-    (make-instance 'yarpc-state-machine))
+(defun yarpc-client-state-machine ()
+    (make-instance 'yarpc-client-state-machine))
 
 (defparameter yarpc-pf (yarpc-packet-factory))
 
-(defmethod get-packet-factory((sm yarpc-state-machine))
+(defmethod get-packet-factory((sm yarpc-client-state-machine))
   yarpc-pf)
 
-;;TODO move somewhere suitable
-
-(defparameter *remote-fns* nil)
-
-(defun register-remote-fn(name)
-    (push name *remote-fns*))
-
-(defmacro defremote (name args &rest body)
-  `(progn 
-     (defun ,name (, at args) , at body)
-     (register-remote-fn #',name)))
-
-(defremote test-rpc-list()
-  (list 3 "as" 's (code-char #x2211)))
-
-(defremote test-rpc-string(a b c)
-  (format nil "response - ~A ~A ~A ~A~%" a b c (code-char #x2211)))
-
-;;end move TODO
-
-
-;;;Utils
-
-(defun print-hashtable (table &optional (stream t))
-  (maphash #'(lambda (k v) (format stream "~a -> ~a~%" k v)) table))
-;;;
-
 
-(defmethod print-object ((sm yarpc-state-machine) stream)
-  (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil)))
+(defmethod print-object ((sm yarpc-client-state-machine) stream)
+  (format stream "#<YARPC-CLIENT-STATE-MACHINE ~A >" (call-next-method sm nil)))
 
 (defconstant STATE-INITIALISED 0)
-(defconstant STATE-SEND-RESPONSE 1)
+(defconstant STATE-SENT-REQUEST 1)
 
 (defparameter state STATE-INITIALISED)
 
-(define-condition authorization-error (error) ())
-
-
-(defmethod process-outgoing-packet((sm yarpc-state-machine))
-  (format t "process-outgoing-packet called~%")
-  (let ((packet (outgoing-packet sm)))
-    (setf (outgoing-packet sm) nil)
-    packet))
-
-;TODO queue and thread stuf
-(defmethod queue-outgoing-packet((sm yarpc-state-machine) packet)
-  (setf (outgoing-packet sm) packet))
-
-;Process a call method packet, returns 
-(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
-  (assert (eql state STATE-INITIALISED))
-  (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call)
-  (handler-case
-    (let ((result (execute-call (call-string call))))
-      (when result 
-      	(let ((response-packet (progn 
-                                  (setf state STATE-SEND-RESPONSE)
-                                  (queue-outgoing-packet sm (method-response-packet result)))))
-          t)))
-    (reader-error (re) (format t "No such function ~A~%" (call-string call)))
-    (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call)))))
-
-(defmethod process-incoming-packet ((sm yarpc-state-machine) (response method-response-packet))
-  (assert (eql state STATE-INITIALISED))
-  (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response))
+(defmethod process-outgoing-packet((sm yarpc-client-state-machine))
+  (format t "process-outgoing-packet called, polling the job-queue ~%")
+  (let ((packet (nio-compat:take (job-queue sm) :blocking-call nil)))
+    (when packet
+      (format t "process-outgoing-packet got job ~A ~%" packet)
+      (setf state STATE-SENT-REQUEST))
+      packet))
+
+(defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet))
+  (assert (eql state STATE-SENT-REQUEST))
+  (format t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response)
+  (nio-compat:add (result-queue sm) response)
+  (setf state STATE-INITIALISED))
   
-
-(defun execute-call (call-string)
-      (let* ((rpc-call-list (read-from-string call-string ))
-	     (fn (member (symbol-function (first rpc-call-list)) *remote-fns* )))
-	(format t "fn - ~A authorised? : ~A~%" (symbol-function (first rpc-call-list)) fn)
-	(if fn
-	    (apply (first rpc-call-list) (rest rpc-call-list))
-	    (error 'authorization-error))))
-
-
-(defmethod remote-execute ((sm yarpc-state-machine) call-string)
-  (queue-outgoing-packet sm (make-instance 'call-method-packet :call-string call-string)))
-    
\ No newline at end of file
+;Called from an external thread i.e. *not* the nio thread
+;Blocks calling thread on the remote m/c's response
+(defmethod remote-execute ((sm yarpc-client-state-machine) call-string)
+;    (queue-outgoing-packet 
+  (assert (eql state STATE-INITIALISED))
+  (nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string))
+  (nio-compat:take (result-queue sm)))
\ No newline at end of file

Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	Wed Jan 17 00:06:13 2007
@@ -32,27 +32,57 @@
 ;;
 ;; A server that processes remote procedure calls and returns results
 ;;
-;; Test with: 
-;; > telnet 127.0.0.1 16323
-;; Trying 127.0.0.1...
-;; Connected to 127.0.0.1.
-;; Escape character is '^]'.
-;; (test-rpc "who" 2 's)
-;; response - who 2 'S
-;;
 (defclass yarpc-state-machine (state-machine)
-  ((outgoing-packet :initarg :outgoing-packet
-                    :accessor outgoing-packet
-		    :initform nil)))
-
-(defun yarpc-state-machine ()
-    (make-instance 'yarpc-state-machine))
+  ((job-queue :initarg :job-queue
+	      :initform (error "Must supply a job queue to write work to.")
+	      :accessor job-queue
+	      :documentation "The queue used to hand off work from the NIO thread to an external thread for execution")
+   (result-queue :initform (nio-compat:concurrent-queue)
+		 :accessor result-queue
+		 :documentation "The queue used to return results from an external thread to the nio thread")))
+
+(defun yarpc-state-machine (read-fd write-fd socket job-queue)
+  (let ((sm (make-instance 'yarpc-state-machine :read-fd read-fd :write-fd write-fd :socket socket :job-queue job-queue)))
+    (nio-buffer:clear (foreign-read-buffer sm))
+    (nio-buffer:clear (foreign-write-buffer sm))
+    (format t "yarpc-state-machine - Created ~S~%" sm)
+    sm))
 
 (defparameter yarpc-pf (yarpc-packet-factory))
 
 (defmethod get-packet-factory((sm yarpc-state-machine))
   yarpc-pf)
 
+(defmethod print-object ((sm yarpc-state-machine) stream)
+  (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil)))
+
+(defconstant STATE-INITIALISED 0)
+(defconstant STATE-SEND-RESPONSE 1)
+
+(defparameter state STATE-INITIALISED)
+
+
+(defmethod process-outgoing-packet((sm yarpc-state-machine))
+  (format t "yarpc-state-machine: process-outgoing-packet called, polling the results-queue ~%")
+  (let ((packet (nio-compat:take (result-queue sm) :blocking-call nil)))
+    (format t "yarpc-state-machine: process-outgoing-packet got result ~A ~%" packet)
+    packet))
+
+
+;Process a call method packet, returns 
+(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
+  (assert (eql state STATE-INITIALISED))
+  (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call)
+  (nio-compat:add (job-queue sm) (cons (call-string call) (result-queue sm))))  
+
+
+;Called from an external thread i.e. *not* the nio thread
+;Blocks waiting for a job (call-string,result-queue) to process and return the result into the result queue
+;(defmethod get-job ((sm yarpc-state-machine))
+;  (values (nio-compat:take (job-queue sm)) (result-queue sm)))
+
+
+
 ;;TODO move somewhere suitable
 
 (defparameter *remote-fns* nil)
@@ -71,56 +101,8 @@
 (defremote test-rpc-string(a b c)
   (format nil "response - ~A ~A ~A ~A~%" a b c (code-char #x2211)))
 
-;;end move TODO
-
-
-;;;Utils
-
-(defun print-hashtable (table &optional (stream t))
-  (maphash #'(lambda (k v) (format stream "~a -> ~a~%" k v)) table))
-;;;
-
-
-(defmethod print-object ((sm yarpc-state-machine) stream)
-  (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil)))
-
-(defconstant STATE-INITIALISED 0)
-(defconstant STATE-SEND-RESPONSE 1)
-
-(defparameter state STATE-INITIALISED)
-
 (define-condition authorization-error (error) ())
 
-
-(defmethod process-outgoing-packet((sm yarpc-state-machine))
-  (format t "process-outgoing-packet called~%")
-  (let ((packet (outgoing-packet sm)))
-    (setf (outgoing-packet sm) nil)
-    packet))
-
-;TODO queue and thread stuf
-(defmethod queue-outgoing-packet((sm yarpc-state-machine) packet)
-  (setf (outgoing-packet sm) packet))
-
-;Process a call method packet, returns 
-(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
-  (assert (eql state STATE-INITIALISED))
-  (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call)
-  (handler-case
-    (let ((result (execute-call (call-string call))))
-      (when result 
-      	(let ((response-packet (progn 
-                                  (setf state STATE-SEND-RESPONSE)
-                                  (queue-outgoing-packet sm (method-response-packet result)))))
-          t)))
-    (reader-error (re) (format t "No such function ~A~%" (call-string call)))
-    (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call)))))
-
-(defmethod process-incoming-packet ((sm yarpc-state-machine) (response method-response-packet))
-  (assert (eql state STATE-INITIALISED))
-  (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response))
-  
-
 (defun execute-call (call-string)
       (let* ((rpc-call-list (read-from-string call-string ))
 	     (fn (member (symbol-function (first rpc-call-list)) *remote-fns* )))
@@ -129,7 +111,18 @@
 	    (apply (first rpc-call-list) (rest rpc-call-list))
 	    (error 'authorization-error))))
 
+;;end move TODO
+
+
+
+
 
-(defmethod remote-execute ((sm yarpc-state-machine) call-string)
-  (queue-outgoing-packet sm (make-instance 'call-method-packet :call-string call-string)))
-    
\ No newline at end of file
+;  (handler-case
+;    (let ((result (execute-call (call-string call))))
+;      (when result 
+;      	(let ((response-packet (progn 
+;                                  (setf state STATE-SEND-RESPONSE)
+;                                  (queue-outgoing-packet sm (method-response-packet result)))))
+;          t)))
+;    (reader-error (re) (format t "No such function ~A~%" (call-string call)))
+;    (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call))))
\ No newline at end of file



More information about the Nio-cvs mailing list