Ring adapter(HTTP server) with async and websocket extenstion

(:use org.httpkit.server)

The server uses an event-driven, non-blocking I/O model that makes it lightweight and scalable. It's written to conform to the standard Clojure web server Ring spec, with asynchronous and websocket extension. HTTP Kit is (almost) drop-in replacement of ring-jetty-adapter

Hello, Clojure HTTP server

run-server starts a Ring-compatible HTTP server. You may want to do routing with compojure

(defn app [req]
  {:status  200
   :headers {"Content-Type" "text/html"}
   :body    "hello HTTP!"})
(run-server app {:port 8080})

Options:

  • :ip: which IP to bind, default to 0.0.0.0
  • :port: which port listens for incoming requests, default to 8090
  • :thread: How many threads to compute response from request, default to 4
  • :worker-name-prefix: worker thread name prefix, default to worker-: worker-1 worker-2....
  • :queue-size: max requests queued waiting for thread pool to compute response before rejecting, 503(Service Unavailable) is returned to client if queue is full, default to 20K
  • :max-body: length limit for request body in bytes, 413(Request Entity Too Large) is returned if request exceeds this limit, default to 8388608(8M)
  • :max-line: length limit for HTTP initial line and per header, 414(Request-URI Too Long) will be returned if exceeding this limit, default to 4096(4K), relevant discussion on Stack Overflow

Stop/Restart Server

run-server returns a function that stops the server, which can take an optional timeout(ms) param to wait for existing requests to be finished.

(defn app [req]
  {:status  200
   :headers {"Content-Type" "text/html"}
   :body    "hello HTTP!"})

(defonce server (atom nil))

(defn stop-server []
  (when-not (nil? @server)
    ;; graceful shutdown: wait 100ms for existing requests to be finished
    ;; :timeout is optional, when no timeout, stop immediately
    (@server :timeout 100)
    (reset! server nil)))

