Ring adapter(HTTP server) with async and websocket extenstion

(:use org.httpkit.server)

The server uses an event-driven, non-blocking I/O model that makes it lightweight and scalable. It's written to conform to the standard Clojure web server Ring spec, with asynchronous and websocket extension. HTTP Kit is (almost) drop-in replacement of ring-jetty-adapter

Hello, Clojure HTTP server

run-server starts a Ring-compatible HTTP server. You may want to do routing with compojure

(defn app [req]
  {:status  200
   :headers {"Content-Type" "text/html"}
   :body    "hello HTTP!"})
(run-server app {:port 8080})

Options:

  • :ip: which IP to bind, default to 0.0.0.0
  • :port: which port listens incomming request, default to 8090
  • :thread: How many threads to compute response from request, default to 4
  • :worker-name-prefix: woker thread name prefix, default to worker-: worker-1 worker-2....
  • :queue-size: max requests queued waiting for threadpool to compute response before reject, 503(Service Unavailable) is returned to client if queue is full, default to 20K
  • :max-body: length limit for request body in bytes, 413(Request Entity Too Large) is returned if exceeds this limit, default to 8388608(8M)
  • :max-line: length limit for HTTP inital line and per header, 414(Request-URI Too Long) will be returned if exceeds this limit, default to 4096(4K), relevant discusstion on stackoverflow

Stop/Restart Server

run-server returns a function that stops the server. which can take an optional timeout(ms) param to wait existing requests to be finished.

(defn app [req]
  {:status  200
   :headers {"Content-Type" "text/html"}
   :body    "hello HTTP!"})

(defonce server (atom nil))

(defn stop-server []
  (when-not (nil? @server)
    ;; graceful shutdown: wait 100ms for existing requests to be finished
    ;; :timeout is optional, when no timeout, stop immediately
    (@server :timeout 100)
    (reset! server nil)))

