summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-08-17 09:05:45 +0300
committerPaul Buetow <paul@buetow.org>2025-08-17 09:05:45 +0300
commitd5fbb6ef5957894eb5be0854bdb328a6774abddb (patch)
tree7558a3d1c70db85d54f3367a5c81c28e7260392b /internal
parent5d9e197a394089f66539320e77abb3f3689b3381 (diff)
cli: stream responses in hexai when supported (OpenAI, Ollama)
- Add llm.Streamer optional interface - Implement ChatStream for OpenAI (SSE) and Ollama (JSON stream) - CLI uses streaming; LSP unchanged (non-streaming) - README: document streaming behavior for CLI
Diffstat (limited to 'internal')
-rw-r--r--internal/llm/ollama.go116
-rw-r--r--internal/llm/openai.go146
-rw-r--r--internal/llm/provider.go22
3 files changed, 254 insertions, 30 deletions
diff --git a/internal/llm/ollama.go b/internal/llm/ollama.go
index e8b75c9..ffee354 100644
--- a/internal/llm/ollama.go
+++ b/internal/llm/ollama.go
@@ -1,14 +1,15 @@
package llm
import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "net/http"
- "strings"
- "time"
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "time"
"hexai/internal/logging"
)
@@ -35,10 +36,10 @@ func newOllama(baseURL, model string) Client {
}
type ollamaChatRequest struct {
- Model string `json:"model"`
- Messages []oaMessage `json:"messages"`
- Stream bool `json:"stream"`
- Options any `json:"options,omitempty"`
+ Model string `json:"model"`
+ Messages []oaMessage `json:"messages"`
+ Stream bool `json:"stream"`
+ Options any `json:"options,omitempty"`
}
type ollamaChatResponse struct {
@@ -133,3 +134,94 @@ func (c *ollamaClient) Chat(ctx context.Context, messages []Message, opts ...Req
// Provider metadata
func (c *ollamaClient) Name() string { return "ollama" }
func (c *ollamaClient) DefaultModel() string { return c.defaultModel }
+
+// Streaming support (optional)
+func (c *ollamaClient) ChatStream(ctx context.Context, messages []Message, onDelta func(string), opts ...RequestOption) error {
+ o := Options{Model: c.defaultModel}
+ for _, opt := range opts {
+ opt(&o)
+ }
+ if o.Model == "" {
+ o.Model = c.defaultModel
+ }
+
+ start := time.Now()
+ logging.Logf("llm/ollama ", "stream start model=%s temp=%.2f max_tokens=%d stop=%d messages=%d", o.Model, o.Temperature, o.MaxTokens, len(o.Stop), len(messages))
+ for i, m := range messages {
+ logging.Logf("llm/ollama ", "msg[%d] role=%s size=%d preview=%s%s%s", i, m.Role, len(m.Content), logging.AnsiCyan, logging.PreviewForLog(m.Content), logging.AnsiBase)
+ }
+
+ req := ollamaChatRequest{Model: o.Model, Stream: true}
+ req.Messages = make([]oaMessage, len(messages))
+ for i, m := range messages {
+ req.Messages[i] = oaMessage{Role: m.Role, Content: m.Content}
+ }
+ // Build options map
+ optsMap := map[string]any{}
+ if o.Temperature != 0 {
+ optsMap["temperature"] = o.Temperature
+ }
+ if o.MaxTokens > 0 {
+ optsMap["num_predict"] = o.MaxTokens
+ }
+ if len(o.Stop) > 0 {
+ optsMap["stop"] = o.Stop
+ }
+ if len(optsMap) > 0 {
+ req.Options = optsMap
+ }
+
+ body, err := json.Marshal(req)
+ if err != nil {
+ return err
+ }
+
+ endpoint := c.baseURL + "/api/chat"
+ logging.Logf("llm/ollama ", "POST %s (stream)", endpoint)
+ httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
+ if err != nil {
+ return err
+ }
+ httpReq.Header.Set("Content-Type", "application/json")
+
+ resp, err := c.httpClient.Do(httpReq)
+ if err != nil {
+ logging.Logf("llm/ollama ", "%shttp error after %s: %v%s", logging.AnsiRed, time.Since(start), err, logging.AnsiBase)
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+ var apiErr ollamaChatResponse
+ _ = json.NewDecoder(resp.Body).Decode(&apiErr)
+ if strings.TrimSpace(apiErr.Error) != "" {
+ logging.Logf("llm/ollama ", "%sapi error status=%d msg=%s duration=%s%s", logging.AnsiRed, resp.StatusCode, apiErr.Error, time.Since(start), logging.AnsiBase)
+ return fmt.Errorf("ollama error: %s (status %d)", apiErr.Error, resp.StatusCode)
+ }
+ logging.Logf("llm/ollama ", "%shttp non-2xx status=%d duration=%s%s", logging.AnsiRed, resp.StatusCode, time.Since(start), logging.AnsiBase)
+ return fmt.Errorf("ollama http error: status %d", resp.StatusCode)
+ }
+
+ dec := json.NewDecoder(resp.Body)
+ for {
+ var ev ollamaChatResponse
+ if err := dec.Decode(&ev); err != nil {
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ logging.Logf("llm/ollama ", "%sdecode stream error after %s: %v%s", logging.AnsiRed, time.Since(start), err, logging.AnsiBase)
+ return err
+ }
+ if strings.TrimSpace(ev.Error) != "" {
+ logging.Logf("llm/ollama ", "%sstream event error: %s%s", logging.AnsiRed, ev.Error, logging.AnsiBase)
+ return fmt.Errorf("ollama stream error: %s", ev.Error)
+ }
+ if s := ev.Message.Content; strings.TrimSpace(s) != "" {
+ onDelta(s)
+ }
+ if ev.Done {
+ break
+ }
+ }
+ logging.Logf("llm/ollama ", "stream end duration=%s", time.Since(start))
+ return nil
+}
diff --git a/internal/llm/openai.go b/internal/llm/openai.go
index 03e894a..080d4e9 100644
--- a/internal/llm/openai.go
+++ b/internal/llm/openai.go
@@ -1,16 +1,17 @@
package llm
import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "fmt"
+ "bufio"
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
"net/http"
"strings"
- "time"
+ "time"
- "hexai/internal/logging"
+ "hexai/internal/logging"
)
// openAIClient implements Client against OpenAI's Chat Completions API.
@@ -41,11 +42,12 @@ func newOpenAI(baseURL, model, apiKey string) Client {
}
type oaChatRequest struct {
- Model string `json:"model"`
- Messages []oaMessage `json:"messages"`
- Temperature *float64 `json:"temperature,omitempty"`
- MaxTokens *int `json:"max_tokens,omitempty"`
- Stop []string `json:"stop,omitempty"`
+ Model string `json:"model"`
+ Messages []oaMessage `json:"messages"`
+ Temperature *float64 `json:"temperature,omitempty"`
+ MaxTokens *int `json:"max_tokens,omitempty"`
+ Stop []string `json:"stop,omitempty"`
+ Stream bool `json:"stream,omitempty"`
}
type oaMessage struct {
@@ -163,3 +165,123 @@ func trimPreview(s string, n int) string {
// Provider metadata
func (c *openAIClient) Name() string { return "openai" }
func (c *openAIClient) DefaultModel() string { return c.defaultModel }
+
+// Streaming support (optional)
+type oaStreamChunk struct {
+ Choices []struct {
+ Delta struct {
+ Content string `json:"content"`
+ } `json:"delta"`
+ FinishReason string `json:"finish_reason"`
+ } `json:"choices"`
+ Error *struct {
+ Message string `json:"message"`
+ Type string `json:"type"`
+ Param any `json:"param"`
+ Code any `json:"code"`
+ } `json:"error,omitempty"`
+}
+
+func (c *openAIClient) ChatStream(ctx context.Context, messages []Message, onDelta func(string), opts ...RequestOption) error {
+ if c.apiKey == "" {
+ return errors.New("missing OpenAI API key")
+ }
+ o := Options{Model: c.defaultModel}
+ for _, opt := range opts {
+ opt(&o)
+ }
+ if o.Model == "" {
+ o.Model = c.defaultModel
+ }
+ start := time.Now()
+ logging.Logf("llm/openai ", "stream start model=%s temp=%.2f max_tokens=%d stop=%d messages=%d", o.Model, o.Temperature, o.MaxTokens, len(o.Stop), len(messages))
+ for i, m := range messages {
+ logging.Logf("llm/openai ", "msg[%d] role=%s size=%d preview=%s%s%s", i, m.Role, len(m.Content), logging.AnsiCyan, logging.PreviewForLog(m.Content), logging.AnsiBase)
+ }
+
+ req := oaChatRequest{Model: o.Model, Stream: true}
+ req.Messages = make([]oaMessage, len(messages))
+ for i, m := range messages {
+ req.Messages[i] = oaMessage{Role: m.Role, Content: m.Content}
+ }
+ if o.Temperature != 0 {
+ req.Temperature = &o.Temperature
+ }
+ if o.MaxTokens > 0 {
+ req.MaxTokens = &o.MaxTokens
+ }
+ if len(o.Stop) > 0 {
+ req.Stop = o.Stop
+ }
+
+ body, err := json.Marshal(req)
+ if err != nil {
+ c.logf("marshal error: %v", err)
+ return err
+ }
+ endpoint := c.baseURL + "/chat/completions"
+ logging.Logf("llm/openai ", "POST %s (stream)", endpoint)
+ httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
+ if err != nil {
+ c.logf("new request error: %v", err)
+ return err
+ }
+ httpReq.Header.Set("Content-Type", "application/json")
+ httpReq.Header.Set("Authorization", "Bearer "+c.apiKey)
+ // Streaming uses SSE-style data lines
+ httpReq.Header.Set("Accept", "text/event-stream")
+
+ resp, err := c.httpClient.Do(httpReq)
+ if err != nil {
+ logging.Logf("llm/openai ", "%shttp error after %s: %v%s", logging.AnsiRed, time.Since(start), err, logging.AnsiBase)
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+ // try to decode body to surface message
+ var apiErr oaChatResponse
+ _ = json.NewDecoder(resp.Body).Decode(&apiErr)
+ if apiErr.Error != nil && apiErr.Error.Message != "" {
+ logging.Logf("llm/openai ", "%sapi error status=%d type=%s msg=%s duration=%s%s", logging.AnsiRed, resp.StatusCode, apiErr.Error.Type, apiErr.Error.Message, time.Since(start), logging.AnsiBase)
+ return fmt.Errorf("openai error: %s (status %d)", apiErr.Error.Message, resp.StatusCode)
+ }
+ logging.Logf("llm/openai ", "%shttp non-2xx status=%d duration=%s%s", logging.AnsiRed, resp.StatusCode, time.Since(start), logging.AnsiBase)
+ return fmt.Errorf("openai http error: status %d", resp.StatusCode)
+ }
+
+ // Parse SSE: lines starting with "data: " containing JSON or [DONE]
+ scanner := bufio.NewScanner(resp.Body)
+ // Increase buffer for long lines
+ const maxBuf = 1024 * 1024
+ buf := make([]byte, 0, 64*1024)
+ scanner.Buffer(buf, maxBuf)
+ for scanner.Scan() {
+ line := scanner.Text()
+ if !strings.HasPrefix(line, "data: ") {
+ continue
+ }
+ payload := strings.TrimPrefix(line, "data: ")
+ if strings.TrimSpace(payload) == "[DONE]" {
+ break
+ }
+ var chunk oaStreamChunk
+ if err := json.Unmarshal([]byte(payload), &chunk); err != nil {
+ continue // skip malformed lines
+ }
+ if chunk.Error != nil && chunk.Error.Message != "" {
+ logging.Logf("llm/openai ", "%sstream error: %s%s", logging.AnsiRed, chunk.Error.Message, logging.AnsiBase)
+ return fmt.Errorf("openai stream error: %s", chunk.Error.Message)
+ }
+ for _, ch := range chunk.Choices {
+ if ch.Delta.Content != "" {
+ onDelta(ch.Delta.Content)
+ }
+ }
+ }
+ if err := scanner.Err(); err != nil {
+ logging.Logf("llm/openai ", "%sstream read error after %s: %v%s", logging.AnsiRed, time.Since(start), err, logging.AnsiBase)
+ return err
+ }
+ logging.Logf("llm/openai ", "stream end duration=%s", time.Since(start))
+ return nil
+}
diff --git a/internal/llm/provider.go b/internal/llm/provider.go
index dda3d16..3e3023e 100644
--- a/internal/llm/provider.go
+++ b/internal/llm/provider.go
@@ -15,12 +15,22 @@ type Message struct {
// Client is a minimal LLM provider interface.
// Future providers (Ollama, etc.) should implement this.
type Client interface {
- // Chat sends chat messages and returns the assistant text.
- Chat(ctx context.Context, messages []Message, opts ...RequestOption) (string, error)
- // Name returns the provider's short name (e.g., "openai", "ollama").
- Name() string
- // DefaultModel returns the configured default model name.
- DefaultModel() string
+ // Chat sends chat messages and returns the assistant text.
+ Chat(ctx context.Context, messages []Message, opts ...RequestOption) (string, error)
+ // Name returns the provider's short name (e.g., "openai", "ollama").
+ Name() string
+ // DefaultModel returns the configured default model name.
+ DefaultModel() string
+}
+
+// Streamer is an optional interface that providers may implement to support
+// token-by-token streaming responses. Callers can type-assert to Streamer and
+// fall back to Client.Chat when not implemented.
+type Streamer interface {
+ // ChatStream sends chat messages and invokes onDelta with incremental text
+ // chunks as they are produced by the model. Implementations should call
+ // onDelta with empty strings sparingly (prefer only non-empty chunks).
+ ChatStream(ctx context.Context, messages []Message, onDelta func(string), opts ...RequestOption) error
}
// Options for a request. Providers may ignore unsupported fields.