[nio-cvs] r81 - in branches/home/psmith/restructure/src: io nio-logger protocol/yarpc

psmith at common-lisp.net psmith at common-lisp.net
Sat Feb 10 20:36:44 UTC 2007


Author: psmith
Date: Sat Feb 10 15:36:43 2007
New Revision: 81

Modified:
   branches/home/psmith/restructure/src/io/nio-package.lisp
   branches/home/psmith/restructure/src/io/nio-server.lisp
   branches/home/psmith/restructure/src/io/nio.asd
   branches/home/psmith/restructure/src/io/nodes.lisp
   branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
Log:
First stab at rpc multiplexing

Modified: branches/home/psmith/restructure/src/io/nio-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-package.lisp	(original)
+++ branches/home/psmith/restructure/src/io/nio-package.lisp	Sat Feb 10 15:36:43 2007
@@ -42,4 +42,7 @@
 
 	     ;;ip-authorisation
 	     check-ip load-ips
+	     
+	     ;;nodes
+	     node with-connected-nodes active-conn
 	     ))

Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp	(original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp	Sat Feb 10 15:36:43 2007
@@ -35,7 +35,7 @@
 
 ;TODO thread safety
 (defparameter +connected-sockets-queue+ (nio-compat:concurrent-queue)
-  "List of sockets that have been connected and are awaiting addition to the event-notification system")
+  "List of node objects that are to be connected to")
 
 ;loop over hashtable 
 (defun process-async-fds (client-hash)
@@ -150,12 +150,18 @@
 			     (when (write-event-p event) (setf (write-ready async-fd) t)))))))))
 
 					;add outgoing sockets to event queue
-#+nio-debug2     (format-log t "nio-server:start-server - Processing client add ~A~%" +connected-sockets-queue+)
-
-             (loop for new-fd = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null new-fd) do
+#+nio-debug2     (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+)
+             (loop for node = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null node) do
+#+nio-debug	  (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node)
+		  (push node *nodes-list*))
+	     (with-connect-ready-nodes (a-node)
+#+nio-debug	  (format-log t "nio-server:start-server - attempting connection to node ~A~%" a-node)
+	       (let ((new-fd (connect (host a-node) (port a-node) connection-type)))
+		 (update-last-connect-attempt a-node)
+		 (when new-fd
 #+nio-debug	  (format-log t "nio-server:start-server - adding connection to nio thread ~A~%" new-fd)
-		  (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd)
-		  (add-async-fd event-queue new-fd :read-write))
+                   (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd)
+		   (add-async-fd event-queue new-fd :read-write))))
 	     
 					;loop over async-fd's processing where necessary
 	     (process-async-fds client-hash)
@@ -164,10 +170,10 @@
       (close-fd sock))))
 
 
-(defun add-connection (host port connection-type
-		       &key 
-		       (protocol :inet))
-  (format-log t "nio-server:add-connection - Called with: ~A:~A:~A ~%" protocol host port)
+(defun connect(host port connection-type
+	       &key 
+	       (protocol :inet))
+  (format-log t "nio-server:connect - Called with: ~A:~A:~A ~%" protocol host port)
   (let ((sock nil))
     (setq sock (ecase protocol
 		 (:inet (make-inet-socket)) 
@@ -175,8 +181,10 @@
     
     (if (connect-inet-socket sock host port)
     	(let ((sm (create-state-machine connection-type sock sock sock)))
-	  (nio-compat:add +connected-sockets-queue+ sm)
-  	  (format-log t "nio-server:add-connection - Socket enqueued: ~A~%" +connected-sockets-queue+)
-	  (return-from add-connection sm))
+;	  (nio-compat:add +connected-sockets-queue+ sm)
+;  	  (format-log t "nio-server:connect - Socket enqueued: ~A~%" +connected-sockets-queue+)
+	  (return-from connect sm))
   	(format t "Connect failed!!~A ~%" (get-errno)))))
-   
\ No newline at end of file
+
+(defun add-connection(node)
+  (nio-compat:add +connected-sockets-queue+ node))
\ No newline at end of file

Modified: branches/home/psmith/restructure/src/io/nio.asd
==============================================================================
--- branches/home/psmith/restructure/src/io/nio.asd	(original)
+++ branches/home/psmith/restructure/src/io/nio.asd	Sat Feb 10 15:36:43 2007
@@ -9,9 +9,9 @@
 		 (:file "packet" :depends-on ("nio-package"))
 		 (:file "async-fd" :depends-on ("fd-helper"))
 		 (:file "async-socket" :depends-on ("async-fd"))
-		 (:file "nio-server" :depends-on ("async-socket"))
+ 		 (:file "nodes" :depends-on ("nio-package"))
+		 (:file "nio-server" :depends-on ("async-socket" "nodes"))
 		 (:file "ip-authorisation" :depends-on ("nio-package"))
-		 (:file "nodes" :depends-on ("nio-package"))
 		 )
 
     :depends-on (:cffi :event-notification :nio-buffer :nio-compat :nio-utils))