(defn -main [&args]
  ;; The #' is useful, when you want to hot-reload code
  ;; You may want to take a look: https://github.com/clojure/tools.namespace
  ;; and http://http-kit.org/migration.html#reload
  (reset! server (run-server #'app {:port 8080})))

Unified Async/Websocket API

The with-channel API is not compatible with the RC releases. The new is better and much easier to understand and use. The old's documentation is here

Unified asynchronous channel interface for HTTP (streaming or long-polling) and WebSocket.

Channel defines the following contract:
  • open?: Returns true iff channel is open.
  • close: Closes the channel. Idempotent: returns true if the channel was actually closed, or false if it was already closed.
  • websocket?: Returns true iff channel is a WebSocket.
  • send!: Sends data to client and returns true if the data was successfully written to the output queue, or false if the channel is closed. Normally, check the returned value is not needed. This function returns immediately, does not block.

    Data is sent directly to the client, NO RING MIDDLEWARE IS APPLIED.

    Data form: {:headers _ :status _ :body _} or just body. Note that :headers and :status will be stripped for WebSockets and for HTTP streaming responses after the first.

  • on-receive: Sets handler (fn [message-string || byte[]) for notification of client WebSocket messages. Message ordering is guaranteed by server.
  • on-close: Sets handler (fn [status]) for notification of channel being closed by the server or client. Handler will be invoked at most once. Useful for clean-up. Status can be `:normal`, `:going-away`, `:protocol-error`, `:unsupported`, `:unknown`, `:server-close`, `:client-close`
(defn handler [req]
  (with-channel req channel              ; get the channel
    ;; communicate with client using method defined above
    (on-close channel (fn [status]
                        (println "channel closed")))
    (if (websocket? channel)
      (println "WebSocket channel")
      (println "HTTP channel"))
    (on-receive channel (fn [data]       ; data received from client
           ;; An optional param can pass to send!: close-after-send?
           ;; When unspecified, `close-after-send?` defaults to true for HTTP channels
           ;; and false for WebSocket.  (send! channel data close-after-send?)
                          (send! channel data))))) ; data is sent directly to the client
(run-server handler {:port 8080})

HTTP Streaming example

  • First call of send!, send HTTP status and Headers to client
  • After the first, send! send a chunk to client
  • close send an empty chunk to client, mark as end of the response
  • Client close notification via on-close
(use 'org.httpkit.timer)

(defn handler [request]
  (with-channel request channel
    (on-close channel (fn [status] (println "channel closed, " status)))
    (loop [id 0]
      (when (< id 10)
        (schedule-task (* id 200) ;; sent a message every 200ms
                       (send! channel (str "message from server #" id) false)) ; false => don't close after send
        (recur (inc id))))
    (schedule-task 10000 (close channel)))) ;; close in 10s.

;;; open you browser http://127.0.0.1:9090, a new message show up every 200ms
(run-server handler {:port 9090})

Long polling example

long polling is very much like streaming

chat-polling is a realtime chat example of using polling

(def channel-hub (atom {}))

(defn handler [request]
  (with-channel request channel
    ;; Store the channel somewhere, and use it to sent response to client when interesting event happened
    (swap! channel-hub assoc channel request)
    (on-close channel (fn [status]
                        ;; remove from hub when channel get closed
                        (swap! channel-hub dissoc channel)))))

(on-some-event                          ;; send data to client
 (doseq [channel (keys @channel-hub)]
   (send! channel {:status 200
                   :headers {"Content-Type" "application/json; charset=utf-8"}
                   :body data})))

(run-server handler {:port 9090})

Websocket example

  • Two way communication bettween client and server
  • Can Easy degradate to HTTP long polling/streaming, since the unified API
  • send! with java.lang.String, a text frame assembled and sent to client
  • send! with java.io.InputStream or byte[], a binary frame assembled and sent to client
  • For WebSocket Secure connection, one option is stud (self-signed certificate may not work with websocket). Nginx can do it too.
(defn handler [request]
  (with-channel request channel
    (on-close channel (fn [status] (println "channel closed: " status)))
    (on-receive channel (fn [data] ;; echo it back
                          (send! channel data)))))

(run-server handler {:port 9090})

Control WebSocket handshake

The with-channel does the websocket handshake automatically. In case if you want to control it, eg: support websocket handshake. Here is a working around. cgmartin has a gist, is a good place to get inspired.

Routing with Compojure

Compojure Can be used to do the routing, based on uri and method

(:use [compojure.route :only [files not-found]]
      [compojure.handler :only [site]] ; form, query params decode; cookie; session, etc
      [compojure.core :only [defroutes GET POST DELETE ANY context]]
      org.httpkit.server)

(defn show-landing-page [req] ;; ordinary clojure function, accepts a request map, returns a response map
  ;; return landing page's html string. Using template library is a good idea:
  ;; mustache (https://github.com/shenfeng/mustache.clj, https://github.com/fhd/clostache...)
  ;; enlive (https://github.com/cgrand/enlive)
  ;; hiccup(https://github.com/weavejester/hiccup)
  )

(defn update-userinfo [req]          ;; ordinary clojure function
  (let [user-id (-> req :params :id)    ; param from uri
        password (-> req :params :password)] ; form param
    ....
    ))

(defroutes all-routes
  (GET "/" [] show-landing-page)
  (GET "/ws" [] chat-handler)     ;; websocket
  (GET "/async" [] async-handler) ;; asynchronous(long polling)
  (context "/user/:id" []
           (GET / [] get-user-by-id)
           (POST / [] update-userinfo))
  (route/files "/static/") ;; static file url prefix /static, in `public` folder
  (route/not-found "<p>Page not found.</p>")) ;; all other, return 404

(run-server (site #'all-routes) {:port 8080})
  

Recommended server deployment

http-kit runs alone happily, handy for development and quick deployment. Use of a reverse proxy like Nginx, Lighthttpd, etc in serious production is encouraged. They can also be used to add https support.

  • They are fast, heavily optimized for static contents.
  • They can be configured to compress the content sent to browsers

Sample Nginx configration:

upstream http_backend {
    server 127.0.0.1:8090;  # http-kit listen on 8090
    # put more server here for load balance
    # keepalive(resue TCP connection) improves performance
    keepalive 32;  # both http-kit and nginx are good at concurrency
}

server {
    location /static/ {  # static contents
        alias   /var/www/xxxx/public/;
    }
    location / {
        proxy_pass  http://http_backend;

        # tell http-kit to keep the connection
        proxy_http_version 1.1;
        proxy_set_header Connection "";

        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;

        access_log  /var/log/nginx/xxxx.access.log;
    }
}