commit 14387f3fc0668827d0d1978f95d6c09564c0f5f6
parent 05b9ff75c6cdeee71eaa65ff7246edac979e712b
Author: William Casarin <jb55@jb55.com>
Date: Sun, 15 Feb 2026 20:06:06 -0800
md-stream: fix code block detection after paragraph with single newline
Refactor StreamParser from Vec<String> token storage to a single
contiguous String buffer, eliminating cross-token boundary issues
that prevented code fence detection in char-by-char streaming.
Key fixes:
- Buffer consolidation: push() is now zero-alloc (push_str)
- could_be_block_start() defers ambiguous prefixes (` -> ```)
- at_line_start properly cleared on non-newline accumulation
- Paragraph partial saved before try_block_start to prevent clobbering
- Code fence closing detection defers on partial fence chars
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat:
3 files changed, 195 insertions(+), 94 deletions(-)
diff --git a/crates/md-stream/src/parser.rs b/crates/md-stream/src/parser.rs
@@ -6,14 +6,11 @@ use crate::partial::{Partial, PartialKind};
/// Incremental markdown parser for streaming input.
///
-/// Maintains a buffer of incoming tokens and tracks parsing state
-/// to allow progressive rendering as content streams in.
+/// Maintains a single contiguous buffer of incoming text and tracks
+/// a processing cursor to allow progressive rendering as content streams in.
pub struct StreamParser {
- /// Raw token chunks from the stream
- tokens: Vec<String>,
-
- /// Total bytes in tokens (for efficient length tracking)
- total_bytes: usize,
+ /// Contiguous buffer of all pushed text
+ buffer: String,
/// Completed markdown elements
parsed: Vec<MdElement>,
@@ -21,11 +18,8 @@ pub struct StreamParser {
/// Current in-progress element (if any)
partial: Option<Partial>,
- /// Index of first unprocessed token
- process_idx: usize,
-
- /// Byte offset within the token at process_idx
- process_offset: usize,
+ /// Byte offset of first unprocessed content in buffer
+ process_pos: usize,
/// Are we at the start of a line? (for block-level detection)
at_line_start: bool,
@@ -34,12 +28,10 @@ pub struct StreamParser {
impl StreamParser {
pub fn new() -> Self {
Self {
- tokens: Vec::new(),
- total_bytes: 0,
+ buffer: String::new(),
parsed: Vec::new(),
partial: None,
- process_idx: 0,
- process_offset: 0,
+ process_pos: 0,
at_line_start: true,
}
}
@@ -50,8 +42,7 @@ impl StreamParser {
return;
}
- self.tokens.push(token.to_string());
- self.total_bytes += token.len();
+ self.buffer.push_str(token);
self.process_new_content();
}
@@ -84,20 +75,15 @@ impl StreamParser {
)
}
+ /// Get the unprocessed portion of the buffer.
+ fn remaining(&self) -> &str {
+ &self.buffer[self.process_pos..]
+ }
+
/// Process newly added content.
fn process_new_content(&mut self) {
- while self.process_idx < self.tokens.len() {
- // Clone the remaining text to avoid borrow conflicts
- let remaining = {
- let token = &self.tokens[self.process_idx];
- let slice = &token[self.process_offset..];
- if slice.is_empty() {
- self.process_idx += 1;
- self.process_offset = 0;
- continue;
- }
- slice.to_string()
- };
+ while self.process_pos < self.buffer.len() {
+ let remaining = self.remaining().to_string();
// Handle based on current partial state
let partial_kind = self.partial.as_ref().map(|p| p.kind.clone());
@@ -122,17 +108,31 @@ impl StreamParser {
PartialKind::Paragraph => {
// For paragraphs, check if we're at a line start that could be a block element
if self.at_line_start {
+ // Take the paragraph partial first — try_block_start may
+ // replace self.partial with the new block element
+ let para_partial = self.partial.take();
+
if let Some(consumed) = self.try_block_start(&remaining) {
- // Emit the current paragraph before starting the new block
- if let Some(partial) = self.partial.take() {
- if !partial.content.trim().is_empty() {
- let inline_elements = parse_inline(partial.content.trim());
+ // Emit the saved paragraph before the new block
+ if let Some(partial) = para_partial {
+ let trimmed = partial.content.trim();
+ if !trimmed.is_empty() {
+ let inline_elements = parse_inline(trimmed);
self.parsed.push(MdElement::Paragraph(inline_elements));
}
}
self.advance(consumed);
continue;
}
+
+ // Block start failed — restore the paragraph partial
+ self.partial = para_partial;
+ // If remaining could be the start of a block element but we
+ // don't have enough chars yet, wait for more input rather than
+ // consuming into the paragraph (e.g. "`" could become "```")
+ if self.could_be_block_start(&remaining) {
+ return;
+ }
}
// Continue with inline processing
if self.process_inline(&remaining) {
@@ -156,6 +156,9 @@ impl StreamParser {
self.advance(consumed);
continue;
}
+ if self.could_be_block_start(&remaining) {
+ return;
+ }
}
// Fall back to inline processing
@@ -166,6 +169,35 @@ impl StreamParser {
}
}
+ /// Check if text could be the start of a block element but we don't
+ /// have enough characters to confirm yet. Used to defer consuming
+ /// ambiguous prefixes like "`" or "``" that might become "```".
+ fn could_be_block_start(&self, text: &str) -> bool {
+ let trimmed = text.trim_start();
+ if trimmed.is_empty() {
+ return false;
+ }
+
+ // Could be a code fence: need at least 3 backticks or tildes
+ if trimmed.len() < 3 {
+ let first = trimmed.as_bytes()[0];
+ if first == b'`' || first == b'~' {
+ // All chars so far are the same fence char
+ return trimmed.bytes().all(|b| b == first);
+ }
+ }
+
+ // Could be a thematic break: need "---", "***", or "___"
+ if trimmed.len() < 3 {
+ let first = trimmed.as_bytes()[0];
+ if first == b'-' || first == b'*' || first == b'_' {
+ return trimmed.bytes().all(|b| b == first);
+ }
+ }
+
+ false
+ }
+
/// Try to detect a block-level element at line start.
/// Returns bytes consumed if successful.
fn try_block_start(&mut self, text: &str) -> Option<usize> {
@@ -180,7 +212,7 @@ impl StreamParser {
if rest.starts_with(' ') || rest.is_empty() {
self.partial = Some(Partial::new(
PartialKind::Heading { level: level as u8 },
- self.process_idx,
+ self.process_pos,
));
self.at_line_start = false;
return Some(leading_space + level + rest.starts_with(' ') as usize);
@@ -225,7 +257,7 @@ impl StreamParser {
fence_len,
language,
},
- self.process_idx,
+ self.process_pos,
));
self.at_line_start = false;
return Some(leading_space + fence_len + consumed_lang);
@@ -255,34 +287,52 @@ impl StreamParser {
/// Process content inside a code fence.
/// Returns true if we should continue processing, false if we need more input.
fn process_code_fence(&mut self, fence_char: char, fence_len: usize, text: &str) -> bool {
- let closing = std::iter::repeat_n(fence_char, fence_len).collect::<String>();
-
- // Look for closing fence at start of line
let partial = self.partial.as_mut().unwrap();
for line in text.split_inclusive('\n') {
- let trimmed = line.trim_start();
- if trimmed.starts_with(&closing) {
- // Check it's a valid closing fence (only fence chars and whitespace after)
- let after_fence = &trimmed[fence_len..];
- if after_fence.trim().is_empty() || after_fence.starts_with('\n') {
- // Found closing fence! Complete the code block
- let language = if let PartialKind::CodeFence { language, .. } = &partial.kind {
- language.clone()
- } else {
- None
- };
-
- let content = std::mem::take(&mut partial.content);
- self.parsed
- .push(MdElement::CodeBlock(CodeBlock { language, content }));
- self.partial = None;
- self.at_line_start = true;
+ // Check if we're at a line start within the code fence
+ let at_content_line_start =
+ partial.content.is_empty() || partial.content.ends_with('\n');
+
+ if at_content_line_start {
+ let trimmed = line.trim_start();
+
+ // Check for closing fence
+ if trimmed.len() >= fence_len
+ && trimmed.as_bytes().iter().take(fence_len).all(|&b| b == fence_char as u8)
+ {
+ let after_fence = &trimmed[fence_len..];
+ if after_fence.trim().is_empty() || after_fence.starts_with('\n') {
+ // Found closing fence! Complete the code block
+ let language =
+ if let PartialKind::CodeFence { language, .. } = &partial.kind {
+ language.clone()
+ } else {
+ None
+ };
+
+ let content = std::mem::take(&mut partial.content);
+ self.parsed
+ .push(MdElement::CodeBlock(CodeBlock { language, content }));
+ self.partial = None;
+ self.at_line_start = true;
+
+ // Advance past the closing fence line
+ let consumed = text.find(line).unwrap() + line.len();
+ self.advance(consumed);
+ return true;
+ }
+ }
- // Advance past the closing fence line
- let consumed = text.find(line).unwrap() + line.len();
- self.advance(consumed);
- return true;
+ // If this could be the start of a closing fence but we don't
+ // have enough chars yet, wait for more input
+ if !trimmed.is_empty()
+ && trimmed.len() < fence_len
+ && trimmed.bytes().all(|b| b == fence_char as u8)
+ && !line.contains('\n')
+ {
+ // Don't advance — wait for more chars
+ return false;
}
}
@@ -360,6 +410,35 @@ impl StreamParser {
}
if let Some(nl_pos) = text.find('\n') {
+ let after_nl = &text[nl_pos + 1..];
+
+ // Check if text after the newline starts a block element (code fence, heading, etc.)
+ // If so, emit the current paragraph and let the block parser handle the rest.
+ if !after_nl.is_empty() {
+ let trimmed_after = after_nl.trim_start();
+ let is_block_start = trimmed_after.starts_with("```")
+ || trimmed_after.starts_with("~~~")
+ || trimmed_after.starts_with('#');
+ if is_block_start {
+ // Accumulate text before the newline into the paragraph
+ let para_text = if let Some(ref mut partial) = self.partial {
+ partial.content.push_str(&text[..nl_pos]);
+ std::mem::take(&mut partial.content)
+ } else {
+ text[..nl_pos].to_string()
+ };
+ self.partial = None;
+
+ if !para_text.trim().is_empty() {
+ let inline_elements = parse_inline(para_text.trim());
+ self.parsed.push(MdElement::Paragraph(inline_elements));
+ }
+ self.at_line_start = true;
+ self.advance(nl_pos + 1);
+ return true;
+ }
+ }
+
// Single newline - continue accumulating but track position
if let Some(ref mut partial) = self.partial {
partial.content.push_str(&text[..=nl_pos]);
@@ -368,8 +447,7 @@ impl StreamParser {
let content = text[..=nl_pos].to_string();
self.partial = Some(Partial {
kind: PartialKind::Paragraph,
- start_idx: self.process_idx,
- byte_offset: self.process_offset,
+ start_pos: self.process_pos,
content,
});
}
@@ -384,29 +462,18 @@ impl StreamParser {
} else {
self.partial = Some(Partial {
kind: PartialKind::Paragraph,
- start_idx: self.process_idx,
- byte_offset: self.process_offset,
+ start_pos: self.process_pos,
content: text.to_string(),
});
}
+ self.at_line_start = false;
self.advance(text.len());
false
}
/// Advance the processing position by n bytes.
fn advance(&mut self, n: usize) {
- let mut remaining = n;
- while remaining > 0 && self.process_idx < self.tokens.len() {
- let token_remaining = self.tokens[self.process_idx].len() - self.process_offset;
- if remaining >= token_remaining {
- remaining -= token_remaining;
- self.process_idx += 1;
- self.process_offset = 0;
- } else {
- self.process_offset += remaining;
- remaining = 0;
- }
- }
+ self.process_pos += n;
}
/// Finalize parsing (call when stream ends).
diff --git a/crates/md-stream/src/partial.rs b/crates/md-stream/src/partial.rs
@@ -7,32 +7,18 @@ pub struct Partial {
/// What kind of element we're building
pub kind: PartialKind,
- /// Index into the token buffer where this element starts.
- /// Used to resume parsing from the right spot.
- pub start_idx: usize,
-
- /// Byte offset within the starting token (for mid-token starts)
- pub byte_offset: usize,
+ /// Byte offset into the buffer where this element starts
+ pub start_pos: usize,
/// Accumulated content so far (for elements that need it)
pub content: String,
}
impl Partial {
- pub fn new(kind: PartialKind, start_idx: usize) -> Self {
- Self {
- kind,
- start_idx,
- byte_offset: 0,
- content: String::new(),
- }
- }
-
- pub fn with_offset(kind: PartialKind, start_idx: usize, byte_offset: usize) -> Self {
+ pub fn new(kind: PartialKind, start_pos: usize) -> Self {
Self {
kind,
- start_idx,
- byte_offset,
+ start_pos,
content: String::new(),
}
}
diff --git a/crates/md-stream/src/tests.rs b/crates/md-stream/src/tests.rs
@@ -497,6 +497,54 @@ fn test_streaming_multiple_code_spans_with_angle_brackets() {
}
#[test]
+fn test_code_block_after_paragraph_single_newline() {
+ // Reproduces: paragraph text ending with ":\n" then "```\n" code block
+ // This is the common pattern: "All events share these common tags:\n```\n..."
+ let mut parser = StreamParser::new();
+ let input = "All events share these common tags:\n```\n[\"d\", \"<session-id>\"]\n```\n";
+ parser.push(input);
+
+ eprintln!("Before finalize - parsed: {:#?}", parser.parsed());
+ eprintln!("Before finalize - partial: {:#?}", parser.partial());
+
+ parser.finalize();
+
+ eprintln!("After finalize - parsed: {:#?}", parser.parsed());
+
+ // Should have: paragraph + code block
+ let has_paragraph = parser.parsed().iter().any(|e| matches!(e, MdElement::Paragraph(_)));
+ let has_code_block = parser.parsed().iter().any(|e| matches!(e, MdElement::CodeBlock(_)));
+
+ assert!(has_paragraph, "Missing paragraph element");
+ assert!(has_code_block, "Missing code block element");
+}
+
+#[test]
+fn test_code_block_after_paragraph_single_newline_streaming() {
+ // Same test but streaming char-by-char (how LLM tokens arrive)
+ let mut parser = StreamParser::new();
+ let input = "All events share these common tags:\n```\n[\"d\", \"<session-id>\"]\n```\n";
+
+ for ch in input.chars() {
+ parser.push(&ch.to_string());
+ }
+
+ eprintln!("Before finalize - parsed: {:#?}", parser.parsed());
+ eprintln!("Before finalize - partial: {:#?}", parser.partial());
+ eprintln!("Before finalize - in_code_block: {}", parser.in_code_block());
+
+ parser.finalize();
+
+ eprintln!("After finalize - parsed: {:#?}", parser.parsed());
+
+ let has_paragraph = parser.parsed().iter().any(|e| matches!(e, MdElement::Paragraph(_)));
+ let has_code_block = parser.parsed().iter().any(|e| matches!(e, MdElement::CodeBlock(_)));
+
+ assert!(has_paragraph, "Missing paragraph element");
+ assert!(has_code_block, "Missing code block element");
+}
+
+#[test]
fn test_heading_partial_kind_distinct_from_paragraph() {
let mut parser = StreamParser::new();
parser.push("# Heading without newline");