I am trying to stream tick level information about YTCc1 using the LSEG-data package in python. I can pull tick histories no worries, but i seem to be losing trades on the stream. I am only getting a small subset of trades in my stream. I have the require data permissions i believe, as i can see the tick stream in the trade log in workspace.
there are no errors, it runs fine but just doesn't have all the trades on the stream side
it uses Refinitiv-data not lseg-data soz, old package
but similar logic i believe
import refinitiv.data as rd
import pandas as pd
import datetime as dt
import threading
import queue
import csv
import os
import time
from zoneinfo import ZoneInfo
# ======================
# CONFIG
# ======================
INSTRUMENTS = ["YTCc1", "YTTc1"]
# Core top-of-book (your current Dash expects these columns)
CORE_FIELDS = ["BID", "ASK", "BIDSIZE", "ASKSIZE"]
# Quote/trade timing + trade identifiers needed for overlays
# (Seen in your full-field probe and your later CSV extract)
EXTRA_FIELDS = [
# Quote time-of-day
"QUOTIM_MS", "TIMACT", "EXCHTIM",
# Optional (sometimes present only on refresh)
"QUOTE_DATE",
# Trade fields
"TRDPRC_1", "TRDVOL_1", "TRADE_ID",
"TRDTIM_1", "SALTIM", "SALTIM_MS",
# Optional (sometimes present only on refresh)
"TRADE_DATE",
# Useful context for later filtering/annotation
"LSTSALCOND", "PRCTCK_1",
]
FIELDS_ALL = CORE_FIELDS + EXTRA_FIELDS
OUT_CSV = "XM_stream.csv"
# Writer tuning
event_q = queue.Queue(maxsize=200000)
stop_flag = threading.Event()
FLUSH_INTERVAL_SEC = 0.05
IDLE_FLUSH_SEC = 0.02
BATCH_MAX_ROWS = 1000
DROPPED_EVENTS = 0
# ======================
# TIME HELPERS (UTC)
# ======================
def now_utc() -> dt.datetime:
return dt.datetime.now(dt.timezone.utc)
def to_iso_utc(ts: dt.datetime) -> str:
if ts.tzinfo is None:
ts = ts.replace(tzinfo=dt.timezone.utc)
return ts.astimezone(dt.timezone.utc).isoformat()
def sydney_830_to_utc(today_local_date=None) -> dt.datetime:
"""
Controls how far back you backfill.
Even though all timestamps are treated as UTC, this is just a cutoff rule.
"""
syd = ZoneInfo("Australia/Sydney")
if today_local_date is None:
today_local_date = dt.datetime.now(syd).date()
start_syd = dt.datetime(
year=today_local_date.year,
month=today_local_date.month,
day=today_local_date.day,
hour=8, minute=30, second=0,
tzinfo=syd
)
return start_syd.astimezone(dt.timezone.utc)
def _parse_time_to_timeobj(time_s) -> dt.time | None:
"""
Parse 'HH:MM:SS' or 'HH:MM:SS.mmm' or 'HH:MM:SS.ffffff' into a datetime.time
"""
if not time_s:
return None
try:
s = str(time_s).strip()
if "." in s:
base, frac = s.split(".", 1)
frac = frac[:6].ljust(6, "0") # to microseconds
s2 = f"{base}.{frac}"
return dt.datetime.strptime(s2, "%H:%M:%S.%f").time()
return dt.datetime.strptime(s, "%H:%M:%S").time()
except Exception:
return None
def _combine_with_recv_date(recv_dt_utc: dt.datetime, t: dt.time) -> dt.datetime:
"""
Combine RecvUTC date with a time-of-day and adjust ±1 day if too far from recv time.
Handles UTC midnight rollover when the stream only provides HH:MM:SS.mmm.
"""
candidate = dt.datetime(
recv_dt_utc.year, recv_dt_utc.month, recv_dt_utc.day,
t.hour, t.minute, t.second, t.microsecond,
tzinfo=dt.timezone.utc
)
delta = (candidate - recv_dt_utc).total_seconds()
if delta > 12 * 3600:
candidate -= dt.timedelta(days=1)
elif delta < -12 * 3600:
candidate += dt.timedelta(days=1)
return candidate
def build_market_ts_from_fields(fields: dict, recv_iso_utc: str) -> str | None:
"""
Build MarketTS for streaming rows.
Assumes time-of-day fields are UTC.
Anchors date to RecvUTC and corrects for midnight rollover.
"""
if not fields or not recv_iso_utc:
return None
try:
recv_dt = dt.datetime.fromisoformat(recv_iso_utc)
if recv_dt.tzinfo is None:
recv_dt = recv_dt.replace(tzinfo=dt.timezone.utc)
recv_dt = recv_dt.astimezone(dt.timezone.utc)
except Exception:
return None
# Prefer quote ms time
t = _parse_time_to_timeobj(fields.get("QUOTIM_MS"))
if t:
return _combine_with_recv_date(recv_dt, t).isoformat()
# Prefer trade ms time
t = _parse_time_to_timeobj(fields.get("SALTIM_MS"))
if t:
return _combine_with_recv_date(recv_dt, t).isoformat()
# Seconds-only trade time
t = _parse_time_to_timeobj(fields.get("TRDTIM_1") or fields.get("SALTIM"))
if t:
return _combine_with_recv_date(recv_dt, t).isoformat()
# Seconds-only quote time
t = _parse_time_to_timeobj(fields.get("EXCHTIM") or fields.get("TIMACT"))
if t:
return _combine_with_recv_date(recv_dt, t).isoformat()
return None
# ======================
# CSV SCHEMA
# ======================
def ensure_csv_header():
"""
Keeps your original first 9 columns unchanged for Dash compatibility.
Appends the extra fields at the end.
"""
cols = ["RecvUTC", "WriteUTC", "MarketTS", "Instrument", "EventType"] + CORE_FIELDS + EXTRA_FIELDS
if not os.path.exists(OUT_CSV):
with open(OUT_CSV, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=cols, delimiter=",")
w.writeheader()
f.flush()
return cols
def append_rows(rows, cols):
with open(OUT_CSV, "a", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=cols, delimiter=",")
for r in rows:
w.writerow({c: r.get(c, None) for c in cols})
f.flush()
# ======================
# BACKFILL (quotes + trades)
# ======================
def backfill_history(start_utc: dt.datetime, end_utc: dt.datetime, cols):
"""
Backfill tick history using rd.get_history(interval='tick') requesting quote+trade fields.
History timestamp from get_history becomes MarketTS (UTC).
"""
hist = rd.get_history(
universe=INSTRUMENTS,
fields=FIELDS_ALL,
interval="tick",
start=start_utc.strftime("%Y-%m-%d %H:%M:%S"),
end=end_utc.strftime("%Y-%m-%d %H:%M:%S"),
)
if hist is None or len(hist) == 0:
return
# Normalize output into long_df with Instrument + TimestampUTC
if is
that's the full thing the most relevant parts are probably this:
rd.open_session()
cols = ensure_csv_header()
# 1) Backfill from 08:30 Sydney (converted to UTC) -> now
start_utc = sydney_830_to_utc()
end_utc = now_utc()
backfill_history(start_utc, end_utc, cols)
# 2) Stream live with same fields
definition = rd.content.pricing.Definition(
universe=INSTRUMENTS,
fields=FIELDS_ALL
)
stream = (definition.get_stream()
.on_refresh(on_refresh)
.on_update(on_update)
.on_status(on_status)
.on_error(on_error))
t = threading.Thread(target=writer_loop, args=(cols,), daemon=True)
t.start()
stream.open()