mirror of
https://gitlab.com/eql/lqml.git
synced 2025-12-06 10:31:34 -08:00
890 lines
44 KiB
Common Lisp
890 lines
44 KiB
Common Lisp
;;; Copyright 2012-2020 Google LLC
|
|
;;;
|
|
;;; Use of this source code is governed by an MIT-style
|
|
;;; license that can be found in the LICENSE file or at
|
|
;;; https://opensource.org/licenses/MIT.
|
|
|
|
(in-package #:cl-protobufs.implementation)
|
|
|
|
;;; This file provides a stream-like abstraction, a BUFFER, that Protobuf serialization
|
|
;;; logic can use to perform a one-pass traversal of the input object tree such that
|
|
;;; all variable-length pieces are properly length-prefixed but without having to
|
|
;;; precompute lengths. This differs from the C implementation of serialization,
|
|
;;; which (by default) requires a pre-pass to compute the lengths for all constituent
|
|
;;; variable-length pieces such as strings and sub-messages.
|
|
|
|
(eval-when (:compile-toplevel :load-toplevel :execute)
|
|
(defparameter $optimize-buffering *optimize-fast-unsafe*)) ; NOLINT
|
|
|
|
(deftype array-index ()
|
|
#+sbcl 'sb-int:index
|
|
#-sbcl `(integer 0 ,(1- array-total-size-limit)))
|
|
|
|
;; A BUFFER is a linked list of blocks (vectors) of unsigned-byte.
|
|
;; It can more-or-less be thought of as a string-output-stream that accepts
|
|
;; (UNSIGNED-BYTE n) as the element-type, instead of character, and which
|
|
;; allows replacement of previously written bytes. CONCATENATE-BLOCKS
|
|
;; is the analogous operation to GET-OUTPUT-STREAM-STRING. It produces a
|
|
;; single vector of all bytes that were written.
|
|
;; This structure has subtypes for 8-bit octets and 32-bit words.
|
|
(defstruct (buffer (:constructor nil))
|
|
;; The current block
|
|
(block nil :type (simple-array * 1))
|
|
;; Index into current block at which next element may be written.
|
|
;; The block is full when index is equal to (LENGTH BLOCK).
|
|
(index 0 :type (unsigned-byte 28))
|
|
;; The entire list of blocks
|
|
(chain nil :type cons)
|
|
;; The cons cell whose car is BLOCK. This slot acts primarily
|
|
;; to optimize nconc onto CHAIN. It is not necessarily the last
|
|
;; cons in CHAIN, but usually it is.
|
|
(next nil :type cons)
|
|
;; Zero-based absolute position of the first element of this block in
|
|
;; the overall output. Updated only when assigning a new BLOCK.
|
|
(%block-absolute-start 0 :type array-index))
|
|
|
|
(defmethod print-object ((self buffer) stream)
|
|
(print-unreadable-object (self stream :type t :identity t)))
|
|
|
|
;; BUFFER-SAP is a macro because it makes little sense to write a function
|
|
;; that returns a pointer to something that can go stale on you.
|
|
;; Otherwise any extraction of a SAP from the buffer would be reliable only
|
|
;; within the scope of a WITHOUT-GCING or WITH-PINNED-OBJECTS.
|
|
;; It would work as an inline function, but this forces the right behavior.
|
|
#+sbcl
|
|
(defmacro buffer-sap (buffer)
|
|
`(sb-sys:vector-sap (buffer-block ,buffer)))
|
|
|
|
(defun-inline buffer-block-capacity (buffer)
|
|
(declare (optimize (safety 0)))
|
|
(length (buffer-block buffer)))
|
|
|
|
(defun-inline buffer-absolute-position (buffer)
|
|
(i+ (buffer-%block-absolute-start buffer)
|
|
(buffer-index buffer)))
|
|
|
|
(defun make-buffer (constructor block)
|
|
(let ((chain (list block)))
|
|
(funcall (the function constructor) block chain chain)))
|
|
|
|
(deftype octet-type () '(unsigned-byte 8))
|
|
(deftype word-buffer-block-type () '(simple-array (unsigned-byte 32) 1))
|
|
|
|
(defstruct (word-buffer (:include buffer (block nil :type word-buffer-block-type))
|
|
(:constructor %make-word-buffer (block chain next))))
|
|
|
|
(defun make-word-buffer (size)
|
|
(declare (array-index size))
|
|
(make-buffer #'%make-word-buffer
|
|
(make-array size :element-type '(unsigned-byte 32))))
|
|
|
|
(defstruct (octet-buffer (:include buffer
|
|
(block nil :type (simple-array octet-type 1)))
|
|
(:constructor %make-octet-buffer (block chain next)))
|
|
;; The collection of backpatches is itself a word buffer
|
|
(backpatches (make-word-buffer 10))
|
|
;; When copying a fixed-size wire-level primitive that crosses a block boundary,
|
|
;; use the scratchpad first, then copy two subsequences of octets.
|
|
(scratchpad (make-array 8 :element-type '(unsigned-byte 8)))
|
|
(n-gap-bytes 0 :type fixnum)
|
|
(target nil) ; the destination of these octets, a STREAM typically
|
|
;; The BUFFER can also pretend to be stream by implementing CHAR-OUT
|
|
;; and STRING-OUT methods. The buffer and stream point to each other.
|
|
;; The stream is created only if needed. No support for non-SBCL Lisps.
|
|
#+sbcl
|
|
(stream nil :type (or null sb-kernel:ansi-stream))
|
|
;; The library does not use this slot, but applications may.
|
|
;; Because the structure type gets frozen (below) it is impolite/incorrect
|
|
;; to create subtypes of it having additional slots.
|
|
(userdata))
|
|
|
|
;; This declaration asserts that there wil not be further descendant types,
|
|
;; and promises to the compiler that TYPEP on the two buffer subtypes
|
|
;; need only be a simple EQ check.
|
|
#+sbcl
|
|
(declaim (sb-ext:freeze-type word-buffer octet-buffer))
|
|
|
|
(defun make-octet-buffer (size &key userdata target)
|
|
(declare (array-index size))
|
|
(let ((b (make-buffer #'%make-octet-buffer
|
|
(make-array size :element-type 'octet-type))))
|
|
(setf (octet-buffer-userdata b) userdata
|
|
(octet-buffer-target b) target)
|
|
b))
|
|
|
|
;; Allocate but do not link in a new block of at least MIN-SIZE, which can be zero
|
|
;; for the default growth rate of 1.5x the previous allocation.
|
|
;; A clever way to make an array of the right kind would be to use introspection
|
|
;; on the TYPE of the CURRENT-BLOCK slot. But clever = slow, so use ETYPECASE instead.
|
|
(defun new-block (buffer min-size)
|
|
(declare (array-index min-size))
|
|
;; For testing the algorithm without growth of buffers - to make it more likely that
|
|
;; data will span buffers - the new-capacity could be (max min-size 128) or similar.
|
|
;; It must never be smaller than the largest primitive type though.
|
|
(let* ((old-capacity (buffer-block-capacity buffer))
|
|
(new-capacity
|
|
(max min-size
|
|
(min (+ old-capacity (ash old-capacity 1)) 100000))))
|
|
(etypecase buffer
|
|
(word-buffer (make-array new-capacity :element-type '(unsigned-byte 32)))
|
|
(octet-buffer (make-array new-capacity :element-type 'octet-type)))))
|
|
|
|
;; After having ensured sufficient space, the "FAST-" output algorithms can avoid
|
|
;; allocating blocks, but might have to advance the block pointer with ADVANCE-BLOCK.
|
|
;; This gets called exponentially less often as block size is automatically grown,
|
|
;; so dot not benefit from being inlined.
|
|
;; Note that this DOES NOT set the 'current-index' slot to 0.
|
|
(declaim (ftype (function (buffer) (values (simple-array octet-type 1) &optional))
|
|
advance-block))
|
|
(defun advance-block (buffer)
|
|
(declare #.$optimize-buffering)
|
|
;; this INCF generates 6 instructions instead of 1. wth?
|
|
(incf (buffer-%block-absolute-start buffer)
|
|
(length (buffer-block buffer)))
|
|
(let ((tail (cdr (buffer-next buffer))))
|
|
(setf (buffer-next buffer) tail
|
|
(buffer-block buffer) (car tail))))
|
|
|
|
;; Create a new block such that there will be at least N bytes available in
|
|
;; total across the current and new block, given that BUFFER-ENSURE-SPACE [q.v.]
|
|
;; has already decided there is not presently enough space.
|
|
;; The new block's size is the greater of the defecit or the standard growth
|
|
;; amount. If there is zero space in the current block, the new block is set
|
|
;; as the current block, otherwise it is not.
|
|
;; Return true if all data will fit in the current block; NIL otherwise.
|
|
(declaim (ftype (function (t t) (values t &optional)) %buffer-ensure-space))
|
|
(defun %buffer-ensure-space (buffer n)
|
|
(declare ((and fixnum unsigned-byte) n) #.$optimize-buffering)
|
|
(let* ((capacity (buffer-block-capacity buffer))
|
|
(space-remaining (- capacity (buffer-index buffer)))
|
|
(defecit (the fixnum (- n space-remaining))))
|
|
;; There might already be a next-block. This can happen if previous write asked
|
|
;; for more space than existed in the current block, but subsequently didn't
|
|
;; use any space in the new block. That block can be smaller than what is
|
|
;; needed now, but don't drop it - push a new next-block in front.
|
|
(unless (and (cdr (buffer-next buffer))
|
|
(>= (length (the vector (second (buffer-next buffer)))) defecit))
|
|
(rplacd (buffer-next buffer)
|
|
(cons (new-block buffer defecit) (cdr (buffer-next buffer)))))
|
|
(when (zerop space-remaining)
|
|
(advance-block buffer)
|
|
;; 0 serves as a generalized T, meaining all N bytes fit in one block
|
|
(setf (buffer-index buffer) 0))))
|
|
|
|
;; Guarantee that BUFFER has room for at least N more elements (words or octets)
|
|
;; considering its current block and possibly one new block.
|
|
;; If all N elements fit into the current block, return true, else return NIL.
|
|
;; If exactly at the end of a block, the return value will be true because
|
|
;; the next block will contain all N bytes.
|
|
;; This inlined wrapper punts to the general case if available space is inadequate.
|
|
;;
|
|
(defun-inline buffer-ensure-space (buffer n)
|
|
(declare ((and fixnum unsigned-byte) n) #.$optimize-buffering)
|
|
(or (>= (- (buffer-block-capacity buffer) (buffer-index buffer)) n)
|
|
(%buffer-ensure-space buffer n)))
|
|
|
|
;; A SERIALIZED-PROTOBUF is the result of serializing in the one-pass algorithm
|
|
;; and then squashing out any of the gaps that were left by allocating length
|
|
;; prefixes in their largest possible size but not using all bytes.
|
|
;;
|
|
(defstruct (serialized-protobuf
|
|
(:constructor make-serialized-protobuf
|
|
(blocks total-length final-block-length)))
|
|
blocks
|
|
total-length
|
|
final-block-length)
|
|
(defmethod print-object ((self serialized-protobuf) stream)
|
|
(declare (stream stream))
|
|
(print-unreadable-object (self stream :type t)
|
|
(format stream "~D byte~:P" (serialized-protobuf-total-length self))))
|
|
|
|
(declaim (ftype (function (t t) (values t &optional))
|
|
word-out octet-out)
|
|
(inline word-out))
|
|
|
|
;; Define OCTET-OUT and WORD-OUT on the respective buffer types.
|
|
(macrolet
|
|
((define-emitter (name buffer-type element-type)
|
|
`(defun ,name (buffer val)
|
|
(declare (,buffer-type buffer) #.$optimize-buffering)
|
|
(let* ((block (buffer-block buffer))
|
|
(index (buffer-index buffer))
|
|
(capacity (length block)))
|
|
;; Structure's slot type isn't enough to provide type information
|
|
;; because of a later setq.
|
|
(declare ((simple-array ,element-type 1) block))
|
|
(when (>= index capacity)
|
|
(incf (buffer-%block-absolute-start buffer) capacity)
|
|
(setf block
|
|
;; see if space was pre-allocated
|
|
(cond ((cdr (buffer-next buffer))
|
|
(pop (buffer-next buffer))
|
|
(car (buffer-next buffer)))
|
|
(t
|
|
(let* ((next (new-block buffer 0))
|
|
(cell (list next)))
|
|
(setf (cdr (buffer-next buffer)) cell
|
|
(buffer-next buffer) cell)
|
|
next)))
|
|
(buffer-block buffer) block
|
|
index 0))
|
|
(setf (aref block index) val
|
|
(buffer-index buffer) (1+ index))))))
|
|
(define-emitter word-out word-buffer (unsigned-byte 32))
|
|
(define-emitter octet-out octet-buffer octet-type))
|
|
|
|
(defun %fast-octet-out (buffer val)
|
|
(let ((block (advance-block buffer)))
|
|
(setf (aref block 0) val
|
|
(buffer-index buffer) 1)))
|
|
|
|
;; Perform OCTET-OUT, but if the current block can hold no more,
|
|
;; assume existence of a pre-made next block.
|
|
(defun-inline fast-octet-out (buffer val)
|
|
(declare (octet-buffer buffer) #.$optimize-buffering)
|
|
(let* ((block (buffer-block buffer))
|
|
(index (buffer-index buffer)))
|
|
(declare ((simple-array octet-type 1) block))
|
|
(if (i< index (length block))
|
|
(setf (aref block index) val (buffer-index buffer) (1+ index))
|
|
(%fast-octet-out buffer val)))) ; punt
|
|
|
|
;; Rapidly copy all of OCTETS into BUFFER as if by FAST-OCTET-OUT.
|
|
;; Space must have been ensured so that at most one additional block beyond
|
|
;; the current-block is needed.
|
|
;;
|
|
(defun fast-octets-out (buffer octets
|
|
&aux (input-length (length octets)))
|
|
(declare (octet-buffer buffer) (optimize (safety 0))
|
|
((simple-array octet-type 1) octets)
|
|
((unsigned-byte 32) input-length))
|
|
(unless (zerop input-length)
|
|
(let* ((block (buffer-block buffer))
|
|
(index (buffer-index buffer))
|
|
(available-space (- (length block) index)))
|
|
(declare ((simple-array octet-type 1) block))
|
|
;; ENSURE-SPACE always leaves room for at least 1 octet in the current block,
|
|
;; and even if it left zero this code would still be correct.
|
|
(let ((n (min available-space input-length)))
|
|
(replace block octets :start1 index)
|
|
(incf index n)
|
|
(decf input-length n))
|
|
(when (plusp input-length)
|
|
;; There is more input. This can only happen if the block's
|
|
;; capacity was reached.
|
|
;; The starting index of the source of the copy is the number
|
|
;; of bytes that were already written into the first block.
|
|
(replace (advance-block buffer) octets
|
|
:start2 available-space)
|
|
;; The ending index in the current block is whatever was just
|
|
;; copied, since the starting index for writing was 0.
|
|
(setq index input-length))
|
|
(setf (buffer-index buffer) index))))
|
|
|
|
;; Bind ITER to an iterator over WORD-BUFFER in the manner of standard
|
|
;; WITH-{mumble}-ITERATOR macros. Each time ITER is invoked, the next
|
|
;; buffer element will be returned, or NIL if no more remain.
|
|
(defmacro with-word-buffer-iterator ((iterator-name word-buffer) &body body)
|
|
(with-gensyms (buffer block more-blocks input-pointer input-limit)
|
|
`(let* ((,buffer, word-buffer)
|
|
(,block ,(coerce #() 'word-buffer-block-type))
|
|
;; if the current block's index is 0, then no blocks were used at all
|
|
(,more-blocks (unless (zerop (buffer-index ,buffer))
|
|
(buffer-chain ,buffer)))
|
|
(,input-pointer 0)
|
|
(,input-limit 0))
|
|
(declare (word-buffer-block-type ,block)
|
|
(array-index ,input-pointer ,input-limit))
|
|
(macrolet
|
|
((,iterator-name ()
|
|
`(locally
|
|
(declare (optimize (safety 0)))
|
|
(when (or (i< ,',input-pointer ,',input-limit)
|
|
(when ,',more-blocks
|
|
(setq ,',block (pop ,',more-blocks)
|
|
,',input-limit
|
|
(if ,',more-blocks
|
|
(length ,',block)
|
|
(buffer-index ,',buffer))
|
|
,',input-pointer 0)))
|
|
(aref ,',block (prog1 ,',input-pointer (incf ,',input-pointer)))))))
|
|
,@body))))
|
|
|
|
;; Put blank space into an octet buffer so that later we can go back and
|
|
;; patch a length-prefix in.
|
|
;; Return fives values: absolute stream position, the cons cell pointing
|
|
;; to the block in which the first octet would be written, and the index to
|
|
;; that octet, and a pointer to the block in the buffer of deletions that
|
|
;; will be performed on finalization, and a pointer into that block.
|
|
;; Multiple values avoid consing anything to represent saved buffer locations.
|
|
(declaim (ftype (function (t) (values t t t t t &optional))
|
|
emit-placeholder))
|
|
(defun emit-placeholder (buffer)
|
|
(declare #.$optimize-buffering)
|
|
;; ABS-POS doesn't change even if BUFFER-ENSURE-SPACE advances a block
|
|
;; so the first two bindings are actually order-insensitive,
|
|
;; but the capturing of BUFFER-NEXT must occur after ENSURE-SPACE.
|
|
;; A length-prefix placeholder reserves 4 octets which is enough to represent
|
|
;; a 28-bit integer (the other bit of each octet being the "more-to-go" flag).
|
|
;; Given the suggested message size limit of a few megabytes, this is fine.
|
|
(symbol-macrolet ((reserve-bytes 4))
|
|
(let ((within-block-p (buffer-ensure-space buffer reserve-bytes))
|
|
(abs-pos (buffer-absolute-position buffer))
|
|
(blocks (buffer-next buffer))
|
|
(index (buffer-index buffer)))
|
|
(setf (buffer-index buffer)
|
|
(if within-block-p
|
|
(+ index reserve-bytes)
|
|
(let ((available-space (- (buffer-block-capacity buffer) index)))
|
|
(advance-block buffer)
|
|
(- reserve-bytes available-space))))
|
|
;; A place is reserved in the deletion buffer to hold a pointer to
|
|
;; the place in the octet buffer that will probably be squeezed out.
|
|
;; This is done now, so that indices stored are monotonic.
|
|
;; Were that not done, and backpatching recorded deletion markers
|
|
;; only at the time of making the patch, the deletion markers would
|
|
;; not be in ascending order - they would have a "treelike" appearance
|
|
;; based on the order in which submessages were completed.
|
|
(let ((patch-buffer (octet-buffer-backpatches buffer)))
|
|
(word-out patch-buffer 0)
|
|
(values abs-pos blocks index
|
|
(buffer-block patch-buffer)
|
|
(1- (buffer-index patch-buffer)))))))
|
|
|
|
;; Patch VAL into the octet buffer by changing the contents of VAL's block at
|
|
;; the specified indices using 'varint' encoding, and also record a pointer
|
|
;; to the range of octets which were reserved for VAL but not consumed by it.
|
|
;; Return the number of bytes used to store VAL.
|
|
(declaim (ftype (function (t t t t t t t) (values fixnum &optional))
|
|
backpatch-varint))
|
|
(defun backpatch-varint (val buffer abs-pos blocks index pointer-block pointer-index)
|
|
(declare #.$optimize-buffering)
|
|
(declare (type (unsigned-byte 32) val)
|
|
((simple-array (unsigned-byte 32) 1) pointer-block)
|
|
(array-index index pointer-index))
|
|
(let* ((block (first blocks)) (limit (length block)) (count 0))
|
|
(declare ((simple-array octet-type 1) block) (fixnum count))
|
|
;; Seven bits at a time, least significant bits first
|
|
(loop do (let ((bits (ildb (byte 7 0) val)))
|
|
(declare (octet-type bits))
|
|
(setq val (iash val -7))
|
|
(when (>= index limit)
|
|
;; This doesn't bother updating LIMIT to its "proper" new value.
|
|
;; It can't possibly be any smaller than a varint.
|
|
(setf index 0 block (second blocks)))
|
|
(setf (aref block index) (ilogior bits (if (i= val 0) 0 128)))
|
|
(iincf index)
|
|
(incf count))
|
|
until (i= val 0))
|
|
;; Record the location of the backpatch so that the unused bytes can be
|
|
;; squashed out later. This is done even if all 4 bytes were used,
|
|
;; because a place was aleady reserved in the word-buffer for this backpatch.
|
|
(cond ((<= count 4)
|
|
;; Encode the deletion using 2 bits for the deletion count (0 .. 3)
|
|
;; ORed with the index at which to delete shifted left 2 bits.
|
|
(let ((gap (i- 4 count)))
|
|
(setf (aref pointer-block pointer-index)
|
|
(ilogior (ash (i+ abs-pos count) 2) gap))
|
|
(incf (octet-buffer-n-gap-bytes buffer) gap)))
|
|
((> count 4)
|
|
(protobuf-error "Backpatch failure on ~S" buffer)))
|
|
count))
|
|
|
|
;; Execute BODY, capturing the state of BUFFER at the start, and *unless* a nonlocal
|
|
;; exit occurs, restore the state of the buffer prior to executing the body
|
|
;; and return no value.
|
|
(defmacro with-bookmark ((buffer) &body body)
|
|
(with-gensyms (block index next abs-pos)
|
|
`(let ((,block (buffer-block ,buffer))
|
|
(,index (buffer-index ,buffer))
|
|
(,next (buffer-next ,buffer))
|
|
(,abs-pos (buffer-%block-absolute-start ,buffer)))
|
|
,@body
|
|
(setf (buffer-block ,buffer) ,block
|
|
(buffer-index ,buffer) ,index
|
|
(buffer-next ,buffer) ,next
|
|
(buffer-%block-absolute-start ,buffer) ,abs-pos)
|
|
(values))))
|
|
|
|
;; Reserve space for a uint32 prior to the start of a variable-length subsequence
|
|
;; of buffer, and also reserve space in the backpatch buffer to point to the space
|
|
;; in the data buffer where unused reserved bytes should be squashed out.
|
|
(defmacro with-placeholder ((buffer &key position) &body body)
|
|
(let* ((name "PLACEHOLDER")
|
|
(abs
|
|
(or position
|
|
(make-symbol (concatenate 'string name "-OCTET-POSITION"))))
|
|
(blocks (make-symbol (concatenate 'string name "-OCTET-BLOCKS")))
|
|
(index (make-symbol (concatenate 'string name "-OCTET-INDEX")))
|
|
(pointer-block (make-symbol (concatenate 'string name "-POINTER-BLOCK")))
|
|
(pointer-index (make-symbol (concatenate 'string name "-POINTER-INDEX"))))
|
|
`(multiple-value-bind (,abs ,blocks ,index ,pointer-block ,pointer-index)
|
|
(emit-placeholder ,buffer)
|
|
(macrolet ((backpatch (value)
|
|
`(backpatch-varint ,value
|
|
,',buffer ,',abs ,',blocks ,',index
|
|
,',pointer-block ,',pointer-index)))
|
|
,@body))))
|
|
|
|
;; A simple wrapper on REPLACE. This function is used only in one place.
|
|
;; It shouldn't be needed, but small copies using REPLACE are slower than a loop.
|
|
;; It turns out that a foreign call to memmove would be faster for 80 bytes or more.
|
|
(defun-inline fast-replace (destination destination-index
|
|
source source-index count)
|
|
(declare (array-index destination-index count)
|
|
((simple-array octet-type 1) destination source))
|
|
(let ((limit (the array-index (+ destination-index count))))
|
|
(if (< count 40)
|
|
(loop (setf (aref destination destination-index) (aref source source-index))
|
|
(incf source-index)
|
|
(when (eql (incf destination-index) limit) (return)))
|
|
(replace destination source
|
|
:start1 destination-index :end1 limit
|
|
:start2 source-index))))
|
|
|
|
(defvar **empty-word-buffer** (make-word-buffer 0))
|
|
|
|
;; Given an octet-buffer BUFFER, squeeze out any octets which "do not exist" in
|
|
;; the virtual octet sequence so they no also longer exist in the physical sequence.
|
|
;; After this operation, BUFFER will be ready for direct consumption, such as
|
|
;; by a client or a compression algorithm or file storage.
|
|
(defun compactify-blocks (buffer)
|
|
(declare #.$optimize-buffering)
|
|
;; OUTPUT and INPUT refer to the same block chain, namely the blocks
|
|
;; that currently exist in BUFFER.
|
|
(let* ((input-block-chain (buffer-chain buffer))
|
|
(output-block-chain input-block-chain)
|
|
;; Output blocks are not popped off the chain until
|
|
;; advancing beyond the current block. This way the tail
|
|
;; can be smashed to NIL when reaching the end of input.
|
|
(output-block (car output-block-chain))
|
|
(output-index 0)
|
|
;; Setting INPUT-BLOCK now is only for type-correctness of the
|
|
;; initial value. It will be set again immediately before reading
|
|
(input-block (car input-block-chain))
|
|
(input-index 0) ; block-relative index
|
|
(input-position 0) ; absolute
|
|
(deletion-point 0)
|
|
(deletion-length 0))
|
|
(declare ((simple-array octet-type 1) output-block input-block)
|
|
(array-index output-index input-index input-position))
|
|
;; Drop any pre-allocated but unused block in the input chain.
|
|
(when (cdr (buffer-next buffer))
|
|
(assert (eq (buffer-block buffer) (car (buffer-next buffer))))
|
|
(rplacd (buffer-next buffer) nil))
|
|
|
|
;; The reason for deferring this POP 'til after the preceding "drop"
|
|
;; is that if there were exactly two input blocks, one used and one not
|
|
;; used at all, INPUT-BLOCK-CHAIN should become NIL.
|
|
(setq input-block (pop input-block-chain))
|
|
(with-word-buffer-iterator
|
|
(deletion-point-getter (octet-buffer-backpatches buffer))
|
|
(labels
|
|
((find-next-deletion-point ()
|
|
;; If the deletion point is one at which no bytes should be deleted -
|
|
;; probably impossible as it means a submessage length took >21 bits
|
|
;; (= 4 bytes) to encode - skip until finding somewhere to delete,
|
|
;; or else finding that there are no further deletion points.
|
|
(let ((word (deletion-point-getter)))
|
|
(if (not word)
|
|
(setq deletion-point most-positive-fixnum deletion-length 0)
|
|
(let ((n-bytes (logand (the fixnum word) #b11)))
|
|
(if (zerop n-bytes)
|
|
(find-next-deletion-point)
|
|
(setq deletion-point (ash word -2)
|
|
deletion-length n-bytes))))))
|
|
(next-output-block ()
|
|
(setq output-block-chain (cdr output-block-chain)
|
|
output-block (car output-block-chain)
|
|
output-index 0)
|
|
(length output-block))
|
|
(copy-to-output (count)
|
|
(declare ((and fixnum unsigned-byte) count))
|
|
(when (zerop count)
|
|
(return-from copy-to-output))
|
|
(let ((space-available (- (length output-block) output-index)))
|
|
(declare (array-index count space-available))
|
|
;; See if the output needs to be advanced to the next block.
|
|
(when (zerop space-available)
|
|
(setq space-available (next-output-block)))
|
|
;; Avoid copying until the earlist point at which bytes need to move.
|
|
;; This rapidly skips over blocks that contain only fixed-length data
|
|
;; provided they are the first blocks in the serialized output.
|
|
;; Not likely, but happens.
|
|
(when (and (eq output-block input-block)
|
|
(eql output-index input-index))
|
|
(incf output-index count)
|
|
(incf input-index count)
|
|
(return-from copy-to-output))
|
|
;; A chunk of input can span more than one block of output due to
|
|
;; variable-length blocks.
|
|
(loop
|
|
(let ((stride (min count space-available)))
|
|
;; COUNT and SPACE-AVAILABLE are both positive,
|
|
;; so this will copy at least one octet.
|
|
(fast-replace output-block output-index
|
|
input-block input-index stride)
|
|
(incf output-index stride)
|
|
(incf input-index stride)
|
|
(if (eql (decf count stride) 0) (return)))
|
|
(when (zerop (setq space-available
|
|
(- (length output-block) output-index)))
|
|
(setq space-available (next-output-block))))))
|
|
(compute-input-block-length ()
|
|
;; Only the final block is possibly shorter than its allocated length.
|
|
;; The others are as long as allocated, each larger than its predecessor.
|
|
(if input-block-chain
|
|
(length input-block)
|
|
(buffer-index buffer))))
|
|
(declare (inline next-output-block compute-input-block-length))
|
|
(prog ((block-length (compute-input-block-length))
|
|
(total-deletion-count 0))
|
|
(declare (array-index block-length total-deletion-count))
|
|
tippytop
|
|
(find-next-deletion-point)
|
|
top
|
|
(let* ((remaining-length (- block-length input-index))
|
|
(n-bytes-to-copy
|
|
(min remaining-length (- deletion-point input-position))))
|
|
(copy-to-output n-bytes-to-copy)
|
|
(incf input-position n-bytes-to-copy)) ; absolute
|
|
(when (eql input-index block-length)
|
|
(unless input-block-chain
|
|
(rplacd output-block-chain nil) ; terminate the list
|
|
;; Free the unnecessary word-buffer blocks. Also makes additional calls
|
|
;; to COMPACTIFY on this buffer do nothing, which seems reasonable.
|
|
(setf (octet-buffer-backpatches buffer) **empty-word-buffer**)
|
|
(return (make-serialized-protobuf
|
|
(buffer-chain buffer)
|
|
(- input-position total-deletion-count)
|
|
output-index)))
|
|
(setq input-block (pop input-block-chain)
|
|
block-length (compute-input-block-length)
|
|
input-index 0)
|
|
(go top))
|
|
;; now we must be at a deletion point
|
|
(unless (and (= input-position deletion-point) (plusp deletion-length))
|
|
(protobuf-error "Octet buffer compaction bug"))
|
|
(let ((remaining-length (- block-length input-index)))
|
|
(if (>= remaining-length deletion-length)
|
|
(incf input-index deletion-length) ; easy case
|
|
;; Skip remainder of this block and start of one more. Deleted ranges
|
|
;; never span more than 2 blocks since deletion-length <= 3
|
|
;; and blocks are much larger than 3 octets.
|
|
(setq input-block (pop input-block-chain)
|
|
block-length (compute-input-block-length)
|
|
input-index (- deletion-length remaining-length))))
|
|
(incf input-position deletion-length)
|
|
(incf total-deletion-count deletion-length)
|
|
(go tippytop))))))
|
|
|
|
(defun reset-buffer-chain (buffer chain)
|
|
"Make BUFFER have CHAIN as its list of octet arrays"
|
|
(setf (buffer-block buffer) (car chain)
|
|
(buffer-index buffer) 0
|
|
(buffer-chain buffer) chain
|
|
(buffer-next buffer) chain
|
|
(buffer-%block-absolute-start buffer) 0)
|
|
;; Zero-fill, or not. This should depend on SAFETY and/or DEBUG,
|
|
;; but there is no way to discover the current policy
|
|
;; without using implementation-specific code.
|
|
#+nil
|
|
(dolist (block chain)
|
|
(fill block 0)))
|
|
|
|
(defun force-to-stream (buffer)
|
|
"Write the octets currently in BUFFER to its target stream,
|
|
and rewind BUFFER so that it is empty."
|
|
;; Before COMPACTIFY-BLOCKS messes up the chain, copy it.
|
|
;; Then compactify and copy to the target stream.
|
|
(let ((chain (copy-list (buffer-chain buffer)))
|
|
(backpatch-chain (buffer-chain (octet-buffer-backpatches buffer)))
|
|
(stream (the stream (octet-buffer-target buffer))))
|
|
(flet ((out-block (block length)
|
|
(write-sequence block stream :start 0 :end length)))
|
|
(declare (dynamic-extent #'out-block))
|
|
(call-with-each-block #'out-block (compactify-blocks buffer)))
|
|
(reset-buffer-chain buffer chain)
|
|
(setf (octet-buffer-n-gap-bytes buffer) 0)
|
|
;; Heuristically resize the backpatch buffer, trying to avoid subsequent expansion
|
|
;; Ideally we would do this only only on the *next* attempted use of the buffer,
|
|
;; but that's not as easy as just sizing up now, even if no further write will occur.
|
|
;; The worst-case is when the backpatch buffer is never needed again,
|
|
;; but was nonetheless resized to be larger. But that's probably not common.
|
|
(let ((backpatches (octet-buffer-backpatches buffer)))
|
|
(reset-buffer-chain
|
|
backpatches
|
|
(if (cdr backpatch-chain)
|
|
(list (new-block backpatches
|
|
(loop for block in backpatch-chain
|
|
sum (length block))))
|
|
backpatch-chain)))))
|
|
|
|
;; Given either a SERIALIZED-PROTOBUF or a BUFFER, return the concatenation
|
|
;; of all BLOCKS. You probably don't want to do this on an uncompacted BUFFER.
|
|
;; That usually makes no sense in any scenario other than debugging.
|
|
(defun concatenate-blocks (buffer)
|
|
(multiple-value-bind (total-length blocks)
|
|
(etypecase buffer
|
|
(serialized-protobuf
|
|
(values (serialized-protobuf-total-length buffer)
|
|
(serialized-protobuf-blocks buffer)))
|
|
(buffer
|
|
(values (loop for (block . rest) on (buffer-chain buffer)
|
|
sum (if rest (length (the (simple-array * 1) block))
|
|
(buffer-index buffer))
|
|
fixnum)
|
|
(buffer-chain buffer))))
|
|
(declare (array-index total-length))
|
|
(let ((result (make-array total-length :element-type 'octet-type))
|
|
(index 0))
|
|
(declare (array-index index))
|
|
(dolist (block blocks result)
|
|
(replace result (the (simple-array octet-type 1) block) :start1 index)
|
|
(incf index (length (the (simple-array * 1) block)))))))
|
|
|
|
;; Given a BUFFER or a SERIALIZED-PROTOBUF, call FUNCTION once with each
|
|
;; block, passing it also the effective length of the block.
|
|
(defun call-with-each-block (function buffer)
|
|
(etypecase buffer
|
|
(serialized-protobuf
|
|
(let ((blocks (serialized-protobuf-blocks buffer)))
|
|
(loop
|
|
(let ((block (car blocks)))
|
|
(funcall function block
|
|
(if (cdr blocks)
|
|
(length (the (simple-array * 1) block))
|
|
(serialized-protobuf-final-block-length buffer))))
|
|
(pop blocks)
|
|
(if (null blocks) (return)))))
|
|
(buffer
|
|
(let ((blocks (buffer-chain buffer)))
|
|
(loop
|
|
(let ((block (car blocks)))
|
|
(funcall function block
|
|
(if (cdr blocks)
|
|
(length (the (simple-array * 1) block))
|
|
(buffer-index buffer))))
|
|
(pop blocks)
|
|
(if (null blocks) (return)))))))
|
|
|
|
;;;
|
|
|
|
#+sbcl
|
|
(declaim (sb-ext:maybe-inline encode-uint32))
|
|
(macrolet ((define-varint-encoder (name reserve-bytes lisp-type
|
|
&optional (expr 'input))
|
|
`(progn
|
|
(declaim (ftype (function (,lisp-type buffer)
|
|
(values (integer 1 ,(or reserve-bytes 5)) &optional))
|
|
,name))
|
|
(defun ,name (input buffer &aux (val ,expr))
|
|
(declare (type ,lisp-type input)
|
|
(type (unsigned-byte ,(second lisp-type)) val))
|
|
;; The locally declare gives us optimizations inside the locally
|
|
;; but leaves the typechecking in the function.
|
|
(locally
|
|
(declare #.$optimize-buffering)
|
|
,@(when reserve-bytes
|
|
`((buffer-ensure-space buffer ,reserve-bytes)))
|
|
(let ((n 0))
|
|
(declare (fixnum n))
|
|
(loop (let ((bits (ldb (byte 7 0) val)))
|
|
(setq val (ash val -7))
|
|
(fast-octet-out buffer
|
|
(ilogior bits (if (i= val 0) 0 128)))
|
|
(iincf n))
|
|
(when (eql val 0) (return n)))))))))
|
|
|
|
(define-varint-encoder encode-uint32 5 (unsigned-byte 32))
|
|
(define-varint-encoder encode-uint64 10 (unsigned-byte 64))
|
|
|
|
;; It is best to keep all occurrences of (LDB (BYTE 64 0) ...) out of calling code
|
|
;; because that forces boxing in many cases, and even it if doesn't create a new bignum,
|
|
;; it causes generic arithmetic routines to be used.
|
|
;; Hiding the LDB operation inside a primitive encoder is better for efficiency.
|
|
(define-varint-encoder encode-int64 10 (signed-byte 64)
|
|
;; On SBCL the LOGAND compiles to nothing.
|
|
#+sbcl (logand input sb-vm::most-positive-word)
|
|
#-sbcl (ldb (byte 64 0) input))
|
|
|
|
;; FAST-ENCODE simply omits the call to ENSURE-SPACE and might not be worth keeping
|
|
(define-varint-encoder fast-encode-uint32 nil (unsigned-byte 32)))
|
|
|
|
(define-compiler-macro encode-uint32 (&whole form val buffer)
|
|
(let (encoded-length)
|
|
(if (and (typep val 'fixnum) (i<= (setq encoded-length (length32 val)) 2))
|
|
(let ((low7 (logand val #x7F)))
|
|
(case encoded-length
|
|
(1 `(progn (octet-out ,buffer ,low7)
|
|
1))
|
|
(2 `(progn (octet-out2 ,buffer ,(logior #x80 low7) ,(ldb (byte 7 7) val))
|
|
2))))
|
|
form)))
|
|
|
|
;; For encoding an object tag + wire-type, we can compile-time convert ENCODE-UINT32
|
|
;; into a few OCTET-OUT calls. I'll only do this for 1 and 2-octet writes though,
|
|
;; which is enough for field-indices up to (2^14)-1.
|
|
(defun octet-out2 (buffer first second)
|
|
(octet-out buffer first)
|
|
(octet-out buffer second))
|
|
|
|
;;;
|
|
|
|
;; A BUFFER does not, in general, interact through a stream interface
|
|
;; (WRITE-BYTE, WRITE-SEQUENCE) however there is some support in SBCL
|
|
;; for treating it as though it were a character output stream.
|
|
;; In general it is faster to use OCTET-OUT, however a stream produces
|
|
;; less garbage if the alternative would be to call WRITE-TO-STRING on
|
|
;; something and serialize the resultant string. The buffer can do this
|
|
;; for you as long as you only write ASCII characters, because the
|
|
;; stream mode does not have a UTF-8 encoder. (It could, but doesn't)
|
|
|
|
#+sbcl
|
|
(progn
|
|
(defstruct (octet-output-stream
|
|
(:conc-name octet-stream-)
|
|
;; Maybe Todo: supply a BOUT (byte-out) handler function.
|
|
(:include sb-kernel:ansi-stream
|
|
;; "OUT" is the old slot name, "COUT" is the modern name
|
|
(#.(if (find-symbol "ANSI-STREAM-OUT" "SB-KERNEL") 'out 'cout)
|
|
#'octet-stream-char-out)
|
|
(sout #'octet-stream-string-out))
|
|
(:constructor make-octet-output-stream (buffer)))
|
|
;; How many characters should the character producer be permitted to write
|
|
;; before we complain about a protocol error.
|
|
(space-available 0 :type fixnum)
|
|
(buffer nil :type octet-buffer))
|
|
|
|
(defun protocol-error (stream)
|
|
(protobuf-error "Octet stream protocol error on ~S" stream))
|
|
|
|
(defun octet-stream-char-out (stream character)
|
|
;; A streamified BUFFER accept only ASCII characters (for now).
|
|
;; This is more of a sanity-check than a limitation, and it's a mild
|
|
;; limitation if that- the ENCODE-STRING protobuf serializer performs
|
|
;; encoding and doesn't use its BUFFER as a stream. It uses OCTETS-OUT.
|
|
(unless (<= (char-code character) 127)
|
|
(protocol-error stream))
|
|
(octet-out (octet-stream-buffer stream) (char-code character)))
|
|
|
|
(defun octet-stream-limited-char-out (stream character)
|
|
(cond ((or (zerop (octet-stream-space-available stream))
|
|
(> (char-code character) 127))
|
|
(protocol-error stream))
|
|
(t
|
|
(decf (octet-stream-space-available stream))
|
|
(octet-out (octet-stream-buffer stream) (char-code character)))))
|
|
|
|
(macrolet ((ansi-stream-char-out-method (x)
|
|
`(,(or (find-symbol "ANSI-STREAM-COUT" "SB-KERNEL")
|
|
(find-symbol "ANSI-STREAM-OUT" "SB-KERNEL"))
|
|
,x)))
|
|
(defun octet-stream-string-out (stream string start end)
|
|
(declare (string string) (array-index start end))
|
|
(let ((f (ansi-stream-char-out-method stream)))
|
|
(sb-kernel:with-array-data ((string string) (start start) (end end))
|
|
(loop for i fixnum from start below end
|
|
do (funcall f stream (char string i))))))
|
|
|
|
(defun %get-buffer-stream (buffer)
|
|
(or (octet-buffer-stream buffer)
|
|
(setf (octet-buffer-stream buffer) (make-octet-output-stream buffer))))
|
|
|
|
(declaim (ftype (function (buffer) (values stream &optional))
|
|
get-unlimited-buffer-stream get-tiny-buffer-stream)
|
|
(ftype (function (buffer fixnum) (values stream &optional))
|
|
get-bounded-buffer-stream))
|
|
|
|
;; Return a stream that accepts any number of characters.
|
|
;; A placeholder must already have been reserved for the length prefix.
|
|
(defun get-unlimited-buffer-stream (buffer)
|
|
(let ((stream (%get-buffer-stream buffer)))
|
|
;; Setting the space to 0 ensures we can't call the 'limited'
|
|
;; char out function without getting an obvious failure.
|
|
(setf (octet-stream-space-available stream) 0
|
|
(ansi-stream-char-out-method stream) #'octet-stream-char-out)
|
|
stream))
|
|
|
|
;; Return a stream that accepts a tiny string. 1 byte is reserved for the length.
|
|
(defun get-tiny-buffer-stream (buffer)
|
|
(buffer-ensure-space buffer 128) ; 1 byte prefix, <= 127 string characters
|
|
(fast-octet-out buffer 0) ; easy way to leave a 1-byte space
|
|
(let ((stream (%get-buffer-stream buffer)))
|
|
(setf (octet-stream-space-available stream) 127
|
|
(ansi-stream-char-out-method stream) #'octet-stream-limited-char-out)
|
|
stream))
|
|
|
|
;; Return a stream that accepts a known-length string. The length gets encoded first.
|
|
(defun get-bounded-buffer-stream (buffer n-chars)
|
|
(encode-uint32 n-chars buffer) ; emit the variable-length length prefix
|
|
(let ((stream (%get-buffer-stream buffer)))
|
|
(setf (octet-stream-space-available stream) n-chars
|
|
(ansi-stream-char-out-method stream) #'octet-stream-limited-char-out)
|
|
stream))
|
|
)
|
|
|
|
;; WITH-BUFFER-AS-STREAM binds STREAM to a character output stream that when written to
|
|
;; places ASCII characters into BUFFER. There are three cases, listed here
|
|
;; in order from most efficient to least efficient:
|
|
;; 1. (WITH-BUFFER-AS-STREAM (stream buffer :length n)
|
|
;; Length specified as an integer N (evaluated at runtime) will encode a prefix of N
|
|
;; then accept N characters. Writing anything other than exactly N will signal an eror.
|
|
;; 2. (WITH-BUFFER-AS-STREAM (stream buffer :length :TINY)
|
|
;; Length specified as the literal symbol :TINY will leave a 1-byte gap for a prefix.
|
|
;; (... :length N) where N runtime evaluates to the keyword :TINY is not legal.
|
|
;; Between 0 and 127 characters may be written, and the prefix will be modified accordingly.
|
|
;; An error will be signaled if more than 127 characters are written.
|
|
;; 3. (WITH-BUFFER-AS-STREAM (stream buffer) ...)
|
|
;; No length specified will leave a 4-byte placeholder for an arbitrary length and
|
|
;; backpatch it in. This relies on buffer compactification in the same way as does
|
|
;; writing of an unknown-length submessage.
|
|
|
|
;; In all cases, non-ASCII characters are rejected.
|
|
;; If TAG is supplied, it is encoded prior to the encoding of the string data.
|
|
;; This macro should be used for effect, not value - its return value is undefined.
|
|
|
|
(defmacro with-buffer-as-stream ((stream-var buffer &key length (tag nil tag-p))
|
|
&body body &environment env)
|
|
(with-gensyms (start-pos start-block start-index)
|
|
`(progn
|
|
,@(if tag-p `((encode-uint32 ,tag ,buffer)))
|
|
,(cond ((not length) ; most general
|
|
`(with-placeholder (,buffer :position ,start-pos)
|
|
(let ((,stream-var (get-unlimited-buffer-stream ,buffer)))
|
|
,@body)
|
|
(backpatch
|
|
(i- (buffer-absolute-position ,buffer)
|
|
;; Buffer's absolute pos was marked at the first octet of the
|
|
;; placeholder for the varint.
|
|
;; Actual number of chars written is 4 less than that.
|
|
,start-pos 4))))
|
|
((eq length :tiny)
|
|
`(let ((,stream-var (get-tiny-buffer-stream ,buffer))
|
|
(,start-block (octet-buffer-block ,buffer))
|
|
(,start-index (1- (buffer-index ,buffer))))
|
|
,@body
|
|
(locally
|
|
,@(when (sb-c:policy env (= safety 0))
|
|
`((declare (optimize (sb-c::insert-array-bounds-checks 0)))))
|
|
(setf (aref ,start-block ,start-index)
|
|
(i- 127 (octet-stream-space-available ,stream-var))))))
|
|
(t
|
|
`(let ((,stream-var (get-bounded-buffer-stream ,buffer ,length)))
|
|
,@body
|
|
,@(when (sb-c:policy env (> safety 0))
|
|
;; The stream will croak upon trying to write >LENGTH chars.
|
|
;; With safety, ensure *exactly* that many were written.
|
|
`((unless (zerop (octet-stream-space-available ,stream-var))
|
|
(protocol-error ,stream-var))))))))))
|
|
|
|
) ; end of #+sbcl (PROGN ...)
|
|
|
|
;; The portable implementation of WITH-BUFFER-AS-STREAM
|
|
#-sbcl
|
|
(defmacro with-buffer-as-stream ((stream-var buffer &key length) &body body)
|
|
(declare (ignore length))
|
|
`(let ((,stream-var (make-string-output-stream)))
|
|
,@body
|
|
(encode-string (get-output-stream-string ,stream-var)
|
|
,buffer)))
|