(defn -main [&args]
  ;; The #' is useful when you want to hot-reload code
  ;; You may want to take a look: https://github.com/clojure/tools.namespace
  ;; and http://http-kit.org/migration.html#reload
  (reset! server (run-server #'app {:port 8080})))

Unified Async/Websocket API

The with-channel API is not compatible with the RC releases. The new one is better and much easier to understand and use. The old documentation is here

Unified asynchronous channel interface for HTTP (streaming or long-polling) and WebSocket.

Channel defines the following contract:
  • open?: Returns true iff channel is open.
  • close: Closes the channel. Idempotent: returns true if the channel was actually closed, or false if it was already closed.
  • websocket?: Returns true iff channel is a WebSocket.
  • send!: Sends data to client and returns true if the data was successfully written to the output queue, or false if the channel is closed. Normally, checking the returned value is not needed. This function returns immediately (does not block).

    Data is sent directly to the client, NO RING MIDDLEWARE IS APPLIED.

    Data form: {:headers _ :status _ :body _} or just body. Note that :headers and :status will be stripped for WebSockets and for HTTP streaming responses after the first.

  • on-receive: Sets handler (fn [message-string || byte[]) for notification of client WebSocket messages. Message ordering is guaranteed by server.
  • on-close: Sets handler (fn [status]) for notification of channel being closed by the server or client. Handler will be invoked at most once. Useful for clean-up. Status can be `:normal`, `:going-away`, `:protocol-error`, `:unsupported`, `:unknown`, `:server-close`, `:client-close`
(defn handler [req]
  (with-channel req channel              ; get the channel
    ;; communicate with client using method defined above
    (on-close channel (fn [status]
                        (println "channel closed")))
    (if (websocket? channel)
      (println "WebSocket channel")
      (println "HTTP channel"))
    (on-receive channel (fn [data]       ; data received from client
           ;; An optional param can pass to send!: close-after-send?
           ;; When unspecified, `close-after-send?` defaults to true for HTTP channels
           ;; and false for WebSocket.  (send! channel data close-after-send?)
                          (send! channel data))))) ; data is sent directly to the client
(run-server handler {:port 8080})

HTTP Streaming example

  • First call of send!, sends HTTP status and Headers to client
  • After the first, send! sends a chunk to client
  • close sends an empty chunk to client, marking the end of the response
  • Client close notification printed via on-close
(use 'org.httpkit.timer)

(defn handler [request]
  (with-channel request channel
    (on-close channel (fn [status] (println "channel closed, " status)))
    (loop [id 0]
      (when (< id 10)
        (schedule-task (* id 200) ;; send a message every 200ms
                       (send! channel (str "message from server #" id) false)) ; false => don't close after send
        (recur (inc id))))
    (schedule-task 10000 (close channel)))) ;; close in 10s.

;;; open you browser http://127.0.0.1:9090, a new message show up every 200ms
(run-server handler {:port 9090})

Long polling example

Long polling is very much like streaming

chat-polling is a realtime chat example of using polling

(def channel-hub (atom {}))

(defn handler [request]
  (with-channel request channel
    ;; Store the channel somewhere, and use it to send response to client when interesting event happens
    (swap! channel-hub assoc channel request)
    (on-close channel (fn [status]
                        ;; remove from hub when channel get closed
                        (swap! channel-hub dissoc channel)))))

(on-some-event                          ;; send data to client
 (doseq [channel (keys @channel-hub)]
   (send! channel {:status 200
                   :headers {"Content-Type" "application/json; charset=utf-8"}
                   :body data})))

(run-server handler {:port 9090})

WebSocket example

  • Two-way communication between client and server
  • Can easily degrade to HTTP long polling/streaming, due to the unified API
  • send! with java.lang.String, a text frame assembled and sent to client
  • send! with java.io.InputStream or byte[], a binary frame assembled and sent to client
  • For WebSocket Secure connection, one option is stud (self-signed certificate may not work with websocket). Nginx can do it, too.
(defn handler [request]
  (with-channel request channel
    (on-close channel (fn [status] (println "channel closed: " status)))
    (on-receive channel (fn [data] ;; echo it back
                          (send! channel data)))))

(run-server handler {:port 9090})

Control WebSocket handshake

The with-channel does the WebSocket handshake automatically. In case if you want to control it, e.g., to support WebSocket subprotocol, here is a workaround. cgmartin's gist is a good place to get inspired.

Routing with Compojure

Compojure can be used to do the routing, based on uri and method

(:use [compojure.route :only [files not-found]]
      [compojure.handler :only [site]] ; form, query params decode; cookie; session, etc
      [compojure.core :only [defroutes GET POST DELETE ANY context]]
      org.httpkit.server)

(defn show-landing-page [req] ;; ordinary clojure function, accepts a request map, returns a response map
  ;; return landing page's html string. Using template library is a good idea:
  ;; mustache (https://github.com/shenfeng/mustache.clj, https://github.com/fhd/clostache...)
  ;; enlive (https://github.com/cgrand/enlive)
  ;; hiccup(https://github.com/weavejester/hiccup)
  )

(defn update-userinfo [req]          ;; ordinary clojure function
  (let [user-id (-> req :params :id)    ; param from uri
        password (-> req :params :password)] ; form param
    ....
    ))

(defroutes all-routes
  (GET "/" [] show-landing-page)
  (GET "/ws" [] chat-handler)     ;; websocket
  (GET "/async" [] async-handler) ;; asynchronous(long polling)
  (context "/user/:id" []
           (GET / [] get-user-by-id)
           (POST / [] update-userinfo))
  (route/files "/static/") ;; static file url prefix /static, in `public` folder
  (route/not-found "<p>Page not found.</p>")) ;; all other, return 404

(run-server (site #'all-routes) {:port 8080})

Recommended server deployment

http-kit runs alone happily, handy for development and quick deployment. Use of a reverse proxy like Nginx, Lighthttpd, etc in serious production is encouraged. They can also be used to add https support.

  • They are fast and heavily optimized for static content.
  • They can be configured to compress the content sent to browsers

Sample Nginx configration:

upstream http_backend {
    server 127.0.0.1:8090;  # http-kit listen on 8090
    # put more servers here for load balancing
    # keepalive(resue TCP connection) improves performance
    keepalive 32;  # both http-kit and nginx are good at concurrency
}

server {
    location /static/ {  # static content
        alias   /var/www/xxxx/public/;
    }
    location / {
        proxy_pass  http://http_backend;

        # tell http-kit to keep the connection
        proxy_http_version 1.1;
        proxy_set_header Connection "";

        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;

        access_log  /var/log/nginx/xxxx.access.log;
    }
}