はじめに:なぜ Go + Python のハイブリッド構成か
本記事では、Go言語のWebフレームワーク `chi` と、PythonのLLMライブラリ `LangGraph` を組み合わせたハイブリッドなバックエンドアーキテクチャについて、ドメイン駆動設計(DDD)の観点から解説します。公開APIはGoに一元化し、AI/LLM関連の重い処理はPythonに分離するこの構成は、それぞれの言語の得意分野を活かすための設計判断です。現在、LangGraphの公式実装はPythonのみであり、Go実装にはプロダクション採用レベルのものが存在しないという実情があります。このアーキテクチャでは、認証、認可、レートリミット、監査ログといった横断的関心事をGo製のAPI Gateway層に集約することで、堅牢性とメンテナンス性を両立させることを目指します。
採用アーキテクチャの全体像
本構成では、クリーンアーキテクチャとヘキサゴナルアーキテクチャ(ポーツ&アダプターズ)の思想を基盤に、戦術的DDDのパターンを適用します。これにより、ビジネスロジックを中心とした疎結合でテスト容易性の高いシステムを構築します。
Go (chi) 側の DDD ディレクトリ構成
Go側のAPI Gatewayは、以下のようなディレクトリ構成を採用しています。これは一般的なDDDのレイヤードアーキテクチャに沿っており、各レイヤーの責務を明確に分離しています。
internal/
├── interfaces/http/
│ ├── controller/ # 1 endpoint = 1 method
│ ├── schema/ # request/response + validate tag
│ ├── mapper/ # DTO ⇔ Schema
│ ├── route/ # chi のルート登録
│ └── presenter/ # error → HTTP status
├── application/<ctx>/
│ ├── dto/
│ ├── port/ # 外部サービス interface
│ └── usecase/ # 1 FE endpoint = 1 file
├── domain/<ctx>/
│ ├── entity/
│ ├── value_object/
│ ├── repository/ # interface
│ └── error/
└── infrastructure/
├── persistence/
│ ├── sqlc/queries/ # *.sql
│ ├── sqlc/gen/ # sqlc generate 出力
│ └── repository/ # domain interface 実装
└── dispatcher/ # 外部API集約レイヤごとのサンプルコードと責務
Domain Entity
ドメインエンティティは、ビジネスの中核となる概念とルールを表現します。単なるデータ構造ではなく、不変条件を保護するためのロジックを持ちます。`Validate()`のようなメソッドを配置し、エンティティが常に正当な状態であることを保証します。
// internal/domain/subject/entity/subject.go
package entity
type Subject struct {
ID string
Name string
Code SubjectCode
}
func (s *Subject) Validate() error {
if s.ID == "" || s.Name == "" {
return errors.New("ID and Name are required")
}
return s.Code.Validate()
}Value Object
値オブジェクトは、ドメインの「値」を表現する不変のオブジェクトです。例えば、単なる`string`ではなく、特定のフォーマットを持つ「科目コード」として定義することで、生成時にバリデーションを強制し、不正な値の存在を防ぎます。これにより、システムの堅牢性が向上します。
// internal/domain/subject/value_object/subject_code.go
package value_object
import "fmt"
type SubjectCode string
func NewSubjectCode(code string) (SubjectCode, error) {
if len(code) != 5 {
return "", fmt.Errorf("invalid subject code format")
}
return SubjectCode(code), nil
}
func (sc SubjectCode) Validate() error {
// ... more validation logic
return nil
}Repository, Domain Error, DTO
// internal/domain/subject/repository/subject_repository.go
package repository
import "context"
// ...
type SubjectRepository interface {
FindAll(ctx context.Context) ([]*entity.Subject, error)
}
// internal/domain/subject/error/errors.go
package error
import "errors"
var ErrSubjectNotFound = errors.New("subject not found")
// internal/application/subject/dto/subject_dto.go
package dto
type SubjectDTO struct {
ID string
Name string
Code string
}UseCase
// internal/application/subject/usecase/create_subject.go
package usecase
// ...
type CreateSubjectUseCase struct {
repo repository.SubjectRepository
}
func (uc *CreateSubjectUseCase) Execute(ctx context.Context, inputDTO dto.CreateSubjectInputDTO) (*dto.SubjectDTO, error) {
// ... business logic ...
// 1. DTO to Entity
// 2. repo.Save(entity)
// 3. Entity to DTO
return &dto.SubjectDTO{}, nil
}
Repository 実装 + sqlc
// internal/infrastructure/persistence/repository/subject_repository.go
package repository
import (
"context"
domain_repo "your-project/internal/domain/subject/repository"
"your-project/internal/infrastructure/persistence/sqlc/gen"
)
type subjectRepository struct {
q *gen.Queries
}
func NewSubjectRepository(q *gen.Queries) domain_repo.SubjectRepository {
return &subjectRepository{q: q}
}
func (r *subjectRepository) FindAll(ctx context.Context) ([]*entity.Subject, error) {
// sqlc generated method
dbSubjects, err := r.q.ListSubjects(ctx)
// ... map db model to entity
return subjects, nil
}
-- internal/infrastructure/persistence/sqlc/queries/subject.sql
-- name: ListSubjects :many
SELECT * FROM subjects ORDER BY name;Controller, Route, Presenter
// internal/interfaces/http/controller/subject_controller.go
func (c *SubjectController) ListSubjects(w http.ResponseWriter, r *http.Request) {
// ... call usecase ...
// ... map DTO to Schema ...
presenter.JSON(w, http.StatusOK, schema)
}
// internal/interfaces/http/route/subject_route.go
func (sr *SubjectRoutes) RegisterRoutes(r *chi.Mux) {
r.Get("/subjects", sr.controller.ListSubjects)
}
// internal/interfaces/http/presenter/presenter.go
func Error(w http.ResponseWriter, err error) {
switch {
case errors.Is(err, domain_error.ErrSubjectNotFound):
JSON(w, http.StatusNotFound, map[string]string{"error": err.Error()})
default:
JSON(w, http.StatusInternalServerError, map[string]string{"error": "internal server error"})
}
}Python LangGraph 側のアーキテクチャ
Pythonで実装されたAIワーカーは、LangGraphを利用して複雑なLLMワークフローを構築します。ここでも重要なのは責務の分離です。LangGraphのワークフロー、ノード、ツールから、直接Go側のユースケースやリポジトリを呼び出すことはありません。必要なビジネスロジックは、Goのアプリケーション層で定義されたPort(インターフェース)を通じて抽象化され、Python側はそのインターフェースの実装(アダプター)を呼び出す形を取ります。
StateGraphの組み立て
# infrastructure/langgraph/problem_creation/workflows/problem_creation_workflow.py
from langgraph.graph import StateGraph, END
from .state import ProblemCreationState
from ..nodes import core_logic_node, tool_node
def create_problem_creation_workflow():
workflow = StateGraph(ProblemCreationState)
workflow.add_node("core_logic", core_logic_node)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("core_logic")
workflow.add_conditional_edges(
"core_logic",
lambda state: "tools" if state.get("tool_calls") else END,
)
workflow.add_edge("tools", "core_logic")
return workflow.compile()PortとAdapterによる依存性逆転
Goのアプリケーション層で定義されたPort(インターフェース)を、Python側で実装(アダプター)することで、依存性の逆転を実現します。LangGraphのワークフローは、具象クラスである`LangGraphShapeExtractor`ではなく、抽象インターフェースである`ShapeExtractorPort`に依存します。これにより、ビジネスロジックと具体的なLLM実装が分離され、テストや将来の変更が容易になります。
# application/problem/port/shape_extractor_port.py (Interface)
from abc import ABC, abstractmethod
class ShapeExtractorPort(ABC):
@abstractmethod
def extract_and_save_shapes(self, problem_id: str, image_url: str) -> None:
pass
# infrastructure/langgraph/problem_creation/adapter/langgraph_shape_extractor.py (Implementation)
from application.problem.port.shape_extractor_port import ShapeExtractorPort
from ..workflows import create_problem_creation_workflow
class LangGraphShapeExtractor(ShapeExtractorPort):
def extract_and_save_shapes(self, problem_id: str, image_url: str) -> None:
workflow = create_problem_creation_workflow()
# ... invoke workflow with inputs ...Go ⇔ Python の接続: Redis Streams + Pub/Sub
同期的なCRUD処理はGoのAPI内で完結させますが、LLMが関わるような時間のかかる非同期処理は、Redis Streamsを介してGoからPythonワーカーに委譲します。Goのコントローラーはジョブをエンキューするだけですぐにレスポンスを返し、Pythonのワーカーがそのジョブを非同期に処理します。処理の進捗や結果はRedis Pub/Subを通じてリアルタイムにフロントエンドに通知されます。
// infrastructure/redis/job_publisher.go
func (p *JobPublisher) EnqueueProblemCreationJob(ctx context.Context, jobData map[string]interface{}) (string, error) {
jobID, err := p.redisClient.XAdd(ctx, &redis.XAddArgs{
Stream: "jobs:problem_creation",
Values: jobData,
}).Result()
return jobID, err
}# infrastructure/redis/problem_creation_stream_consumer.py
class ProblemCreationStreamConsumer:
def __init__(self, redis_client, use_case):
self.redis_client = redis_client
self.use_case = use_case
# ...
def consume(self):
while True:
# XREADGROUP from "jobs:problem_creation"
# ...
self.use_case.execute(job_data)
# XACK ...FE リクエストの全フロー
同期CRUDパス (例: GET /api/v1/subjects)
sequenceDiagram
participant FE as Frontend
participant GW as Go API Gateway
participant DB as PostgreSQL
FE->>+GW: GET /api/v1/subjects
GW->>GW: Middleware (Auth, CORS, etc)
GW->>GW: Controller -> Usecase
GW->>+DB: repo.FindAll()
DB-->>-GW: Rows
GW->>GW: Entity -> DTO -> Schema
GW-->>-FE: 200 OK (JSON)- FrontendがGo API Gatewayにリクエストを送信
- Goのmiddleware群(認証、CORS、ロギング)が実行
- `controller`がリクエストを受け取り、`usecase`を呼び出す
- `usecase`が`repository`(sqlc実装)を呼び出す
- `pgx`経由でPostgreSQLにクエリが発行される
- DBから返された行を`mapper`が`entity`に変換
- `usecase`が`entity`を`DTO`に変換して返す
- `controller`が`DTO`を`schema`に変換
- `presenter`がHTTPステータスコードとJSONレスポンスを生成し、Frontendに返す
非同期AIパス (例: POST /api/problem-creation/start)
sequenceDiagram
participant FE as Frontend
participant GW as Go API Gateway
participant RS as Redis Streams
participant PW as Python Worker
participant LLM as AI/LLM Service
participant RPubSub as Redis Pub/Sub
FE->>+GW: POST /api/problem-creation/start
GW->>+RS: XADD jobs:problem_creation
RS-->>-GW: job_id
GW-->>-FE: 202 Accepted (session_id)
FE->>+GW: GET /sse/stream?sid=...
GW->>+RPubSub: SUBSCRIBE sse:<sid>
PW->>+RS: XREADGROUP
RS-->>-PW: job
PW->>+LLM: Execute LangGraph Workflow
LLM-->>-PW: Result
PW->>+RPubSub: PUBLISH sse:<sid>
RPubSub-->>-GW: chunk
GW-->>-FE: Stream chunk- FrontendがGo API Gatewayに非同期処理開始リクエストを送信
- Goの`controller`が認可とバリデーションを行い、`usecase`を呼び出す
- `usecase`がRedis Streamsにジョブをエンキュー (`XADD`)
- Goは即座に`202 Accepted`とセッションIDをFrontendに返す
- Pythonの`stream-worker`がブロッキング読み取り (`XREADGROUP`) でジョブを取得
- ワーカーは`usecase`を起動し、Port経由でLangGraphワークフローを実行
- ワークフロー内でLLM呼び出しやDB書き込みが行われる
- 進捗があるたびに、ワーカーはRedis Pub/SubにイベントをPUBLISHする
- Frontendは別途EventSourceでSSEストリームに接続 (`/sse/stream`)
- GoのSSEハンドラがPub/SubをSUBSCRIBEし、受け取ったメッセージをそのままFrontendにプロキシする
テスト戦略
- **Entity / Value Object**: ビジネスロジックの正当性を確認するため、単体テストを徹底します。
- **UseCase**: 依存するRepositoryをインラインモックに差し替え、ユースケースのロジックのみをテストします。
- **Repository**: `testcontainers-go`を使い、実際のPostgreSQLコンテナを起動して、DBとの結合をテストします。
- **Controller**: `httptest`パッケージを利用し、UseCaseをモック化して、HTTPリクエストからレスポンスまでをテストします。
- **E2E**: 主要なシナリオ(同期CRUD、非同期AIフローなど)に絞り、システム全体を通したEnd-to-Endテストを実施します。