[nio-cvs] r109 - in branches/home/psmith/restructure/src: io nio-logger protocol/yarpc

psmith at common-lisp.net psmith at common-lisp.net
Tue Apr 17 04:26:33 UTC 2007


Author: psmith
Date: Tue Apr 17 00:26:29 2007
New Revision: 109

Modified:
   branches/home/psmith/restructure/src/io/async-fd.lisp
   branches/home/psmith/restructure/src/io/nio-package.lisp
   branches/home/psmith/restructure/src/io/nio-server.lisp
   branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
   branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
Log:
Remove +process-jobs-inline+ as can't work like this. Added timeout mechanism

Modified: branches/home/psmith/restructure/src/io/async-fd.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-fd.lisp	(original)
+++ branches/home/psmith/restructure/src/io/async-fd.lisp	Tue Apr 17 00:26:29 2007
@@ -62,6 +62,9 @@
 ;;Implement this in concrete SM for read
 (defgeneric process-write (async-fd))
 
+;;Implement this in concrete SM for timeout processing
+(defgeneric process-timeout (async-fd))
+
 ;;SM factory 
 (defun create-state-machine(sm-type read-fd write-fd socket)
   (let ((sm (make-instance sm-type :read-fd read-fd :write-fd write-fd :socket socket)))

Modified: branches/home/psmith/restructure/src/io/nio-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-package.lisp	(original)
+++ branches/home/psmith/restructure/src/io/nio-package.lisp	Tue Apr 17 00:26:29 2007
@@ -30,7 +30,7 @@
 
 	     ;; async-fd.lisp
 	     async-fd process-read process-write foreign-read-buffer foreign-write-buffer close-sm
-	     recommend-buffer-size close-pending
+	     recommend-buffer-size close-pending process-timeout
 
 	     ;; async-socket.lisp
 	     

Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp	(original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp	Tue Apr 17 00:26:29 2007
@@ -46,6 +46,7 @@
 ;process reads
                (handler-case
 		   (progn
+		     (process-timeout async-fd)
 		     (when (read-ready async-fd) (read-more async-fd))
 		     (when (> (buffer-position (foreign-read-buffer async-fd)) 0)
 		       (process-read async-fd))
@@ -63,7 +64,8 @@
 		       (write-more async-fd)
 		       (push async-fd removals)))
 		 (read-error (re) (push async-fd removals))
-		 (write-error (we) (push async-fd removals))))
+		 (write-error (we) (push async-fd removals))
+		 (timeout (to) (push async-fd removals))))
 	   client-hash)
     (dolist (async-fd removals)
       (format-log t "nio-server:process-async-fds processing remove for ~a~%" async-fd)

Modified: branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp
==============================================================================
--- branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp	(original)
+++ branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp	Tue Apr 17 00:26:29 2007
@@ -63,7 +63,6 @@
 (defparameter +log-file-name+ "./out")
 
 (defun run-logging-server(listen-ip out-file &optional (allowed-ips "ips.txt"))
-  (setf nio-yarpc:+process-jobs-inline+ nil)
   (setf  +log-file-name+ out-file)
   (nio:load-ips allowed-ips)
   (sb-thread:make-thread #'(lambda()(nio:start-server 'nio-yarpc:yarpc-state-machine :host listen-ip :port 16323 :accept-connection 'nio:check-ip)) :name "nio-server")

Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp	Tue Apr 17 00:26:29 2007
@@ -32,9 +32,9 @@
              yarpc-state-machine-factory get-packet-factory 
 
 	     ;; yarpc-state-machine
-	     yarpc-state-machine job-queue run-job +process-jobs-inline+ +serialise-packet-fn+
+	     yarpc-state-machine job-queue run-job +process-jobs-inline+ +serialise-packet-fn+ process-timeout
 	     ;to be moved
-	     test-rpc test-rpc-list test-rpc-string execute-call defremote
+	     test-rpc test-rpc-list test-rpc-string execute-call defremote process-timeout
 	     
 	     ;;yarpc-client-state-machine
 	     yarpc-client-state-machine remote-execute simulate-connection

Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp	Tue Apr 17 00:26:29 2007
@@ -48,11 +48,12 @@
 	      :reader start-time
 	      :documentation "The (floating point) start time")   
    (timeout :initarg :timeout
-	    :initform 1.5
+	    :initform 15
+	    :reader timeout
 	    :documentation "The time in seconds before a timeout should occur, abviously we dont guarantee that this will be honored, it depends on other processing but should be close.")))
 
