Source code for langchain.document_loaders.concurrent
from __future__ import annotations
import concurrent.futures
from pathlib import Path
from typing import Iterator, Literal, Optional, Sequence, Union
from langchain.document_loaders.base import BaseBlobParser
from langchain.document_loaders.blob_loaders import BlobLoader, FileSystemBlobLoader
from langchain.document_loaders.generic import GenericLoader
from langchain.document_loaders.parsers.registry import get_parser
from langchain.schema import Document
_PathLike = Union[str, Path]
DEFAULT = Literal["default"]
[docs]class ConcurrentLoader(GenericLoader):
"""
A generic document loader that loads and parses documents concurrently.
"""
def __init__(
self, blob_loader: BlobLoader, blob_parser: BaseBlobParser, num_workers: int = 4
) -> None:
super().__init__(blob_loader, blob_parser)
self.num_workers = num_workers
[docs] def lazy_load(
self,
) -> Iterator[Document]:
"""Load documents lazily with concurrent parsing."""
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.num_workers
) as executor:
futures = {
executor.submit(self.blob_parser.lazy_parse, blob)
for blob in self.blob_loader.yield_blobs()
}
for future in concurrent.futures.as_completed(futures):
yield from future.result()
[docs] @classmethod
def from_filesystem(
cls,
path: _PathLike,
*,
glob: str = "**/[!.]*",
suffixes: Optional[Sequence[str]] = None,
show_progress: bool = False,
parser: Union[DEFAULT, BaseBlobParser] = "default",
num_workers: int = 4,
) -> ConcurrentLoader:
"""
Create a concurrent generic document loader using a
filesystem blob loader.
"""
blob_loader = FileSystemBlobLoader(
path, glob=glob, suffixes=suffixes, show_progress=show_progress
)
if isinstance(parser, str):
blob_parser = get_parser(parser)
else:
blob_parser = parser
return cls(blob_loader, blob_parser, num_workers)