Modified: branches/home/psmith/restructure/src/io/nodes.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nodes.lisp	(original)
+++ branches/home/psmith/restructure/src/io/nodes.lisp	Sat Feb 10 15:36:43 2007
@@ -75,6 +75,21 @@
       (get-universal-high-res)
       (+ (last-connect-attempt node) (retry-delay node))))
 
+(defun allowed-to-connect(node)
+  (if (null (last-connect-attempt node))
+      t
+      (and (not (active-conn node)) (< (+ (last-connect-attempt node) (retry-delay node))  (get-universal-high-res)))))
+
 (defun update-last-connect-attempt(node)
   (setf (last-connect-attempt node) (get-universal-high-res)))
 
+;;iterates over the nodes list looking for nodes that are ready to be connected to
+;;i.e. the SM is null and the next-allowed-connect time has expired
+(defmacro with-connect-ready-nodes ((node) &rest body)
+  `(dolist (,node *nodes-list*)
+     (when (allowed-to-connect ,node) , at body)))
+
+
+(defmacro with-connected-nodes ((node) &rest body)
+  `(dolist (,node *nodes-list*)
+     (when (active-conn ,node) , at body)))

Modified: branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp
==============================================================================
--- branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp	(original)
+++ branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp	Sat Feb 10 15:36:43 2007
@@ -41,16 +41,20 @@
 		(sleep ,delay))))))
 
 