-(defun remote-job(callback)
-  (make-instance 'remote-job :callback callback))
+(defun remote-job(callback &key (timeout 15))
+  (make-instance 'remote-job :callback callback :timeout timeout))
 
 
 (defun yarpc-client-state-machine ()
@@ -67,11 +68,37 @@
 (defmethod print-object ((sm yarpc-client-state-machine) stream)
   (format stream "#<YARPC-CLIENT-STATE-MACHINE ~A >" (call-next-method sm nil)))
 
+(defmethod print-object ((job remote-job) stream)
+  (format stream "#<REMOTE-JOB :start-time ~A :timeout ~A>" (start-time job) (timeout job)))
+
+
 (defconstant STATE-INITIALISED 0)
 (defconstant STATE-SENT-REQUEST 1)
 
 (defparameter +request-id+ 0)
 
+
+(defun check-timeouts(id job)
+;    (format-log t "Checking timeout on ~A~%" job)
+  (when (> (get-universal-high-res) (+ (start-time job) (timeout job)))
+    (format-log t "Timeout detected ~A ~A~%" id job)
+    t))
+
+(defun finish-job (request-id sm result)
+  "Remove the job from the request map and call the callback with the result"
+  (let ((remote-job (gethash request-id (request-map sm))))
+    (when remote-job
+      (remhash request-id (request-map sm))
+      (funcall (callback remote-job) result))))
+
+(defmethod process-timeout((sm yarpc-client-state-machine))
+  (let ((requests (request-map sm)))
+#+nio-debug  (format-log t "yarpc-client-state-machine:process-outgoing-packet called, searching for timeouts in ~A ~%" requests)
+    (maphash #'(lambda (id job) 
+		 (when (check-timeouts id job) (finish-job id sm nil)))
+	     requests)))
+ 
+
 (defmethod process-outgoing-packet((sm yarpc-client-state-machine))
 #+nio-debug  (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%")
   (let ((ttd (nio-utils:take (job-queue sm) :blocking-call nil)))
@@ -88,8 +115,7 @@
 	 (request-id (request-id response)))
 #+nio-debug    (format-log t "yarpc-client-state-machine:process-incoming-packet :result ~A :request-id ~A~%" result request-id)
 ;    (maphash #'(lambda (k v) (format t "~a -> ~a~%" k v)) (request-map sm))
-    (let ((remote-job (gethash request-id (request-map sm))))
-      (funcall (callback remote-job) result))))
+     (finish-job request-id sm result)))
 
 (defparameter *simulate-calls* nil)
 
@@ -107,3 +133,14 @@
          (setf (nio:active-conn node) (nio::create-state-machine 'yarpc-client-state-machine 1 1 6))
     (push  node nio::*nodes-list*)))
             
+
+
+(defun test-timeout()
+  (let* ((done nil)
+	 (job (remote-job #'(lambda(x) (format-log t "~A finished~%" x) (setf done t)) :timeout 30)))
+    (format-log t "Job: ~A~%" job)
+    (loop while (not done) do
+	 (check-timeouts 99 job)
+	 (format-log t ".~%")
+	 (sleep 1))
+    (format-log t "done test~%")))
\ No newline at end of file

Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	(original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp	Tue Apr 17 00:26:29 2007
@@ -52,12 +52,6 @@
 (defconstant STATE-INITIALISED 0)
 (defconstant STATE-SEND-RESPONSE 1)
 
-
-(defparameter +process-jobs-inline+ t
-  "Set this to make the NIO thread process the RPC calls - warning the procedure should not block!")
-
-
-
 (defun run-job(&key (blocking t))
 #+nio-debug  (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%")
   (let ((server-job (nio-utils:take nio-yarpc:job-queue :blocking-call blocking)))
@@ -66,6 +60,7 @@
 #+nio-debug	(format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job)
 	(nio-utils:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
 
+(defmethod process-timeout((sm yarpc-state-machine)))
 
 (defmethod process-outgoing-packet((sm yarpc-state-machine))
 #+nio-debug2  (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" )
@@ -78,8 +73,7 @@
 ;Process a call method packet by placing it in the job-queue
 (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
 #+nio-debug  (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call)
-  (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
-  (when +process-jobs-inline+ (run-job :blocking nil)))
+  (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm))))
 
 
 



More information about the Nio-cvs mailing list