|
|
import os |
|
|
import shutil |
|
|
import subprocess |
|
|
import sys |
|
|
import time |
|
|
|
|
|
import pytest |
|
|
|
|
|
import fsspec |
|
|
from fsspec.implementations.cached import CachingFileSystem |
|
|
|
|
|
|
|
|
@pytest.fixture() |
|
|
def m(): |
|
|
""" |
|
|
Fixture providing a memory filesystem. |
|
|
""" |
|
|
m = fsspec.filesystem("memory") |
|
|
m.store.clear() |
|
|
m.pseudo_dirs.clear() |
|
|
m.pseudo_dirs.append("") |
|
|
try: |
|
|
yield m |
|
|
finally: |
|
|
m.store.clear() |
|
|
m.pseudo_dirs.clear() |
|
|
m.pseudo_dirs.append("") |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
def ftp_writable(tmpdir): |
|
|
""" |
|
|
Fixture providing a writable FTP filesystem. |
|
|
""" |
|
|
pytest.importorskip("pyftpdlib") |
|
|
from fsspec.implementations.ftp import FTPFileSystem |
|
|
|
|
|
FTPFileSystem.clear_instance_cache() |
|
|
CachingFileSystem.clear_instance_cache() |
|
|
d = str(tmpdir) |
|
|
with open(os.path.join(d, "out"), "wb") as f: |
|
|
f.write(b"hello" * 10000) |
|
|
P = subprocess.Popen( |
|
|
[sys.executable, "-m", "pyftpdlib", "-d", d, "-u", "user", "-P", "pass", "-w"] |
|
|
) |
|
|
try: |
|
|
time.sleep(1) |
|
|
yield "localhost", 2121, "user", "pass" |
|
|
finally: |
|
|
P.terminate() |
|
|
P.wait() |
|
|
try: |
|
|
shutil.rmtree(tmpdir) |
|
|
except Exception: |
|
|
pass |
|
|
|