+(defun callback(result)
+  (nio-utils:format-log t "Result of remote-log ~A~%" result))
+
+
 ;;Tail the given log and write to remote logger
 ;;e.g. (tail-log "/var/log/httpd/access_log" "192.168.1.1")
 (defun tail-log(filename ip-address)
   (sleep 4)
-  (let ((sm (nio:add-connection ip-address 16323 'nio-yarpc:yarpc-client-state-machine)))
-    (nio-utils:format-log t "toplevel adding conn ~A to ~A~%" sm ip-address)
-    (with-line-from-tailed-file (text filename 1)
-      (let ((rpc (format nil "(nio-logger:remote-log \"~A\")" (cl-base64:string-to-base64-string text))))
-	(nio-utils:format-log t "Toplevel Submitting job~A~%" rpc)
-	(nio-utils:format-log t "Result of remote-log ~A~%" (nio-yarpc:remote-execute sm rpc))))))
+  (nio:add-connection (nio:node ip-address 16323))
+  (with-line-from-tailed-file (text filename 1)
+    (let ((rpc (format nil "(nio-logger:remote-log \"~A\")" (cl-base64:string-to-base64-string text))))
+      (nio-utils:format-log t "Toplevel Submitting job~A~%" rpc)
+      (nio:with-connected-nodes (node)
+	(nio-yarpc:remote-execute (nio:active-conn node) rpc #'callback)))))
 
 ;Runs a multithreaded system with an IO thread dealing with IO only and a 'job'  thread taking and executing jobs
 

Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp	Sat Feb 10 15:36:43 2007
@@ -36,9 +36,23 @@
   ((job-queue :initform (nio-compat:concurrent-queue)
 	      :accessor job-queue
 	      :documentation "The queue used to hand off work from an external thread to the io thread")
-   (result-queue :initform (nio-compat:concurrent-queue)
-		 :accessor result-queue
-		 :documentation "The queue used to return results from the io thread to an external thread")))
+   (request-map :initform (make-hash-table)
+		:reader request-map
+		:documentation "A map from request-id (a unique id for this request) to remote-job")))
+
+(defclass remote-job()
+  ((callback :accessor callback
+	     :documentation "A function accepting one argument to call with the result of the remote operation")
+   (start-time :initform (get-universal-high-res)
+	      :reader start-time
+	      :documentation "The (floating point) start time")   
+   (timeout :initarg :timeout
+	    :initform 1.5
+	    :documentation "The time in seconds before a timeout should occur, abviously we dont guarantee that this will be honored, it depends on other processing but should be close.")))
+
+(defun remote-job(callback)
+  (make-instance 'remote-job :callback callback))
+
 
 (defun yarpc-client-state-machine ()
     (make-instance 'yarpc-client-state-machine))
@@ -55,25 +69,23 @@
 (defconstant STATE-INITIALISED 0)
 (defconstant STATE-SENT-REQUEST 1)
 
+(defparameter +request-id+ 0)
+
 (defmethod process-outgoing-packet((sm yarpc-client-state-machine))
 #+nio-debug2  (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%")
-  (let ((packet (nio-compat:take (job-queue sm) :blocking-call nil)))
-    (when packet
-      (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" packet)
-      (setf (state sm) STATE-SENT-REQUEST))
-    packet)) 
+  (let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil)))
+    (when ttd
+      (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd)
+      (destructuring-bind (job call-string) ttd
+	(setf (gethash (1+ +request-id+) (request-map sm)) job)
+	(make-instance 'call-method-packet :call-string call-string :request-id +request-id+)))))
 
 (defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet))
-  (assert (eql (state sm) STATE-SENT-REQUEST))
   (format-log t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response)
   (let* ((*package* (find-package :nio-yarpc))
          (result  (read-from-string (response response))))
-    (setf (state sm) STATE-INITIALISED)    
     (nio-compat:add (result-queue sm) result)))
-  
-;Called from an external thread i.e. *not* the nio thread
-;Blocks calling thread on the remote m/c's response
-(defmethod remote-execute ((sm yarpc-client-state-machine) call-string)
-  (assert (eql (state sm) STATE-INITIALISED))
-  (nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string))
-  (nio-compat:take (result-queue sm)))
\ No newline at end of file
+
+;Execute the call-string on the remote node and call callback with the result
+(defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback)
+  (nio-compat:add (job-queue sm) '((remote-job callback) call-string)))

Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp	Sat Feb 10 15:36:43 2007
@@ -40,19 +40,21 @@
 
 (defconstant +PACKET-ID-SIZE+ 1)
 (defconstant +PACKET-LENGTH-SIZE+ 4)
+;(defconstant +PACKET-REQUEST-ID+ 4)
 
 (defconstant +yarpc-packet-header-size+
   (+ +PACKET-ID-SIZE+ +PACKET-LENGTH-SIZE+))
 
 (defmethod get-packet ((pf yarpc-packet-factory) buf)
   (flip buf)
-  (if (>= (remaining buf) +yarpc-packet-header-size+) ;; First byte denotes packet ID ;;bytes 2,3,4,5 denote packet size
+  (if (>= (remaining buf) +yarpc-packet-header-size+) ;; First byte denotes packet ID ;;bytes 2,3,4,5 denote packet size ;; 6,7,8,9 request-id
       (let ((packet-id (bytebuffer-read-8 buf))
             (packet-length (bytebuffer-read-32 buf)))
 	(if (<= (- packet-length +yarpc-packet-header-size+) (remaining buf)) ;is the whole packet available in the buffer?
-	    (let ((ret-packet (ecase packet-id
-				(0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)))))
-				(1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+))))))))
+	    (let* ((packet-request-id (bytebuffer-read-32 buf))
+		   (ret-packet (ecase packet-id
+				(0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id)))
+				(1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id))))))
 	      (compact buf)
 	      #+nio-debug  (format-log t "yarpc-packet-factory:get-packet - after compact ~%~A~%" buf)
 	      #+nio-debug  (format-log t "yarpc-packet-factory:get-packet - retuirning packet ~A~%" ret-packet)
@@ -64,7 +66,11 @@
 
 
 
-(defclass call-method-packet (packet)((call-string :initarg :call-string
+(defclass yarpc-packet(packet)
+  ((request-id :initarg :request-id
+	       :reader request-id)))
+
+(defclass call-method-packet (yarpc-packet)((call-string :initarg :call-string
                                             :accessor call-string)))
 (defun call-method-packet (call-string)
   (make-instance 'call-method-packet :call-string call-string))
@@ -79,6 +85,7 @@
       (progn
 	(nio-buffer:bytebuffer-write-8 buf +CALL-METHOD-PACKET-ID+)
 	(nio-buffer:bytebuffer-write-32 buf 0) ; come back and write length later
+	(nio-buffer:bytebuffer-write-32 buf (request-id packet))
 	(nio-buffer:bytebuffer-write-string buf (call-string packet))
 	(nio-buffer:bytebuffer-insert-32 buf (buffer-position buf) 1)
   #+nio-debug    (format-log t "yarpc-packet-factory:write-bytes(call-method-packet) - written ~%~A ~%" buf)  
@@ -92,7 +99,7 @@
   (+ +yarpc-packet-header-size+
      (length (sb-ext:string-to-octets (write-to-string (call-string packet))))))
 
-(defclass method-response-packet (packet)
+(defclass method-response-packet (yarpc-packet)
   ((response :initarg :response
 	     :accessor response)))
 
@@ -109,6 +116,7 @@
       (progn
 	(nio-buffer:bytebuffer-write-8 buf +METHOD-RESPONSE-PACKET-ID+)
 	(nio-buffer:bytebuffer-write-32 buf 0) ; come back and write length later
+	(nio-buffer:bytebuffer-write-32 buf (request-id packet))
 	(nio-buffer:bytebuffer-write-string buf (write-to-string (response packet)))
 	(nio-buffer:bytebuffer-insert-32 buf (buffer-position buf) 1)
 #+nio-debug    (format-log t "yarpc-packet-factory:write-bytes - written ~A~%" buf)  

Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	Sat Feb 10 15:36:43 2007
@@ -58,26 +58,25 @@
 
 
 
-(defun run-job(&key (wait-on-job-pdw t))
+(defun run-job(&key (blocking t))
   (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%")
-  (destructuring-bind (job result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call wait-on-job-pdw)
+  (destructuring-bind (job request-id result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call blocking)
     (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job)
-    (nio-compat:add result-queue (nio-yarpc:execute-call job))))
+    (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))
 
 
 (defmethod process-outgoing-packet((sm yarpc-state-machine))
   (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" )
-  (let ((result (nio-compat:take (result-queue sm) :blocking-call nil)))
-    (format-log t "yarpc-state-machine:process-outgoing-packet - got result ~A ~%" result)
+  (destructuring-bind (request-id result) (nio-compat:take (result-queue sm) :blocking-call nil)
+    (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result)
      (when result
-	(method-response-packet result))))
+	(method-response-packet result :request-id request-id))))
 
 ;Process a call method packet by placing it in the job-queue
 (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
-  (assert (eql (state sm) STATE-INITIALISED))
   (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call)
-  (nio-compat:add job-queue (list (call-string call) (result-queue sm)))
-  (when +process-jobs-inline+ (run-job :wait-on-job-pdw nil)))
+  (nio-compat:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
+  (when +process-jobs-inline+ (run-job :blocking nil)))
 
 
 



More information about the Nio-cvs mailing list